Source code for kpicalculator.adapters.time_series_manager

# src/kpicalculator/adapters/time_series_manager.py
"""Centralized time series loading with multiple source support."""

import logging

import pandas as pd
from esdl import esdl

from ..common.constants import COMPOSITE_KEY_SEPARATOR, KNOWN_TIME_SERIES_FIELDS
from ..security.credential_manager import CredentialManager
from .base_adapter import ValidationResult
from .common_model import TimeSeries
from .database_time_series_loader import DatabaseTimeSeriesLoader
from .xml_time_series_adapter import PiXmlTimeSeries


[docs] class TimeSeriesManager: """Centralized time series loading with multiple source support. This class implements the Factory pattern to provide a single entry point for loading time series data from various sources (DataFrame, database, XML) with configurable priority and graceful degradation. """ def __init__(self, credential_manager: CredentialManager | None = None): """Initialize TimeSeriesManager with required components. Args: credential_manager: Manager for secure database credentials """ self.database_loader = DatabaseTimeSeriesLoader(credential_manager) self.logger = logging.getLogger(__name__)
[docs] def load_time_series( self, energy_system: esdl.EnergySystem, timeseries_dataframes: dict[str, pd.DataFrame] | None = None, xml_file: str | None = None, source_priority: list[str] | None = None, ) -> tuple[dict[str, TimeSeries], ValidationResult]: """Single entry point for all time series loading with source priority. Args: energy_system: ESDL energy system containing asset definitions timeseries_dataframes: Optional dict mapping asset IDs to pandas DataFrames with time-indexed energy/power data. When provided, takes precedence over database loading. xml_file: Optional XML time series file path (testing only) source_priority: Optional list defining loading priority Default: ["dataframes", "database", "xml", "empty"] Returns: Tuple of (time_series_dict, validation_result) Raises: ValidationError: If critical validation fails across all sources """ # Default priority: DataFrames → Database → XML → Empty priority = source_priority or ["dataframes", "database", "xml", "empty"] validation_results = [] for source in priority: try: if source == "dataframes" and timeseries_dataframes: self.logger.info("Loading time series from pandas DataFrames") return self._load_from_dataframes(timeseries_dataframes, energy_system) if source == "database" and self._has_database_profiles(energy_system): self.logger.info("Loading time series from InfluxDB database") return self.database_loader.load_time_series_from_esdl(energy_system) if source == "xml" and xml_file: self.logger.info(f"Loading time series from XML file: {xml_file}") return self._load_from_xml(xml_file, energy_system) except Exception as e: error_msg = f"Failed to load time series from {source}: {e}" self.logger.warning(error_msg) validation_results.append(error_msg) continue # If no sources succeeded, return empty with warnings self.logger.info("No time series data loaded - using empty time series") return {}, ValidationResult( is_valid=True, warnings=["No time series data loaded", *validation_results] )
def _load_from_dataframes( self, dataframes: dict[str, pd.DataFrame], energy_system: esdl.EnergySystem ) -> tuple[dict[str, TimeSeries], ValidationResult]: """Load time series from pandas DataFrames. Each DataFrame column is stored as a separate TimeSeries under a composite key (asset_id|column_name), matching the format used by DatabaseTimeSeriesLoader so the ESDL adapter can resolve them. Args: dataframes: Dict mapping asset IDs to pandas DataFrames with a DatetimeIndex. All numeric columns are stored; columns whose names are not recognised by the KPI calculators (e.g. unknown_field, custom_metric) are stored but will not contribute to any KPI — a warning is logged in that case. energy_system: ESDL energy system for validation Returns: Tuple of (time_series_dict, validation_result) """ time_series_dict = {} issues = [] # Validate DataFrame format and content validation_result = self._validate_dataframe_time_series(dataframes, energy_system) for asset_id, df in dataframes.items(): try: # Validate DataFrame is not empty if df.empty: issues.append(f"Empty DataFrame for asset {asset_id}") continue # Detect time step from DataFrame index (same for all columns) time_step = self._detect_time_step(df) # Process each column in the DataFrame # Each column represents a different field (e.g., heat_supplied, power, flow) for column_name in df.columns: try: if not pd.api.types.is_numeric_dtype(df[column_name]): error_msg = ( f"Column '{column_name}' for asset {asset_id} has non-numeric " f"dtype '{df[column_name].dtype}' - skipping" ) self.logger.warning(error_msg) issues.append(error_msg) continue # Create composite key: asset_id|field_name # This matches the format used by DatabaseTimeSeriesLoader composite_key = f"{asset_id}{COMPOSITE_KEY_SEPARATOR}{column_name}" # Convert to internal TimeSeries format # This maintains compatibility with existing KPI calculators time_series_dict[composite_key] = TimeSeries( time_step=time_step, values=df[column_name].tolist(), ) if column_name not in KNOWN_TIME_SERIES_FIELDS: self.logger.warning( f"Column '{column_name}' for asset {asset_id} is not a " f"recognized KPI field and will not contribute to any KPI" ) else: self.logger.debug( f"Converted DataFrame column '{column_name}' for asset " f"{asset_id}: {len(df)} data points" ) except Exception as e: error_msg = ( f"Failed to convert DataFrame column '{column_name}' " f"for asset {asset_id}: {e}" ) self.logger.error(error_msg) issues.append(error_msg) continue except Exception as e: error_msg = f"Failed to process DataFrame for asset {asset_id}: {e}" self.logger.error(error_msg) issues.append(error_msg) continue # Combine validation results combined_validation = ValidationResult( is_valid=validation_result.is_valid and len(issues) == 0, errors=validation_result.errors + issues, warnings=validation_result.warnings, ) return time_series_dict, combined_validation def _validate_dataframe_time_series( self, dataframes: dict[str, pd.DataFrame], energy_system: esdl.EnergySystem ) -> ValidationResult: """Validate DataFrame time series against ESDL assets. Args: dataframes: Dict mapping asset IDs to pandas DataFrames energy_system: ESDL energy system for validation Returns: ValidationResult with validation status and issues """ issues = [] warnings = [] # Extract asset IDs from ESDL esdl_asset_ids = { asset.id for asset in energy_system.eAllContents() if isinstance(asset, esdl.Asset) and asset.id } # Check DataFrame keys match asset IDs dataframe_ids = set(dataframes.keys()) missing_assets = esdl_asset_ids - dataframe_ids unknown_assets = dataframe_ids - esdl_asset_ids if missing_assets: warnings.append(f"Missing DataFrames for assets: {missing_assets}") if unknown_assets: warnings.append(f"Unknown assets in DataFrames: {unknown_assets}") # Validate DataFrame structure for asset_id, df in dataframes.items(): if df.empty: issues.append(f"Empty DataFrame for asset {asset_id}") continue if not isinstance(df.index, pd.DatetimeIndex): issues.append(f"DataFrame for {asset_id} must have DatetimeIndex") continue if df.isnull().any().any(): issues.append(f"DataFrame for {asset_id} contains null values") continue # Check for reasonable data ranges across numeric columns only. # Non-numeric columns are not range-checked here; they are rejected # with an error in _load_from_dataframes during the conversion step. for col, series in df.select_dtypes(include="number").items(): if (series < 0).any(): warnings.append( f"DataFrame for {asset_id}, column '{col}' contains negative values" ) if series.max() > 1e9: # Very large values might indicate unit issues warnings.append( f"DataFrame for {asset_id}, column '{col}' contains very large values" " - check units" ) return ValidationResult(is_valid=len(issues) == 0, errors=issues, warnings=warnings) def _detect_time_step(self, df: pd.DataFrame) -> float: """Detect time step from DataFrame DatetimeIndex. Args: df: DataFrame with DatetimeIndex Returns: Time step in seconds (default: 3600.0 for hourly) """ if len(df) < 2: self.logger.warning("DataFrame has less than 2 points, defaulting to hourly time step") return 3600.0 try: # Calculate differences between consecutive timestamps time_diffs: pd.Series = pd.Series(df.index[1:] - df.index[:-1]) # Check if all time steps are equal (current requirement) unique_diffs = time_diffs.unique() if len(unique_diffs) > 1: # For now, require equally spaced time steps # TODO: Future enhancement - support variable time steps diff_seconds_all = [diff.total_seconds() for diff in unique_diffs] min_diff = min(diff_seconds_all) max_diff = max(diff_seconds_all) mean_diff = sum(diff_seconds_all) / len(diff_seconds_all) count_unique = len(diff_seconds_all) raise ValueError( f"Non-uniform time steps detected: min={min_diff}, max={max_diff}, " f"mean={mean_diff:.1f}, count={count_unique}. " "Currently only equally spaced time series are supported." ) # All time steps are equal - use the common interval time_step_timedelta = unique_diffs[0] time_step_seconds = time_step_timedelta.total_seconds() self.logger.debug(f"Detected uniform time step: {time_step_seconds} seconds") return float(time_step_seconds) except ValueError: # Re-raise validation errors for non-uniform time steps raise except Exception as e: self.logger.warning(f"Failed to detect time step: {e}, defaulting to hourly") return 3600.0 def _has_database_profiles(self, energy_system: esdl.EnergySystem) -> bool: """Check if energy system contains InfluxDB profile references. Args: energy_system: ESDL energy system to check Returns: True if database profiles are found, False otherwise """ influx_profiles = [ x for x in energy_system.eAllContents() if isinstance(x, esdl.InfluxDBProfile) ] return len(influx_profiles) > 0 def _load_from_xml( self, xml_file: str, energy_system: esdl.EnergySystem ) -> tuple[dict[str, TimeSeries], ValidationResult]: """Load time series from XML file (testing/fallback). Args: xml_file: Path to XML time series file energy_system: ESDL energy system for validation Returns: Tuple of (time_series_dict, validation_result) """ try: xml_adapter = PiXmlTimeSeries(xml_file, "locationId", "parameterId") esdl_asset_ids = { asset.id for asset in energy_system.eAllContents() if isinstance(asset, esdl.Asset) and asset.id } time_series_dict, warnings = self._load_xml_with_parameters(xml_adapter, esdl_asset_ids) if not time_series_dict: time_series_dict, fallback_warnings = self._load_xml_fallback( xml_adapter, esdl_asset_ids ) warnings.extend(fallback_warnings) return time_series_dict, ValidationResult(is_valid=True, warnings=warnings) except Exception as e: error_msg = f"Failed to load XML time series from {xml_file}: {e}" self.logger.error(error_msg) return {}, ValidationResult(is_valid=False, errors=[error_msg]) def _load_xml_with_parameters( self, xml_adapter: PiXmlTimeSeries, esdl_asset_ids: set[str] ) -> tuple[dict[str, TimeSeries], list[str]]: """Load XML time series using the parameter-aware interface. Args: xml_adapter: Initialised PiXmlTimeSeries adapter. esdl_asset_ids: Set of asset IDs from the ESDL for filtering. Returns: Tuple of (time_series_dict, warnings). """ try: time_series_dict: dict[str, TimeSeries] = {} asset_parameters = xml_adapter.get_time_series_with_parameters() for asset_id, parameters in asset_parameters.items(): if asset_id in esdl_asset_ids: for parameter_name, (values, time_step) in parameters.items(): composite_key = f"{asset_id}{COMPOSITE_KEY_SEPARATOR}{parameter_name}" time_series_dict[composite_key] = TimeSeries( time_step=time_step, values=values ) self.logger.debug( f"Loaded XML time series for asset {asset_id} " f"parameter {parameter_name} with time step {time_step}s" ) return time_series_dict, [] except Exception as e: return {}, [f"Failed to access XML time series with parameter info: {e}"] def _load_xml_fallback( self, xml_adapter: PiXmlTimeSeries, esdl_asset_ids: set[str] ) -> tuple[dict[str, TimeSeries], list[str]]: """Load XML time series using the simplified fallback interface. Used when the parameter-aware interface fails or returns no data. Args: xml_adapter: Initialised PiXmlTimeSeries adapter. esdl_asset_ids: Set of asset IDs from the ESDL for filtering. Returns: Tuple of (time_series_dict, warnings). """ try: time_series_dict: dict[str, TimeSeries] = {} warnings: list[str] = [] for key, value in xml_adapter.time_series.items(): if key in esdl_asset_ids and value: time_series_dict[key] = TimeSeries(time_step=3600.0, values=value) warnings.append( f"XML time series for {key} loaded without parameter information" ) return time_series_dict, warnings except Exception as e: return {}, [f"Failed to access time_series property: {e}"]