0

Establish a modern asynchronous configuration management system with type safety and hot reload

In this tutorial, we guide you through design and functionality AsyncconfigPython’s modern asynchronous configuration management library. We built it from scratch to support powerful features including type-based data-level configuration loading, multiple configuration sources such as environment variables, files, and dictionaries, and hot loading with WatchDog. Asyncconfig has a clean API and powerful verification capabilities, making it ideal for development and production environments. Throughout the tutorial, we demonstrate its capabilities using simple, advanced and validation-focused use cases, all powered by Asyncio to support non-blocking workflows.

import asyncio
import json
import os
import yaml
from pathlib import Path
from typing import Any, Dict, Optional, Type, TypeVar, Union, get_type_hints
from dataclasses import dataclass, field, MISSING
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging


__version__ = "0.1.0"
__author__ = "AsyncConfig Team"


T = TypeVar('T')


logger = logging.getLogger(__name__)

We first need to import the required Python modules required to configure the system. These include asynchronous for asynchronous operations, YAML and JSON for file parsing, Dataclasses for structured configuration, and watchdog for hot reloading. We also defined some metadata and set up a logger to track events throughout the system.

class ConfigError(Exception):
    """Base exception for configuration errors."""
    pass




class ValidationError(ConfigError):
    """Raised when configuration validation fails."""
    pass




class LoadError(ConfigError):
    """Raised when configuration loading fails."""
    pass




@dataclass
class ConfigSource:
    """Represents a configuration source with priority and reload capabilities."""
    path: Optional[Path] = None
    env_prefix: Optional[str] = None
    data: Optional[Dict[str, Any]] = None
    priority: int = 0
    watch: bool = False
   
    def __post_init__(self):
        if self.path:
            self.path = Path(self.path)

We define a hierarchy of custom exceptions to handle different configuration-related errors, which will be configured as base classes, while more specific errors, such as verificationError and loadError, are used for target troubleshooting. We also create a ConfigSource data class to represent a single configuration source, which can be a file, environment variable, or dictionary, and include support for priority and optional hot loading.

class ConfigWatcher(FileSystemEventHandler):
    """File system event handler for configuration hot reloading."""
   
    def __init__(self, config_manager, paths: list[Path]):
        self.config_manager = config_manager
        self.paths = {str(p.resolve()) for p in paths}
        super().__init__()
   
    def on_modified(self, event):
        if not event.is_directory and event.src_path in self.paths:
            logger.info(f"Configuration file changed: {event.src_path}")
            asyncio.create_task(self.config_manager._reload_config())

We create the ConfigWatcher class by extending FilesystemeventHandler to enable hot loading of configuration files. This class monitors the specified file path and triggers the configured asynchronous reloading through the associated manager when modifying the file. This ensures that our applications can adapt to real-time configuration changes without restarting.

