mirrored 15 minutes ago
0
Linxin SongCoACT initialize (#292) b968155
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT
"""Create a compatible client for the Amazon Bedrock Converse API.

Example usage:
Install the `boto3` package by running `pip install --upgrade boto3`.
- https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference.html

```python
import autogen

config_list = [
    {
        "api_type": "bedrock",
        "model": "meta.llama3-1-8b-instruct-v1:0",
        "aws_region": "us-west-2",
        "aws_access_key": "",
        "aws_secret_key": "",
        "price": [0.003, 0.015],
    }
]

assistant = autogen.AssistantAgent("assistant", llm_config={"config_list": config_list})
```
"""

from __future__ import annotations

import base64
import json
import os
import re
import time
import warnings
from typing import Any, Literal, Optional

import requests
from pydantic import Field, SecretStr, field_serializer

from ..import_utils import optional_import_block, require_optional_import
from ..llm_config import LLMConfigEntry, register_llm_config
from .client_utils import validate_parameter
from .oai_models import ChatCompletion, ChatCompletionMessage, ChatCompletionMessageToolCall, Choice, CompletionUsage

with optional_import_block():
    import boto3
    from botocore.config import Config


@register_llm_config
class BedrockLLMConfigEntry(LLMConfigEntry):
    api_type: Literal["bedrock"] = "bedrock"
    aws_region: str
    aws_access_key: Optional[SecretStr] = None
    aws_secret_key: Optional[SecretStr] = None
    aws_session_token: Optional[SecretStr] = None
    aws_profile_name: Optional[str] = None
    temperature: Optional[float] = None
    topP: Optional[float] = None  # noqa: N815
    maxTokens: Optional[int] = None  # noqa: N815
    top_p: Optional[float] = None
    top_k: Optional[int] = None
    k: Optional[int] = None
    seed: Optional[int] = None
    cache_seed: Optional[int] = None
    supports_system_prompts: bool = True
    stream: bool = False
    price: Optional[list[float]] = Field(default=None, min_length=2, max_length=2)
    timeout: Optional[int] = None

    @field_serializer("aws_access_key", "aws_secret_key", "aws_session_token", when_used="unless-none")
    def serialize_aws_secrets(self, v: SecretStr) -> str:
        return v.get_secret_value()

    def create_client(self):
        raise NotImplementedError("BedrockLLMConfigEntry.create_client must be implemented.")


@require_optional_import("boto3", "bedrock")
class BedrockClient:
    """Client for Amazon's Bedrock Converse API."""

    _retries = 5

    def __init__(self, **kwargs: Any):
        """Initialises BedrockClient for Amazon's Bedrock Converse API"""
        self._aws_access_key = kwargs.get("aws_access_key")
        self._aws_secret_key = kwargs.get("aws_secret_key")
        self._aws_session_token = kwargs.get("aws_session_token")
        self._aws_region = kwargs.get("aws_region")
        self._aws_profile_name = kwargs.get("aws_profile_name")
        self._timeout = kwargs.get("timeout")

        if not self._aws_access_key:
            self._aws_access_key = os.getenv("AWS_ACCESS_KEY")

        if not self._aws_secret_key:
            self._aws_secret_key = os.getenv("AWS_SECRET_KEY")

        if not self._aws_session_token:
            self._aws_session_token = os.getenv("AWS_SESSION_TOKEN")

        if not self._aws_region:
            self._aws_region = os.getenv("AWS_REGION")

        if self._aws_region is None:
            raise ValueError("Region is required to use the Amazon Bedrock API.")

        if self._timeout is None:
            self._timeout = 60

        # Initialize Bedrock client, session, and runtime
        bedrock_config = Config(
            region_name=self._aws_region,
            signature_version="v4",
            retries={"max_attempts": self._retries, "mode": "standard"},
            read_timeout=self._timeout,
        )

        session = boto3.Session(
            aws_access_key_id=self._aws_access_key,
            aws_secret_access_key=self._aws_secret_key,
            aws_session_token=self._aws_session_token,
            profile_name=self._aws_profile_name,
        )

        if "response_format" in kwargs and kwargs["response_format"] is not None:
            warnings.warn("response_format is not supported for Bedrock, it will be ignored.", UserWarning)

        # if haven't got any access_key or secret_key in environment variable or via arguments then
        if (
            self._aws_access_key is None
            or self._aws_access_key == ""
            or self._aws_secret_key is None
            or self._aws_secret_key == ""
        ):
            # attempts to get client from attached role of managed service (lambda, ec2, ecs, etc.)
            self.bedrock_runtime = boto3.client(service_name="bedrock-runtime", config=bedrock_config)
        else:
            session = boto3.Session(
                aws_access_key_id=self._aws_access_key,
                aws_secret_access_key=self._aws_secret_key,
                aws_session_token=self._aws_session_token,
                profile_name=self._aws_profile_name,
            )
            self.bedrock_runtime = session.client(service_name="bedrock-runtime", config=bedrock_config)

    def message_retrieval(self, response):
        """Retrieve the messages from the response."""
        return [choice.message for choice in response.choices]

    def parse_custom_params(self, params: dict[str, Any]):
        """Parses custom parameters for logic in this client class"""
        # Should we separate system messages into its own request parameter, default is True
        # This is required because not all models support a system prompt (e.g. Mistral Instruct).
        self._supports_system_prompts = params.get("supports_system_prompts", True)

    def parse_params(self, params: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
        """Loads the valid parameters required to invoke Bedrock Converse
        Returns a tuple of (base_params, additional_params)
        """
        base_params = {}
        additional_params = {}

        # Amazon Bedrock  base model IDs are here:
        # https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html
        self._model_id = params.get("model")
        assert self._model_id, "Please provide the 'model` in the config_list to use Amazon Bedrock"

        # Parameters vary based on the model used.
        # As we won't cater for all models and parameters, it's the developer's
        # responsibility to implement the parameters and they will only be
        # included if the developer has it in the config.
        #
        # Important:
        # No defaults will be used (as they can vary per model)
        # No ranges will be used (as they can vary)
        # We will cover all the main parameters but there may be others
        # that need to be added later
        #
        # Here are some pages that show the parameters available for different models
        # https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-titan-text.html
        # https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-text-completion.html
        # https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-cohere-command-r-plus.html
        # https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-meta.html
        # https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-mistral-chat-completion.html

        # Here are the possible "base" parameters and their suitable types
        base_parameters = [["temperature", (float, int)], ["topP", (float, int)], ["maxTokens", (int)]]

        for param_name, suitable_types in base_parameters:
            if param_name in params:
                base_params[param_name] = validate_parameter(
                    params, param_name, suitable_types, False, None, None, None
                )

        # Here are the possible "model-specific" parameters and their suitable types, known as additional parameters
        additional_parameters = [
            ["top_p", (float, int)],
            ["top_k", (int)],
            ["k", (int)],
            ["seed", (int)],
        ]

        for param_name, suitable_types in additional_parameters:
            if param_name in params:
                additional_params[param_name] = validate_parameter(
                    params, param_name, suitable_types, False, None, None, None
                )

        # Streaming
        self._streaming = params.get("stream", False)

        # For this release we will not support streaming as many models do not support streaming with tool use
        if self._streaming:
            warnings.warn(
                "Streaming is not currently supported, streaming will be disabled.",
                UserWarning,
            )
            self._streaming = False

        return base_params, additional_params

    def create(self, params) -> ChatCompletion:
        """Run Amazon Bedrock inference and return AG2 response"""
        # Set custom client class settings
        self.parse_custom_params(params)

        # Parse the inference parameters
        base_params, additional_params = self.parse_params(params)

        has_tools = "tools" in params
        messages = oai_messages_to_bedrock_messages(params["messages"], has_tools, self._supports_system_prompts)

        if self._supports_system_prompts:
            system_messages = extract_system_messages(params["messages"])

        tool_config = format_tools(params["tools"] if has_tools else [])

        request_args = {"messages": messages, "modelId": self._model_id}

        # Base and additional args
        if len(base_params) > 0:
            request_args["inferenceConfig"] = base_params

        if len(additional_params) > 0:
            request_args["additionalModelRequestFields"] = additional_params

        if self._supports_system_prompts:
            request_args["system"] = system_messages

        if len(tool_config["tools"]) > 0:
            request_args["toolConfig"] = tool_config

        response = self.bedrock_runtime.converse(**request_args)
        if response is None:
            raise RuntimeError(f"Failed to get response from Bedrock after retrying {self._retries} times.")

        finish_reason = convert_stop_reason_to_finish_reason(response["stopReason"])
        response_message = response["output"]["message"]

        tool_calls = format_tool_calls(response_message["content"]) if finish_reason == "tool_calls" else None

        text = ""
        for content in response_message["content"]:
            if "text" in content:
                text = content["text"]
                # NOTE: other types of output may be dealt with here

        message = ChatCompletionMessage(role="assistant", content=text, tool_calls=tool_calls)

        response_usage = response["usage"]
        usage = CompletionUsage(
            prompt_tokens=response_usage["inputTokens"],
            completion_tokens=response_usage["outputTokens"],
            total_tokens=response_usage["totalTokens"],
        )

        return ChatCompletion(
            id=response["ResponseMetadata"]["RequestId"],
            choices=[Choice(finish_reason=finish_reason, index=0, message=message)],
            created=int(time.time()),
            model=self._model_id,
            object="chat.completion",
            usage=usage,
        )

    def cost(self, response: ChatCompletion) -> float:
        """Calculate the cost of the response."""
        return calculate_cost(response.usage.prompt_tokens, response.usage.completion_tokens, response.model)

    @staticmethod
    def get_usage(response) -> dict:
        """Get the usage of tokens and their cost information."""
        return {
            "prompt_tokens": response.usage.prompt_tokens,
            "completion_tokens": response.usage.completion_tokens,
            "total_tokens": response.usage.total_tokens,
            "cost": response.cost,
            "model": response.model,
        }


def extract_system_messages(messages: list[dict[str, Any]]) -> list:
    """Extract the system messages from the list of messages.

    Args:
        messages (list[dict[str, Any]]): List of messages.

    Returns:
        List[SystemMessage]: List of System messages.
    """
    """
    system_messages = [message.get("content")[0]["text"] for message in messages if message.get("role") == "system"]
    return system_messages # ''.join(system_messages)
    """

    for message in messages:
        if message.get("role") == "system":
            if isinstance(message["content"], str):
                return [{"text": message.get("content")}]
            else:
                return [{"text": message.get("content")[0]["text"]}]
    return []


def oai_messages_to_bedrock_messages(
    messages: list[dict[str, Any]], has_tools: bool, supports_system_prompts: bool
) -> list[dict[str, Any]]:
    """Convert messages from OAI format to Bedrock format.
    We correct for any specific role orders and types, etc.
    AWS Bedrock requires messages to alternate between user and assistant roles. This function ensures that the messages
    are in the correct order and format for Bedrock by inserting "Please continue" messages as needed.
    This is the same method as the one in the Autogen Anthropic client
    """
    # Track whether we have tools passed in. If not,  tool use / result messages should be converted to text messages.
    # Bedrock requires a tools parameter with the tools listed, if there are other messages with tool use or tool results.
    # This can occur when we don't need tool calling, such as for group chat speaker selection

    # Convert messages to Bedrock compliant format

    # Take out system messages if the model supports it, otherwise leave them in.
    if supports_system_prompts:
        messages = [x for x in messages if x["role"] != "system"]
    else:
        # Replace role="system" with role="user"
        for msg in messages:
            if msg["role"] == "system":
                msg["role"] = "user"

    processed_messages = []

    # Used to interweave user messages to ensure user/assistant alternating
    user_continue_message = {"content": [{"text": "Please continue."}], "role": "user"}
    assistant_continue_message = {
        "content": [{"text": "Please continue."}],
        "role": "assistant",
    }

    tool_use_messages = 0
    tool_result_messages = 0
    last_tool_use_index = -1
    last_tool_result_index = -1
    # user_role_index = 0 if supports_system_prompts else 1 # If system prompts are supported, messages start with user, otherwise they'll be the second message
    for message in messages:
        # New messages will be added here, manage role alternations
        expected_role = "user" if len(processed_messages) % 2 == 0 else "assistant"

        if "tool_calls" in message:
            # Map the tool call options to Bedrock's format
            tool_uses = []
            tool_names = []
            for tool_call in message["tool_calls"]:
                tool_uses.append({
                    "toolUse": {
                        "toolUseId": tool_call["id"],
                        "name": tool_call["function"]["name"],
                        "input": json.loads(tool_call["function"]["arguments"]),
                    }
                })
                if has_tools:
                    tool_use_messages += 1
                tool_names.append(tool_call["function"]["name"])

            if expected_role == "user":
                # Insert an extra user message as we will append an assistant message
                processed_messages.append(user_continue_message)

            if has_tools:
                processed_messages.append({"role": "assistant", "content": tool_uses})
                last_tool_use_index = len(processed_messages) - 1
            else:
                # Not using tools, so put in a plain text message
                processed_messages.append({
                    "role": "assistant",
                    "content": [{"text": f"Some internal function(s) that could be used: [{', '.join(tool_names)}]"}],
                })
        elif "tool_call_id" in message:
            if has_tools:
                # Map the tool usage call to tool_result for Bedrock
                tool_result = {
                    "toolResult": {
                        "toolUseId": message["tool_call_id"],
                        "content": [{"text": message["content"]}],
                    }
                }

                # If the previous message also had a tool_result, add it to that
                # Otherwise append a new message
                if last_tool_result_index == len(processed_messages) - 1:
                    processed_messages[-1]["content"].append(tool_result)
                else:
                    if expected_role == "assistant":
                        # Insert an extra assistant message as we will append a user message
                        processed_messages.append(assistant_continue_message)

                    processed_messages.append({"role": "user", "content": [tool_result]})
                    last_tool_result_index = len(processed_messages) - 1

                tool_result_messages += 1
            else:
                # Not using tools, so put in a plain text message
                processed_messages.append({
                    "role": "user",
                    "content": [{"text": f"Running the function returned: {message['content']}"}],
                })
        elif message["content"] == "":
            # Ignoring empty messages
            pass
        else:
            if expected_role != message["role"] and not (len(processed_messages) == 0 and message["role"] == "system"):
                # Inserting the alternating continue message (ignore if it's the first message and a system message)
                processed_messages.append(
                    user_continue_message if expected_role == "user" else assistant_continue_message
                )

            processed_messages.append({
                "role": message["role"],
                "content": parse_content_parts(message=message),
            })

    # We'll replace the last tool_use if there's no tool_result (occurs if we finish the conversation before running the function)
    if has_tools and tool_use_messages != tool_result_messages:
        processed_messages[last_tool_use_index] = assistant_continue_message

    # name is not a valid field on messages
    for message in processed_messages:
        if "name" in message:
            message.pop("name", None)

    # Note: When using reflection_with_llm we may end up with an "assistant" message as the last message and that may cause a blank response
    # So, if the last role is not user, add a 'user' continue message at the end
    if processed_messages[-1]["role"] != "user":
        processed_messages.append(user_continue_message)

    return processed_messages


def parse_content_parts(
    message: dict[str, Any],
) -> list[dict[str, Any]]:
    content: str | list[dict[str, Any]] = message.get("content")
    if isinstance(content, str):
        return [
            {
                "text": content,
            }
        ]
    content_parts = []
    for part in content:
        # part_content: Dict = part.get("content")
        if "text" in part:  # part_content:
            content_parts.append({
                "text": part.get("text"),
            })
        elif "image_url" in part:  # part_content:
            image_data, content_type = parse_image(part.get("image_url").get("url"))
            content_parts.append({
                "image": {
                    "format": content_type[6:],  # image/
                    "source": {"bytes": image_data},
                },
            })
        else:
            # Ignore..
            continue
    return content_parts


def parse_image(image_url: str) -> tuple[bytes, str]:
    """Try to get the raw data from an image url.

    Ref: https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ImageSource.html
    returns a tuple of (Image Data, Content Type)
    """
    pattern = r"^data:(image/[a-z]*);base64,\s*"
    content_type = re.search(pattern, image_url)
    # if already base64 encoded.
    # Only supports 'image/jpeg', 'image/png', 'image/gif' or 'image/webp'
    if content_type:
        image_data = re.sub(pattern, "", image_url)
        return base64.b64decode(image_data), content_type.group(1)

    # Send a request to the image URL
    response = requests.get(image_url)
    # Check if the request was successful
    if response.status_code == 200:
        content_type = response.headers.get("Content-Type")
        if not content_type.startswith("image"):
            content_type = "image/jpeg"
        # Get the image content
        image_content = response.content
        return image_content, content_type
    else:
        raise RuntimeError("Unable to access the image url")


def format_tools(tools: list[dict[str, Any]]) -> dict[Literal["tools"], list[dict[str, Any]]]:
    converted_schema = {"tools": []}

    for tool in tools:
        if tool["type"] == "function":
            function = tool["function"]
            converted_tool = {
                "toolSpec": {
                    "name": function["name"],
                    "description": function["description"],
                    "inputSchema": {"json": {"type": "object", "properties": {}, "required": []}},
                }
            }

            for prop_name, prop_details in function["parameters"]["properties"].items():
                converted_tool["toolSpec"]["inputSchema"]["json"]["properties"][prop_name] = {
                    "type": prop_details["type"],
                    "description": prop_details.get("description", ""),
                }
                if "enum" in prop_details:
                    converted_tool["toolSpec"]["inputSchema"]["json"]["properties"][prop_name]["enum"] = prop_details[
                        "enum"
                    ]
                if "default" in prop_details:
                    converted_tool["toolSpec"]["inputSchema"]["json"]["properties"][prop_name]["default"] = (
                        prop_details["default"]
                    )

            if "required" in function["parameters"]:
                converted_tool["toolSpec"]["inputSchema"]["json"]["required"] = function["parameters"]["required"]

            converted_schema["tools"].append(converted_tool)

    return converted_schema


def format_tool_calls(content):
    """Converts Converse API response tool calls to AG2 format"""
    tool_calls = []
    for tool_request in content:
        if "toolUse" in tool_request:
            tool = tool_request["toolUse"]

            tool_calls.append(
                ChatCompletionMessageToolCall(
                    id=tool["toolUseId"],
                    function={
                        "name": tool["name"],
                        "arguments": json.dumps(tool["input"]),
                    },
                    type="function",
                )
            )
    return tool_calls


def convert_stop_reason_to_finish_reason(
    stop_reason: str,
) -> Literal["stop", "length", "tool_calls", "content_filter"]:
    """Converts Bedrock finish reasons to our finish reasons, according to OpenAI:

    - stop: if the model hit a natural stop point or a provided stop sequence,
    - length: if the maximum number of tokens specified in the request was reached,
    - content_filter: if content was omitted due to a flag from our content filters,
    - tool_calls: if the model called a tool
    """
    if stop_reason:
        finish_reason_mapping = {
            "tool_use": "tool_calls",
            "finished": "stop",
            "end_turn": "stop",
            "max_tokens": "length",
            "stop_sequence": "stop",
            "complete": "stop",
            "content_filtered": "content_filter",
        }
        return finish_reason_mapping.get(stop_reason.lower(), stop_reason.lower())

    warnings.warn(f"Unsupported stop reason: {stop_reason}", UserWarning)
    return None


# NOTE: As this will be quite dynamic, it's expected that the developer will use the "price" parameter in their config
# These may be removed.
PRICES_PER_K_TOKENS = {
    "meta.llama3-8b-instruct-v1:0": (0.0003, 0.0006),
    "meta.llama3-70b-instruct-v1:0": (0.00265, 0.0035),
    "mistral.mistral-7b-instruct-v0:2": (0.00015, 0.0002),
    "mistral.mixtral-8x7b-instruct-v0:1": (0.00045, 0.0007),
    "mistral.mistral-large-2402-v1:0": (0.004, 0.012),
    "mistral.mistral-small-2402-v1:0": (0.001, 0.003),
}


def calculate_cost(input_tokens: int, output_tokens: int, model_id: str) -> float:
    """Calculate the cost of the completion using the Bedrock pricing."""
    if model_id in PRICES_PER_K_TOKENS:
        input_cost_per_k, output_cost_per_k = PRICES_PER_K_TOKENS[model_id]
        input_cost = (input_tokens / 1000) * input_cost_per_k
        output_cost = (output_tokens / 1000) * output_cost_per_k
        return input_cost + output_cost
    else:
        warnings.warn(
            f'Cannot get the costs for {model_id}. The cost will be 0. In your config_list, add field {{"price" : [prompt_price_per_1k, completion_token_price_per_1k]}} for customized pricing.',
            UserWarning,
        )
        return 0