Skip to content

Core Types

This page documents the main classes and types in metEAUdata.

Data Container Classes

Dataset

Bases: BaseModel, DisplayableBase

Collection of signals representing a complete monitoring dataset.

A Dataset groups multiple signals that are collected together as part of a monitoring project or analysis workflow. It provides project-level metadata and enables coordinated processing operations across multiple parameters.

Datasets support cross-signal processing operations and maintain consistent naming conventions across all contained signals. They provide the highest level of organization for environmental monitoring data with complete metadata preservation and serialization capabilities.

serialize_datetime(dt: datetime.datetime, _info) -> str

Serialize datetime to ISO 8601 string format.

replace_signal_base_name(signal_name: str, custom_name: str) -> str

Replace the base name of a signal while preserving the hash number.

Parameters:

Name Type Description Default
signal_name str

The signal name (e.g., "AVERAGE" or "AVERAGE#2")

required
custom_name str

The custom base name to use (e.g., "site_average")

required

Returns:

Type Description
str

The signal name with replaced base (e.g., "site_average" or "site_average#2")

Examples:

>>> dataset.replace_signal_base_name("AVERAGE", "combined")
"combined"
>>> dataset.replace_signal_base_name("AVERAGE#2", "combined")
"combined#2"

set_backend(backend: StorageBackend, auto_save: bool = False) -> Dataset

Configure storage backend for this dataset.

Parameters:

Name Type Description Default
backend StorageBackend

Storage backend instance to use

required
auto_save bool

If True, automatically save after process() operations

False

Returns:

Type Description
Dataset

Self for method chaining

save_all() -> None

Save all time series data to the configured backend.

Raises:

Type Description
ValueError

If no backend is configured

load_all() -> None

Load all time series data from the configured backend.

Raises:

Type Description
ValueError

If no backend is configured

use_disk_storage(path: Union[str, Path], auto_save: bool = True) -> Dataset

Configure this dataset to use disk-based storage.

This is a convenience method that configures pandas-disk backend storage for all signals and time series in this dataset. Data will be stored as Parquet files on disk. This replaces the need to manually create StorageConfig and call set_backend().

Parameters:

Name Type Description Default
path Union[str, Path]

Directory path where data will be stored

required
auto_save bool

If True, automatically save after modifications (default: True)

True

Returns:

Type Description
Dataset

Self for method chaining

Example

dataset = Dataset(name="my_dataset", ...) dataset.use_disk_storage("./my_data")

Now all processing will automatically save to disk

use_sql_storage(connection_string: str, auto_save: bool = True) -> Dataset

Configure this dataset to use SQL database storage.

This is a convenience method that configures SQL backend storage for all signals and time series in this dataset. Supports SQLite, PostgreSQL, MySQL, and other SQLAlchemy-compatible databases. This replaces the need to manually create StorageConfig and call set_backend().

Parameters:

Name Type Description Default
connection_string str

Database connection string (e.g., 'sqlite:///my_data.db')

required
auto_save bool

If True, automatically save after modifications (default: True)

True

Returns:

Type Description
Dataset

Self for method chaining

Example

dataset = Dataset(name="my_dataset", ...) dataset.use_sql_storage("sqlite:///my_data.db")

PostgreSQL example:

dataset.use_sql_storage("postgresql://user:password@localhost/database")

use_memory_storage() -> Dataset

Configure this dataset to use in-memory storage.

This is a convenience method that explicitly configures in-memory storage (the default). Useful for switching back from disk/SQL storage.

Returns:

Type Description
Dataset

Self for method chaining

Example

dataset = Dataset(name="my_dataset", ...) dataset.use_disk_storage("./data") # Use disk

... later ...

dataset.use_memory_storage() # Switch back to memory

save(directory: str, separator: str = ',', output_index_name: Optional[Union[str, tuple]] = None, output_value_names: Optional[Union[str, tuple, dict]] = None) -> Dataset

Save dataset data and metadata to disk as a zip archive.

Parameters:

Name Type Description Default
directory str

Directory path where the zip file will be created

required
separator str

CSV column separator character

','
output_index_name Optional[Union[str, tuple]]