class AsyncConfigManager:
    """
    Modern async configuration manager with type safety and hot reloading.
   
    Features:
    - Async-first design
    - Type-safe configuration classes
    - Environment variable support
    - Hot reloading
    - Multiple source merging
    - Validation with detailed error messages
    """
   
    def __init__(self):
        self.sources: list[ConfigSource] = []
        self.observers: list[Observer] = []
        self.config_cache: Dict[str, Any] = {}
        self.reload_callbacks: list[callable] = []
        self._lock = asyncio.Lock()
   
    def add_source(self, source: ConfigSource) -> "AsyncConfigManager":
        """Add a configuration source."""
        self.sources.append(source)
        self.sources.sort(key=lambda x: x.priority, reverse=True)
        return self
   
    def add_file(self, path: Union[str, Path], priority: int = 0, watch: bool = False) -> "AsyncConfigManager":
        """Add a file-based configuration source."""
        return self.add_source(ConfigSource(path=path, priority=priority, watch=watch))
   
    def add_env(self, prefix: str, priority: int = 100) -> "AsyncConfigManager":
        """Add environment variable source."""
        return self.add_source(ConfigSource(env_prefix=prefix, priority=priority))
   
    def add_dict(self, data: Dict[str, Any], priority: int = 50) -> "AsyncConfigManager":
        """Add dictionary-based configuration source."""
        return self.add_source(ConfigSource(data=data, priority=priority))
   
    async def load_config(self, config_class: Type[T]) -> T:
        """Load and validate configuration into a typed dataclass."""
        async with self._lock:
            config_data = await self._merge_sources()
           
            try:
                return self._validate_and_convert(config_data, config_class)
            except Exception as e:
                raise ValidationError(f"Failed to validate configuration: {e}")
   
    async def _merge_sources(self) -> Dict[str, Any]:
        """Merge configuration from all sources based on priority."""
        merged = {}
       
        for source in reversed(self.sources):  
            try:
                data = await self._load_source(source)
                if data:
                    merged.update(data)
            except Exception as e:
                logger.warning(f"Failed to load source {source}: {e}")
       
        return merged
   
    async def _load_source(self, source: ConfigSource) -> Optional[Dict[str, Any]]:
        """Load data from a single configuration source."""
        if source.data:
            return source.data.copy()
       
        if source.path:
            return await self._load_file(source.path)
       
        if source.env_prefix:
            return self._load_env_vars(source.env_prefix)
       
        return None
   
    async def _load_file(self, path: Path) -> Dict[str, Any]:
        """Load configuration from a file."""
        if not path.exists():
            raise LoadError(f"Configuration file not found: {path}")
       
        try:
            content = await asyncio.to_thread(path.read_text)
           
            if path.suffix.lower() == '.json':
                return json.loads(content)
            elif path.suffix.lower() in ['.yml', '.yaml']:
                return yaml.safe_load(content) or {}
            else:
                raise LoadError(f"Unsupported file format: {path.suffix}")
       
        except Exception as e:
            raise LoadError(f"Failed to load {path}: {e}")
   
    def _load_env_vars(self, prefix: str) -> Dict[str, Any]:
        """Load environment variables with given prefix."""
        env_vars = {}
        prefix = prefix.upper() + '_'
       
        for key, value in os.environ.items():
            if key.startswith(prefix):
                config_key = key[len(prefix):].lower()
                env_vars[config_key] = self._convert_env_value(value)
       
        return env_vars
   
    def _convert_env_value(self, value: str) -> Any:
        """Convert environment variable string to appropriate type."""
        if value.lower() in ('true', 'false'):
            return value.lower() == 'true'
       
        try:
            if '.' in value:
                return float(value)
            return int(value)
        except ValueError:
            pass
       
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            pass
       
        return value
   
    def _validate_and_convert(self, data: Dict[str, Any], config_class: Type[T]) -> T:
        """Validate and convert data to the specified configuration class."""
        if not hasattr(config_class, '__dataclass_fields__'):
            raise ValidationError(f"{config_class.__name__} must be a dataclass")
       
        type_hints = get_type_hints(config_class)
        field_values = {}
       
        for field_name, field_info in config_class.__dataclass_fields__.items():
            if field_name in data:
                field_value = data[field_name]
               
                if hasattr(field_info.type, '__dataclass_fields__'):
                    if isinstance(field_value, dict):
                        field_value = self._validate_and_convert(field_value, field_info.type)
               
                field_values[field_name] = field_value
            elif field_info.default is not MISSING:
                field_values[field_name] = field_info.default
            elif field_info.default_factory is not MISSING:
                field_values[field_name] = field_info.default_factory()
            else:
                raise ValidationError(f"Required field '{field_name}' not found in configuration")
       
        return config_class(**field_values)
   
    async def start_watching(self):
        """Start watching configuration files for changes."""
        watch_paths = []
       
        for source in self.sources:
            if source.watch and source.path:
                watch_paths.append(source.path)
       
        if watch_paths:
            observer = Observer()
            watcher = ConfigWatcher(self, watch_paths)
           
            for path in watch_paths:
                observer.schedule(watcher, str(path.parent), recursive=False)
           
            observer.start()
            self.observers.append(observer)
            logger.info(f"Started watching {len(watch_paths)} configuration files")
   
    async def stop_watching(self):
        """Stop watching configuration files."""
        for observer in self.observers:
            observer.stop()
            observer.join()
        self.observers.clear()
   
    async def _reload_config(self):
        """Reload configuration from all sources."""
        try:
            self.config_cache.clear()
            for callback in self.reload_callbacks:
                await callback()
            logger.info("Configuration reloaded successfully")
        except Exception as e:
            logger.error(f"Failed to reload configuration: {e}")
   
    def on_reload(self, callback: callable):
        """Register a callback to be called when configuration is reloaded."""
        self.reload_callbacks.append(callback)
   
    async def __aenter__(self):
        await self.start_watching()
        return self
   
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.stop_watching()

