mirrored 16 minutes ago
0
HiroidAdd multiple new modules and tools to enhance the functionality and extensibility of the Maestro project (#333) * Added a **pyproject.toml** file to define project metadata and dependencies. * Added **run\_maestro.py** and **osworld\_run\_maestro.py** to provide the main execution logic. * Introduced multiple new modules, including **Evaluator**, **Controller**, **Manager**, and **Sub-Worker**, supporting task planning, state management, and data analysis. * Added a **tools module** containing utility functions and tool configurations to improve code reusability. * Updated the **README** and documentation with usage examples and module descriptions. These changes lay the foundation for expanding the Maestro project’s functionality and improving the user experience. Co-authored-by: Hiroid <guoliangxuan@deepmatrix.com>3a4b673
import base64

import numpy as np

from .engine import (
    LMMEngineAnthropic,
    LMMEngineAzureOpenAI,
    LMMEngineHuggingFace,
    LMMEngineOpenAI,
    LMMEngineLybic,
    LMMEngineOpenRouter,
    LMMEnginevLLM,
    LMMEngineGemini,
    LMMEngineQwen,
    LMMEngineDoubao,
    LMMEngineDeepSeek,
    LMMEngineZhipu,
    LMMEngineGroq,
    LMMEngineSiliconflow,
    LMMEngineMonica,
    LMMEngineAWSBedrock,
    OpenAIEmbeddingEngine,
    GeminiEmbeddingEngine,
    AzureOpenAIEmbeddingEngine,
    DashScopeEmbeddingEngine,
    DoubaoEmbeddingEngine,
    JinaEmbeddingEngine,
    BochaAISearchEngine,
    ExaResearchEngine,
)

class CostManager:
    """Cost manager, responsible for adding currency symbols based on engine type"""
    
    # Chinese engines use CNY
    CNY_ENGINES = {
        LMMEngineQwen, LMMEngineDoubao, LMMEngineDeepSeek, LMMEngineZhipu, 
        LMMEngineSiliconflow, DashScopeEmbeddingEngine, DoubaoEmbeddingEngine
    }
    # Other engines use USD
    USD_ENGINES = {
        LMMEngineOpenAI, LMMEngineLybic, LMMEngineAnthropic, LMMEngineAzureOpenAI, LMMEngineGemini,
        LMMEngineOpenRouter, LMMEnginevLLM, LMMEngineHuggingFace, LMMEngineGroq,
        LMMEngineMonica, LMMEngineAWSBedrock, OpenAIEmbeddingEngine, 
        GeminiEmbeddingEngine, AzureOpenAIEmbeddingEngine, JinaEmbeddingEngine
    }
    
    @classmethod
    def get_currency_symbol(cls, engine) -> str:
        engine_type = type(engine)
        
        if engine_type in cls.CNY_ENGINES:
            return ""
        elif engine_type in cls.USD_ENGINES:
            return "$"
        else:
            return "$"
    
    @classmethod
    def format_cost(cls, cost: float, engine) -> str:
        currency = cls.get_currency_symbol(engine)
        return f"{cost:.7f}{currency}"
    
    @classmethod
    def add_costs(cls, cost1: str, cost2: str) -> str:
        currency_symbols = ["$", "", "¥", "", "£"]
        currency1 = currency2 = "$"
        value1 = value2 = 0.0
        
        if isinstance(cost1, (int, float)):
            value1 = float(cost1)
            currency1 = "$"
        else:
            cost1_str = str(cost1)
            for symbol in currency_symbols:
                if symbol in cost1_str:
                    value1 = float(cost1_str.replace(symbol, "").strip())
                    currency1 = symbol
                    break
            else:
                try:
                    value1 = float(cost1_str)
                    currency1 = "$"
                except:
                    value1 = 0.0
        
        if isinstance(cost2, (int, float)):
            value2 = float(cost2)
            currency2 = "$"
        else:
            cost2_str = str(cost2)
            for symbol in currency_symbols:
                if symbol in cost2_str:
                    value2 = float(cost2_str.replace(symbol, "").strip())
                    currency2 = symbol
                    break
            else:
                try:
                    value2 = float(cost2_str)
                    currency2 = "$"
                except:
                    value2 = 0.0
        
        if currency1 != currency2:
            print(f"Warning: Different currencies in cost accumulation: {currency1} and {currency2}")
            currency = currency1
        else:
            currency = currency1
        
        total_value = value1 + value2
        return f"{total_value:.6f}{currency}"

class LLMAgent:
    def __init__(self, engine_params=None, system_prompt=None, engine=None):
        if engine is None:
            if engine_params is not None:
                engine_type = engine_params.get("engine_type")
                if engine_type == "openai":
                    self.engine = LMMEngineOpenAI(**engine_params)
                elif engine_type == "lybic":
                    self.engine = LMMEngineLybic(**engine_params)
                elif engine_type == "anthropic":
                    self.engine = LMMEngineAnthropic(**engine_params)
                elif engine_type == "azure":
                    self.engine = LMMEngineAzureOpenAI(**engine_params)
                elif engine_type == "vllm":
                    self.engine = LMMEnginevLLM(**engine_params)
                elif engine_type == "huggingface":
                    self.engine = LMMEngineHuggingFace(**engine_params)
                elif engine_type == "gemini":
                    self.engine = LMMEngineGemini(**engine_params)
                elif engine_type == "openrouter":
                    self.engine = LMMEngineOpenRouter(**engine_params)
                elif engine_type == "dashscope":
                    self.engine = LMMEngineQwen(**engine_params)
                elif engine_type == "doubao":
                    self.engine = LMMEngineDoubao(**engine_params)
                elif engine_type == "deepseek":
                    self.engine = LMMEngineDeepSeek(**engine_params)
                elif engine_type == "zhipu":
                    self.engine = LMMEngineZhipu(**engine_params)
                elif engine_type == "groq":
                    self.engine = LMMEngineGroq(**engine_params)
                elif engine_type == "siliconflow":
                    self.engine = LMMEngineSiliconflow(**engine_params)
                elif engine_type == "monica":
                    self.engine = LMMEngineMonica(**engine_params)
                elif engine_type == "aws_bedrock":
                    self.engine = LMMEngineAWSBedrock(**engine_params)
                else:
                    raise ValueError("engine_type is not supported")
            else:
                raise ValueError("engine_params must be provided")
        else:
            self.engine = engine

        self.messages = []  # Empty messages

        if system_prompt:
            self.add_system_prompt(system_prompt)
        else:
            self.add_system_prompt("You are a helpful assistant.")

    def encode_image(self, image_content):
        # if image_content is a path to an image file, check type of the image_content to verify
        if isinstance(image_content, str):
            with open(image_content, "rb") as image_file:
                return base64.b64encode(image_file.read()).decode("utf-8")
        else:
            return base64.b64encode(image_content).decode("utf-8")

    def reset(
        self,
    ):

        self.messages = [
            {
                "role": "system",
                "content": [{"type": "text", "text": self.system_prompt}],
            }
        ]

    def add_system_prompt(self, system_prompt):
        self.system_prompt = system_prompt
        if len(self.messages) > 0:
            self.messages[0] = {
                "role": "system",
                "content": [{"type": "text", "text": self.system_prompt}],
            }
        else:
            self.messages.append(
                {
                    "role": "system",
                    "content": [{"type": "text", "text": self.system_prompt}],
                }
            )

    def remove_message_at(self, index):
        """Remove a message at a given index"""
        if index < len(self.messages):
            self.messages.pop(index)

    def replace_message_at(
        self, index, text_content, image_content=None, image_detail="high"
    ):
        """Replace a message at a given index"""
        if index < len(self.messages):
            self.messages[index] = {
                "role": self.messages[index]["role"],
                "content": [{"type": "text", "text": text_content}],
            }
            if image_content:
                base64_image = self.encode_image(image_content)
                self.messages[index]["content"].append(
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{base64_image}",
                            "detail": image_detail,
                        },
                    }
                )

    def add_message(
        self,
        text_content,
        image_content=None,
        role=None,
        image_detail="high",
        put_text_last=False,
    ):
        """Add a new message to the list of messages"""

        # API-style inference from OpenAI and similar services
        if isinstance(
            self.engine,
            (
                LMMEngineAnthropic,
                LMMEngineAzureOpenAI,
                LMMEngineHuggingFace,
                LMMEngineOpenAI,
                LMMEngineLybic,
                LMMEngineOpenRouter,
                LMMEnginevLLM,
                LMMEngineGemini,
                LMMEngineQwen,
                LMMEngineDoubao,
                LMMEngineDeepSeek,
                LMMEngineZhipu,
                LMMEngineGroq,
                LMMEngineSiliconflow,
                LMMEngineMonica,
                LMMEngineAWSBedrock,
            ),
        ):
            # infer role from previous message
            if role != "user":
                if self.messages[-1]["role"] == "system":
                    role = "user"
                elif self.messages[-1]["role"] == "user":
                    role = "assistant"
                elif self.messages[-1]["role"] == "assistant":
                    role = "user"

            message = {
                "role": role,
                "content": [{"type": "text", "text": text_content}],
            }

            if isinstance(image_content, np.ndarray) or image_content:
                # Check if image_content is a list or a single image
                if isinstance(image_content, list):
                    # If image_content is a list of images, loop through each image
                    for image in image_content:
                        base64_image = self.encode_image(image)
                        message["content"].append(
                            {
                                "type": "image_url",
                                "image_url": {
                                    "url": f"data:image/png;base64,{base64_image}",
                                    "detail": image_detail,
                                },
                            }
                        )
                else:
                    # If image_content is a single image, handle it directly
                    base64_image = self.encode_image(image_content)
                    message["content"].append(
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/png;base64,{base64_image}",
                                "detail": image_detail,
                            },
                        }
                    )

            # Rotate text to be the last message if desired
            if put_text_last:
                text_content = message["content"].pop(0)
                message["content"].append(text_content)

            self.messages.append(message)

        # For API-style inference from Anthropic
        elif isinstance(self.engine, (LMMEngineAnthropic, LMMEngineAWSBedrock)):
            # infer role from previous message
            if role != "user":
                if self.messages[-1]["role"] == "system":
                    role = "user"
                elif self.messages[-1]["role"] == "user":
                    role = "assistant"
                elif self.messages[-1]["role"] == "assistant":
                    role = "user"

            message = {
                "role": role,
                "content": [{"type": "text", "text": text_content}],
            }

            if image_content:
                # Check if image_content is a list or a single image
                if isinstance(image_content, list):
                    # If image_content is a list of images, loop through each image
                    for image in image_content:
                        base64_image = self.encode_image(image)
                        message["content"].append(
                            {
                                "type": "image",
                                "source": {
                                    "type": "base64",
                                    "media_type": "image/png",
                                    "data": base64_image,
                                },
                            }
                        )
                else:
                    # If image_content is a single image, handle it directly
                    base64_image = self.encode_image(image_content)
                    message["content"].append(
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": "image/png",
                                "data": base64_image,
                            },
                        }
                    )
            self.messages.append(message)

        # Locally hosted vLLM model inference
        elif isinstance(self.engine, LMMEnginevLLM):
            # infer role from previous message
            if role != "user":
                if self.messages[-1]["role"] == "system":
                    role = "user"
                elif self.messages[-1]["role"] == "user":
                    role = "assistant"
                elif self.messages[-1]["role"] == "assistant":
                    role = "user"

            message = {
                "role": role,
                "content": [{"type": "text", "text": text_content}],
            }

            if image_content:
                # Check if image_content is a list or a single image
                if isinstance(image_content, list):
                    # If image_content is a list of images, loop through each image
                    for image in image_content:
                        base64_image = self.encode_image(image)
                        message["content"].append(
                            {
                                "type": "image_url",
                                "image_url": {
                                    "url": f"data:image;base64,{base64_image}"
                                },
                            }
                        )
                else:
                    # If image_content is a single image, handle it directly
                    base64_image = self.encode_image(image_content)
                    message["content"].append(
                        {
                            "type": "image_url",
                            "image_url": {"url": f"data:image;base64,{base64_image}"},
                        }
                    )

            self.messages.append(message)
        else:
            raise ValueError("engine_type is not supported")

    def get_response(
        self,
        user_message=None,
        messages=None,
        temperature=0.0,
        max_new_tokens=None,
        **kwargs,
    ):
        """Generate the next response based on previous messages"""
        if messages is None:
            messages = self.messages
        if user_message:
            messages.append(
                {"role": "user", "content": [{"type": "text", "text": user_message}]}
            )
    
        if isinstance(self.engine, LMMEngineLybic):
            content, total_tokens, cost = self.engine.generate(
                messages,
                max_new_tokens=max_new_tokens,  # type: ignore
                **kwargs,
            )
        else:
            content, total_tokens, cost = self.engine.generate(
                messages,
                temperature=temperature,
                max_new_tokens=max_new_tokens,  # type: ignore
                **kwargs,
            )
        
        cost_string = CostManager.format_cost(cost, self.engine)
        
        return content, total_tokens, cost_string