Custom name for index column (applies to all signals) - String: Single-line header - Tuple: Multi-line header

None
output_value_names Optional[Union[str, tuple, dict]]

Custom names for value columns - String/Tuple: Applies to all signals - Dict[signal_name, str/tuple]: Per-signal customization - Dict[signal_name, Dict[ts_name, str/tuple]]: Per-time-series customization - "auto": Auto-populate from each signal's units attribute

None

Returns:

Name Type Description
self Dataset

The Dataset object (for method chaining)

Examples:

>>> # Apply to all signals
>>> dataset.save("output/", output_index_name=("Time", "days"))
>>> # Per-signal customization
>>> dataset.save("output/",
...     output_value_names={
...         "temp_sensor": ("Temperature", "°C"),
...         "pH_sensor": ("pH", "unitless")
...     })
>>> # Auto-populate from units
>>> dataset.save("output/", output_value_names="auto")

process(input_time_series_names: list[str], transform_function: DatasetTransformFunctionProtocol, *args: Any, output_signal_names: Optional[list[str]] = None, output_ts_names: Optional[list[str]] = None, overwrite: bool = False, **kwargs: Any) -> Dataset

Processes the dataset data using a transformation function.

Parameters:

Name Type Description Default
input_time_series_names list[str]

List of names of the input time series to be processed.

required
transform_function DatasetTransformFunctionProtocol

The transformation function to be applied.

required
*args Any

Additional positional arguments to be passed to the transformation function.

()
output_signal_names Optional[list[str]]

Optional list of custom names for output signals. Must have the same length as the number of output signals. Example: ["site_average"] will create a signal named "site_average#1" instead of the default naming.

None
output_ts_names Optional[list[str]]

Optional list of custom names for time series within output signals. These replace the operation suffix in time series names. Example: ["combined"] will create time series like "site_average#1_combined#1".

None
overwrite bool

If True, overwrites the latest version instead of incrementing. Default is False (increment hash number). For example, if versions #1, #2, #3 exist, overwrite=True will replace #3, while overwrite=False will create #4.

False
**kwargs Any

Additional keyword arguments to be passed to the transformation function.

{}

Returns:

Name Type Description
Dataset Dataset

The updated Dataset object after processing. The transformation will produce new Signals with the processed time series data.

plot(signal_names: List[str], ts_names: List[str], title: Optional[str] = None, y_axis: Optional[str] = None, x_axis: Optional[str] = None, start: Optional[Union[str, datetime.datetime, pd.Timestamp]] = None, end: Optional[Union[str, datetime.datetime, pd.Timestamp]] = None) -> go.Figure

Create a multi-subplot visualization comparing time series across signals.

Each signal gets its own subplot with shared x-axis (time). Only time series that exist in each signal are plotted. Individual y-axis labels include units.

Parameters:

Name Type Description Default
signal_names List[str]

List of signal names to plot. Must exist in this dataset.

required
ts_names List[str]

List of time series names to plot from each signal.

required
title Optional[str]

Plot title. If None, uses "Time series plots of dataset {dataset_name}".

None
y_axis Optional[str]

Base Y-axis label. If None, uses "Values".

None
x_axis Optional[str]

X-axis label. If None, uses "Time".

None
start Optional[Union[str, datetime, Timestamp]]

Start date for filtering data (datetime string or object).

None
end Optional[Union[str, datetime, Timestamp]]

End date for filtering data (datetime string or object).

None

Returns:

Type Description
Figure

Plotly Figure object with subplots for each signal.

Signal

Bases: BaseModel, DisplayableBase

Collection of related time series representing a measured parameter.

A Signal groups multiple time series that represent the same physical parameter (e.g., temperature) at different processing stages or from different processing paths. This enables comparison between raw and processed data, evaluation of different processing methods, and maintenance of data lineage.

Signals handle the naming conventions for time series, ensuring consistent identification across processing workflows. They support processing operations that can take multiple input time series and produce new processed versions with complete metadata preservation.

serialize_datetime(dt: datetime.datetime, _info) -> str

Serialize datetime to ISO 8601 string format.

extract_ts_base_and_number(ts_full_name: str) -> tuple[str, str, int] staticmethod