Now, we implement the core of the system through the Asyncconfigmanager class. It acts as a central controller for all configuration operations, adds sources (files, environment variables, dictionaries), prioritizes them, loads files asynchronously and validates input data levels. We will first design asynchronously – allowing I/O to be non-blocked and include a locking mechanism to ensure secure concurrent access. Additionally, we enable hot loading by watching the specified configuration file and triggering a callback when a change is detected. This setup provides a flexible, robust and modern foundation for dynamically managing application configuration.

async def load_config(config_class: Type[T],
                     config_file: Optional[Union[str, Path]] = None,
                     env_prefix: Optional[str] = None,
                     watch: bool = False) -> T:
    """
    Convenience function to quickly load configuration.
   
    Args:
        config_class: Dataclass to load configuration into
        config_file: Optional configuration file path
        env_prefix: Optional environment variable prefix
        watch: Whether to watch for file changes
   
    Returns:
        Configured instance of config_class
    """
    manager = AsyncConfigManager()
   
    if config_file:
        manager.add_file(config_file, priority=0, watch=watch)
   
    if env_prefix:
        manager.add_env(env_prefix, priority=100)
   
    return await manager.load_config(config_class)

We have added a handy assistant feature, load_config, to simplify the configuration setup process. With just one call, we can load the settings from the file, environment variables, or both, and optionally enable hot loading. The utility makes the library beginner-friendly while still supporting advanced use cases under the hood.

@dataclass
class DatabaseConfig:
    """Example database configuration."""
    host: str = "localhost"
    port: int = 5432
    username: str = "admin"
    password: str = ""
    database: str = "myapp"
    ssl_enabled: bool = False
    pool_size: int = 10




@dataclass
class AppConfig:
    """Example application configuration."""
    debug: bool = False
    log_level: str = "INFO"
    secret_key: str = ""
    database: DatabaseConfig = field(default_factory=DatabaseConfig)
    redis_url: str = "redis://localhost:6379"
    max_workers: int = 4




async def demo_simple_config():
    """Demo simple configuration loading."""
   
    sample_config = {
        "debug": True,
        "log_level": "DEBUG",
        "secret_key": "dev-secret-key",
        "database": {
            "host": "localhost",
            "port": 5432,
            "username": "testuser",
            "password": "testpass",
            "database": "testdb"
        },
        "max_workers": 8
    }
   
    manager = AsyncConfigManager()
    manager.add_dict(sample_config, priority=0)
   
    config = await manager.load_config(AppConfig)
   
    print("=== Simple Configuration Demo ===")
    print(f"Debug mode: {config.debug}")
    print(f"Log level: {config.log_level}")
    print(f"Database host: {config.database.host}")
    print(f"Database port: {config.database.port}")
    print(f"Max workers: {config.max_workers}")
   
    return config

We define two sample configuration dataclasses: databaseconfig and appconfig, which show the structure of nested and typing configurations. To demonstrate the actual usage, we write demo_simple_config() where we load the basic dictionary into our configuration manager. This shows that we can effortlessly map structured data into type-safe Python objects, making configuration processing clean, readable, and maintainable.

