Skip to main content

Rule framework functions

This article lists the available rule framework functions and attributes of the Rule base class that can be used in rules.

Timeseries and annotations

Storing and loading timeseries data have their own dedicated reference pages — in short:

  • store_timeseries(timeseries, channel_id=None, *, datasource_id=None, store_in_flow_data=True) — persist a pandas Series or DataFrame on this (or another prepared) datasource. Full signature and examples: Timeseries Storage.
  • store_annotations(annotations, channel_id, annotation_name=None, *, datasource_id=None, store_in_flow_data=True) — persist annotations on a channel. Full signature and examples: Timeseries Storage.
  • load_timeseries(datasource_id, channel_ids, start, end, ...) — legacy loader kept for backwards compatibility; prefer self.timeseries_service.get_latest(). Full signatures and caching behaviour: Timeseries Loading.

Tags methods

self.datasource.get_latest_tag_version(self, tag_name: str) -> Tag | None:  
"""
Get latest tag version for a given datasource from rules.

Args:
tag_name: Name of the tag.

Returns:
tag: Latest tag version for the given datasource.
"""
self.datasource.get_active_tag_versions(self, tag_name: str) -> list[Tag]:  
"""
Gets the active tag versions for a given tag and datasource from rules.

Args:
tag_name: Name of the tag.

Returns:
tags_list: List of the active tag version for a given datasource.
"""
self.datasource.get_all_tag_versions(self, tag_name: str) -> list[Tag]:  
"""
Gets all the tag versions for a given tag and datasource from rules.

Args:
tag_name: Name of the tag.

Returns:
tags_list: List of all tag versions for a given datasource.
"""

Note: The three methods mention above need to be called in the rule, using the datasource object. This is why all methods have self.datasource. All other methods, are inherited from the rule class.

add_tags(self, datasource_id: str, tags: list[Tag]) -> int:  
"""
Adds tags to a datasource from rules.

Args:
datasource_id: The datasource ID to store `tags` on.
tags: List of Tag objects to store on `datasource_id`.

Returns:
n_tags: Number of tags created.
"""

Notes: datasource_id must be a prepared datasource.

The given tags must be an Iterable of Tag objects.

The class Tag has the following structure:

Tag:  
tag: str
description: str
valid_from: datetime
properties: list[KeyValueType]
removed: bool
version: datetime
is_active_scd: bool
tag_links: str
created_by: str

Other methods

Upload files to file manager

additional_output(self, filename: str, file_content: str, tags: list[str], market_adapter_id: str = None):  
"""
Creates a file in file manager
Args:
filename (str): Name of the file
file_content (str): Content of the file
tags (list[str]): Optional tags which should be set on the file
market_adapter_id (int or None): Optional Market Adapter ID that will trigger after the upload

Raises:
ValueError: It raises this exception if the market_adapter_id is not a type that can be converted to int or None.
"""

Creates a new file in the file manager of this namespace. The content provided here must be a string and will be stored directly to a file that will show up in the File Management section. A list of strings can optionally be provided to add tags to the to-be-created file.

Load channel classifier

load_channel_classifier(self, channel_classifier_name)

Retrieve a Channel Classifier object based on a given ID. Mostly used for checking if a specific channel classifier exists in the platform.

Load datasources of a virtual datasource

load_datasources_by_virtual_datasource_filter(self, virtual_datasource: Datasource, reference_date: datetime = None):  
"""
Get the datasource ids and their time range in which they belong to this virtual datasource (the datasource that this rule is executed on, the self.datasource)_
Args:
virtual_datasource (VirtualDatasource):
reference_date (dt.Datetime): Only tag versions before reference_date (inclusive) will be considered

Returns:
[dict(str, list(DateRange))]: a dictionary containing datasource ids as keys and a list of DateRanges as values
"""

Retrieves the datasources that belong to a Virtual Datasource by providing the Virtual Datasource object. The Virtual Datasource object must be an object of type Datasource whose filter will be used to retrieve the list of datasources.

If the datasource provided is not a valid Virtual Datasource it will return an empty dictionary.

Load tags for multiple datasources

Retrieves tags for multiple datasources in a single batch call, without loading the full datasource objects. This makes it significantly more efficient than load_multi_datasources when only tag data is needed (e.g. reading tag property values for a large set of datasources).

The method can be called in both prepare_context and apply. Unlike load_timeseries, it does not require the datasource IDs to be declared in prepare_datasource_ids.

Available since version 26.04

Use self.services.tag.read_tags_multi_datasource(namespace_id, datasource_ids):