Extract signal name, base ts name, and number from a full time series name.

Parameters:

Name Type Description Default
ts_full_name str

Full time series name in format 'signalname_tsbase#number' e.g., 'temperature#1_raw#2' or 'test_signal#1_processed#1'

required

Returns:

Type Description
str

Tuple of (signal_name, ts_base, number)

str

e.g., ('temperature#1', 'raw', 2) or ('test_signal#1', 'processed', 1)

Examples:

>>> Signal.extract_ts_base_and_number('temperature#1_raw#2')
('temperature#1', 'raw', 2)
>>> Signal.extract_ts_base_and_number('test_signal#1_processed#1')
('test_signal#1', 'processed', 1)

make_ts_name(signal_name: str, ts_base: str, number: int) -> str staticmethod

Construct a full time series name from components.

Parameters:

Name Type Description Default
signal_name str

Signal name (e.g., 'temperature#1')

required
ts_base str

Base time series name (e.g., 'raw', 'processed')

required
number int

Version number (e.g., 1, 2)

required

Returns:

Type Description
str

Full time series name in format 'signalname_tsbase#number'

Examples:

>>> Signal.make_ts_name('temperature#1', 'raw', 2)
'temperature#1_raw#2'
>>> Signal.make_ts_name('test_signal#1', 'processed', 1)
'test_signal#1_processed#1'

replace_operation_suffix(ts_name: str, custom_suffix: str) -> str

Replace the operation suffix in a time series name with a custom suffix.

Preserves the signal name and hash number (if present), only replacing the middle operation part.

Parameters:

Name Type Description Default
ts_name str

The time series name (e.g., "A#1_RESAMPLED" or "A#1_RESAMPLED#1")

required
custom_suffix str

The custom suffix to use (e.g., "daily_avg")

required

Returns:

Type Description
str

The time series name with replaced suffix (e.g., "A#1_daily_avg" or "A#1_daily_avg#1")

Examples:

>>> signal.replace_operation_suffix("A#1_RESAMPLED", "hourly")
"A#1_hourly"
>>> signal.replace_operation_suffix("A#1_RESAMPLED#2", "hourly")
"A#1_hourly#2"

save_all(dataset_name: Optional[str] = None) -> None

Save all time series in this signal to the backend.

Parameters:

Name Type Description Default
dataset_name Optional[str]

Optional dataset name for namespacing keys. If not provided, uses _parent_dataset_name or "default".

None

process(input_time_series_names: list[str], transform_function: SignalTransformFunctionProtocol, *args: Any, output_names: Optional[list[str]] = None, overwrite: bool = False, **kwargs: Any) -> Signal

Processes the signal data using a transformation function.

Parameters:

Name Type Description Default
input_time_series_names list[str]

List of names of the input time series to be processed.

required
transform_function SignalTransformFunctionProtocol

The transformation function to be applied.

required
*args Any

Additional positional arguments to be passed to the transformation function.

()
output_names Optional[list[str]]

Optional list of custom names to replace the operation suffix. Must have the same length as the number of outputs. Example: ["smoothed", "filtered"] will produce names like "A#1_smoothed#1" instead of "A#1_SMOOTH#1".

None
overwrite bool

If True, overwrites the latest version instead of incrementing. Default is False (increment hash number). For example, if versions #1, #2, #3 exist, overwrite=True will replace #3, while overwrite=False will create #4.

False
**kwargs Any

Additional keyword arguments to be passed to the transformation function.

{}

Returns:

Name Type Description
Signal Signal

The updated Signal object after processing.

save(destination: str, zip: bool = True, separator: str = ',', output_index_name: Optional[Union[str, tuple]] = None, output_value_names: Optional[Union[str, tuple, list, dict]] = None)

Save signal data and metadata to disk.

Parameters:

Name Type Description Default
destination str

Directory path where files will be saved

required
zip bool

If True, creates a .zip archive; if False, saves as directory

True
separator str

CSV column separator character (e.g., ',', ';', '\t')

','
output_index_name Optional[Union[str, tuple]]

Custom name for index column in CSV files - String: Single-line header (e.g., "Time") - Tuple: Multi-line header (e.g., ("Time", "hours")) - None: Uses pandas default (index name from Series)

