/
OS-World3a4b673
import os
import json
import logging
import backoff
logger = logging.getLogger()
doubao_logger = logging.getLogger("doubao_api")
import requests
from typing import List, Dict, Any, Optional, Union
import numpy as np
from anthropic import Anthropic
from openai import (
AzureOpenAI,
APIConnectionError,
APIError,
AzureOpenAI,
OpenAI,
RateLimitError,
)
from google import genai
from google.genai import types
from zhipuai import ZhipuAI
from groq import Groq
import boto3
import exa_py
from typing import List, Dict, Any, Optional, Union, Tuple
class ModelPricing:
def __init__(self, pricing_file: str = "model_pricing.json"):
self.pricing_file = pricing_file
self.pricing_data = self._load_pricing()
def _load_pricing(self) -> Dict:
if os.path.exists(self.pricing_file):
try:
with open(self.pricing_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Warning: Failed to load pricing file {self.pricing_file}: {e}")
return {
"default": {"input": 0, "output": 0}
}
def get_price(self, model: str) -> Dict[str, float]:
# Handle nested pricing data structure
if "llm_models" in self.pricing_data:
# Iterate through all LLM model categories
for category, models in self.pricing_data["llm_models"].items():
# Direct model name matching
if model in models:
pricing = models[model]
return self._parse_pricing(pricing)
# Fuzzy matching for model names
for model_name in models:
if model_name in model or model in model_name:
pricing = models[model_name]
return self._parse_pricing(pricing)
# Handle embedding models
if "embedding_models" in self.pricing_data:
for category, models in self.pricing_data["embedding_models"].items():
if model in models:
pricing = models[model]
return self._parse_pricing(pricing)
for model_name in models:
if model_name in model or model in model_name:
pricing = models[model_name]
return self._parse_pricing(pricing)
# Default pricing
return {"input": 0, "output": 0}
def _parse_pricing(self, pricing: Dict[str, str]) -> Dict[str, float]:
"""Parse pricing strings and convert to numeric values"""
result = {}
for key, value in pricing.items():
if isinstance(value, str):
# Remove currency symbols and units, convert to float
clean_value = value.replace('$', '').replace('¥', '').replace(',', '')
try:
result[key] = float(clean_value)
except ValueError:
result[key] = 0.0
else:
result[key] = float(value) if value else 0.0
return result
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
pricing = self.get_price(model)
input_cost = (input_tokens / 1000000) * pricing["input"]
output_cost = (output_tokens / 1000000) * pricing["output"]
return input_cost + output_cost
# Initialize pricing manager with correct pricing file path
pricing_file = os.path.join(os.path.dirname(__file__), 'model_pricing.json')
pricing_manager = ModelPricing(pricing_file)
def extract_token_usage(response, provider: str) -> Tuple[int, int]:
if "-" in provider:
api_type, vendor = provider.split("-", 1)
else:
api_type, vendor = "llm", provider
if api_type == "llm":
if vendor in ["openai", "qwen", "deepseek", "doubao", "siliconflow", "monica", "vllm", "groq", "zhipu", "gemini", "openrouter", "azureopenai", "huggingface", "exa", "lybic"]:
if hasattr(response, 'usage') and response.usage:
return response.usage.prompt_tokens, response.usage.completion_tokens
elif vendor == "anthropic":
if hasattr(response, 'usage') and response.usage:
return response.usage.input_tokens, response.usage.output_tokens
elif vendor == "bedrock":
if isinstance(response, dict) and "usage" in response:
usage = response["usage"]
return usage.get("input_tokens", 0), usage.get("output_tokens", 0)
elif api_type == "embedding":
if vendor in ["openai", "azureopenai", "qwen", "doubao"]:
if hasattr(response, 'usage') and response.usage:
return response.usage.prompt_tokens, 0
elif vendor == "jina":
if isinstance(response, dict) and "usage" in response:
total_tokens = response["usage"].get("total_tokens", 0)
return total_tokens, 0
elif vendor == "gemini":
if hasattr(response, 'usage') and response.usage:
return response.usage.prompt_tokens, 0
return 0, 0
def calculate_tokens_and_cost(response, provider: str, model: str) -> Tuple[List[int], float]:
input_tokens, output_tokens = extract_token_usage(response, provider)
total_tokens = input_tokens + output_tokens
cost = pricing_manager.calculate_cost(model, input_tokens, output_tokens)
return [input_tokens, output_tokens, total_tokens], cost
class LMMEngine:
pass
# ==================== LLM ====================
class LMMEngineOpenAI(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-openai"
api_key = api_key or os.getenv("OPENAI_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named OPENAI_API_KEY"
)
self.base_url = base_url
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
if not self.base_url:
self.llm_client = OpenAI(api_key=self.api_key)
else:
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
**({} if self.model in ["o3", "o3-pro"] else {"temperature": temperature}),
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineLybic(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-lybic"
api_key = api_key or os.getenv("LYBIC_LLM_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named LYBIC_LLM_API_KEY"
)
self.base_url = base_url or "https://aigw.lybicai.com/v1"
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=1, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
# temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineQwen(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, enable_thinking=False, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.enable_thinking = enable_thinking
self.provider = "llm-qwen"
api_key = api_key or os.getenv("DASHSCOPE_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named DASHSCOPE_API_KEY"
)
self.base_url = base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
# For Qwen3 models, we need to handle thinking mode
extra_body = {}
if self.model.startswith("qwen3") and not self.enable_thinking:
extra_body["enable_thinking"] = False
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**extra_body,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineDoubao(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-doubao"
api_key = api_key or os.getenv("ARK_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named ARK_API_KEY"
)
self.base_url = base_url or "https://ark.cn-beijing.volces.com/api/v3"
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
# doubao_logger.info(f"Doubao API Call - Model: {self.model}, Temperature: {temperature}, Max Tokens: {max_new_tokens}")
# doubao_logger.info(f"Doubao API Input - Messages count: {len(messages)}")
# doubao_logger.info(f"Doubao API Input - messages: {messages}")
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
extra_body={
"thinking": {
"type": "disabled",
# "type": "enabled",
# "type": "auto",
}
},
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
# doubao_logger.info(f"Doubao API Response - Content length: {len(content) if content else 0}, Tokens: {total_tokens}, Cost: {cost}")
# doubao_logger.info(f"Doubao API Response - Content: {content}")
return content, total_tokens, cost
class LMMEngineAnthropic(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, thinking=False, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.thinking = thinking
self.provider = "llm-anthropic"
api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named ANTHROPIC_API_KEY"
)
self.api_key = api_key
self.llm_client = Anthropic(api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
if self.thinking:
response = self.llm_client.messages.create(
system=messages[0]["content"][0]["text"],
model=self.model,
messages=messages[1:],
max_tokens=8192,
thinking={"type": "enabled", "budget_tokens": 4096},
**kwargs,
)
thoughts = response.content[0].thinking
print("CLAUDE 3.7 THOUGHTS:", thoughts)
content = response.content[1].text
else:
response = self.llm_client.messages.create(
system=messages[0]["content"][0]["text"],
model=self.model,
messages=messages[1:],
max_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.content[0].text
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineGemini(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-gemini"
api_key = api_key or os.getenv("GEMINI_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named GEMINI_API_KEY"
)
self.base_url = base_url or os.getenv("GEMINI_ENDPOINT_URL")
if self.base_url is None:
raise ValueError(
"An endpoint URL needs to be provided in either the endpoint_url parameter or as an environment variable named GEMINI_ENDPOINT_URL"
)
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
# reasoning_effort="low",
extra_body={
'extra_body': {
"google": {
"thinking_config": {
"thinking_budget": 128,
"include_thoughts": True
}
}
}
},
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineOpenRouter(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-openrouter"
api_key = api_key or os.getenv("OPENROUTER_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named OPENROUTER_API_KEY"
)
self.base_url = base_url or os.getenv("OPEN_ROUTER_ENDPOINT_URL")
if self.base_url is None:
raise ValueError(
"An endpoint URL needs to be provided in either the endpoint_url parameter or as an environment variable named OPEN_ROUTER_ENDPOINT_URL"
)
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineAzureOpenAI(LMMEngine):
def __init__(
self,
base_url=None,
api_key=None,
azure_endpoint=None,
model=None,
api_version=None,
rate_limit=-1,
**kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-azureopenai"
assert api_version is not None, "api_version must be provided"
self.api_version = api_version
api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named AZURE_OPENAI_API_KEY"
)
self.api_key = api_key
azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
if azure_endpoint is None:
raise ValueError(
"An Azure API endpoint needs to be provided in either the azure_endpoint parameter or as an environment variable named AZURE_OPENAI_ENDPOINT"
)
self.azure_endpoint = azure_endpoint
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = AzureOpenAI(
azure_endpoint=self.azure_endpoint,
api_key=self.api_key,
api_version=self.api_version,
)
self.cost = 0.0
# @backoff.on_exception(backoff.expo, (APIConnectionError, APIError, RateLimitError), max_tries=10)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEnginevLLM(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.api_key = api_key
self.provider = "llm-vllm"
self.base_url = base_url or os.getenv("vLLM_ENDPOINT_URL")
if self.base_url is None:
raise ValueError(
"An endpoint URL needs to be provided in either the endpoint_url parameter or as an environment variable named vLLM_ENDPOINT_URL"
)
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
# @backoff.on_exception(backoff.expo, (APIConnectionError, APIError, RateLimitError), max_tries=10)
# TODO: Default params chosen for the Qwen model
def generate(
self,
messages,
temperature=0.0,
top_p=0.8,
repetition_penalty=1.05,
max_new_tokens=512,
**kwargs
):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
top_p=top_p,
extra_body={"repetition_penalty": repetition_penalty},
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineHuggingFace(LMMEngine):
def __init__(self, base_url=None, api_key=None, rate_limit=-1, **kwargs):
assert base_url is not None, "HuggingFace endpoint must be provided"
self.base_url = base_url
self.model = base_url.split('/')[-1] if base_url else "huggingface-tgi"
self.provider = "llm-huggingface"
api_key = api_key or os.getenv("HF_TOKEN")
if api_key is None:
raise ValueError(
"A HuggingFace token needs to be provided in either the api_key parameter or as an environment variable named HF_TOKEN"
)
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model="tgi",
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineDeepSeek(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-deepseek"
api_key = api_key or os.getenv("DEEPSEEK_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named DEEPSEEK_API_KEY"
)
self.base_url = base_url or "https://api.deepseek.com"
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineZhipu(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-zhipu"
api_key = api_key or os.getenv("ZHIPU_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named ZHIPU_API_KEY"
)
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
# Use ZhipuAI client directly instead of OpenAI compatibility layer
self.llm_client = ZhipuAI(api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content # type: ignore
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineGroq(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-groq"
api_key = api_key or os.getenv("GROQ_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named GROQ_API_KEY"
)
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
# Use Groq client directly
self.llm_client = Groq(api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineSiliconflow(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-siliconflow"
api_key = api_key or os.getenv("SILICONFLOW_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named SILICONFLOW_API_KEY"
)
self.base_url = base_url or "https://api.siliconflow.cn/v1"
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineMonica(LMMEngine):
def __init__(
self, base_url=None, api_key=None, model=None, rate_limit=-1, **kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-monica"
api_key = api_key or os.getenv("MONICA_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named MONICA_API_KEY"
)
self.base_url = base_url or "https://openapi.monica.im/v1"
self.api_key = api_key
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
self.llm_client = OpenAI(base_url=self.base_url, api_key=self.api_key)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
response = self.llm_client.chat.completions.create(
model=self.model,
messages=messages,
max_completion_tokens=max_new_tokens if max_new_tokens else 8192,
temperature=temperature,
**kwargs,
)
content = response.choices[0].message.content
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return content, total_tokens, cost
class LMMEngineAWSBedrock(LMMEngine):
def __init__(
self,
aws_access_key=None,
aws_secret_key=None,
aws_region=None,
model=None,
rate_limit=-1,
**kwargs
):
assert model is not None, "model must be provided"
self.model = model
self.provider = "llm-bedrock"
# Claude model mapping for AWS Bedrock
self.claude_model_map = {
"claude-opus-4": "anthropic.claude-opus-4-20250514-v1:0",
"claude-sonnet-4": "anthropic.claude-sonnet-4-20250514-v1:0",
"claude-3-7-sonnet": "anthropic.claude-3-7-sonnet-20250219-v1:0",
"claude-3-5-sonnet": "anthropic.claude-3-5-sonnet-20241022-v2:0",
"claude-3-5-sonnet-20241022": "anthropic.claude-3-5-sonnet-20241022-v2:0",
"claude-3-5-sonnet-20240620": "anthropic.claude-3-5-sonnet-20240620-v1:0",
"claude-3-5-haiku": "anthropic.claude-3-5-haiku-20241022-v1:0",
"claude-3-haiku": "anthropic.claude-3-haiku-20240307-v1:0",
"claude-3-sonnet": "anthropic.claude-3-sonnet-20240229-v1:0",
"claude-3-opus": "anthropic.claude-3-opus-20240229-v1:0",
}
# Get the actual Bedrock model ID
self.bedrock_model_id = self.claude_model_map.get(model, model)
# AWS credentials
aws_access_key = aws_access_key or os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_key = aws_secret_key or os.getenv("AWS_SECRET_ACCESS_KEY")
aws_region = aws_region or os.getenv("AWS_DEFAULT_REGION") or "us-west-2"
if aws_access_key is None:
raise ValueError(
"AWS Access Key needs to be provided in either the aws_access_key parameter or as an environment variable named AWS_ACCESS_KEY_ID"
)
if aws_secret_key is None:
raise ValueError(
"AWS Secret Key needs to be provided in either the aws_secret_key parameter or as an environment variable named AWS_SECRET_ACCESS_KEY"
)
self.aws_region = aws_region
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
# Initialize Bedrock client
self.bedrock_client = boto3.client(
service_name="bedrock-runtime",
region_name=aws_region,
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key
)
@backoff.on_exception(
backoff.expo, (APIConnectionError, APIError, RateLimitError), max_time=60
)
def generate(self, messages, temperature=0.0, max_new_tokens=None, **kwargs):
"""Generate the next message based on previous messages"""
# Convert messages to Bedrock format
# Extract system message if present
system_message = None
user_messages = []
for message in messages:
if message["role"] == "system":
if isinstance(message["content"], list):
system_message = message["content"][0]["text"]
else:
system_message = message["content"]
else:
# Handle both list and string content formats
if isinstance(message["content"], list):
content = message["content"][0]["text"] if message["content"] else ""
else:
content = message["content"]
user_messages.append({
"role": message["role"],
"content": content
})
# Prepare the body for Bedrock
body = {
"max_completion_tokens": max_new_tokens if max_new_tokens else 8192,
"messages": user_messages,
"anthropic_version": "bedrock-2023-05-31"
}
if temperature > 0:
body["temperature"] = temperature
if system_message:
body["system"] = system_message
try:
response = self.bedrock_client.invoke_model(
body=json.dumps(body),
modelId=self.bedrock_model_id
)
response_body = json.loads(response.get("body").read())
if "content" in response_body and response_body["content"]:
content = response_body["content"][0]["text"]
else:
raise ValueError("No content in response")
total_tokens, cost = calculate_tokens_and_cost(response_body, self.provider, self.model)
return content, total_tokens, cost
except Exception as e:
print(f"AWS Bedrock error: {e}")
raise
# ==================== Embedding ====================
class OpenAIEmbeddingEngine(LMMEngine):
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
api_key=None,
**kwargs
):
"""Init an OpenAI Embedding engine
Args:
embedding_model (str, optional): Model name. Defaults to "text-embedding-3-small".
api_key (_type_, optional): Auth key from OpenAI. Defaults to None.
"""
self.model = embedding_model
self.provider = "embedding-openai"
api_key = api_key or os.getenv("OPENAI_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named OPENAI_API_KEY"
)
self.api_key = api_key
@backoff.on_exception(
backoff.expo,
(
APIError,
RateLimitError,
APIConnectionError,
),
)
def get_embeddings(self, text: str) -> Tuple[np.ndarray, List[int], float]:
client = OpenAI(api_key=self.api_key)
response = client.embeddings.create(model=self.model, input=text)
embeddings = np.array([data.embedding for data in response.data])
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return embeddings, total_tokens, cost
class GeminiEmbeddingEngine(LMMEngine):
def __init__(
self,
embedding_model: str = "text-embedding-004",
api_key=None,
**kwargs
):
"""Init an Gemini Embedding engine
Args:
embedding_model (str, optional): Model name. Defaults to "text-embedding-004".
api_key (_type_, optional): Auth key from Gemini. Defaults to None.
"""
self.model = embedding_model
self.provider = "embedding-gemini"
api_key = api_key or os.getenv("GEMINI_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named GEMINI_API_KEY"
)
self.api_key = api_key
@backoff.on_exception(
backoff.expo,
(
APIError,
RateLimitError,
APIConnectionError,
),
)
def get_embeddings(self, text: str) -> Tuple[np.ndarray, List[int], float]:
client = genai.Client(api_key=self.api_key)
result = client.models.embed_content(
model=self.model,
contents=text,
config=types.EmbedContentConfig(task_type="SEMANTIC_SIMILARITY"),
)
embeddings = np.array([i.values for i in result.embeddings]) # type: ignore
total_tokens, cost = calculate_tokens_and_cost(result, self.provider, self.model)
return embeddings, total_tokens, cost
class AzureOpenAIEmbeddingEngine(LMMEngine):
def __init__(
self,
embedding_model: str = "text-embedding-3-small",
api_key=None,
api_version=None,
endpoint_url=None,
**kwargs
):
"""Init an Azure OpenAI Embedding engine
Args:
embedding_model (str, optional): Model name. Defaults to "text-embedding-3-small".
api_key (_type_, optional): Auth key from Azure OpenAI. Defaults to None.
api_version (_type_, optional): API version. Defaults to None.
endpoint_url (_type_, optional): Endpoint URL. Defaults to None.
"""
self.model = embedding_model
self.provider = "embedding-azureopenai"
api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named AZURE_OPENAI_API_KEY"
)
self.api_key = api_key
api_version = api_version or os.getenv("OPENAI_API_VERSION")
if api_version is None:
raise ValueError(
"An API Version needs to be provided in either the api_version parameter or as an environment variable named OPENAI_API_VERSION"
)
self.api_version = api_version
endpoint_url = endpoint_url or os.getenv("AZURE_OPENAI_ENDPOINT")
if endpoint_url is None:
raise ValueError(
"An Endpoint URL needs to be provided in either the endpoint_url parameter or as an environment variable named AZURE_OPENAI_ENDPOINT"
)
self.endpoint_url = endpoint_url
@backoff.on_exception(
backoff.expo,
(
APIError,
RateLimitError,
APIConnectionError,
),
)
def get_embeddings(self, text: str) -> Tuple[np.ndarray, List[int], float]:
client = AzureOpenAI(
api_key=self.api_key,
api_version=self.api_version,
azure_endpoint=self.endpoint_url,
)
response = client.embeddings.create(input=text, model=self.model)
embeddings = np.array([data.embedding for data in response.data])
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return embeddings, total_tokens, cost
class DashScopeEmbeddingEngine(LMMEngine):
def __init__(
self,
embedding_model: str = "text-embedding-v4",
api_key=None,
dimensions: int = 1024,
**kwargs
):
"""Init a DashScope Embedding engine
Args:
embedding_model (str, optional): Model name. Defaults to "text-embedding-v4".
api_key (_type_, optional): Auth key from DashScope. Defaults to None.
dimensions (int, optional): Embedding dimensions. Defaults to 1024.
"""
self.model = embedding_model
self.dimensions = dimensions
self.provider = "embedding-qwen"
api_key = api_key or os.getenv("DASHSCOPE_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named DASHSCOPE_API_KEY"
)
self.api_key = api_key
# Initialize OpenAI client with DashScope base URL
self.client = OpenAI(
api_key=self.api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
@backoff.on_exception(
backoff.expo,
(
APIError,
RateLimitError,
APIConnectionError,
),
)
def get_embeddings(self, text: str) -> Tuple[np.ndarray, List[int], float]:
response = self.client.embeddings.create(
model=self.model,
input=text,
dimensions=self.dimensions,
encoding_format="float"
)
embeddings = np.array([data.embedding for data in response.data])
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
return embeddings, total_tokens, cost
class DoubaoEmbeddingEngine(LMMEngine):
def __init__(
self,
embedding_model: str = "doubao-embedding-256",
api_key=None,
**kwargs
):
"""Init a Doubao Embedding engine
Args:
embedding_model (str, optional): Model name. Defaults to "doubao-embedding-256".
api_key (_type_, optional): Auth key from Doubao. Defaults to None.
"""
self.model = embedding_model
self.provider = "embedding-doubao"
api_key = api_key or os.getenv("ARK_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named ARK_API_KEY"
)
self.api_key = api_key
self.base_url = "https://ark.cn-beijing.volces.com/api/v3"
# Use OpenAI-compatible client for text embeddings
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
@backoff.on_exception(
backoff.expo,
(
APIError,
RateLimitError,
APIConnectionError,
),
)
def get_embeddings(self, text: str) -> Tuple[np.ndarray, List[int], float]:
# Log embedding request
logger.info(f"Doubao Embedding API Call - Model: {self.model}, Text length: {len(text)}")
doubao_logger.info(f"Doubao Embedding API Call - Model: {self.model}, Text length: {len(text)}")
response = self.client.embeddings.create(
model=self.model,
input=text,
encoding_format="float"
)
embeddings = np.array([data.embedding for data in response.data])
total_tokens, cost = calculate_tokens_and_cost(response, self.provider, self.model)
# Log embedding response
logger.info(f"Doubao Embedding API Response - Embedding dimension: {embeddings.shape}, Tokens: {total_tokens}, Cost: {cost}")
doubao_logger.info(f"Doubao Embedding API Response - Embedding dimension: {embeddings.shape}, Tokens: {total_tokens}, Cost: {cost}")
return embeddings, total_tokens, cost
class JinaEmbeddingEngine(LMMEngine):
def __init__(
self,
embedding_model: str = "jina-embeddings-v4",
api_key=None,
task: str = "retrieval.query",
**kwargs
):
"""Init a Jina AI Embedding engine
Args:
embedding_model (str, optional): Model name. Defaults to "jina-embeddings-v4".
api_key (_type_, optional): Auth key from Jina AI. Defaults to None.
task (str, optional): Task type. Options: "retrieval.query", "retrieval.passage", "text-matching". Defaults to "retrieval.query".
"""
self.model = embedding_model
self.task = task
self.provider = "embedding-jina"
api_key = api_key or os.getenv("JINA_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named JINA_API_KEY"
)
self.api_key = api_key
self.base_url = "https://api.jina.ai/v1"
@backoff.on_exception(
backoff.expo,
(
APIError,
RateLimitError,
APIConnectionError,
),
)
def get_embeddings(self, text: str) -> Tuple[np.ndarray, List[int], float]:
import requests
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": self.model,
"task": self.task,
"input": [
{
"text": text
}
]
}
response = requests.post(
f"{self.base_url}/embeddings",
headers=headers,
json=data
)
if response.status_code != 200:
raise Exception(f"Jina AI API error: {response.text}")
result = response.json()
embeddings = np.array([data["embedding"] for data in result["data"]])
total_tokens, cost = calculate_tokens_and_cost(result, self.provider, self.model)
return embeddings, total_tokens, cost
# ==================== webSearch ====================
class SearchEngine:
"""Base class for search engines"""
pass
class BochaAISearchEngine(SearchEngine):
def __init__(
self,
api_key: str|None = None,
base_url: str = "https://api.bochaai.com/v1",
rate_limit: int = -1,
**kwargs
):
"""Init a Bocha AI Search engine
Args:
api_key (str, optional): Auth key from Bocha AI. Defaults to None.
base_url (str, optional): Base URL for the API. Defaults to "https://api.bochaai.com/v1".
rate_limit (int, optional): Rate limit per minute. Defaults to -1 (no limit).
"""
api_key = api_key or os.getenv("BOCHA_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named BOCHA_API_KEY"
)
self.api_key = api_key
self.base_url = base_url
self.endpoint = f"{base_url}/ai-search"
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
@backoff.on_exception(
backoff.expo,
(
APIConnectionError,
APIError,
RateLimitError,
requests.exceptions.RequestException,
),
max_time=60
)
def search(
self,
query: str,
freshness: str = "noLimit",
answer: bool = True,
stream: bool = False,
**kwargs
) -> Union[Dict[str, Any], Any]:
"""Search with AI and return intelligent answer
Args:
query (str): Search query
freshness (str, optional): Freshness filter. Defaults to "noLimit".
answer (bool, optional): Whether to return answer. Defaults to True.
stream (bool, optional): Whether to stream response. Defaults to False.
Returns:
Union[Dict[str, Any], Any]: AI search results with sources and answer
"""
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
payload = {
"query": query,
"freshness": freshness,
"answer": answer,
"stream": stream,
**kwargs
}
if stream:
result = self._stream_search(headers, payload)
return result, [0, 0, 0], 0.06
else:
result = self._regular_search(headers, payload)
return result, [0, 0, 0], 0.06
def _regular_search(self, headers: Dict[str, str], payload: Dict[str, Any]) -> Dict[str, Any]:
"""Regular non-streaming search"""
response = requests.post(
self.endpoint,
headers=headers,
json=payload
)
if response.status_code != 200:
raise APIError(f"Bocha AI Search API error: {response.text}") # type: ignore
return response.json()
def _stream_search(self, headers: Dict[str, str], payload: Dict[str, Any]):
"""Streaming search response"""
response = requests.post(
self.endpoint,
headers=headers,
json=payload,
stream=True
)
if response.status_code != 200:
raise APIError(f"Bocha AI Search API error: {response.text}") # type: ignore
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data:'):
data = line[5:].strip()
if data and data != '{"event":"done"}':
try:
yield json.loads(data)
except json.JSONDecodeError:
continue
def get_answer(self, query: str, **kwargs) -> Tuple[str, int, float]:
"""Get AI generated answer only"""
result, _, remaining_balance = self.search(query, answer=True, **kwargs)
# Extract answer from messages
messages = result.get("messages", []) # type: ignore
answer = ""
for message in messages:
if message.get("type") == "answer":
answer = message.get("content", "")
break
return answer, [0,0,0], remaining_balance # type: ignore
def get_sources(self, query: str, **kwargs) -> List[Dict[str, Any]]:
"""Get source materials only"""
result, _, remaining_balance = self.search(query, **kwargs)
# Extract sources from messages
sources = []
messages = result.get("messages", []) # type: ignore
for message in messages:
if message.get("type") == "source":
content_type = message.get("content_type", "")
if content_type in ["webpage", "image", "video", "baike_pro", "medical_common"]:
sources.append({
"type": content_type,
"content": json.loads(message.get("content", "{}"))
})
return sources, 0, remaining_balance # type: ignore
def get_follow_up_questions(self, query: str, **kwargs) -> List[str]:
"""Get follow-up questions"""
result, _, remaining_balance = self.search(query, **kwargs)
# Extract follow-up questions from messages
follow_ups = []
messages = result.get("messages", []) # type: ignore
for message in messages:
if message.get("type") == "follow_up":
follow_ups.append(message.get("content", ""))
return follow_ups, 0, remaining_balance # type: ignore
class ExaResearchEngine(SearchEngine):
def __init__(
self,
api_key: str|None = None,
base_url: str = "https://api.exa.ai",
rate_limit: int = -1,
**kwargs
):
"""Init an Exa Research engine
Args:
api_key (str, optional): Auth key from Exa AI. Defaults to None.
base_url (str, optional): Base URL for the API. Defaults to "https://api.exa.ai".
rate_limit (int, optional): Rate limit per minute. Defaults to -1 (no limit).
"""
api_key = api_key or os.getenv("EXA_API_KEY")
if api_key is None:
raise ValueError(
"An API Key needs to be provided in either the api_key parameter or as an environment variable named EXA_API_KEY"
)
self.api_key = api_key
self.base_url = base_url
self.request_interval = 0 if rate_limit == -1 else 60.0 / rate_limit
# Initialize OpenAI-compatible client for chat completions
self.chat_client = OpenAI(
base_url=base_url,
api_key=api_key
)
# Initialize Exa client for research tasks
try:
from exa_py import Exa
self.exa_client = Exa(api_key=api_key)
except ImportError:
self.exa_client = None
print("Warning: exa_py not installed. Research tasks will not be available.")
@backoff.on_exception(
backoff.expo,
(
APIConnectionError,
APIError,
RateLimitError,
),
max_time=60
)
def search(self, query: str, **kwargs):
"""Standard Exa search with direct cost from API
Args:
query (str): Search query
**kwargs: Additional search parameters
Returns:
tuple: (result, tokens, cost) where cost is actual API cost
"""
headers = {
'x-api-key': self.api_key,
'Content-Type': 'application/json'
}
payload = {
"query": query,
**kwargs
}
response = requests.post(
f"{self.base_url}/search",
headers=headers,
json=payload
)
if response.status_code != 200:
raise APIError(f"Exa Search API error: {response.text}") # type: ignore
result = response.json()
cost = 0.0
if "costDollars" in result:
cost = result["costDollars"].get("total", 0.0)
return result, [0, 0, 0], cost
def chat_research(
self,
query: str,
model: str = "exa",
stream: bool = False,
**kwargs
) -> Union[str, Any]:
"""Research using chat completions interface
Args:
query (str): Research query
model (str, optional): Model name. Defaults to "exa".
stream (bool, optional): Whether to stream response. Defaults to False.
Returns:
Union[str, Any]: Research result or stream
"""
messages = [
{"role": "user", "content": query}
]
if stream:
completion = self.chat_client.chat.completions.create(
model=model,
messages=messages, # type: ignore
stream=True,
**kwargs
)
return completion
else:
completion = self.chat_client.chat.completions.create(
model=model,
messages=messages, # type: ignore
**kwargs
)
result = completion.choices[0].message.content # type: ignore
return result,[0,0,0],0.005