#!/usr/bin/env python3
"""
Configuration Management System for VarAnnote v1.0.0
Provides comprehensive configuration management including:
- YAML configuration file loading
- Environment variable overrides
- User preference management
- Configuration validation
- Default value handling
"""
import os
import yaml
import json
from pathlib import Path
from typing import Dict, Any, Optional, Union, List
from dataclasses import dataclass, asdict
import logging
from datetime import datetime
from .logger import get_logger
[docs]
@dataclass
class DatabaseConfig:
"""Database configuration settings"""
name: str
priority: int = 5
api_key: Optional[str] = None
rate_limit: float = 5.0
timeout: int = 30
enabled: bool = True
[docs]
@dataclass
class CacheConfig:
"""Cache configuration settings"""
enabled: bool = True
directory: str = "~/.varannote/cache"
max_age_days: int = 30
max_size_gb: float = 2.0
compression: bool = True
strategies: Dict[str, str] = None
[docs]
@dataclass
class OutputConfig:
"""Output configuration settings"""
default_format: str = "vcf"
available_formats: List[str] = None
include_fields: List[str] = None
filters: Dict[str, Any] = None
[docs]
@dataclass
class LoggingConfig:
"""Logging configuration settings"""
level: str = "INFO"
directory: str = "~/.varannote/logs"
max_file_size_mb: int = 10
backup_count: int = 5
categories: Dict[str, bool] = None
[docs]
class ConfigManager:
"""
Comprehensive configuration manager for VarAnnote
Features:
- YAML configuration file loading
- Environment variable overrides
- Configuration validation
- Default value management
- User preference handling
- Configuration merging and inheritance
"""
[docs]
def __init__(self,
config_file: Optional[Union[str, Path]] = None,
user_config_dir: Optional[Union[str, Path]] = None):
"""
Initialize configuration manager
Args:
config_file: Path to main configuration file
user_config_dir: Directory for user-specific configurations
"""
self.logger = get_logger("config_manager")
# Configuration paths
self.config_file = self._resolve_config_file(config_file)
self.user_config_dir = self._resolve_user_config_dir(user_config_dir)
# Configuration data
self._config_data: Dict[str, Any] = {}
self._user_config: Dict[str, Any] = {}
self._env_overrides: Dict[str, Any] = {}
# Configuration objects
self.databases: Dict[str, DatabaseConfig] = {}
self.performance: PerformanceConfig = PerformanceConfig()
self.cache: CacheConfig = CacheConfig()
self.output: OutputConfig = OutputConfig()
self.logging_config: LoggingConfig = LoggingConfig()
# Load configurations
self._load_configurations()
self.logger.info(f"Configuration manager initialized")
self.logger.info(f"Config file: {self.config_file}")
self.logger.info(f"User config dir: {self.user_config_dir}")
def _resolve_config_file(self, config_file: Optional[Union[str, Path]]) -> Path:
"""Resolve configuration file path"""
if config_file:
return Path(config_file).expanduser().resolve()
# Search order: current dir, package dir, user config dir
search_paths = [
Path.cwd() / "config.yaml",
Path(__file__).parent.parent / "config.yaml",
Path.home() / ".varannote" / "config.yaml"
]
for path in search_paths:
if path.exists():
return path
# Return default path (will be created if needed)
return Path(__file__).parent.parent / "config.yaml"
def _resolve_user_config_dir(self, user_config_dir: Optional[Union[str, Path]]) -> Path:
"""Resolve user configuration directory"""
if user_config_dir:
path = Path(user_config_dir).expanduser().resolve()
else:
path = Path.home() / ".varannote"
path.mkdir(parents=True, exist_ok=True)
return path
def _load_configurations(self):
"""Load all configuration sources"""
# 1. Load main configuration file
self._load_main_config()
# 2. Load user-specific configuration
self._load_user_config()
# 3. Load environment variable overrides
self._load_env_overrides()
# 4. Merge configurations
self._merge_configurations()
# 5. Create configuration objects
self._create_config_objects()
# 6. Validate configuration
self._validate_configuration()
def _load_main_config(self):
"""Load main configuration file"""
try:
if self.config_file.exists():
with open(self.config_file, 'r', encoding='utf-8') as f:
loaded_data = yaml.safe_load(f)
self._config_data = loaded_data if loaded_data else self._get_default_config()
self.logger.info(f"Loaded main config from {self.config_file}")
else:
self.logger.warning(f"Main config file not found: {self.config_file}")
self._config_data = self._get_default_config()
except Exception as e:
self.logger.error(f"Error loading main config: {e}")
self._config_data = self._get_default_config()
def _load_user_config(self):
"""Load user-specific configuration"""
user_config_file = self.user_config_dir / "user_config.yaml"
try:
if user_config_file.exists():
with open(user_config_file, 'r', encoding='utf-8') as f:
self._user_config = yaml.safe_load(f) or {}
self.logger.info(f"Loaded user config from {user_config_file}")
else:
self._user_config = {}
except Exception as e:
self.logger.error(f"Error loading user config: {e}")
self._user_config = {}
def _load_env_overrides(self):
"""Load environment variable overrides"""
self._env_overrides = {}
# Database API keys
for db_name in ['clinvar', 'gnomad', 'dbsnp', 'ensembl', 'cosmic',
'pharmgkb', 'omim', 'clingen', 'hgmd']:
env_key = f"VARANNOTE_{db_name.upper()}_API_KEY"
if env_key in os.environ:
if 'databases' not in self._env_overrides:
self._env_overrides['databases'] = {'api_keys': {}}
self._env_overrides['databases']['api_keys'][db_name] = os.environ[env_key]
# Other environment variables
env_mappings = {
'VARANNOTE_LOG_LEVEL': ['logging', 'level'],
'VARANNOTE_CACHE_DIR': ['cache', 'directory'],
'VARANNOTE_MAX_WORKERS': ['performance', 'max_workers'],
'VARANNOTE_DEBUG': ['advanced', 'debug_mode']
}
for env_var, config_path in env_mappings.items():
if env_var in os.environ:
value = os.environ[env_var]
# Convert numeric values
if config_path[-1] == 'max_workers':
value = int(value)
elif config_path[-1] == 'debug_mode':
value = value.lower() in ('true', '1', 'yes')
self._set_nested_value(self._env_overrides, config_path, value)
if self._env_overrides:
self.logger.info(f"Loaded {len(self._env_overrides)} environment overrides")
def _merge_configurations(self):
"""Merge all configuration sources"""
# Start with main config
merged_config = self._config_data.copy()
# Merge user config
merged_config = self._deep_merge(merged_config, self._user_config)
# Merge environment overrides
merged_config = self._deep_merge(merged_config, self._env_overrides)
self._config_data = merged_config
def _create_config_objects(self):
"""Create typed configuration objects"""
# Database configurations
db_config = self._config_data.get('databases', {})
priorities = db_config.get('priorities', {})
api_keys = db_config.get('api_keys', {})
rate_limits = db_config.get('rate_limits', {})
timeouts = db_config.get('timeouts', {})
for db_name in priorities.keys():
self.databases[db_name] = DatabaseConfig(
name=db_name,
priority=priorities.get(db_name, 5),
api_key=api_keys.get(db_name),
rate_limit=rate_limits.get(db_name, 5.0),
timeout=timeouts.get(db_name, 30),
enabled=True
)
# Performance configuration
perf_config = self._config_data.get('performance', {})
self.performance = PerformanceConfig(
max_workers=perf_config.get('max_workers', 4),
use_parallel=perf_config.get('use_parallel', True),
batch_size=perf_config.get('batch_size', 50),
max_concurrent_requests=perf_config.get('max_concurrent_requests', 30),
max_cache_size_mb=perf_config.get('max_cache_size_mb', 500),
cache_cleanup_interval=perf_config.get('cache_cleanup_interval', 3600),
max_connections=perf_config.get('max_connections', 100),
max_connections_per_host=perf_config.get('max_connections_per_host', 30)
)
# Cache configuration
cache_config = self._config_data.get('cache', {})
self.cache = CacheConfig(
enabled=cache_config.get('enabled', True),
directory=cache_config.get('directory', "~/.varannote/cache"),
max_age_days=cache_config.get('max_age_days', 30),
max_size_gb=cache_config.get('max_size_gb', 2.0),
compression=cache_config.get('compression', True),
strategies=cache_config.get('strategies', {})
)
# Output configuration
output_config = self._config_data.get('output', {})
self.output = OutputConfig(
default_format=output_config.get('default_format', 'vcf'),
available_formats=output_config.get('available_formats', ['vcf', 'tsv', 'json']),
include_fields=output_config.get('include_fields', []),
filters=output_config.get('filters', {})
)
# Logging configuration
log_config = self._config_data.get('logging', {})
self.logging_config = LoggingConfig(
level=log_config.get('level', 'INFO'),
directory=log_config.get('directory', "~/.varannote/logs"),
max_file_size_mb=log_config.get('max_file_size_mb', 10),
backup_count=log_config.get('backup_count', 5),
categories=log_config.get('categories', {})
)
def _validate_configuration(self):
"""Validate configuration values"""
errors = []
# Validate database priorities
for db_name, db_config in self.databases.items():
if not (1 <= db_config.priority <= 10):
errors.append(f"Database {db_name} priority must be between 1-10")
if db_config.rate_limit <= 0:
errors.append(f"Database {db_name} rate limit must be positive")
if db_config.timeout <= 0:
errors.append(f"Database {db_name} timeout must be positive")
# Validate performance settings
if self.performance.max_workers <= 0:
errors.append("max_workers must be positive")
if self.performance.batch_size <= 0:
errors.append("batch_size must be positive")
# Validate cache settings
if self.cache.max_age_days <= 0:
errors.append("cache max_age_days must be positive")
if self.cache.max_size_gb <= 0:
errors.append("cache max_size_gb must be positive")
# Validate logging level
valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
if self.logging_config.level.upper() not in valid_levels:
errors.append(f"logging level must be one of {valid_levels}")
if errors:
error_msg = "Configuration validation errors:\n" + "\n".join(f" - {e}" for e in errors)
self.logger.error(error_msg)
raise ValueError(error_msg)
self.logger.info("Configuration validation passed")
[docs]
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value by key"""
return self._get_nested_value(self._config_data, key.split('.'), default)
[docs]
def set(self, key: str, value: Any):
"""Set configuration value by key"""
self._set_nested_value(self._config_data, key.split('.'), value)
[docs]
def get_database_config(self, database_name: str) -> Optional[DatabaseConfig]:
"""Get database configuration"""
return self.databases.get(database_name)
[docs]
def get_enabled_databases(self) -> List[str]:
"""Get list of enabled databases sorted by priority"""
enabled_dbs = [(name, config) for name, config in self.databases.items() if config.enabled]
enabled_dbs.sort(key=lambda x: x[1].priority, reverse=True)
return [name for name, _ in enabled_dbs]
[docs]
def save_user_config(self, config_updates: Dict[str, Any]):
"""Save user configuration updates"""
user_config_file = self.user_config_dir / "user_config.yaml"
# Merge with existing user config
updated_config = self._deep_merge(self._user_config, config_updates)
try:
with open(user_config_file, 'w', encoding='utf-8') as f:
yaml.dump(updated_config, f, default_flow_style=False, indent=2)
self.logger.info(f"Saved user config to {user_config_file}")
# Reload configurations
self._load_configurations()
except Exception as e:
self.logger.error(f"Error saving user config: {e}")
raise
[docs]
def export_config(self, output_file: Union[str, Path], format: str = "yaml"):
"""Export current configuration to file"""
output_path = Path(output_file)
try:
if format.lower() == "yaml":
with open(output_path, 'w', encoding='utf-8') as f:
yaml.dump(self._config_data, f, default_flow_style=False, indent=2)
elif format.lower() == "json":
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self._config_data, f, indent=2)
else:
raise ValueError(f"Unsupported format: {format}")
self.logger.info(f"Exported config to {output_path}")
except Exception as e:
self.logger.error(f"Error exporting config: {e}")
raise
[docs]
def get_config_summary(self) -> Dict[str, Any]:
"""Get configuration summary"""
return {
'config_file': str(self.config_file),
'user_config_dir': str(self.user_config_dir),
'databases': {
'enabled': len([db for db in self.databases.values() if db.enabled]),
'total': len(self.databases),
'priorities': {name: config.priority for name, config in self.databases.items()}
},
'performance': asdict(self.performance),
'cache': {
'enabled': self.cache.enabled,
'directory': self.cache.directory,
'max_size_gb': self.cache.max_size_gb
},
'logging': {
'level': self.logging_config.level,
'directory': self.logging_config.directory
}
}
def _get_default_config(self) -> Dict[str, Any]:
"""Get default configuration"""
return {
'databases': {
'priorities': {
'clinvar': 10, 'gnomad': 9, 'dbsnp': 8, 'ensembl': 7,
'cosmic': 6, 'pharmgkb': 5, 'omim': 4, 'clingen': 3, 'hgmd': 2
},
'api_keys': {db: None for db in ['clinvar', 'gnomad', 'dbsnp', 'ensembl',
'cosmic', 'pharmgkb', 'omim', 'clingen', 'hgmd']},
'rate_limits': {
'clinvar': 3.0, 'gnomad': 10.0, 'dbsnp': 3.0, 'ensembl': 15.0,
'cosmic': 1.0, 'pharmgkb': 5.0, 'omim': 2.0, 'clingen': 5.0, 'hgmd': 1.0
},
'timeouts': {db: 30 for db in ['clinvar', 'gnomad', 'dbsnp', 'ensembl',
'cosmic', 'pharmgkb', 'omim', 'clingen', 'hgmd']}
},
'performance': {
'max_workers': 4, 'use_parallel': True, 'batch_size': 50,
'max_concurrent_requests': 30, 'max_cache_size_mb': 500,
'cache_cleanup_interval': 3600, 'max_connections': 100,
'max_connections_per_host': 30
},
'cache': {
'enabled': True, 'directory': "~/.varannote/cache",
'max_age_days': 30, 'max_size_gb': 2.0, 'compression': True
},
'output': {
'default_format': 'vcf',
'available_formats': ['vcf', 'tsv', 'json'],
'include_fields': ['variant_id', 'gene_symbol', 'consequence'],
'filters': {}
},
'logging': {
'level': 'INFO', 'directory': "~/.varannote/logs",
'max_file_size_mb': 10, 'backup_count': 5
}
}
def _deep_merge(self, dict1: Dict[str, Any], dict2: Dict[str, Any]) -> Dict[str, Any]:
"""Deep merge two dictionaries"""
result = dict1.copy()
for key, value in dict2.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._deep_merge(result[key], value)
else:
result[key] = value
return result
def _get_nested_value(self, data: Dict[str, Any], keys: List[str], default: Any = None) -> Any:
"""Get nested dictionary value"""
current = data
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
else:
return default
return current
def _set_nested_value(self, data: Dict[str, Any], keys: List[str], value: Any):
"""Set nested dictionary value"""
current = data
for key in keys[:-1]:
if key not in current:
current[key] = {}
current = current[key]
current[keys[-1]] = value
# Global configuration manager instance
_config_manager: Optional[ConfigManager] = None
[docs]
def get_config_manager(**kwargs) -> ConfigManager:
"""Get global configuration manager instance"""
global _config_manager
if _config_manager is None:
_config_manager = ConfigManager(**kwargs)
return _config_manager
[docs]
def get_config(key: str, default: Any = None) -> Any:
"""Get configuration value"""
return get_config_manager().get(key, default)
[docs]
def get_database_config(database_name: str) -> Optional[DatabaseConfig]:
"""Get database configuration"""
return get_config_manager().get_database_config(database_name)
[docs]
def get_enabled_databases() -> List[str]:
"""Get enabled databases sorted by priority"""
return get_config_manager().get_enabled_databases()