Rule framework functions
This article lists the available rule framework functions and attributes of the Rule base class that can be used in rules.
Store timeseries
store_timeseries(
self,
timeseries: pd.Series | pd.DataFrame,
channel_id: str | None = None,
*,
datasource_id: str | None = None,
store_in_flow_data: bool = True
) -> None:
"""
Persists the given pandas Series or DataFrame `timeseries`.
If `timeseries` is a pandas Series and `channel_id` is given, use given `channel_id` as the channel to store the
Series on.
The column names of the given `timeseries` are used as the channel classifiers to store the timeseries under. If
`timeseries` is a pandas Series without a name and `channel_id` is *None*, an error is raised.
Args:
timeseries: A pandas Series or DataFrame to store. By default, the name of the Series or DataFrame columns
is used as the channel classifiers.
channel_id: Optionally a specific channel classifier to store `timeseries` under. Only used if `timeseries`
is a pandas Series.
datasource_id: Optionally the ID of the datasource to store `timeseries` under. By default, this datasource
is used.
store_in_flow_data: Whether to also store these timeseries in the used flow data. Timeseries stored in the
flow data will be available in `AbstractRule.dataframe` in following rules.
Raises:
TypeError:
- `timeseries` is not a pandas Series or DataFrame.
- `timeseries` has no DatetimeIndex as the index.
- `timeseries` has a DatetimeIndex that is timezone-naive.
- `timeseries` contains values that are not of type int or float.
ValueError:
- `timeseries` is a pandas Series without a name and `channel_id` is *None*.
- `timeseries` has a DatetimeIndex with duplicate entries.
- `timeseries` has a column that is not an existing channel classifier.
- `datasource_id` is the ID of an unprepared datasource.
"""
See storing timeseries for details.
Store annotations
store_annotations(
self,
annotations: pd.Series | pd.DataFrame,
channel_id: str,
annotation_name: str | None = None,
*,
datasource_id: str | None = None,
store_in_flow_data: bool = True
) -> None:
"""
Persists the given pandas Series or DataFrame `annotations` under the provided `channel_id`.
The column names of the given `annotations` are used as the names of the annotations to store on the given
`channel_id`. If `annotations` is a pandas Series without a name and `annotation_name` is *None*, an error is
raised.
Args:
annotations: A pandas Series or DataFrame of annotations to store. The column names are used as the names of
the annotations.
channel_id: The channel classifier to store `annotations` under.
annotation_name: Optionally a specific annotation name to store `annotations` with. Only used if
`annotations` is a pandas Series.
datasource_id: Optionally the ID of the datasource to store `annotations` under. By default, this datasource
is used.
store_in_flow_data: Whether to also store these annotations in the used flow data. Annotations stored in the
flow data will be available in `AbstractRule.dataframe` in following rules and are recognized by
`AbstractRule.data_filter`.
Raises:
TypeError:
- `annotations` is not a pandas Series or DataFrame.
- `annotations` has no DatetimeIndex as the index.
- `annotations` has a DatetimeIndex that is timezone-naive.
- `annotations` contains values that are not of type dict.
ValueError:
- `annotations` is a pandas Series without a name and `annotation_name` is *None*.
- `annotations` has a DatetimeIndex with duplicate entries.
- `channel_id` is not an existing channel classifier.
- `datasource_id` is the ID of an unprepared datasource.
"""
See Timeseries Storage for details.
Load timeseries
load_timeseries(
self, datasource_id: str,
channel_ids: list[str],
start: datetime,
end: datetime,
annotation_included: bool = False,
annotation_filter: list[str] = None,
version: datetime = None,
version_range: tuple[datetime, datetime] = None,
channel_family: str = None
):
"""
Get timeseries data from a channel with an option to include annotations. This is often used when another
channel apart from the source channel of the flow is needed in the flow.
The datasource id has to be included in the returned dict of the prepare_context function, under the key
prepare_datasource_ids.
Args:
datasource_id (str): the id of the datasource of the requested channel
channel_ids (list[str]): list of channel ids
start (datetime): start index of the timeseries data
end (datetime): end index of the timeseries data
annotation_included (bool): whether to include annotations associated to the channel
annotation_filter (list): a list of annotation names to retrieve, if not given, all annotations associated
to that channel will be retrieved
version (datetime): Get all timeseries with a specific version. (Only for backwards compatibility at this moment)
version_range (tuple[datetime, datetime]): Get all timeseries in a version range. Returns a multi index
dataframe (Only for backwards compatibility at this moment)
channel_family (str): The channel family to use for retrieving the data. If *None*, no channel family
filtering will be used.
Returns:
:class:`pd.DataFrame`
"""
See Loading Timeseries for details.
Tags methods
self.datasource.get_latest_tag_version(self, tag_name: str) -> Tag | None:
"""
Get latest tag version for a given datasource from rules.
Args:
tag_name: Name of the tag.
Returns:
tag: Latest tag version for the given datasource.
"""
self.datasource.get_active_tag_versions(self, tag_name: str) -> list[Tag]:
"""
Gets the active tag versions for a given tag and datasource from rules.
Args:
tag_name: Name of the tag.
Returns:
tags_list: List of the active tag version for a given datasource.
"""
self.datasource.get_all_tag_versions(self, tag_name: str) -> list[Tag]:
"""
Gets all the tag versions for a given tag and datasource from rules.
Args:
tag_name: Name of the tag.
Returns:
tags_list: List of all tag versions for a given datasource.
"""
Note: The three methods mention above need to be called in the rule, using the datasource object. This is why all methods have self.datasource. All other methods, are inherited from the rule class.
add_tags(self, datasource_id: str, tags: list[Tag]) -> int:
"""
Adds tags to a datasource from rules.
Args:
datasource_id: The datasource ID to store `tags` on.
tags: List of Tag objects to store on `datasource_id`.
Returns:
n_tags: Number of tags created.
"""
Notes: datasource_id must be a prepared datasource.
The given tags must be an Iterable of Tag objects.
The class Tag has the following structure:
Tag:
tag: str
description: str
valid_from: datetime
properties: list[KeyValueType]
removed: bool
version: datetime
is_active_scd: bool
tag_links: str
created_by: str
Other methods
Upload files to file manager
additional_output(self, filename: str, file_content: str, tags: list[str], market_adapter_id: str = None):
"""
Creates a file in file manager
Args:
filename (str): Name of the file
file_content (str): Content of the file
tags (list[str]): Optional tags which should be set on the file
market_adapter_id (int or None): Optional Market Adapter ID that will trigger after the upload
Raises:
ValueError: It raises this exception if the market_adapter_id is not a type that can be converted to int or None.
"""
Creates a new file in the file manager of this namespace. The content provided here must be a string and will be stored directly to a file that will show up in the File Management section. A list of strings can optionally be provided to add tags to the to-be-created file.
Load channel classifier
load_channel_classifier(self, channel_classifier_name)
Retrieve a Channel Classifier object based on a given ID. Mostly used for checking if a specific channel classifier exists in the platform.
Load datasources of a virtual datasource
load_datasources_by_virtual_datasource_filter(self, virtual_datasource: Datasource, reference_date: datetime = None):
"""
Get the datasource ids and their time range in which they belong to this virtual datasource (the datasource that this rule is executed on, the self.datasource)_
Args:
virtual_datasource (VirtualDatasource):
reference_date (dt.Datetime): Only tag versions before reference_date (inclusive) will be considered
Returns:
[dict(str, list(DateRange))]: a dictionary containing datasource ids as keys and a list of DateRanges as values
"""
Retrieves the datasources that belong to a Virtual Datasource by providing the Virtual Datasource object. The Virtual Datasource object must be an object of type Datasource whose filter will be used to retrieve the list of datasources.
If the datasource provided is not a valid Virtual Datasource it will return an empty dictionary.
Publish to custom pubsub topic
publish_to_custom_pubsub_topic(self, topic_name: str, data: bytes, attributes: dict = None):
"""
Publishes a message to a custom pubsub topic called `custom- <topic_name>`_
Args:
topic_name (str): The name of the custom pubsub topic; prefixed with `custom-`
data (bytes): The message to be published
attributes (dict}): The attributes to be published with the message
"""
Publish any byte data from a rule to a Google PubSub Topic starting with custom- within the current environment.
topic_name: must be an existing Pubsub Topic living in the current environment (this has to be arranged with the Energyworx support team in advance)
data: anything you could publish to a PubSub Topic. (<10MB, but ideally less than 1MB)
attributes: An optional dict[str, str] of keys and values to be passed along as Pubsub attributes.
Send a Collective Trigger message
send_collective_trigger(
self,
correlation_id: str,
expected_number_of_messages: int,
deadline: int | None,
deadline_delta: int | None,
timeout_between_messages: int | None,
late_data_behaviour: LateDataBehaviourEnum | str,
datasource_id: str,
flow_config_id: int,
source_channel_id: Optional[str],
flow_properties: Optional[dict[str, object]] = None,
):
"""
Sends a message to the Collective Trigger service.
Args:
correlation_id (str): Identifier that messages are aggregated on
expected_number_of_messages (int): The amount of messages that are expected
deadline (int): if this specific timestamp expires after receiving the first message, trigger the flow
deadline_delta (int): if this amount of seconds has passed after receiving the first message, trigger the flow
timeout_between_messages (int): if no new message has arrived after this amount of seconds, trigger the flow
late_data_behaviour (LateDataBehaviourEnum | str): determines what happens to messages that come in after
the deadline or timeout have passed.
datasource_id (str): id of the datasource on which to trigger the flow
flow_config_id (int): id of the configuration of the flow to be triggered
source_channel_id (Optional[str]): source channel id for flow
flow_properties (Optional[Dict[str, object]]): flow properties for the flow to be triggered
"""
This method sends a message to the Collective Trigger (see Collective Trigger for details) to aggregate messages and to trigger a new flow.
The LateDataBehaviourEnum options are represented this way:
class LateDataBehaviourEnum(EWXEnum):
discard = "discard" # 0 Discards the message
trigger_entire_message = "trigger_entire_message" # 1 Triggers the flow with the entire collection of messages
start_new_state = "start_new_state" # 2 Starts again the waiting window to collect new messages
trigger_single_message = "trigger_single_message" # 3 Triggers the flow with this single message
Timeslices methods
The following class definition represents a TimesliceGroup:
class TimesliceGroup:
"""
Attributes:
name (str): the name of the TimesliceGroup
version(str): the version of the TimesliceGroup
description (str): the description of the TimesliceGroup
properties (list[KeyValueType]): the properties of the TimesliceGroup
timeslices (list[Timeslice]): the timeslices of the TimesliceGroup
deleted (bool): if the TimesliceGroup is deleted
id (int): the id of the TimesliceGroup
valid_from (datetime): the point in time where this timeslice group is valid from
"""
and the Timeslice is represented this way:
class Timeslice:
"""
Attributes:
period_type
priority
starts (list[datetime]): the list of the start times of the timeslices
ends (list[datetime]): the list of the end times of the timeslices
value
"""
The properties use this definition:
class KeyValue:
"""
Attributes:
key (str): the name of the property
value (str): the value of the property
"""
The following functions are available:
```python
load_timeslice_group(self, timeslice_group_id: str):
"""
Get a time slice group by its id
Args:
timeslice_group_id (str):
Returns:
TimesliceGroup:
"""
load_timeslice_groups_by_name(
self,
timeslice_group_name,
start_date=None,
end_date=None,
include_versions=True,
):
"""
Get a time slice group by its name and optionally get only the valid ones in the given period
Args:
timeslice_group_name (str): The name of the timeslice group to retrieve
start_date (datetime): Optional start date of the period
end_date (datetime): Optional end date time of the period
include_versions(bool): Option to include older versions in result
Returns:
list(TimesliceGroup):
"""
load_timeslice_groups_by_properties(
self,
timeslice_group_properties,
start_date=None,
end_date=None,
timeslice_group_name=None,
include_versions=False,
):
"""
Get a time slice group by its properties optionally get only the valid ones in the given period. You can also
filter on name with timeslice_group_name.
Args:
timeslice_group_properties (list[str]): [key=value, ...]
start_date (datetime): Optional start date of the period
end_date (datetime): Optional end date time of the period
timeslice_group_name(str): Optional timeslice group name
include_versions(bool): Option to include older versions in result
Returns:
Generator(TimesliceGroup):
""
Trigger a new flow
trigger_flow(
self,
flow_config_id: str | int,
start_datetime: datetime,
end_datetime: datetime,
datasource_id: str = None,
custom_flow_config_parameters: list[CustomFlowConfigurationParameters] = None,
flow_properties: dict = None,
countdown: int = None,
datasource_object: Datasource = None,
tags_included: bool = True,
):
"""
Triggers a flow with the given flow_config_id and the provided start and end datetimes.
Args:
datasource_id (str): datasource id to execute the flow on.
flow_config_id (int): the id of the flow config to trigger.
start_datetime (datetime): start index of the timeseries data to use as source data of the flow.
end_datetime (datetime): end index of the timeseries data to use as source data of the flow.
custom_flow_config_parameters: list[CustomFlowConfigurationParameters]: if it's a custom flow config provide
the parameters here.
flow_properties (dict): The flow properties to be passed to the flow.
countdown (int): the number of seconds to delay the start of the flow.
datasource_object (Datasource): datasource object to execute the flow on.
If provided the datasource_id parameter is not taken into account.
tags_included (bool): indicates if the context of tags is passed with the trigger
(only applied if the datasource_object is passed).
Raises:
TypeError: if passing wrong types to the function.
FlowRuleException: in the following cases:
- if the datasource is not found (when providing datasource_id and not datasource_object)
- if both datasource_id and datasource_object are not provided
"""
This method allows the system to trigger a new flow with different configurations.
Attributes
market_adapter_split_count
This attribute provides the number of messages that the original file was split into upon use of the market adapter split function. (See Market Adapter for more info)
The main use for this attribute is to be able to correctly set the number of expected messages of a Collective Trigger. For example, if a file ingestion updates two datasources and the final flow needs these two datasources to be processed before, it is possible to trigger a collective trigger. Here's an example of using this attribute in combination with a collective trigger:
self.send_collective_trigger(
correlation_id,
self.market_adapter_split_count,
deadline,
deadline_delta,
timeout_between_messages,
late_data_behaviour,
datasource_id,
flow_config_id,
source_channel_id,
flow_properties
)
If the flow is not started from an ingestion, the value of market_adapter_split_count will be set to None.