async def demo_advanced_config():
    """Demo advanced configuration with multiple sources."""
   
    base_config = {
        "debug": False,
        "log_level": "INFO",
        "secret_key": "production-secret",
        "max_workers": 4
    }
   
    override_config = {
        "debug": True,
        "log_level": "DEBUG",
        "database": {
            "host": "dev-db.example.com",
            "port": 5433
        }
    }
   
    env_config = {
        "secret_key": "env-secret-key",
        "redis_url": "redis://prod-redis:6379"
    }
   
    print("n=== Advanced Configuration Demo ===")
   
    manager = AsyncConfigManager()
   
    manager.add_dict(base_config, priority=0)    
    manager.add_dict(override_config, priority=50)  
    manager.add_dict(env_config, priority=100)    
   
    config = await manager.load_config(AppConfig)
   
    print("Configuration sources merged:")
    print(f"Debug mode: {config.debug} (from override)")
    print(f"Log level: {config.log_level} (from override)")
    print(f"Secret key: {config.secret_key} (from env)")
    print(f"Database host: {config.database.host} (from override)")
    print(f"Redis URL: {config.redis_url} (from env)")
   
    return config




async def demo_validation():
    """Demo configuration validation."""
   
    print("n=== Configuration Validation Demo ===")
   
    valid_config = {
        "debug": True,
        "log_level": "DEBUG",
        "secret_key": "test-key",
        "database": {
            "host": "localhost",
            "port": 5432
        }
    }
   
    manager = AsyncConfigManager()
    manager.add_dict(valid_config, priority=0)
   
    try:
        config = await manager.load_config(AppConfig)
        print("✓ Valid configuration loaded successfully")
        print(f"  Database SSL: {config.database.ssl_enabled} (default value)")
        print(f"  Database pool size: {config.database.pool_size} (default value)")
    except ValidationError as e:
        print(f"✗ Validation error: {e}")
   
    incomplete_config = {
        "debug": True,
        "log_level": "DEBUG"
    }
   
    manager2 = AsyncConfigManager()
    manager2.add_dict(incomplete_config, priority=0)
   
    try:
        config2 = await manager2.load_config(AppConfig)
        print("✓ Configuration with defaults loaded successfully")
        print(f"  Secret key: '{config2.secret_key}' (default empty string)")
    except ValidationError as e:
        print(f"✗ Validation error: {e}")

We demonstrate the advanced features of configuring the system through two examples. In Demo_advanced_config() we demonstrate how to merge multiple configuration sources, foundations, coverages, and environments based on their priority and have a higher priority source. This highlights the flexibility to manage coverage in a specific environment. In Demo_validation() we validate the full and partial configuration. The system will automatically fill in missing fields where possible. When necessary fields are missing, it throws out a clear verification device, ensuring type-safe and reliable configuration management in real-world applications.

async def run_demos():
    """Run all demonstration functions."""
    try:
        await demo_simple_config()
        await demo_advanced_config()
        await demo_validation()
        print("n=== All demos completed successfully! ===")
    except Exception as e:
        print(f"Demo error: {e}")
        import traceback
        traceback.print_exc()






await run_demos()


if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            print("Running in Jupyter/IPython environment")
            print("Use: await run_demos()")
        else:
            asyncio.run(run_demos())
    except RuntimeError:
        asyncio.run(run_demos())

We ended the tutorial with Run_Demos(), which in turn performs all the demo functions covering simple loading, multi-source merging, and validation. To support Jupyter and standard Python environments, we include conditional logic to run the demo appropriately. This ensures that our configuration system is easy to test, demonstrated and integrated into a variety of workflows out of the box.

In short, we successfully demonstrate how Asyncconfig provides a solid and scalable foundation for managing configurations in modern Python applications. We see how easy it is to merge multiple sources, verify configurations for typing mode and respond to real-time file changes in real-time. Whether we are building microservices, asynchronous backends, or CLI tools, the library provides a flexible and friendly way to manage configurations safely and effectively.


Check Complete code. All credits for this study are to the researchers on the project.

Sponsorship Opportunities: Attract the most influential AI developers in the United States and Europe. 1M+ monthly readers, 500K+ community builders, unlimited possibilities. [Explore Sponsorship]


Asif Razzaq is CEO of Marktechpost Media Inc. As a visionary entrepreneur and engineer, ASIF is committed to harnessing the potential of artificial intelligence to achieve social benefits. His recent effort is to launch Marktechpost, an artificial intelligence media platform that has an in-depth coverage of machine learning and deep learning news that can sound both technically, both through technical voices and be understood by a wide audience. The platform has over 2 million views per month, demonstrating its popularity among its audience.