class EmbeddingAgent:
    def __init__(self, engine_params=None, engine=None):
        if engine is None:
            if engine_params is not None:
                engine_type = engine_params.get("engine_type")
                if engine_type == "openai":
                    self.engine = OpenAIEmbeddingEngine(**engine_params)
                elif engine_type == "gemini":
                    self.engine = GeminiEmbeddingEngine(**engine_params)
                elif engine_type == "azure":
                    self.engine = AzureOpenAIEmbeddingEngine(**engine_params)
                elif engine_type == "dashscope":
                    self.engine = DashScopeEmbeddingEngine(**engine_params)
                elif engine_type == "doubao":
                    self.engine = DoubaoEmbeddingEngine(**engine_params)
                elif engine_type == "jina":
                    self.engine = JinaEmbeddingEngine(**engine_params)
                else:
                    raise ValueError(f"Embedding engine type '{engine_type}' is not supported")
            else:
                raise ValueError("engine_params must be provided")
        else:
            self.engine = engine

    def get_embeddings(self, text):
        """Get embeddings for the given text
        
        Args:
            text (str): The text to get embeddings for
            
        Returns:
            numpy.ndarray: The embeddings for the text
        """
        embeddings, total_tokens, cost = self.engine.get_embeddings(text)
        cost_string = CostManager.format_cost(cost, self.engine)
        return embeddings, total_tokens, cost_string

    
    def get_similarity(self, text1, text2):
        """Calculate the cosine similarity between two texts
        
        Args:
            text1 (str): First text
            text2 (str): Second text
            
        Returns:
            float: Cosine similarity score between the two texts
        """
        embeddings1, tokens1, cost1 = self.get_embeddings(text1)
        embeddings2, tokens2, cost2 = self.get_embeddings(text2)
        
        # Calculate cosine similarity
        dot_product = np.dot(embeddings1, embeddings2)
        norm1 = np.linalg.norm(embeddings1)
        norm2 = np.linalg.norm(embeddings2)
        
        similarity = dot_product / (norm1 * norm2)
        total_tokens = tokens1 + tokens2
        total_cost = CostManager.add_costs(cost1, cost2)
        
        return similarity, total_tokens, total_cost
    
    def batch_get_embeddings(self, texts):
        """Get embeddings for multiple texts
        
        Args:
            texts (List[str]): List of texts to get embeddings for
            
        Returns:
            List[numpy.ndarray]: List of embeddings for each text
        """
        embeddings = []
        total_tokens = [0, 0, 0]
        if texts:
            first_embedding, first_tokens, first_cost = self.get_embeddings(texts[0])
            embeddings.append(first_embedding)
            total_tokens[0] += first_tokens[0]
            total_tokens[1] += first_tokens[1]
            total_tokens[2] += first_tokens[2]
            total_cost = first_cost
            
            for text in texts[1:]:
                embedding, tokens, cost = self.get_embeddings(text)
                embeddings.append(embedding)
                total_tokens[0] += tokens[0]
                total_tokens[1] += tokens[1]
                total_tokens[2] += tokens[2]
                total_cost = CostManager.add_costs(total_cost, cost)
        else:
            currency = CostManager.get_currency_symbol(self.engine)
            total_cost = f"0.0{currency}"
        
        return embeddings, total_tokens, total_cost


