Source code for kpicalculator.kpi_manager

# src/kpicalculator/kpi_manager.py
import logging
from typing import Any, TypedDict, cast

import pandas as pd  # type: ignore[import-untyped]
from esdl import esdl

from .adapters.common_model import EnergySystem
from .adapters.esdl_adapter import EsdlAdapter
from .adapters.simulator_adapter import SimulatorAdapter
from .calculators.emission_calculator import EmissionCalculator
from .calculators.energy_calculator import EnergyCalculator
from .calculators.financial_calculator import (
    PRODUCER_ASSET_TYPES,
    AssetFinancialResult,
    FinancialCalculator,
)
from .common.constants import (
    DEFAULT_DISCOUNT_RATE_PERCENT,
    DEFAULT_SYSTEM_LIFETIME_YEARS,
)

# Public API of this module — controls re-exports recognised by mypy and static analysers.
__all__ = [
    "AssetFinancialResult",
    "EmissionResults",
    "EnergyResults",
    "FinancialResults",
    "KpiManager",
    "KpiResults",
]

_logger = logging.getLogger(__name__)


[docs] class FinancialResults(TypedDict): """System-level financial KPI results. All monetary values are in EUR or EUR/year. Each field is the sum of the corresponding per-asset value across all assets in the system (except ``lcoe``, which is computed as ``sum(per-asset NPVs) / total_discounted_energy`` and is therefore *not* the average of per-asset LCOEs). ``capex`` and ``opex`` are broken down by asset category: ``"Production"``, ``"Consumption"``, ``"Storage"``, ``"Transport"``, ``"Conversion"``, and ``"All"`` (the system-wide sum). """ capex: dict[str, float] """CAPEX by asset category in EUR (investment + installation costs).""" opex: dict[str, float] """Annual OPEX by asset category in EUR/year (fixed + variable costs).""" npv: float """Net Present Value — discounted lifecycle cost in EUR.""" lcoe: float """Levelized Cost of Energy in EUR/MWh (``system_npv / discounted_energy``). ``system_npv`` is the sum of per-asset NPVs, each discounted at the asset's own rate (from ESDL ``costInformation.discountRate``, falling back to the system default). It is not the average of per-asset LCOEs. """ eac: float """Equivalent Annual Cost — sum of per-asset annualized costs in EUR/year.""" tco: float """Total Cost of Ownership — undiscounted lifecycle cost in EUR."""
[docs] class EnergyResults(TypedDict): """System-level energy KPI results. All values in Joules.""" consumption: float """Total thermal energy consumed by all consumer assets in J.""" demand: float """Total thermal energy demand from all consumer assets in J.""" production: float """Total thermal energy produced by all producer assets in J.""" efficiency: float """Distribution efficiency: consumption / production (0-1). Zero when production is zero."""
[docs] class EmissionResults(TypedDict): """System-level emission KPI results.""" total: float """Total greenhouse gas emissions in tonnes CO2e/year.""" per_mwh: float """Emission intensity in kg CO2e/MWh of energy consumed."""
[docs] class KpiResults(TypedDict): """Complete KPI results returned by ``KpiManager.calculate_all_kpis()`` and the top-level ``kpicalculator.calculate_kpis()`` function. Four top-level keys: - ``financials``: system-level monetary KPIs (CAPEX, OPEX, NPV, LCOE, EAC, TCO) - ``energy``: system-level energy totals in Joules - ``emissions``: system-level CO2e emissions - ``asset_financials``: per-asset financial breakdown keyed by asset ID; system totals in ``financials`` are derived by summing these values """ financials: FinancialResults energy: EnergyResults emissions: EmissionResults asset_financials: dict[str, AssetFinancialResult]
[docs] class KpiManager: """Main class for managing KPI calculations across different data sources. Cost unit conversion factors (EUR/kW, EUR/MW, EUR/km, EUR/kWh, EUR/MWh, % OF CAPEX, etc.) are built-in and used by the cost calculator when computing KPI values from ESDL costInformation elements. """ def __init__(self) -> None: """Initialize the KPI manager.""" self.energy_system: EnergySystem | None = None self.source_esdl_file: str | None = None
[docs] def load_from_esdl( self, esdl_file: str, time_series_file: str | None = None, timeseries_dataframes: dict[str, pd.DataFrame] | None = None, ) -> None: """Load energy system data from ESDL file. Cost data is extracted from ESDL costInformation elements. Note: InfluxDB profile loading is disabled here. To enable it, call ``EsdlAdapter().load_data(..., use_database_profiles=True)`` directly. Args: esdl_file: Path to ESDL file time_series_file: Optional path to time series file (when timeseries_dataframes not provided) timeseries_dataframes: Optional dict mapping asset IDs to pandas DataFrames with time-indexed energy/power data. When provided, takes precedence over database loading and time_series_file. """ adapter = EsdlAdapter() self.energy_system = adapter.load_data( esdl_file, time_series_file=time_series_file, timeseries_dataframes=timeseries_dataframes, use_database_profiles=False, ) self.source_esdl_file = esdl_file
[docs] def load_from_esdl_string( self, esdl_string: str, timeseries_dataframes: dict[str, pd.DataFrame] | None = None, ) -> None: """Load energy system data from ESDL XML string content. This method allows loading ESDL data directly from a string without needing a temporary file. Useful for integration with systems that provide ESDL content in memory (e.g., simulator_worker). Cost data is extracted from ESDL costInformation elements. Args: esdl_string: ESDL XML content as a string timeseries_dataframes: Optional dict mapping asset IDs to pandas DataFrames with time-indexed energy/power data. """ adapter = EsdlAdapter() self.energy_system = adapter.load_from_string( esdl_string, timeseries_dataframes=timeseries_dataframes, ) self.source_esdl_file = None
[docs] def load_from_simulator( self, simulator_result: pd.DataFrame, esdl_string: str, ) -> None: """Load energy system data from OMOTES Simulator results. Converts the simulator's port-indexed DataFrame to the asset-indexed common model and extracts cost data from the supplied ESDL string. Args: simulator_result: DataFrame produced by the simulator, with a DatetimeIndex and ``(port_id, property_name)`` tuple columns. esdl_string: The input ESDL as an XML string, used to resolve port IDs to their owning assets and to extract cost data. """ adapter = SimulatorAdapter() self.energy_system = adapter.load_data(simulator_result, esdl_string=esdl_string) self.source_esdl_file = None
[docs] def load_from_mesido(self, _mesido_data: Any) -> None: """Load energy system data from mesido data structure. Args: _mesido_data: Mesido data structure (unused, placeholder for future implementation) """ # TODO: Implement mesido adapter raise NotImplementedError("Mesido adapter not implemented yet")
[docs] def calculate_all_kpis( # pylint: disable=too-many-locals self, system_lifetime: float = DEFAULT_SYSTEM_LIFETIME_YEARS, discount_rate: float = DEFAULT_DISCOUNT_RATE_PERCENT, round_up_replacement: bool = True, ) -> KpiResults: """Calculate all KPIs for the energy system. Raises: ValueError: If no energy system is loaded, ``system_lifetime <= 0``, or ``discount_rate`` is outside [0, 100]. Args: system_lifetime: System lifetime in years. Must be positive. Default: ``DEFAULT_SYSTEM_LIFETIME_YEARS``. discount_rate: System-wide fallback discount rate in percentage (e.g. 5 for 5%). Must be in [0, 100]. Individual assets may override this via ``costInformation.discountRate`` in the ESDL — this method respects those overrides because it uses ``get_asset_financial_breakdown()`` internally. Note that calling ``FinancialCalculator.calculate_npv()`` directly does not respect per-asset overrides. Default: ``DEFAULT_DISCOUNT_RATE_PERCENT``. round_up_replacement: If True (default), NPV, LCOE, and TCO use ``ceil`` for the asset replacement count — the financially exact calculation. If False, uses the continuous factor ``max(1, system_lifetime / technical_lifetime)`` for compatibility with MESIDO optimizer output. Only set this to False when comparing results against MESIDO. Returns: ``KpiResults`` dict with ``financials``, ``energy``, ``emissions``, and ``asset_financials`` keys. """ if not self.energy_system: raise ValueError("No energy system loaded. Call one of the load methods first.") if system_lifetime <= 0: raise ValueError(f"system_lifetime must be positive (got {system_lifetime}).") if discount_rate < 0 or discount_rate > 100: raise ValueError(f"discount_rate must be between 0 and 100 (got {discount_rate}).") cost_calc = FinancialCalculator(self.energy_system) energy_calc = EnergyCalculator(self.energy_system) emission_calc = EmissionCalculator(self.energy_system) # Pre-compute annual energy production (MWh) for generating assets so # FinancialCalculator can compute per-asset LCOE in its single pass. # Non-generating assets are omitted; FinancialCalculator treats a missing key as 0. annual_energy_mwh_by_asset = { asset.id: energy_calc.get_asset_energy_production_per_year(asset) / 3.6e9 for asset in self.energy_system.assets if asset.asset_type in PRODUCER_ASSET_TYPES } asset_financials = cost_calc.get_asset_financial_breakdown( system_lifetime, discount_rate, round_up_replacement=round_up_replacement, annual_energy_mwh_by_asset=annual_energy_mwh_by_asset, ) system_npv = 0.0 system_eac = 0.0 system_tco = 0.0 for r in asset_financials.values(): system_npv += r["npv"] system_eac += r["eac"] system_tco += r["tco"] capex_by_cat, opex_by_cat = cost_calc.aggregate_by_category(asset_financials) results: KpiResults = { "financials": { "capex": capex_by_cat, "opex": opex_by_cat, "npv": system_npv, # System LCOE = total NPV / total discounted energy — not a sum of per-asset LCOEs. "lcoe": cost_calc.calculate_lcoe( system_lifetime, discount_rate, round_up_replacement=round_up_replacement, system_npv=system_npv, ), "eac": system_eac, # TCO is intentionally undiscounted — discount_rate is not used. "tco": system_tco, }, "energy": { "consumption": energy_calc.get_total_energy_consumption_per_year(), "demand": energy_calc.get_total_energy_demand_per_year(), "production": energy_calc.get_total_energy_production_per_year(), "efficiency": energy_calc.calculate_system_efficiency(), }, "emissions": { "total": emission_calc.get_total_emissions(), "per_mwh": emission_calc.get_emissions_per_mwh(), }, "asset_financials": asset_financials, } no_asset_has_time_series = not any(asset.time_series for asset in self.energy_system.assets) if no_asset_has_time_series: _logger.warning( "No time series data found for any asset. All energy and emission KPIs " "will be 0.0. Provide time_series_file or timeseries_dataframes to " "load_from_esdl(), or use load_from_esdl_string() with dataframes." ) return results
def _warn_if_level_not_system(self, level: str) -> None: """Emit a warning when a non-system export level is requested. Area-level and asset-level KPI placement are not yet implemented; all levels currently fall back to system-wide export. """ if level != "system": _logger.warning( "KPI export level '%s' is not yet implemented; falling back to 'system'.", level )
[docs] def export_to_esdl( self, results: KpiResults, output_file: str | None = None, level: str = "system" ) -> bool | esdl.EnergySystem: """Export KPI results to ESDL format. Args: results: KPI calculation results from calculate_all_kpis() output_file: Output ESDL file path. If None, returns data structure. level: KPI granularity — ``'system'``, ``'area'``, or ``'asset'``. Currently all levels write system-wide KPIs to the main area; area-level and asset-level placement are not yet implemented. Returns: bool: True if file export succeeded (when output_file provided) esdl.EnergySystem: ESDL data structure (when output_file is None) Raises: ValueError: If no energy system is loaded or invalid parameters """ if not self.energy_system: raise ValueError("No energy system loaded. Call one of the load methods first.") if self.energy_system.esdl_energy_system is None: raise ValueError( "No ESDL object available; the loaded adapter did not store one " "(e.g. load_from_simulator). Use load_from_esdl() or " "load_from_esdl_string(), or use build_esdl_string_with_kpis() " "to embed KPIs into an ESDL string without a loaded manager." ) self._warn_if_level_not_system(level) from .reporting.esdl_kpi_exporter import EsdlKpiExporter return EsdlKpiExporter().export( results, self.energy_system.esdl_energy_system, output_file, level=level, )
[docs] def build_esdl_with_kpis(self, results: KpiResults, level: str = "system") -> esdl.EnergySystem: """Build an ESDL energy system data structure with KPI results embedded. Args: results: KPI calculation results from calculate_all_kpis() level: KPI granularity — ``'system'``, ``'area'``, or ``'asset'``. Currently all levels write system-wide KPIs to the main area; area-level and asset-level placement are not yet implemented. Returns: esdl.EnergySystem: ESDL data structure with KPIs Raises: ValueError: If no energy system is loaded or invalid parameters """ result = self.export_to_esdl(results, output_file=None, level=level) if not isinstance(result, esdl.EnergySystem): raise ValueError("Failed to generate ESDL data structure") return result
[docs] def build_esdl_string_with_kpis( self, esdl_string: str, results: KpiResults, level: str = "system" ) -> str: """Embed KPI results into an ESDL XML string and return the updated string. This is the preferred integration method for systems that work with ESDL strings (e.g. simulator-worker). It operates entirely on a local ``EnergySystemHandler`` parsed from ``esdl_string`` and does not modify any manager state, making it safe to call repeatedly on the same instance. Args: esdl_string: Input ESDL XML string to embed KPIs into. results: KPI calculation results from calculate_all_kpis() level: KPI granularity — ``'system'``, ``'area'``, or ``'asset'``. Currently all levels write system-wide KPIs to the main area; area-level and asset-level placement are not yet implemented. Returns: ESDL XML string with KPIs embedded. Raises: ValueError: If esdl_string is empty or invalid parameters are provided. """ if not esdl_string.strip(): raise ValueError("esdl_string must not be empty.") self._warn_if_level_not_system(level) from esdl.esdl_handler import EnergySystemHandler from .reporting.esdl_kpi_exporter import EsdlKpiExporter esh = EnergySystemHandler() esh.load_from_string(esdl_string) EsdlKpiExporter().export(results, esh.energy_system, destination=None, level=level) return cast(str, esh.to_string())
# TODO: Add method to save the results