Source code for agentbx.core.base_client

"""
Base client class for Redis operations.
"""

from abc import ABC
from typing import Any
from typing import Dict
from typing import Optional

from .redis_manager import RedisManager


[docs]class BaseClient(ABC): """ Base class for clients that interact with Redis. Provides common Redis operations and connection management. """
[docs] def __init__(self, redis_manager: RedisManager, client_id: str): """ Initialize base client. Args: redis_manager: Redis manager instance client_id: Unique identifier for this client """ self.redis_manager = redis_manager self.client_id = client_id
[docs] def store_bundle(self, bundle: Any, bundle_id: Optional[str] = None) -> str: """Store a bundle in Redis.""" return self.redis_manager.store_bundle(bundle, bundle_id)
[docs] def get_bundle(self, bundle_id: str) -> Any: """Retrieve a bundle from Redis.""" return self.redis_manager.get_bundle(bundle_id)
[docs] def delete_bundle(self, bundle_id: str) -> bool: """Delete a bundle from Redis.""" return self.redis_manager.delete_bundle(bundle_id)
[docs] def list_bundles(self, bundle_type: Optional[str] = None) -> list[str]: """List all bundles, optionally filtered by type.""" return self.redis_manager.list_bundles(bundle_type)
[docs] def cache_get(self, key: str) -> Any: """Get value from cache.""" return self.redis_manager.cache_get(key)
[docs] def cache_set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: """Set value in cache.""" return self.redis_manager.cache_set(key, value, ttl)
[docs] def get_client_info(self) -> Dict[str, Any]: """Get information about this client.""" return { "client_id": self.client_id, "client_type": self.__class__.__name__, "redis_healthy": self.redis_manager.is_healthy(), }