None
output_value_names Optional[Union[str, tuple, list, dict]]

Custom names for value columns in CSV files - String/Tuple: Applies to all time series - Dict: Map time_series_name -> custom_name for per-series control - List: Must match number of time series - "auto": Auto-create tuple (series_name, self.units) if units exist

None

Examples:

>>> # Single-line headers
>>> signal.save("output/", output_index_name="timestamp")
>>> # Multi-line headers
>>> signal.save("output/",
...     output_index_name=("Time", "hours"),
...     output_value_names=("Temperature", "°C"))
>>> # Auto-populate from units
>>> signal.save("output/", output_value_names="auto")
>>> # Per-series customization
>>> signal.save("output/",
...     output_value_names={
...         "A#1_RAW#1": ("Raw Temp", "°C"),
...         "A#1_SMOOTH#1": ("Smoothed Temp", "°C")
...     })

plot(ts_names: List[str], title: Optional[str] = None, y_axis: Optional[str] = None, x_axis: Optional[str] = None, start: Optional[Union[str, datetime.datetime, pd.Timestamp]] = None, end: Optional[Union[str, datetime.datetime, pd.Timestamp]] = None) -> go.Figure

Create an interactive Plotly plot with multiple time series from this signal.

Each time series is plotted with different colors and appropriate styling based on their processing types. Temporal shifting is applied automatically for prediction data.

Parameters:

Name Type Description Default
ts_names List[str]

List of time series names to plot. Must exist in this signal.

required
title Optional[str]

Plot title. If None, uses "Time series plot of {signal_name}".

None
y_axis Optional[str]

Y-axis label. If None, uses "{signal_name} ({units})".

None
x_axis Optional[str]

X-axis label. If None, uses "Time".

None
start Optional[Union[str, datetime, Timestamp]]

Start date for filtering data (datetime string or object).

None
end Optional[Union[str, datetime, Timestamp]]

End date for filtering data (datetime string or object).

None

Returns:

Type Description
Figure

Plotly Figure object with multiple time series traces.

build_dependency_graph(ts_name: str) -> List[Dict[str, Any]]

Build a data structure that represents all the processig steps and their dependencies for a given time series.

plot_dependency_graph(ts_name: str) -> go.Figure

Create a dependency graph visualization showing processing lineage for a time series.

The graph displays time series as colored rectangles connected by lines representing processing functions. The flow is temporal from left to right.

Parameters:

Name Type Description Default
ts_name str

Name of the time series to trace dependencies for.

required

Returns:

Type Description
Figure

Plotly Figure object with the dependency graph visualization.

use_disk_storage(path: Union[str, Path], auto_save: bool = True) -> Signal

Configure this signal to use disk-based storage.

This is a convenience method that configures pandas-disk backend storage for all time series in this signal. Data will be stored as Parquet files on disk.

Parameters:

Name Type Description Default
path Union[str, Path]

Directory path where data will be stored

required
auto_save bool

If True, automatically save after modifications (default: True)

True

Returns:

Type Description
Signal

Self for method chaining

Example

signal = Signal(input_data=my_data, name="temperature", ...) signal.use_disk_storage("./my_data")

use_sql_storage(connection_string: str, auto_save: bool = True) -> Signal

Configure this signal to use SQL database storage.

This is a convenience method that configures SQL backend storage for all time series in this signal. Supports SQLite, PostgreSQL, MySQL, and other SQLAlchemy-compatible databases.

Parameters:

Name Type Description Default
connection_string str

Database connection string (e.g., 'sqlite:///my_data.db')

required
auto_save bool

If True, automatically save after modifications (default: True)

True

Returns:

Type Description
Signal

Self for method chaining

Example

signal = Signal(input_data=my_data, name="temperature", ...) signal.use_sql_storage("sqlite:///my_data.db")

PostgreSQL example:

signal.use_sql_storage("postgresql://user:password@localhost/database")

use_memory_storage() -> Signal

Configure this signal to use in-memory storage.

This is a convenience method that explicitly configures in-memory storage (the default). Useful for switching back from disk/SQL storage.

Returns:

Type Description
Signal

Self for method chaining

Example

signal = Signal(input_data=my_data, name="temperature", ...) signal.use_disk_storage("./data") # Use disk

... later ...

signal.use_memory_storage() # Switch back to memory

TimeSeries

Bases: BaseModel, DisplayableBase

Time series data with complete processing history and metadata.

This class represents a single time series with its associated pandas Series data, complete processing history, and index metadata. It maintains a full audit trail of all transformations applied to the data from its raw state to the current processed form.

The class handles serialization of pandas objects and preserves critical index information to ensure proper reconstruction. It's the fundamental building block for environmental time series analysis workflows.

serialize_datetime(dt: datetime.datetime, _info) -> str

Serialize datetime to ISO 8601 string format.

plot(title: Optional[str] = None, y_axis: Optional[str] = None, x_axis: Optional[str] = None, legend_name: Optional[str] = None, start: Optional[Union[str, datetime.datetime, pd.Timestamp]] = None, end: Optional[Union[str, datetime.datetime, pd.Timestamp]] = None) -> go.Figure

Create an interactive Plotly plot of the time series data.

The plot styling is automatically determined by the processing type of the time series. For prediction data, temporal shifting is applied to show future timestamps.

Parameters:

Name Type Description Default
title Optional[str]

Plot title. If None, uses the time series name.

None
y_axis Optional[str]

Y-axis label. If None, uses the time series name.

None
x_axis Optional[str]

X-axis label. If None, uses "Time".

None
legend_name Optional[str]

Legend entry name. If None, uses the time series name.

None
start Optional[Union[str, datetime, Timestamp]]

Start date for filtering data (datetime string or object).

None
end Optional[Union[str, datetime, Timestamp]]

End date for filtering data (datetime string or object).

None

Returns:

Type Description
Figure

Plotly Figure object with the time series plot.

save_to_backend(key: str) -> None

Save this time series to the configured backend.

Parameters:

Name Type Description Default
key str

Unique storage key

required

Raises:

Type Description
ValueError

If no storage backend is configured

load_from_backend(key: str) -> None

Load this time series from the configured backend.

Parameters:

Name Type Description Default
key str

Storage key to load from

required

Raises:

Type Description
ValueError

If no storage backend is configured

use_disk_storage(path: Union[str, Path], auto_save: bool = True) -> TimeSeries

Configure this time series to use disk-based storage.

This is a convenience method that configures pandas-disk backend storage for this time series. Data will be stored as Parquet files on disk.

Parameters:

Name Type Description Default
path Union[str, Path]

Directory path where data will be stored

required
auto_save bool

If True, automatically save after modifications (default: True)

True

Returns:

Type Description
TimeSeries

Self for method chaining

Example

ts = TimeSeries(series=my_data) ts.use_disk_storage("./my_data", auto_save=True)

use_sql_storage(connection_string: str, auto_save: bool = True) -> TimeSeries

Configure this time series to use SQL database storage.

This is a convenience method that configures SQL backend storage for this time series. Supports SQLite, PostgreSQL, MySQL, and other SQLAlchemy-compatible databases.

Parameters:

Name Type Description Default
connection_string str

Database connection string (e.g., 'sqlite:///my_data.db')

required
auto_save bool

If True, automatically save after modifications (default: True)

True

Returns:

Type Description
TimeSeries

Self for method chaining

Example

ts = TimeSeries(series=my_data) ts.use_sql_storage("sqlite:///my_data.db")

PostgreSQL example:

ts.use_sql_storage("postgresql://user:password@localhost/database")

use_memory_storage() -> TimeSeries

Configure this time series to use in-memory storage.

This is a convenience method that explicitly configures in-memory storage (the default). Useful for switching back from disk/SQL storage.

Returns:

Type Description
TimeSeries

Self for method chaining

Example

ts = TimeSeries(series=my_data) ts.use_disk_storage("./data") # Use disk

... later ...

ts.use_memory_storage() # Switch back to memory

Metadata Classes

DataProvenance

Bases: BaseModel, DisplayableBase

Information about the source and context of time series data.

