/
OS-Worldb968155
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT
"""Create a compatible client for the Amazon Bedrock Converse API.
Example usage:
Install the `boto3` package by running `pip install --upgrade boto3`.
- https://docs.aws.amazon.com/bedrock/latest/userguide/conversation-inference.html
```python
import autogen
config_list = [
{
"api_type": "bedrock",
"model": "meta.llama3-1-8b-instruct-v1:0",
"aws_region": "us-west-2",
"aws_access_key": "",
"aws_secret_key": "",
"price": [0.003, 0.015],
}
]
assistant = autogen.AssistantAgent("assistant", llm_config={"config_list": config_list})
```
"""
from __future__ import annotations
import base64
import json
import os
import re
import time
import warnings
from typing import Any, Literal, Optional
import requests
from pydantic import Field, SecretStr, field_serializer
from ..import_utils import optional_import_block, require_optional_import
from ..llm_config import LLMConfigEntry, register_llm_config
from .client_utils import validate_parameter
from .oai_models import ChatCompletion, ChatCompletionMessage, ChatCompletionMessageToolCall, Choice, CompletionUsage
with optional_import_block():
import boto3
from botocore.config import Config
@register_llm_config
class BedrockLLMConfigEntry(LLMConfigEntry):
api_type: Literal["bedrock"] = "bedrock"
aws_region: str
aws_access_key: Optional[SecretStr] = None
aws_secret_key: Optional[SecretStr] = None
aws_session_token: Optional[SecretStr] = None
aws_profile_name: Optional[str] = None
temperature: Optional[float] = None
topP: Optional[float] = None # noqa: N815
maxTokens: Optional[int] = None # noqa: N815
top_p: Optional[float] = None
top_k: Optional[int] = None
k: Optional[int] = None
seed: Optional[int] = None
cache_seed: Optional[int] = None
supports_system_prompts: bool = True
stream: bool = False
price: Optional[list[float]] = Field(default=None, min_length=2, max_length=2)
timeout: Optional[int] = None
@field_serializer("aws_access_key", "aws_secret_key", "aws_session_token", when_used="unless-none")
def serialize_aws_secrets(self, v: SecretStr) -> str:
return v.get_secret_value()
def create_client(self):
raise NotImplementedError("BedrockLLMConfigEntry.create_client must be implemented.")
@require_optional_import("boto3", "bedrock")
class BedrockClient:
"""Client for Amazon's Bedrock Converse API."""
_retries = 5
def __init__(self, **kwargs: Any):
"""Initialises BedrockClient for Amazon's Bedrock Converse API"""
self._aws_access_key = kwargs.get("aws_access_key")
self._aws_secret_key = kwargs.get("aws_secret_key")
self._aws_session_token = kwargs.get("aws_session_token")
self._aws_region = kwargs.get("aws_region")
self._aws_profile_name = kwargs.get("aws_profile_name")
self._timeout = kwargs.get("timeout")
if not self._aws_access_key:
self._aws_access_key = os.getenv("AWS_ACCESS_KEY")
if not self._aws_secret_key:
self._aws_secret_key = os.getenv("AWS_SECRET_KEY")
if not self._aws_session_token:
self._aws_session_token = os.getenv("AWS_SESSION_TOKEN")
if not self._aws_region:
self._aws_region = os.getenv("AWS_REGION")
if self._aws_region is None:
raise ValueError("Region is required to use the Amazon Bedrock API.")
if self._timeout is None:
self._timeout = 60
# Initialize Bedrock client, session, and runtime
bedrock_config = Config(
region_name=self._aws_region,
signature_version="v4",
retries={"max_attempts": self._retries, "mode": "standard"},
read_timeout=self._timeout,
)
session = boto3.Session(
aws_access_key_id=self._aws_access_key,
aws_secret_access_key=self._aws_secret_key,
aws_session_token=self._aws_session_token,
profile_name=self._aws_profile_name,
)
if "response_format" in kwargs and kwargs["response_format"] is not None:
warnings.warn("response_format is not supported for Bedrock, it will be ignored.", UserWarning)
# if haven't got any access_key or secret_key in environment variable or via arguments then
if (
self._aws_access_key is None
or self._aws_access_key == ""
or self._aws_secret_key is None
or self._aws_secret_key == ""
):
# attempts to get client from attached role of managed service (lambda, ec2, ecs, etc.)
self.bedrock_runtime = boto3.client(service_name="bedrock-runtime", config=bedrock_config)
else:
session = boto3.Session(
aws_access_key_id=self._aws_access_key,
aws_secret_access_key=self._aws_secret_key,
aws_session_token=self._aws_session_token,
profile_name=self._aws_profile_name,
)
self.bedrock_runtime = session.client(service_name="bedrock-runtime", config=bedrock_config)
def message_retrieval(self, response):
"""Retrieve the messages from the response."""
return [choice.message for choice in response.choices]
def parse_custom_params(self, params: dict[str, Any]):
"""Parses custom parameters for logic in this client class"""
# Should we separate system messages into its own request parameter, default is True
# This is required because not all models support a system prompt (e.g. Mistral Instruct).
self._supports_system_prompts = params.get("supports_system_prompts", True)
def parse_params(self, params: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
"""Loads the valid parameters required to invoke Bedrock Converse
Returns a tuple of (base_params, additional_params)
"""
base_params = {}
additional_params = {}
# Amazon Bedrock base model IDs are here:
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html
self._model_id = params.get("model")
assert self._model_id, "Please provide the 'model` in the config_list to use Amazon Bedrock"
# Parameters vary based on the model used.
# As we won't cater for all models and parameters, it's the developer's
# responsibility to implement the parameters and they will only be
# included if the developer has it in the config.
#
# Important:
# No defaults will be used (as they can vary per model)
# No ranges will be used (as they can vary)
# We will cover all the main parameters but there may be others
# that need to be added later
#
# Here are some pages that show the parameters available for different models
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-titan-text.html
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-anthropic-claude-text-completion.html
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-cohere-command-r-plus.html
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-meta.html
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-mistral-chat-completion.html
# Here are the possible "base" parameters and their suitable types
base_parameters = [["temperature", (float, int)], ["topP", (float, int)], ["maxTokens", (int)]]
for param_name, suitable_types in base_parameters:
if param_name in params:
base_params[param_name] = validate_parameter(
params, param_name, suitable_types, False, None, None, None
)
# Here are the possible "model-specific" parameters and their suitable types, known as additional parameters
additional_parameters = [
["top_p", (float, int)],
["top_k", (int)],
["k", (int)],
["seed", (int)],
]
for param_name, suitable_types in additional_parameters:
if param_name in params:
additional_params[param_name] = validate_parameter(
params, param_name, suitable_types, False, None, None, None
)
# Streaming
self._streaming = params.get("stream", False)
# For this release we will not support streaming as many models do not support streaming with tool use
if self._streaming:
warnings.warn(
"Streaming is not currently supported, streaming will be disabled.",
UserWarning,
)
self._streaming = False
return base_params, additional_params
def create(self, params) -> ChatCompletion:
"""Run Amazon Bedrock inference and return AG2 response"""
# Set custom client class settings
self.parse_custom_params(params)
# Parse the inference parameters
base_params, additional_params = self.parse_params(params)
has_tools = "tools" in params
messages = oai_messages_to_bedrock_messages(params["messages"], has_tools, self._supports_system_prompts)
if self._supports_system_prompts:
system_messages = extract_system_messages(params["messages"])
tool_config = format_tools(params["tools"] if has_tools else [])
request_args = {"messages": messages, "modelId": self._model_id}
# Base and additional args
if len(base_params) > 0:
request_args["inferenceConfig"] = base_params
if len(additional_params) > 0:
request_args["additionalModelRequestFields"] = additional_params
if self._supports_system_prompts:
request_args["system"] = system_messages
if len(tool_config["tools"]) > 0:
request_args["toolConfig"] = tool_config
response = self.bedrock_runtime.converse(**request_args)
if response is None:
raise RuntimeError(f"Failed to get response from Bedrock after retrying {self._retries} times.")
finish_reason = convert_stop_reason_to_finish_reason(response["stopReason"])
response_message = response["output"]["message"]
tool_calls = format_tool_calls(response_message["content"]) if finish_reason == "tool_calls" else None
text = ""
for content in response_message["content"]:
if "text" in content:
text = content["text"]
# NOTE: other types of output may be dealt with here
message = ChatCompletionMessage(role="assistant", content=text, tool_calls=tool_calls)
response_usage = response["usage"]
usage = CompletionUsage(
prompt_tokens=response_usage["inputTokens"],
completion_tokens=response_usage["outputTokens"],
total_tokens=response_usage["totalTokens"],
)
return ChatCompletion(
id=response["ResponseMetadata"]["RequestId"],
choices=[Choice(finish_reason=finish_reason, index=0, message=message)],
created=int(time.time()),
model=self._model_id,
object="chat.completion",
usage=usage,
)
def cost(self, response: ChatCompletion) -> float:
"""Calculate the cost of the response."""
return calculate_cost(response.usage.prompt_tokens, response.usage.completion_tokens, response.model)
@staticmethod
def get_usage(response) -> dict:
"""Get the usage of tokens and their cost information."""
return {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
"cost": response.cost,
"model": response.model,
}
def extract_system_messages(messages: list[dict[str, Any]]) -> list:
"""Extract the system messages from the list of messages.
Args:
messages (list[dict[str, Any]]): List of messages.
Returns:
List[SystemMessage]: List of System messages.
"""
"""
system_messages = [message.get("content")[0]["text"] for message in messages if message.get("role") == "system"]
return system_messages # ''.join(system_messages)
"""
for message in messages:
if message.get("role") == "system":
if isinstance(message["content"], str):
return [{"text": message.get("content")}]
else:
return [{"text": message.get("content")[0]["text"]}]
return []
def oai_messages_to_bedrock_messages(
messages: list[dict[str, Any]], has_tools: bool, supports_system_prompts: bool
) -> list[dict[str, Any]]:
"""Convert messages from OAI format to Bedrock format.
We correct for any specific role orders and types, etc.
AWS Bedrock requires messages to alternate between user and assistant roles. This function ensures that the messages
are in the correct order and format for Bedrock by inserting "Please continue" messages as needed.
This is the same method as the one in the Autogen Anthropic client
"""
# Track whether we have tools passed in. If not, tool use / result messages should be converted to text messages.
# Bedrock requires a tools parameter with the tools listed, if there are other messages with tool use or tool results.
# This can occur when we don't need tool calling, such as for group chat speaker selection
# Convert messages to Bedrock compliant format
# Take out system messages if the model supports it, otherwise leave them in.
if supports_system_prompts:
messages = [x for x in messages if x["role"] != "system"]
else:
# Replace role="system" with role="user"
for msg in messages:
if msg["role"] == "system":
msg["role"] = "user"
processed_messages = []
# Used to interweave user messages to ensure user/assistant alternating
user_continue_message = {"content": [{"text": "Please continue."}], "role": "user"}
assistant_continue_message = {
"content": [{"text": "Please continue."}],
"role": "assistant",
}
tool_use_messages = 0
tool_result_messages = 0
last_tool_use_index = -1
last_tool_result_index = -1
# user_role_index = 0 if supports_system_prompts else 1 # If system prompts are supported, messages start with user, otherwise they'll be the second message
for message in messages:
# New messages will be added here, manage role alternations
expected_role = "user" if len(processed_messages) % 2 == 0 else "assistant"
if "tool_calls" in message:
# Map the tool call options to Bedrock's format
tool_uses = []
tool_names = []
for tool_call in message["tool_calls"]:
tool_uses.append({
"toolUse": {
"toolUseId": tool_call["id"],
"name": tool_call["function"]["name"],
"input": json.loads(tool_call["function"]["arguments"]),
}
})
if has_tools:
tool_use_messages += 1
tool_names.append(tool_call["function"]["name"])
if expected_role == "user":
# Insert an extra user message as we will append an assistant message
processed_messages.append(user_continue_message)
if has_tools:
processed_messages.append({"role": "assistant", "content": tool_uses})
last_tool_use_index = len(processed_messages) - 1
else:
# Not using tools, so put in a plain text message
processed_messages.append({
"role": "assistant",
"content": [{"text": f"Some internal function(s) that could be used: [{', '.join(tool_names)}]"}],
})
elif "tool_call_id" in message:
if has_tools:
# Map the tool usage call to tool_result for Bedrock
tool_result = {
"toolResult": {
"toolUseId": message["tool_call_id"],
"content": [{"text": message["content"]}],
}
}
# If the previous message also had a tool_result, add it to that
# Otherwise append a new message
if last_tool_result_index == len(processed_messages) - 1:
processed_messages[-1]["content"].append(tool_result)
else:
if expected_role == "assistant":
# Insert an extra assistant message as we will append a user message
processed_messages.append(assistant_continue_message)
processed_messages.append({"role": "user", "content": [tool_result]})
last_tool_result_index = len(processed_messages) - 1
tool_result_messages += 1
else:
# Not using tools, so put in a plain text message
processed_messages.append({
"role": "user",
"content": [{"text": f"Running the function returned: {message['content']}"}],
})
elif message["content"] == "":
# Ignoring empty messages
pass
else:
if expected_role != message["role"] and not (len(processed_messages) == 0 and message["role"] == "system"):
# Inserting the alternating continue message (ignore if it's the first message and a system message)
processed_messages.append(
user_continue_message if expected_role == "user" else assistant_continue_message
)
processed_messages.append({
"role": message["role"],
"content": parse_content_parts(message=message),
})
# We'll replace the last tool_use if there's no tool_result (occurs if we finish the conversation before running the function)
if has_tools and tool_use_messages != tool_result_messages:
processed_messages[last_tool_use_index] = assistant_continue_message
# name is not a valid field on messages
for message in processed_messages:
if "name" in message:
message.pop("name", None)
# Note: When using reflection_with_llm we may end up with an "assistant" message as the last message and that may cause a blank response
# So, if the last role is not user, add a 'user' continue message at the end
if processed_messages[-1]["role"] != "user":
processed_messages.append(user_continue_message)
return processed_messages
def parse_content_parts(
message: dict[str, Any],
) -> list[dict[str, Any]]:
content: str | list[dict[str, Any]] = message.get("content")
if isinstance(content, str):
return [
{
"text": content,
}
]
content_parts = []
for part in content:
# part_content: Dict = part.get("content")
if "text" in part: # part_content:
content_parts.append({
"text": part.get("text"),
})
elif "image_url" in part: # part_content:
image_data, content_type = parse_image(part.get("image_url").get("url"))
content_parts.append({
"image": {
"format": content_type[6:], # image/
"source": {"bytes": image_data},
},
})
else:
# Ignore..
continue
return content_parts
def parse_image(image_url: str) -> tuple[bytes, str]:
"""Try to get the raw data from an image url.
Ref: https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ImageSource.html
returns a tuple of (Image Data, Content Type)
"""
pattern = r"^data:(image/[a-z]*);base64,\s*"
content_type = re.search(pattern, image_url)
# if already base64 encoded.
# Only supports 'image/jpeg', 'image/png', 'image/gif' or 'image/webp'
if content_type:
image_data = re.sub(pattern, "", image_url)
return base64.b64decode(image_data), content_type.group(1)
# Send a request to the image URL
response = requests.get(image_url)
# Check if the request was successful
if response.status_code == 200:
content_type = response.headers.get("Content-Type")
if not content_type.startswith("image"):
content_type = "image/jpeg"
# Get the image content
image_content = response.content
return image_content, content_type
else:
raise RuntimeError("Unable to access the image url")
def format_tools(tools: list[dict[str, Any]]) -> dict[Literal["tools"], list[dict[str, Any]]]:
converted_schema = {"tools": []}
for tool in tools:
if tool["type"] == "function":
function = tool["function"]
converted_tool = {
"toolSpec": {
"name": function["name"],
"description": function["description"],
"inputSchema": {"json": {"type": "object", "properties": {}, "required": []}},
}
}
for prop_name, prop_details in function["parameters"]["properties"].items():
converted_tool["toolSpec"]["inputSchema"]["json"]["properties"][prop_name] = {
"type": prop_details["type"],
"description": prop_details.get("description", ""),
}
if "enum" in prop_details:
converted_tool["toolSpec"]["inputSchema"]["json"]["properties"][prop_name]["enum"] = prop_details[
"enum"
]
if "default" in prop_details:
converted_tool["toolSpec"]["inputSchema"]["json"]["properties"][prop_name]["default"] = (
prop_details["default"]
)
if "required" in function["parameters"]:
converted_tool["toolSpec"]["inputSchema"]["json"]["required"] = function["parameters"]["required"]
converted_schema["tools"].append(converted_tool)
return converted_schema
def format_tool_calls(content):
"""Converts Converse API response tool calls to AG2 format"""
tool_calls = []
for tool_request in content:
if "toolUse" in tool_request:
tool = tool_request["toolUse"]
tool_calls.append(
ChatCompletionMessageToolCall(
id=tool["toolUseId"],
function={
"name": tool["name"],
"arguments": json.dumps(tool["input"]),
},
type="function",
)
)
return tool_calls
def convert_stop_reason_to_finish_reason(
stop_reason: str,
) -> Literal["stop", "length", "tool_calls", "content_filter"]:
"""Converts Bedrock finish reasons to our finish reasons, according to OpenAI:
- stop: if the model hit a natural stop point or a provided stop sequence,
- length: if the maximum number of tokens specified in the request was reached,
- content_filter: if content was omitted due to a flag from our content filters,
- tool_calls: if the model called a tool
"""
if stop_reason:
finish_reason_mapping = {
"tool_use": "tool_calls",
"finished": "stop",
"end_turn": "stop",
"max_tokens": "length",
"stop_sequence": "stop",
"complete": "stop",
"content_filtered": "content_filter",
}
return finish_reason_mapping.get(stop_reason.lower(), stop_reason.lower())
warnings.warn(f"Unsupported stop reason: {stop_reason}", UserWarning)
return None
# NOTE: As this will be quite dynamic, it's expected that the developer will use the "price" parameter in their config
# These may be removed.
PRICES_PER_K_TOKENS = {
"meta.llama3-8b-instruct-v1:0": (0.0003, 0.0006),
"meta.llama3-70b-instruct-v1:0": (0.00265, 0.0035),
"mistral.mistral-7b-instruct-v0:2": (0.00015, 0.0002),
"mistral.mixtral-8x7b-instruct-v0:1": (0.00045, 0.0007),
"mistral.mistral-large-2402-v1:0": (0.004, 0.012),
"mistral.mistral-small-2402-v1:0": (0.001, 0.003),
}
def calculate_cost(input_tokens: int, output_tokens: int, model_id: str) -> float:
"""Calculate the cost of the completion using the Bedrock pricing."""
if model_id in PRICES_PER_K_TOKENS:
input_cost_per_k, output_cost_per_k = PRICES_PER_K_TOKENS[model_id]
input_cost = (input_tokens / 1000) * input_cost_per_k
output_cost = (output_tokens / 1000) * output_cost_per_k
return input_cost + output_cost
else:
warnings.warn(
f'Cannot get the costs for {model_id}. The cost will be 0. In your config_list, add field {{"price" : [prompt_price_per_1k, completion_token_price_per_1k]}} for customized pricing.',
UserWarning,
)
return 0