Custom Market Adapter
The platform provides the ability to customized a Market Adapter for the specific customer requirements. This way customers can add and change the code directly on the platform. Note: At the time of writing (release 25.02), the UI still does not support this feature. So the only way to upload custom Market Adapters is through APIv1.
Market Adapter class overview
The design starts with chosing a technical name which must be unique.
Market adapters must extend the class PluggableMarketAdapter.
The process of normalizing data from the provided state to an ingestible form is done in three functions, which can be customized to perform the actions necessary. These functions must be implemented in your Market Adapter.They are:
collect()- Executed to retrieve the actual payload data from storage, decrypts and uncompresses the data if necessary.split()- Executed to split the payload in manageable and grouped chunks. Splitting the payload in chunks allows for greater scalability of the ingestion. Grouping can be applied across chunks to improve performance during ingestion.adapt()- Executed to adapt the payload supplied by the customer to the normalized structure.
There are two ways of using a Market Adapter: either outputting data from the adapt() step in a normalized structure, which will be used by the transformation configuration, or outputting domain objects directly from the adapt() step, which will be skip the transformation step and will be directly ingested.
Example of Market Adapter implementation
This is a simple example of how to implement a Market Adapter. It is necessary to import the base class, such as FileBasedMarketAdapter.
from energyworx_market_adapters.base_market_adapter.pluggable import PluggableMarketAdapter
class MarketAdapterExample(PluggableMarketAdapter):
def collect(self, stream_prepare_trigger, **kwargs):
"""
This method is executed to retrieve the actual payload data from storage, decrypts and decompresses the data if necessary.
Args:
stream_prepare_trigger (DataflowPrepareTrigger): The prepare trigger containing configurations and payloads
**kwargs: market adapter properties as key-values pairs
Returns:
payload (str): The payload in string format
"""
return super().collect(stream_prepare_trigger, unzip=False, **kwargs)
def split(
self,
content,
transformation_config=None,
**kwargs,
):
"""
This method is executed to split the payload in manageable and grouped chunks. Splitting the payload in chunks
allows for greater scalability of the ingestion. Grouping can be applied across chunks to improve performance
during ingestion.
Args:
content (str): The payload in string format
transformation_config (TransformationConfiguration): The transformation configuration
**kwargs: market adapter properties as key-values pairs
Returns:
Iterable[str]: An iterable(list or generator) of the split payload.
"""
for s in content.split(","):
yield s
def adapt(self, content, current_datetime, **kwargs):
"""
This method is executed to adapt the payload supplied by the customer to the normalized structure.
Args:
content (str): The payload in string format
current_datetime (datetime): Datetime object used for Beam's 'now' to make Market Adapters idempotent
**kwargs: market adapter properties as key-values pairs
Returns:
dict: The normalized structure which is used during transformation (keys in normalized structures
correspond to keys in transformation configuration).
"""
pass
Re-use a generic Market Adapter
A Market Adapter can also extend from a usable Market Adapter. In the example below there is a Market Adapter that inherit
from the json Market Adapter.
from energyworx_market_adapters.json_market_adapter.json_market_adapter import V1 as JSON_MA_V1
class V1(JSON_MA_V1):
def split(self, element, **kwargs):
yield element
def adapt(self, element, **kwargs):
pass
Re-use a user Market Adapter
It is also possible to import and re-use a user-defined Market Adapter. In this case, let's assume that the user created
a Market Adapter with the technical name my_json and the class ExampleMarketAdapter. Then, the code for the new Market Adapter will be:
from my_json import ExampleMarketAdapter as JSON_MA_V1
class AnotherExampleMarketAdapter(JSON_MA_V1):
def split(self, element, **kwargs):
yield element
def adapt(self, element, **kwargs):
pass
Outputting domain objects
In the adapt() step, it is also possible to output domain objects instead of a normalized structure. In this case, the transformation step will be skipped and the domain objects will be directly ingested.
Domain objects can be created by importing the corresponding classes from the ewx_public.domain.models module.
import datetime as dt
import pandas as pd
import pytz
from energyworx_market_adapters.base_market_adapter.pluggable import PluggableMarketAdapter
from ewx_public.domain.models.datasource import Datasource, Channel, Tag, TagProperty
class DomainObjectMarketAdapter(PluggableMarketAdapter):
def split(
self,
element,
**kwargs,
):
yield element
def adapt(
self,
element,
current_datetime: dt.datetime,
**kwargs
):
# Retrieving the transformation configuration (this is optional when outputting domain objects,
# but can be useful to create dynamic domain objects based on the transformation configuration)
transformation_configuration = self.get_transformation_configuration()
# Creating a Datasource
new_datasource = Datasource(
id=transformation_configuration.datasource.datasource_id.static_value,
name=transformation_configuration.datasource.name.static_value,
description="This is a new datasource",
classifier="POWER",
timezone="Europe/Amsterdam",
channels=[
Channel(
name=channel_mapping.name.static_value,
classifier=channel_mapping.channel_classifier.static_value,
is_source=True,
description=channel_mapping.description.static_value,
) for channel_mapping in transformation_configuration.datasource.channels
],
tags=[
Tag(
tag="location",
properties=[
TagProperty(
key="country",
value="Netherlands",
),
TagProperty(
key="city",
value="Amsterdam",
),
],
)
],
)
self.output_datasource(new_datasource)
# Create sample timeseries data with hourly values for 24 hours
timestamps = pd.date_range(
start=dt.datetime(2026, 1, 1, 0, 0, 0, tzinfo=pytz.UTC),
end=dt.datetime(2026, 1, 1, 23, 0, 0, tzinfo=pytz.UTC),
freq='1H'
)
df = pd.DataFrame({
'DOMAIN_OUTPUT_1': [20.0 + i * 0.5 for i in range(24)], # Temperature-like values
'DOMAIN_OUTPUT_2': [1013.0 + i * 0.2 for i in range(24)], # Pressure-like values
'DOMAIN_OUTPUT_1:quality': [{"value": "good"}] * 24, # Annotation for Channel 1
'DOMAIN_OUTPUT_2:source': [{"name": "sensor_A"}] * 24, # Annotation for Channel 2
'DOMAIN_OUTPUT_1.TEST': ['zero'] * 24, # Datapoint attribute for Channel 1
'DOMAIN_OUTPUT_2.TEST': ['three'] * 24, # Datapoint attribute for Channel 2
}, index=timestamps)
# Convert the dataframe to a timeseries object
# Use the current_datetime from kwargs as the version
timeseries = self.create_timeseries(
df=df,
datasource=new_datasource,
version=current_datetime
)
# Output the timeseries data
self.output_timeseries(timeseries)
In this example, the adapt() method creates a new Datasource with two channels and some tags. Then, it creates a sample timeseries data with hourly values for 24 hours and outputs the timeseries data using the output_timeseries() method. The current_datetime is used as the version for the timeseries data to ensure idempotency.
Right now, nothing is done with the element parameter, but it can be used to create dynamic domain objects based on the input data.
The following methods are available for outputting domain objects:
output_datasource(datasource: Datasource): This method is used to output aDatasourceobject.output_timeseries(timeseries: Timeseries): This method is used to output aTimeseriesobject.output_tags(tags: List[Tag]): This method is used to output a list ofTagobjects.output_flow_trigger(crunch_trigger: CrunchTrigger, source_data: Timeseries): This method is used to output aCrunchTriggerobject, which is used to trigger a flow based on the providedsource_datatimeseries.create_timeseries(df: pd.DataFrame, datasource: Datasource, version: dt.datetime) -> Timeseries: This method is used to create aTimeseriesobject from a pandas DataFrame, associating it with a specificDatasourceand version. The DataFrame should have a datetime index and columns corresponding to the channels and annotations.create_crunch_trigger(datasource: Datasource, flow_config_id: int, start_datetime: dt.datetime, end_datetime: dt.datetime, source_channel_id: str, flow_properties: dict): -> CrunchTrigger: This method is used to create aCrunchTriggerobject, which can be used to trigger a flow based on a specific time range and source channel from aDatasource. Theflow_propertiescan be used to pass additional parameters to the flow when it is triggered. Please see ewx_public for the actual implementation and required parameters for this method.get_datasource(datasource_id: str) -> Datasource: This method is used to retrieve aDatasourceobject by its ID. It can be useful when you want to create timeseries data for an existing datasource or when you want to check the details of a datasource before creating new domain objects.get_tags(datasource_id: str, tag_key: str) -> List[Tag]: This method is used to retrieve a list ofTagobjects for a specificDatasourceand tag key. It can be useful when you want to check the existing tags for a datasource or when you want to create new tags based on existing ones.get_transformation_configuration() -> TransformationConfiguration: This method is used to retrieve the transformation configuration for the current Market Adapter. It can be useful when you want to create domain objects based on the transformation configuration.
Important note on outputting domain objects
You can only output domain objects from the adapt() step if you do not output a normalized structure for the transformation configuration. You must choose between outputting a normalized structure or outputting domain objects.
Doing both inside the same Market Adapter implementation will result in an error.