Unit Testing Rules: A Practical Guide
This guide shows how to unit test your custom rules locally using the ewx_public.testing module. With a single pip install ewx-public, you get everything you need — no mocks required.
The ewx_public.testing module is available from platform release 26.05 onwards (package version ewx-public==26.5.0). Pin to that version or later to use the testing utilities described in this guide.
For a complete working project with rules and tests, see the examples/my-ewx-rules/ directory in the ewx-public package.
1. Setup
Install ewx-public following the instructions in Local Rule Development, then add pytest to your dev dependencies:
[tool.poetry.group.dev.dependencies]
pytest = ">=7.0"
pytest-cov = ">=4.0"
2. Your first test
Suppose you have a production rule that converts kWh to MWh:
# rules/unit_conversion_rule.py — uploaded to the platform
from ewx_public.flow_rule import FlowRule
from ewx_public import RuleResult
class UnitConversionRule(FlowRule):
def prepare_context(self, **kwargs):
return {}
def apply(self, **kwargs):
source = self.flow_properties.get("source_channel", "E_CONS_kWh")
destination = self.flow_properties.get("destination_channel", "E_CONS_MWh")
converted = self.dataframe[[source]] / 1000.0
converted.columns = [destination]
self.store_timeseries(converted, channel_id=destination)
return RuleResult()
Test it with make_testable():
# tests/test_unit_conversion.py
from ewx_public.testing import (
assert_timeseries_stored,
make_datasource,
make_testable,
make_timeseries_df,
)
from rules.unit_conversion_rule import UnitConversionRule
def test_converts_kwh_to_mwh():
rule = make_testable(
UnitConversionRule,
datasource=make_datasource(channels=["E_CONS_kWh", "E_CONS_MWh"]),
dataframe=make_timeseries_df(columns=["E_CONS_kWh"], fill_value=1000.0),
flow_properties={
"source_channel": "E_CONS_kWh",
"destination_channel": "E_CONS_MWh",
},
)
rule.run()
assert_timeseries_stored(rule, channel_id="E_CONS_MWh", count=1)
stored = rule.backend.stored_timeseries[0]
assert (stored["timeseries"]["E_CONS_MWh"] == 1.0).all()
Key points:
make_testable(RuleClass, ...)wraps anyFlowRulesubclass for local testingrule.run()executes the full lifecycle:prepare_context()→apply()assert_timeseries_stored()checks what the rule stored — no mock assertions neededrule.backendholds all side effects for direct inspection
3. Arrange — Act — Assert
Every test follows the same structure:
- Arrange — create the rule with test data using
make_testable()and factory functions - Act — call
rule.run()(full lifecycle) orrule.apply()(just the apply method) - Assert — check what happened using assertion helpers or
rule.backend
run() vs apply()
rule.run()— full lifecycle: callsprepare_context(), loads prepared datasources, validatesflow_properties, then callsapply(), and validates theRuleResult. Use this for most tests.rule.apply()— calls onlyapply(). Use this when you need to set uprule.contextmanually or test a specific code path:
def test_with_manual_context():
rule = make_testable(MyRule, datasource=ds, dataframe=df)
rule.context = {"custom_key": "value"}
rule.prepared_datasources = {"other-ds": other_ds}
result = rule.apply()
4. Factories — creating test data
Factory functions create test objects with sensible defaults:
from ewx_public.testing import (
make_datasource,
make_channel,
make_tag,
make_tag_property,
make_timeseries_df,
make_namespace,
make_flow_configuration,
)
# Datasource with channels as strings (auto-converted to Channel objects)
ds = make_datasource(
id="meter-001",
timezone="Europe/Amsterdam",
channels=["E_CONS", "E_PROD"],
)
# Timeseries DataFrame with DatetimeIndex (the format FlowRule.dataframe uses)
df = make_timeseries_df(
columns=["E_CONS", "E_PROD"],
start="2024-06-01",
periods=48,
freq="30min",
timezone="Europe/Amsterdam",
fill_value=1.5,
)
# Namespace (for rules that access self.namespace)
ns = make_namespace(id="my-company.com", name="My Company")
# Tag with properties
tag = make_tag(
tag="METER_TYPE",
properties=[
make_tag_property(key="type", value="smart"),
make_tag_property(key="brand", value="Landis+Gyr"),
],
)
Deterministic IDs
Factories auto-increment IDs (ds-1, ds-2, ...). For deterministic tests, reset them in a fixture:
# tests/conftest.py
import pytest
from ewx_public.testing import reset_factory_counters
@pytest.fixture(autouse=True)
def _reset_ids():
reset_factory_counters()
5. Assertion helpers
Assertion helpers check rule side effects with descriptive error messages:
from ewx_public.testing import (
assert_timeseries_stored,
assert_annotations_stored,
assert_tags_added,
assert_trigger_sent,
assert_email_sent,
assert_task_created,
assert_counter_value,
assert_logged,
)
# Timeseries
assert_timeseries_stored(rule, channel_id="E_CONS", count=1)
assert_timeseries_stored(rule, count=0) # assert nothing was stored
# Annotations
assert_annotations_stored(rule, channel_id="E_CONS", count=1)
# Tags
assert_tags_added(rule, datasource_id="meter-001", tag_name="VALIDATED")
# Triggers
assert_trigger_sent(rule, trigger_type="flow", count=1)
# Emails
assert_email_sent(rule, to_email="ops@example.com", subject_contains="Alert")
# Tasks
assert_task_created(rule, title_contains="Review gaps")
# Counters
assert_counter_value(rule, "processed_count", 42)
# Log messages
assert_logged(rule, level="warning", message_contains="Skipping")
assert_logged(rule, level="info", count=3)
The count parameter: None (default) = at least one, 0 = none, integer = exact count.
Direct backend inspection
For content-level assertions beyond what the helpers offer, inspect rule.backend directly:
stored = rule.backend.stored_timeseries[0]
assert stored["channel_id"] == "E_CONS"
assert stored["datasource_id"] == "meter-001"
assert (stored["timeseries"]["E_CONS"] > 0).all()
trigger = rule.backend.triggers[0]
assert trigger["type"] == "flow"
assert trigger["flow_config_id"] == 42
6. Seeding data for lookups
Rules often load data during execution (datasources, timeseries, channel classifiers). Seed the backend before running the rule:
rule = make_testable(MyRule, datasource=make_datasource(id="meter-001"))
# Seed a datasource for load_datasource() or prepare_datasource_ids
other_ds = make_datasource(id="weather-station")
rule.backend.add_datasource(other_ds)
# Seed timeseries for timeseries_service.get_latest()
historical_df = make_timeseries_df(
columns=["E_CONS"],
start="2023-12-01",
periods=720,
)
rule.backend.add_timeseries("meter-001", "E_CONS", historical_df)
# Seed a channel classifier for load_channel_classifier()
from ewx_public.testing import make_channel_classifier
rule.backend.add_channel_classifier(make_channel_classifier(name="E_CONS"))
rule.run()
7. Testing log messages
The default logger in make_testable() is a CapturingLogger that records all messages:
rule = make_testable(MyRule, ...)
rule.run()
# Using assertion helpers
assert_logged(rule, level="warning", message_contains="Skipping")
# Direct inspection
assert "Processing complete" in rule.rule_logger.infos
assert len(rule.rule_logger.warnings) == 1
assert rule.rule_logger.errors == []
8. Testing TransformRules
TransformRules are simpler — instantiate them directly (no make_testable() needed):
from rules.normalize_transform import NormalizeTransform
import pandas as pd
def test_rounds_to_three_decimals():
index = pd.date_range("2024-01-01", periods=3, freq="h", tz="UTC")
df = pd.DataFrame({"E_CONS": [1.23456, 2.34567, 3.45678]}, index=index)
rule = NormalizeTransform(dataframe=df, context={"decimals": 3})
result = rule.apply()
assert result.result["E_CONS"].tolist() == [1.235, 2.346, 3.457]
9. Testing error paths
Exceptions
Test that your rule raises FlowCancelException or FlowStopException correctly:
import pytest
from ewx_public import FlowCancelException
def test_cancels_on_invalid_data():
rule = make_testable(MyRule, dataframe=pd.DataFrame()) # empty
with pytest.raises(FlowCancelException):
rule.run()
Simulating service failures
Use raising() to make a method raise when called:
from ewx_public.testing import raising
rule = make_testable(MyRule, ...)
rule.publish_to_custom_pubsub_topic = raising(ValueError, "Topic not found")
with pytest.raises(ValueError, match="Topic not found"):
rule.run()
10. Running tests
# Run all tests
pytest
# Run with verbose output
pytest -v
# Run a single test file
pytest tests/test_my_rule.py
# Run with coverage
pytest --cov=rules --cov-report=term-missing
11. Migrating from MagicMock
If you have existing tests using MagicMock, here's how to migrate:
| Old pattern (MagicMock) | New pattern (ewx_public.testing) |
|---|---|
rule = MyRule(datasource=MagicMock(), ...) | rule = make_testable(MyRule, datasource=make_datasource(), ...) |
rule.rule_logger = MagicMock() | Remove — CapturingLogger is the default |
rule.store_timeseries = MagicMock() | Remove — use rule.backend.stored_timeseries |
rule.store_timeseries.assert_called_once() | assert_timeseries_stored(rule, count=1) |
rule.load_datasource = MagicMock(return_value=ds) | rule.backend.add_datasource(ds) |
rule.services = MagicMock() | Remove — NullObject is the default |
namespace=MagicMock(id="ns") | namespace=make_namespace(id="ns") |
MagicMock(side_effect=ValueError(...)) | raising(ValueError, "message") |
The goal is zero unittest.mock imports. The testing module provides replacements for every common mock pattern.
Summary
| What | Import |
|---|---|
| Wrap a rule for testing | from ewx_public.testing import make_testable |
| Create test data | from ewx_public.testing import make_datasource, make_timeseries_df |
| Assert side effects | from ewx_public.testing import assert_timeseries_stored, assert_tags_added |
| Assert log messages | from ewx_public.testing import assert_logged |
| Simulate failures | from ewx_public.testing import raising |
| Reset factory IDs | from ewx_public.testing import reset_factory_counters |
For the complete API reference, see the ewx-public Package page.