Skip to main content

Extending the plugin

👥 APIv1 and APIv2​

Some steps need to have logic for both APIv1 and APIv2. The user can then choose whether to run preferred API version.

Guidelines:

  • Don't interleave APIv1 and APIv2 logic.
  • Don't map APIv2 responses to APIv1 responses.

Automated fixture loading​

The plugin is equipped with a mechanism that automatically loads any extensions made to it. After adding new fixtures or steps these will be automatically added and no other action needs to be taken.

Adding a client​

If an entity does not have a client yet, add it to its directory in a client.py file. The client will need to be a class which extends pytest_bdd_ewx.client_base.BaseClient. Then make it available in a fixture inside fixtures.py. Use the following annotation to make sure it's automatically made available to an entire test session:

@fixture(scope="session", autouse=True)
def datasource_client(clients: Dict[str, BaseClient]) -> DataSourceClient:
client = DataSourceClient()
clients[client._OBJECT_TYPE] = client
return client

Adding client capabilities​

Add any new client capabilities to the client class in the clients.py file of the corresponding entity. Now that we have APIv2, it may be necessary to add a specific client for it, for example:

client.py

class TimeSeriesClientV2(client_base.BaseClientV2):
"""Client for interacting with the timseries API endpoints."""

_LIST_FLOW_URL = "datasources"
_LIST_URL = "datasources"
_OBJECT_TYPE = "timeseries"
_PREFIX = "datasource"

fixture.py

@fixture(scope="session", autouse=True)
def timeseries_apiv2_client(clients: dict[str, BaseClientV2]) -> Optional[TimeSeriesClientV2]:
"""Creates a fixture for requesting a timeseries V2 client.

.. note::
Use in your own fixtures by adding a`timeseries_client` parameter.

Args:
clients (dict[str, BaseClientV2]): The clients dictionary.

Returns:
TimeSeriesClient: The timeseries client.

"""

client = TimeSeriesClientV2()
clients[client._OBJECT_TYPE + "V2"] = client
return client

Adding fixtures​

Add any new fixtures to the fixtures.py file of the corresponding entity. Create the fixtures.py file if needed. Make sure to mark them as a fixture by adding PyTest's @fixture annotation. Name the fixture function the way you want other fixtures to request them as a parameter or use the @fixture decorator's name parameter to be able to use a different name for the fixture request parameter and the fixture function.

Adding steps​

Add any new steps to the steps.py file of the corresponding entity. Create the steps.py if needed. Make sure to mark them with the appropriate PyTest BDD annotations @given, @when and/or @then. Since APIv2 was introduced, it is necessary to have a Base implementation for the steps. Then, it is necessary to follow the following structure:

steps.py

from pytest_bdd import then
from pytest_bdd_ewx.api_version_enum import ApiVersionEnum
from pytest_bdd_ewx.client_base import BaseClientV1
from pytest_bdd_ewx.time_series.steps_implementation import TimeseriesBaseImplementation, TimeseriesV2Implementation
from pytest_bdd_ewx.utils import get_step_implementation_class

TIMESERIES_STEP_IMPLEMENTATION_CLASS_MAPPING = {
ApiVersionEnum.API_1_5: TimeseriesBaseImplementation,
ApiVersionEnum.API_2: TimeseriesV2Implementation
}


@then('my step')
def my_step(context: dict[str, ...], clients: dict[str, BaseClientV1]):
"""Just a sample.

Step definition:
my step

Args:
context (dict[str, ...]): The tests context dictionary.
clients (dict[str, BaseClientV1]): Entities clients dictionary.

Raises:
AssertionError: If the structure of the API response is not as expected.
AssertionError: If the data point is not present in the timeseries set.

"""

step_implementation_class = get_step_implementation_class(context, TIMESERIES_STEP_IMPLEMENTATION_CLASS_MAPPING)
step_implementation_class.my_step(clients)

steps_implementation.py

class TimeseriesBaseImplementation:
"""Base class (APIv1) that implements Timeseries step"""

@staticmethod
def my_step(clients):
timeseries_client = clients["timeseries"]
return True


class TimeseriesV2Implementation(TimeseriesBaseImplementation):
"""Class that implements Timeseries step for APIv2"""

@staticmethod
def my_step(clients):
timeseries_client = clients["timeseriesV2"]
return True

If a step is not defined in a specific implementation for APIv2, it must be implemented in the Base class. Note: best practice is making the test steps standalone.

Adding an entity​

If a completely new entity is to be added, create a new directory for it at the level of the other entity directories. Then follow the process described above for adding a client, fixtures and/or steps.

Skipping scenarios​

Sometimes during the test development you might want some tests to automatically be skipped if a certain condition is met. Then you can use the following methods:

import pytest
from pytest_bdd import scenario


@pytest.mark.skip(reason="To be implemented")
@scenario("features/my_example.feature", "Scenario name")
def test_run_scenario():
pass

These scenarios can then also be skipped using the skipif decorator:

import os
import pytest
from pytest_bdd import scenario
from unittest import skipIf


@pytest.mark.skipif(
condition=[os.environ["AUTOMATED_TESTS_NAMESPACE"] == "enrx_org_012"],
reason="This step can only be executed on the 'enrx_org_012' namespace!"
)
@scenario("features/test_flow.feature", "Scenario name")
def test_run_scenario():
pass

Utilizing DataTable​

Starting from version 8, pytest-bdd supports DataTable type in step parameters. The step implementation function should refer to it via reserved word datatable. For more information on Gherkin data tables, refer to this guide.

Usage​

This parameter is particularly useful when writing scenarios like:

Scenario: Validate DPA in APIv1.5
Then I expect the following datapoint attribute-value pairs in datasource with id "DS_WITH_DPA_FROM_FLOW_${test_timestamp}" in channel "DPA_CHANNEL_A_FAMILY_TS_${test_timestamp}" on timestamp "1704067200000":
| TEST_FLOAT_DPA | None |
| TEST_ENUM_DPA | zero |
| TEST_TIMESTAMP_DPA | 1735689600000 |
| TEST_BOOLEAN_DPA | True |
| TEST_LONG_DPA | 12345 |

By passing this datatable to the step, you can make a single API call and validate multiple data points efficiently. This approach reduces the amount of code needed, as it consolidates what would otherwise be multiple steps into a single, more manageable step.

The step definition in this case looks like this:

@then(
parsers.parse(
'I expect the following datapoint attribute-value pairs in datasource with id "{datasource_id}" '
'in channel "{channel_classifier}" on timestamp "{timestamp}":'
)
)
def check_timeseries_datapoint_attribute(
datasource_id: str,
channel_classifier: str,
timestamp: int,
datatable: DataTable,
...
):

Additionally, we have added support for more familiar step definitions that provide values one by one, such as:

Then I expect the datapoint attribute-value pairs "TEST_FLOAT_DPA" and "12.34" in datasource with id "DS_WITH_DPA_FROM_FLOW_${test_timestamp}" in channel "DPA_CHANNEL_A_FAMILY_TS_${test_timestamp}" on timestamp "1704156300000"

This also uses the same underlying implementation, ensuring consistency while keeping complexity low.

Benefits​

  • Efficiency: Make a single API call to verify multiple data points.
  • Less Code: Reduces the need for multiple, separate step definitions.
  • Flexibility: Supports both datatable and individual key-value pairs in step definitions.