self.services.tag.read_tags_multi_datasource(
namespace_id: str,
datasource_ids: list[str]
) -> dict[str, list[Tag]]:
"""
Retrieves all tags for a list of datasource IDs.

Args:
namespace_id: The namespace ID (use self.namespace.id).
datasource_ids: List of datasource IDs to load tags for.

Returns:
Dict mapping datasource ID to list of Tags. IDs with no tags are absent.
"""

Example:

tags_by_datasource = self.services.tag.read_tags_multi_datasource(
self.namespace.id, datasource_ids
)
Available since version 26.06

A convenience wrapper self.load_tags_multi_datasource(datasource_ids) is available directly on the rule class, removing the need to pass namespace_id explicitly:

load_tags_multi_datasource(self, datasource_ids: list[str]) -> dict[str, list[Tag]]:
"""
Load only the tags for multiple datasources without fetching full datasource
objects. More efficient than load_multi_datasources when only tags are needed.

Args:
datasource_ids: List of datasource IDs to load tags for.

Returns:
Dict mapping datasource ID to list of Tags. IDs with no tags are absent.
"""

Example:

tags_by_datasource = self.load_tags_multi_datasource(datasource_ids)

Publish to custom pubsub topic

publish_to_custom_pubsub_topic(self, topic_name: str, data: bytes, attributes: dict = None):  
"""
Publishes a message to a custom pubsub topic called `custom- <topic_name>`_

Args:
topic_name (str): The name of the custom pubsub topic; prefixed with `custom-`
data (bytes): The message to be published
attributes (dict}): The attributes to be published with the message
"""

Publish any byte data from a rule to a Google PubSub Topic starting with custom- within the current environment.

topic_name: must be an existing Pubsub Topic living in the current environment (this has to be arranged with the Energyworx support team in advance)

data: anything you could publish to a PubSub Topic. Ideally less than 1MB. Payloads larger than ~9.5MB are automatically offloaded to Cloud Storage — the published message then carries a placeholder body plus pointer attributes (file_location, bucket_name, namespace_id). The consuming service must rehydrate from the pointer; EnergyWorx integration services do this automatically. If your custom topic is consumed elsewhere, keep payloads under 10MB, as such consumers will only receive the pointer.

attributes: An optional dict[str, str] of keys and values to be passed along as Pubsub attributes.

Send a Collective Trigger message

send_collective_trigger(
self,
correlation_id: str,
expected_number_of_messages: int,
deadline: int | None,
deadline_delta: int | None,
timeout_between_messages: int | None,
late_data_behaviour: LateDataBehaviourEnum | str,
datasource_id: str,
flow_config_id: int,
source_channel_id: Optional[str],
flow_properties: Optional[dict[str, object]] = None,
):
"""
Sends a message to the Collective Trigger service.

Args:
correlation_id (str): Identifier that messages are aggregated on
expected_number_of_messages (int): The amount of messages that are expected
deadline (int): if this specific timestamp expires after receiving the first message, trigger the flow
deadline_delta (int): if this amount of seconds has passed after receiving the first message, trigger the flow
timeout_between_messages (int): if no new message has arrived after this amount of seconds, trigger the flow
late_data_behaviour (LateDataBehaviourEnum | str): determines what happens to messages that come in after
the deadline or timeout have passed.
datasource_id (str): id of the datasource on which to trigger the flow
flow_config_id (int): id of the configuration of the flow to be triggered
source_channel_id (Optional[str]): source channel id for flow
flow_properties (Optional[Dict[str, object]]): flow properties for the flow to be triggered
"""

This method sends a message to the Collective Trigger (see Collective Trigger for details) to aggregate messages and to trigger a new flow.

The LateDataBehaviourEnum options are represented this way:

class LateDataBehaviourEnum(EWXEnum):
discard = "discard" # 0 Discards the message
trigger_entire_message = "trigger_entire_message" # 1 Triggers the flow with the entire collection of messages
start_new_state = "start_new_state" # 2 Starts again the waiting window to collect new messages
trigger_single_message = "trigger_single_message" # 3 Triggers the flow with this single message

Timeslices methods

The following class definition represents a TimesliceGroup:

class TimesliceGroup:
"""
Attributes:
name (str): the name of the TimesliceGroup
version(str): the version of the TimesliceGroup
description (str): the description of the TimesliceGroup
properties (list[KeyValueType]): the properties of the TimesliceGroup
timeslices (list[Timeslice]): the timeslices of the TimesliceGroup
deleted (bool): if the TimesliceGroup is deleted
id (int): the id of the TimesliceGroup
valid_from (datetime): the point in time where this timeslice group is valid from
"""

and the Timeslice is represented this way:

class Timeslice:
"""
Attributes:
period_type
priority
starts (list[datetime]): the list of the start times of the timeslices
ends (list[datetime]): the list of the end times of the timeslices
value
"""

The properties use this definition:

class KeyValue:
"""
Attributes:
key (str): the name of the property
value (str): the value of the property
"""

The following functions are available:

```python
load_timeslice_group(self, timeslice_group_id: str):
"""
Get a time slice group by its id
Args:
timeslice_group_id (str):

Returns:
TimesliceGroup:
"""
load_timeslice_groups_by_name(
self,
timeslice_group_name,
start_date=None,
end_date=None,
include_versions=True,
):
"""
Get a time slice group by its name and optionally get only the valid ones in the given period
Args:
timeslice_group_name (str): The name of the timeslice group to retrieve
start_date (datetime): Optional start date of the period
end_date (datetime): Optional end date time of the period
include_versions(bool): Option to include older versions in result
Returns:
list(TimesliceGroup):
"""
load_timeslice_groups_by_properties(
self,
timeslice_group_properties,
start_date=None,
end_date=None,
timeslice_group_name=None,
include_versions=False,
):
"""
Get a time slice group by its properties optionally get only the valid ones in the given period. You can also
filter on name with timeslice_group_name.
Args:
timeslice_group_properties (list[str]): [key=value, ...]
start_date (datetime): Optional start date of the period
end_date (datetime): Optional end date time of the period
timeslice_group_name(str): Optional timeslice group name
include_versions(bool): Option to include older versions in result

Returns:
Generator(TimesliceGroup):
""

Trigger a new flow

trigger_flow(
self,
flow_config_id: str | int,
start_datetime: datetime,
end_datetime: datetime,
datasource_id: str = None,
custom_flow_config_parameters: list[CustomFlowConfigurationParameters] = None,
flow_properties: dict = None,
countdown: int = None,
datasource_object: Datasource = None,
tags_included: bool = True,
):
"""
Triggers a flow with the given flow_config_id and the provided start and end datetimes.
Args:
datasource_id (str): datasource id to execute the flow on.
flow_config_id (int): the id of the flow config to trigger.
start_datetime (datetime): start index of the timeseries data to use as source data of the flow.
end_datetime (datetime): end index of the timeseries data to use as source data of the flow.
custom_flow_config_parameters: list[CustomFlowConfigurationParameters]: if it's a custom flow config provide
the parameters here.
flow_properties (dict): The flow properties to be passed to the flow.
countdown (int): the number of seconds to delay the start of the flow.
datasource_object (Datasource): datasource object to execute the flow on.
If provided the datasource_id parameter is not taken into account.
tags_included (bool): indicates if the context of tags is passed with the trigger
(only applied if the datasource_object is passed).
Raises:
TypeError: if passing wrong types to the function.
FlowRuleException: in the following cases:
- if the datasource is not found (when providing datasource_id and not datasource_object)
- if both datasource_id and datasource_object are not provided
"""

This method allows the system to trigger a new flow with different configurations.

Execute a trigger schedule

execute_trigger_schedule(
self,
_id: int,
reference_date: datetime | None = None,
) -> None:
"""
Schedules a trigger schedule for execution by publishing a message to the
trigger-execute-schedule Pub/Sub topic. The ewx-intelligence service receives
the message and executes the trigger schedule with the given id.

Args:
_id (int): The id of the trigger schedule to execute.
reference_date (datetime | None): Optional reference date passed to the
trigger execution. Defaults to the current UTC time if not provided.
"""

Executes a trigger schedule by id from within a flow rule.

Data availability

This method publishes a Pub/Sub message that is processed asynchronously and independently of the current flow. The timeseries data written by store_timeseries in the same rule is committed to BigQuery after the flow completes, which happens after the Pub/Sub message is already on the queue.

If the triggered schedule reads data produced by the current flow (e.g. via a query), that data may not yet be available in BigQuery at the time the trigger executes, leading to missing or incomplete results.

Attributes

market_adapter_split_count

This attribute provides the number of messages that the original file was split into upon use of the market adapter split function. (See Market Adapter for more info)

The main use for this attribute is to be able to correctly set the number of expected messages of a Collective Trigger. For example, if a file ingestion updates two datasources and the final flow needs these two datasources to be processed before, it is possible to trigger a collective trigger. Here's an example of using this attribute in combination with a collective trigger:

self.send_collective_trigger(  
correlation_id,
self.market_adapter_split_count,
deadline,
deadline_delta,
timeout_between_messages,
late_data_behaviour,
datasource_id,
flow_config_id,
source_channel_id,
flow_properties
)

If the flow is not started from an ingestion, the value of market_adapter_split_count will be set to None.