class WebSearchAgent:
    def __init__(self, engine_params=None, engine=None):
        if engine is None:
            if engine_params is not None:
                self.engine_type = engine_params.get("engine_type")
                if self.engine_type == "bocha":
                    self.engine = BochaAISearchEngine(**engine_params)
                elif self.engine_type == "exa":
                    self.engine = ExaResearchEngine(**engine_params)
                else:
                    raise ValueError(f"Web search engine type '{self.engine_type}' is not supported")
            else:
                raise ValueError("engine_params must be provided")
        else:
            self.engine = engine
    
    def get_answer(self, query, **kwargs):
        """Get a direct answer for the query
        
        Args:
            query (str): The search query
            **kwargs: Additional arguments to pass to the search engine
            
        Returns:
            str: The answer text
        """
        if isinstance(self.engine, BochaAISearchEngine):
            answer, tokens, cost = self.engine.get_answer(query, **kwargs)
            return answer, tokens, str(cost)

        elif isinstance(self.engine, ExaResearchEngine):
            # For Exa, we'll use the chat_research method which returns a complete answer
            # results, tokens, cost = self.engine.search(query, **kwargs)
            results, tokens, cost = self.engine.chat_research(query, **kwargs)
            if isinstance(results, dict) and "messages" in results:
                for message in results.get("messages", []):
                    if message.get("type") == "answer":
                        return message.get("content", ""), tokens, str(cost)
            return str(results), tokens, str(cost)

        else:
            raise ValueError(f"Web search engine type '{self.engine_type}' is not supported")