This class captures essential metadata about where time series data originated, including the source repository, project context, physical location, equipment used, and the measured parameter. This information is crucial for data traceability and understanding measurement context in environmental monitoring.

Provenance information enables users to assess data quality, understand measurement conditions, and make informed decisions about data usage in analysis and modeling workflows.

ProcessingStep

Bases: BaseModel, DisplayableBase

Record of a single data processing operation applied to time series.

This class documents individual steps in a data processing pipeline, capturing the type of processing performed, when it was executed, the function used, and the parameters applied. Each step maintains a complete audit trail of data transformations.

Processing steps are chained together to form a complete processing history, enabling full traceability from raw data to final processed results. The step_distance field tracks temporal shifts introduced by operations like forecasting or lag analysis.

serialize_datetime(dt: datetime.datetime, _info) -> str

Serialize datetime to ISO 8601 string format.

serialize_type(t: ProcessingType, _info) -> str

Serialize ProcessingType enum to its string value.

FunctionInfo

Bases: BaseModel, DisplayableBase

Metadata about processing functions applied to time series data.

This class documents the functions used in data processing pipelines, capturing essential information for reproducibility including function name, version, author, and reference documentation. It can optionally capture the actual source code of the function for complete reproducibility.

Function information is critical for understanding how data has been processed and for reproducing analysis results. The automatic source code capture feature helps maintain processing lineage even when function implementations change over time.

Parameters

Bases: BaseModel, DisplayableBase

Container for processing function parameters with numpy array support.

This class stores parameters passed to time series processing functions, automatically handling complex data types like numpy arrays, nested objects, and custom classes. It provides serialization capabilities while preserving the ability to reconstruct original parameter values.

The class is particularly useful for maintaining reproducible processing pipelines where parameter values need to be stored as metadata alongside processed time series data.

handle_numpy_arrays(data: Any) -> Any classmethod

Prepare data for Pydantic validation.

as_dict() -> dict[str, Any]

Convert to dict, handling special types.

IndexMetadata

Bases: BaseModel, DisplayableBase

Metadata describing the characteristics of a pandas Index.

This class captures essential information about time series indices to enable proper reconstruction after serialization. It handles various pandas Index types including DatetimeIndex, PeriodIndex, RangeIndex, and CategoricalIndex.

The metadata preserves critical properties like timezone information for datetime indices, frequency for time-based indices, and categorical ordering, ensuring that reconstructed indices maintain their original behavior and constraints.

Enumerations

ProcessingType

Bases: Enum

Standardized categories for time series processing operations.

This enumeration defines the standard types of processing operations that can be applied to environmental time series data. Each type represents a distinct category of data transformation with specific characteristics and purposes in environmental monitoring and wastewater treatment analysis.

The processing types enable consistent categorization of operations across different processing pipelines and facilitate automated quality control, reporting, and method comparison workflows.

Protocols

SignalTransformFunctionProtocol

Bases: Protocol

Protocol defining the interface for Signal-level processing functions.

This protocol specifies the required signature for functions that can be used with the Signal.process() method. Transform functions take multiple input time series and return processed results with complete processing metadata.

Signal transform functions operate within a single measured parameter (Signal) and can take multiple time series representing different processing stages of that parameter. They are ideal for operations like smoothing, filtering, gap filling, and other single-parameter processing tasks.

The protocol ensures consistent interfaces across different processing functions while maintaining complete audit trails of all transformations applied to environmental monitoring data.

DatasetTransformFunctionProtocol

Bases: Protocol

Protocol defining the interface for Dataset-level processing functions.

This protocol specifies the required signature for functions that can be used with the Dataset.process() method. These functions can operate across multiple signals and create new signals with cross-parameter relationships.

Dataset transform functions are ideal for operations that require multiple parameters simultaneously, such as: - Calculating derived parameters (e.g., BOD/COD ratios) - Multivariate analysis and modeling - Cross-parameter quality control - System-wide fault detection - Process efficiency calculations

The protocol ensures that new signals created by dataset processing maintain proper metadata inheritance and processing lineage from their input signals.

Note

New signals created by dataset processing will have their project property automatically updated to match the parent dataset's project. The transform function is responsible for setting appropriate signal names, units, provenance parameters, and purposes.