/
OS-World15d9ddb
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT
from __future__ import annotations
from contextvars import ContextVar
from types import TracebackType
from typing import Any, Optional, Union
from ..doc_utils import export_module
from .abstract_cache_base import AbstractCache
from .cache_factory import CacheFactory
@export_module("autogen")
class Cache(AbstractCache):
"""A wrapper class for managing cache configuration and instances.
This class provides a unified interface for creating and interacting with
different types of cache (e.g., Redis, Disk). It abstracts the underlying
cache implementation details, providing methods for cache operations.
Attributes:
config (Dict[str, Any]): A dictionary containing cache configuration.
cache: The cache instance created based on the provided configuration.
"""
_current_cache: ContextVar[Cache] = ContextVar("current_cache", default=None)
ALLOWED_CONFIG_KEYS = [
"cache_seed",
"redis_url",
"cache_path_root",
"cosmos_db_config",
]
@staticmethod
def redis(cache_seed: Union[str, int] = 42, redis_url: str = "redis://localhost:6379/0") -> Cache:
"""Create a Redis cache instance.
Args:
cache_seed (Union[str, int], optional): A seed for the cache. Defaults to 42.
redis_url (str, optional): The URL for the Redis server. Defaults to "redis://localhost:6379/0".
Returns:
Cache: A Cache instance configured for Redis.
"""
return Cache({"cache_seed": cache_seed, "redis_url": redis_url})
@staticmethod
def disk(cache_seed: Union[str, int] = 42, cache_path_root: str = ".cache") -> Cache:
"""Create a Disk cache instance.
Args:
cache_seed (Union[str, int], optional): A seed for the cache. Defaults to 42.
cache_path_root (str, optional): The root path for the disk cache. Defaults to ".cache".
Returns:
Cache: A Cache instance configured for Disk caching.
"""
return Cache({"cache_seed": cache_seed, "cache_path_root": cache_path_root})
@staticmethod
def cosmos_db(
connection_string: Optional[str] = None,
container_id: Optional[str] = None,
cache_seed: Union[str, int] = 42,
client: Optional[Any] = None,
) -> Cache:
"""Create a Cosmos DB cache instance with 'autogen_cache' as database ID.
Args:
connection_string (str, optional): Connection string to the Cosmos DB account.
container_id (str, optional): The container ID for the Cosmos DB account.
cache_seed (Union[str, int], optional): A seed for the cache.
client: Optional[CosmosClient]: Pass an existing Cosmos DB client.
Returns:
Cache: A Cache instance configured for Cosmos DB.
"""
cosmos_db_config = {
"connection_string": connection_string,
"database_id": "autogen_cache",
"container_id": container_id,
"client": client,
}
return Cache({"cache_seed": str(cache_seed), "cosmos_db_config": cosmos_db_config})
def __init__(self, config: dict[str, Any]):
"""Initialize the Cache with the given configuration.
Validates the configuration keys and creates the cache instance.
Args:
config (Dict[str, Any]): A dictionary containing the cache configuration.
Raises:
ValueError: If an invalid configuration key is provided.
"""
self.config = config
# Ensure that the seed is always treated as a string before being passed to any cache factory or stored.
self.config["cache_seed"] = str(self.config.get("cache_seed", 42))
# validate config
for key in self.config:
if key not in self.ALLOWED_CONFIG_KEYS:
raise ValueError(f"Invalid config key: {key}")
# create cache instance
self.cache = CacheFactory.cache_factory(
seed=self.config["cache_seed"],
redis_url=self.config.get("redis_url"),
cache_path_root=self.config.get("cache_path_root"),
cosmosdb_config=self.config.get("cosmos_db_config"),
)
def __enter__(self) -> Cache:
"""Enter the runtime context related to the cache object.
Returns:
The cache instance for use within a context block.
"""
# Store the previous cache so we can restore it
self._previous_cache = self.__class__._current_cache.get(None)
# Set the current cache to this instance
self._token = self.__class__._current_cache.set(self)
# Call the underlying cache's __enter__ method
return self.cache.__enter__()
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Exit the runtime context related to the cache object.
Cleans up the cache instance and handles any exceptions that occurred
within the context.
Args:
exc_type: The exception type if an exception was raised in the context.
exc_value: The exception value if an exception was raised in the context.
traceback: The traceback if an exception was raised in the context.
"""
# First exit the underlying cache context
result = self.cache.__exit__(exc_type, exc_value, traceback)
try:
# Then reset the context variable to previous value
self.__class__._current_cache.reset(self._token)
except RuntimeError:
# Token might have been reset by a nested context manager
# In this case, we just set it back to the previous value
if self._previous_cache is not None:
self.__class__._current_cache.set(self._previous_cache)
return result
def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
"""Retrieve an item from the cache.
Args:
key (str): The key identifying the item in the cache.
default (optional): The default value to return if the key is not found.
Defaults to None.
Returns:
The value associated with the key if found, else the default value.
"""
return self.cache.get(key, default)
def set(self, key: str, value: Any) -> None:
"""Set an item in the cache.
Args:
key (str): The key under which the item is to be stored.
value: The value to be stored in the cache.
"""
self.cache.set(key, value)
def close(self) -> None:
"""Close the cache.
Perform any necessary cleanup, such as closing connections or releasing resources.
"""
self.cache.close()
@classmethod
def get_current_cache(cls, cache: "Optional[Cache]" = None) -> "Optional[Cache]":
"""Get the current cache instance.
Returns:
Cache: The current cache instance.
"""
if cache is not None:
return cache
try:
return cls._current_cache.get()
except LookupError:
return None