Source code for kpicalculator.security.credential_manager

# src/kpicalculator/security/credential_manager.py
"""Secure credential management for database connections."""

import json
import os
import stat
from abc import ABC, abstractmethod
from pathlib import Path

from ..common.logging_utils import get_database_logger, get_security_logger
from ..common.types import DatabaseCredentials
from ..exceptions import ConfigurationError, SecurityError


class CredentialManager(ABC):
    """Abstract base class for credential management."""

    @abstractmethod
    def get_database_credentials(self, host: str, port: int) -> DatabaseCredentials | None:
        """Get database credentials for host:port combination.

        Args:
            host: Database host
            port: Database port

        Returns:
            DatabaseCredentials if found, None otherwise
        """
        ...

    @abstractmethod
    def supports_environment_credentials(self) -> bool:
        """Return True if this manager supports environment-based credentials.

        Returns:
            True if manager can read from environment variables
        """
        ...


[docs] class SecureCredentialManager(CredentialManager): """Secure credential management using environment variables.""" def __init__(self) -> None: """Initialize the secure credential manager.""" self.security_logger = get_security_logger() self.db_logger = get_database_logger("credential_manager") self._env_prefix_cache: dict[tuple[str, int], str] = {} def _get_env_prefix(self, host: str, port: int) -> str: """Get environment variable prefix for host:port (cached).""" cache_key = (host, port) if cache_key not in self._env_prefix_cache: normalized_host = host.replace(".", "_").replace("-", "_").upper() self._env_prefix_cache[cache_key] = f"KPI_DB_{normalized_host}_{port}" return self._env_prefix_cache[cache_key]
[docs] def get_database_credentials(self, host: str, port: int) -> DatabaseCredentials | None: """Load credentials from environment variables. This method first attempts to load credentials using environment variables with the format KPI_DB_{HOST}_{PORT}_{FIELD}, where HOST is uppercased and dots/hyphens are replaced with underscores. If either the username or password is missing, it falls back to using INFLUXDB_* environment variables for compatibility with simulator-worker setups. The priority order is: 1. KPI_DB_{HOST}_{PORT}_{FIELD} (primary) 2. INFLUXDB_* (fallback if primary is incomplete) Args: host: Database host (e.g., "wu-profiles.esdl-beta.hesi.energy") port: Database port (e.g., 443) Returns: DatabaseCredentials if environment variables are set, None otherwise """ env_prefix = self._get_env_prefix(host, port) # Try KPI_DB_* variables first (primary) username = os.getenv(f"{env_prefix}_USERNAME") password = os.getenv(f"{env_prefix}_PASSWORD") database = os.getenv(f"{env_prefix}_DATABASE") ssl_env = os.getenv(f"{env_prefix}_SSL") verify_ssl_env = os.getenv(f"{env_prefix}_VERIFY_SSL") # Fallback to INFLUXDB_* variables (simulator-worker compatibility) if not username or not password: username = username or os.getenv("INFLUXDB_USERNAME") password = password or os.getenv("INFLUXDB_PASSWORD") database = database or os.getenv("INFLUXDB_DATABASE") # INFLUXDB_HOSTNAME and INFLUXDB_PORT are informational only # (we already know host:port from the ESDL InfluxDBProfile) if not username or not password: self.db_logger.debug( "No credentials found in environment", {"host": host, "port": port} ) return None # Set defaults database = database or "energy_profiles" ssl_env = (ssl_env or "false").lower() verify_ssl_env = (verify_ssl_env or "false").lower() # Parse boolean values ssl = ssl_env in ("true", "1", "yes", "on") verify_ssl = verify_ssl_env in ("true", "1", "yes", "on") self.security_logger.log_credential_access(host, port, "environment_variables") self.db_logger.log_credential_load(host, port, "environment") return DatabaseCredentials( host=host, port=port, username=username, password=password, database=database, ssl=ssl, verify_ssl=verify_ssl, )
[docs] def supports_environment_credentials(self) -> bool: """Return True since this manager reads from environment variables.""" return True
[docs] class ConfigFileCredentialManager(CredentialManager): """Load credentials from secure configuration files.""" def __init__(self, config_path: Path | None = None): """Initialize with optional config file path. Args: config_path: Path to credentials config file. Defaults to ~/.kpi-calculator/credentials.json """ self.config_path = config_path or Path.home() / ".kpi-calculator" / "credentials.json" self._cached_credentials: dict[str, DatabaseCredentials] | None = None self.security_logger = get_security_logger() self.db_logger = get_database_logger("credential_manager")
[docs] def get_database_credentials(self, host: str, port: int) -> DatabaseCredentials | None: """Get credentials from config file. Args: host: Database host port: Database port Returns: DatabaseCredentials if found in config, None otherwise """ credentials = self._load_credentials() host_port_key = f"{host}: {port}" if host_port_key in credentials: self.security_logger.log_credential_access(host, port, "config_file") self.db_logger.log_credential_load(host, port, "config_file") return credentials[host_port_key] self.db_logger.debug("No credentials found in config file", {"host": host, "port": port}) return None
def _load_credentials(self) -> dict[str, DatabaseCredentials]: """Load credentials from config file with security validation. Returns: Dictionary mapping host:port to DatabaseCredentials Raises: SecurityError: If file permissions are insecure ConfigurationError: If config file is invalid """ if self._cached_credentials is not None: return self._cached_credentials if not self.config_path.exists(): self.db_logger.info( "Credentials config file not found", {"config_path": str(self.config_path)} ) self._cached_credentials = {} return self._cached_credentials # Validate file permissions for security self._validate_file_permissions() try: with self.config_path.open(encoding="utf-8") as f: config = json.load(f) except json.JSONDecodeError as e: raise ConfigurationError( f"Invalid JSON in credentials file: {self.config_path} (error: {e!s})" ) from e except Exception as e: raise ConfigurationError( f"Failed to read credentials file: {self.config_path} (error: {e!s})" ) from e # Parse and validate credentials credentials = {} databases_config = config.get("databases", {}) for host_port, creds_config in databases_config.items(): try: credentials[host_port] = DatabaseCredentials( host=creds_config["host"], port=creds_config["port"], username=creds_config["username"], password=creds_config["password"], database=creds_config.get("database", "energy_profiles"), ssl=creds_config.get("ssl", False), verify_ssl=creds_config.get("verify_ssl", False), ) except (KeyError, TypeError) as e: raise ConfigurationError( f"Invalid credential configuration for {host_port} " f"(error: {e!s}, config: {creds_config})" ) from e self._cached_credentials = credentials self.db_logger.info( "Loaded credentials from config file", {"entry_count": len(credentials), "config_path": str(self.config_path)}, ) return credentials def _validate_file_permissions(self) -> None: """Validate that config file has secure permissions. Raises: SecurityError: If file permissions are too permissive """ try: file_stat = self.config_path.stat() # Check if file is readable by group or others (Unix-like systems) if ( hasattr(stat, "S_IRGRP") and hasattr(stat, "S_IROTH") and file_stat.st_mode & (stat.S_IRGRP | stat.S_IROTH) ): raise SecurityError( f"Credentials file has insecure permissions: {self.config_path}. " f"File should only be readable by owner (chmod 600). " f"(file_mode: {oct(file_stat.st_mode)[-3:]})" ) except AttributeError: # Windows or other systems without detailed permission checking self.security_logger.log_validation_failure( "file_permissions", str(self.config_path), "Cannot validate permissions on this platform", "low", )
[docs] def supports_environment_credentials(self) -> bool: """Return False since this manager reads from config files, not environment.""" return False
class ChainedCredentialManager(CredentialManager): """Chain multiple credential managers with fallback priority.""" def __init__(self, *managers: CredentialManager): """Initialize with ordered list of credential managers. Args: *managers: Credential managers in priority order (first has highest priority) """ if not managers: raise ValueError("At least one credential manager must be provided") self.managers = managers self.db_logger = get_database_logger("credential_manager") def get_database_credentials(self, host: str, port: int) -> DatabaseCredentials | None: """Try credential managers in order until credentials are found. Args: host: Database host port: Database port Returns: DatabaseCredentials from first manager that has them, None if none found """ for manager in self.managers: credentials = manager.get_database_credentials(host, port) if credentials: return credentials self.db_logger.warning("No credentials found in any manager", {"host": host, "port": port}) return None def supports_environment_credentials(self) -> bool: """Return True if any of the chained managers supports environment credentials.""" return any(manager.supports_environment_credentials() for manager in self.managers) def create_default_credential_manager() -> CredentialManager: """Create default credential manager with secure fallback chain. Returns: ChainedCredentialManager with environment variables as primary, config file as secondary fallback """ return ChainedCredentialManager( SecureCredentialManager(), # Primary: environment variables ConfigFileCredentialManager(), # Fallback: config file )