mirrored 8 minutes ago
0
Timothyxxxupdate coact: add autogen/cache 15d9ddb
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from  https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT
from __future__ import annotations

from contextvars import ContextVar
from types import TracebackType
from typing import Any, Optional, Union

from ..doc_utils import export_module
from .abstract_cache_base import AbstractCache
from .cache_factory import CacheFactory


@export_module("autogen")
class Cache(AbstractCache):
    """A wrapper class for managing cache configuration and instances.

    This class provides a unified interface for creating and interacting with
    different types of cache (e.g., Redis, Disk). It abstracts the underlying
    cache implementation details, providing methods for cache operations.

    Attributes:
        config (Dict[str, Any]): A dictionary containing cache configuration.
        cache: The cache instance created based on the provided configuration.
    """

    _current_cache: ContextVar[Cache] = ContextVar("current_cache", default=None)

    ALLOWED_CONFIG_KEYS = [
        "cache_seed",
        "redis_url",
        "cache_path_root",
        "cosmos_db_config",
    ]

    @staticmethod
    def redis(cache_seed: Union[str, int] = 42, redis_url: str = "redis://localhost:6379/0") -> Cache:
        """Create a Redis cache instance.

        Args:
            cache_seed (Union[str, int], optional): A seed for the cache. Defaults to 42.
            redis_url (str, optional): The URL for the Redis server. Defaults to "redis://localhost:6379/0".

        Returns:
            Cache: A Cache instance configured for Redis.
        """
        return Cache({"cache_seed": cache_seed, "redis_url": redis_url})

    @staticmethod
    def disk(cache_seed: Union[str, int] = 42, cache_path_root: str = ".cache") -> Cache:
        """Create a Disk cache instance.

        Args:
            cache_seed (Union[str, int], optional): A seed for the cache. Defaults to 42.
            cache_path_root (str, optional): The root path for the disk cache. Defaults to ".cache".

        Returns:
            Cache: A Cache instance configured for Disk caching.
        """
        return Cache({"cache_seed": cache_seed, "cache_path_root": cache_path_root})

    @staticmethod
    def cosmos_db(
        connection_string: Optional[str] = None,
        container_id: Optional[str] = None,
        cache_seed: Union[str, int] = 42,
        client: Optional[Any] = None,
    ) -> Cache:
        """Create a Cosmos DB cache instance with 'autogen_cache' as database ID.

        Args:
            connection_string (str, optional): Connection string to the Cosmos DB account.
            container_id (str, optional): The container ID for the Cosmos DB account.
            cache_seed (Union[str, int], optional): A seed for the cache.
            client: Optional[CosmosClient]: Pass an existing Cosmos DB client.

        Returns:
            Cache: A Cache instance configured for Cosmos DB.
        """
        cosmos_db_config = {
            "connection_string": connection_string,
            "database_id": "autogen_cache",
            "container_id": container_id,
            "client": client,
        }
        return Cache({"cache_seed": str(cache_seed), "cosmos_db_config": cosmos_db_config})

    def __init__(self, config: dict[str, Any]):
        """Initialize the Cache with the given configuration.

        Validates the configuration keys and creates the cache instance.

        Args:
            config (Dict[str, Any]): A dictionary containing the cache configuration.

        Raises:
            ValueError: If an invalid configuration key is provided.
        """
        self.config = config
        # Ensure that the seed is always treated as a string before being passed to any cache factory or stored.
        self.config["cache_seed"] = str(self.config.get("cache_seed", 42))

        # validate config
        for key in self.config:
            if key not in self.ALLOWED_CONFIG_KEYS:
                raise ValueError(f"Invalid config key: {key}")
        # create cache instance
        self.cache = CacheFactory.cache_factory(
            seed=self.config["cache_seed"],
            redis_url=self.config.get("redis_url"),
            cache_path_root=self.config.get("cache_path_root"),
            cosmosdb_config=self.config.get("cosmos_db_config"),
        )

    def __enter__(self) -> Cache:
        """Enter the runtime context related to the cache object.

        Returns:
            The cache instance for use within a context block.
        """
        # Store the previous cache so we can restore it
        self._previous_cache = self.__class__._current_cache.get(None)
        # Set the current cache to this instance
        self._token = self.__class__._current_cache.set(self)
        # Call the underlying cache's __enter__ method
        return self.cache.__enter__()

    def __exit__(
        self,
        exc_type: Optional[type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        """Exit the runtime context related to the cache object.

        Cleans up the cache instance and handles any exceptions that occurred
        within the context.

        Args:
            exc_type: The exception type if an exception was raised in the context.
            exc_value: The exception value if an exception was raised in the context.
            traceback: The traceback if an exception was raised in the context.
        """
        # First exit the underlying cache context
        result = self.cache.__exit__(exc_type, exc_value, traceback)

        try:
            # Then reset the context variable to previous value
            self.__class__._current_cache.reset(self._token)
        except RuntimeError:
            # Token might have been reset by a nested context manager
            # In this case, we just set it back to the previous value
            if self._previous_cache is not None:
                self.__class__._current_cache.set(self._previous_cache)

        return result

    def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
        """Retrieve an item from the cache.

        Args:
            key (str): The key identifying the item in the cache.
            default (optional): The default value to return if the key is not found.
                                Defaults to None.

        Returns:
            The value associated with the key if found, else the default value.
        """
        return self.cache.get(key, default)

    def set(self, key: str, value: Any) -> None:
        """Set an item in the cache.

        Args:
            key (str): The key under which the item is to be stored.
            value: The value to be stored in the cache.
        """
        self.cache.set(key, value)

    def close(self) -> None:
        """Close the cache.

        Perform any necessary cleanup, such as closing connections or releasing resources.
        """
        self.cache.close()

    @classmethod
    def get_current_cache(cls, cache: "Optional[Cache]" = None) -> "Optional[Cache]":
        """Get the current cache instance.

        Returns:
            Cache: The current cache instance.
        """
        if cache is not None:
            return cache
        try:
            return cls._current_cache.get()
        except LookupError:
            return None