Timeseries Storage
There are two methods aimed to make it easy to store both timeseries and annotations on different channels, by allowing the destination channel classifiers to be set dynamically (determined during execution of the flow).
Some of the characteristics of these functions are:
- Storing timeseries or annotations can be done independently of each other.
- Both methods can take a Series or a DataFrame.
- Timeseries and annotations can have any timezone.
- Channels are only created on the datasource if actual data is required to be stored on them.
- A DataFrame of timeseries will store all columns independently on the channels which are the column names. Any channel classifier that does not exist on the namespace will raise an error and abort the flow.
- A DataFrame of annotations will store each column independently as annotations for the specific channel_id channel.
- Annotations can have any name you want (series name or column names are used). The only requirement is that this name is unique. Within
self.dataframe, they will appear as<channel_name>:<annotation_name>:<rule_name>:<sequence_index>. - Annotations can be created on non-existing datapoints. This also means that gap annotations will now actually be stored on the gap itself instead of one annotation after the gap.
- Multiple annotations can be created on the same datapoint within the same rule.
- Multiple versions of annotations can be stored on the same timeseries version.
- Timeseries and annotations can be stored on any datasource without requiring it to be prepared.
- Timeseries and annotations stored with
store_XXXcan optionally also be added to the flow data (whichself.dataframeis created from), which is turned on by default. This only works for data stored on this datasource. Data stored on other (prepared) datasources will never be added to the flow data. - Annotations stored with
store_annotationsare recognized byself.data_filter.
Storing timeseries
Below you can find the signature of the function that allows to store timeseries.
def store_timeseries(
self,
timeseries: pd.Series | pd.DataFrame,
channel_id: str | None = None,
*,
datasource_id: str | None = None,
store_in_flow_data: bool = True,
) -> None:
"""
Persists the given pandas Series or DataFrame `timeseries`.
If `timeseries` is a pandas Series and `channel_id` is given, use given `channel_id` as the channel to store the
Series on.
The column names of the given `timeseries` are used as the channel classifiers to store the timeseries under. If
`timeseries` is a pandas Series without a name and `channel_id` is *None*, an error is raised.
Args:
timeseries: A pandas Series or DataFrame to store. By default, the name of the Series or DataFrame columns
is used as the channel classifiers.
channel_id: Optionally a specific channel classifier to store `timeseries` under. Only used if `timeseries`
is a pandas Series.
datasource_id: Optionally the ID of the datasource to store `timeseries` under. By default, this datasource
is used.
store_in_flow_data: Whether to also store these timeseries in the used flow data. Timeseries stored in the
flow data will be available in `AbstractRule.dataframe` in following rules.
Raises:
TypeError:
- `timeseries` is not a pandas Series or DataFrame.
- `timeseries` has no DatetimeIndex as the index.
- `timeseries` has a DatetimeIndex that is timezone-naive.
- `timeseries` contains values that are not of type int or float.
ValueError:
- `timeseries` is a pandas Series without a name and `channel_id` is *None*.
- `timeseries` has a DatetimeIndex with duplicate entries.
- `timeseries` has a column that is not an existing channel classifier.
- `datasource_id` is the ID of an unprepared datasource.
"""
Writing Datapoint Attributes with no timeseries value attached
Please note that the Datapoint Attributes and the Datapoint Value they're attached to are stored atomically, that means any updates to one of the two will update both.
Beware that assigning no value to a Datapoint Attributes or Value would update the existing data to a ‘null' which may not be your intention in the business rule
Storing Annotations
Below you can find the signature of the function that allows to store annotations.
def store_annotations(
self,
annotations: pd.Series | pd.DataFrame,
channel_id: str,
annotation_name: str | None = None,
*,
datasource_id: str | None = None,
store_in_flow_data: bool = True,
) -> None:
"""
Persists the given pandas Series or DataFrame `annotations` under the provided `channel_id`.
The column names of the given `annotations` are used as the names of the annotations to store on the given
`channel_id`. If `annotations` is a pandas Series without a name and `annotation_name` is *None*, an error is
raised.
Args:
annotations: A pandas Series or DataFrame of annotations to store. The column names are used as the names of
the annotations.
channel_id: The channel classifier to store `annotations` under.
annotation_name: Optionally a specific annotation name to store `annotations` with. Only used if
`annotations` is a pandas Series.
datasource_id: Optionally the ID of the datasource to store `annotations` under. By default, this datasource
is used.
store_in_flow_data: Whether to also store these annotations in the used flow data. Annotations stored in the
flow data will be available in `AbstractRule.dataframe` in following rules and are recognized by
`AbstractRule.data_filter`.
Raises:
TypeError:
- `annotations` is not a pandas Series or DataFrame.
- `annotations` has no DatetimeIndex as the index.
- `annotations` has a DatetimeIndex that is timezone-naive.
- `annotations` contains values that are not of type dict.
ValueError:
- `annotations` is a pandas Series without a name and `annotation_name` is *None*.
- `annotations` has a DatetimeIndex with duplicate entries.
- `channel_id` is not an existing channel classifier.
- `datasource_id` is the ID of an unprepared datasource.
"""
Example call:
```python
timestamps = pd.date_range(start=start_datetime,
end=end_datetime,
freq='15min',
tz='utc')
data = [{'annotation_title': "annotation_detail"}] * len(timestamps)
annotations_series = pd.Series(data=data,
index=timestamps,
name='annotation_name')
self.store_annotations(annotations_series, "channel_name")
Examples
The following ZIP file contains some flow and rule configs that show several of the new functionalities that the new store methods provide. Use the ewx-cli tool to upload these configs to a namespace.