/
OS-World088e687"""
This code is adapted from AgentS2 (https://github.com/simular-ai/Agent-S)
with modifications to suit specific requirements.
"""
import logging
import textwrap
from typing import Dict, List, Tuple
from aworld.config.conf import AgentConfig
from aworld.agents.llm_agent import Agent
from aworld.core.common import Observation
from aworld.core.task import Task
from aworld.core.context.base import Context
from aworld.core.event.base import Message
from aworld.models.llm import get_llm_model
from aworld.utils.common import sync_exec
from mm_agents.aworldguiagent.grounding import ACI
from mm_agents.aworldguiagent.prompt import GENERATOR_SYS_PROMPT, REFLECTION_SYS_PROMPT
from mm_agents.aworldguiagent.utils import encode_image, extract_first_agent_function, parse_single_code_from_string, sanitize_code
from mm_agents.aworldguiagent.utils import prune_image_messages, reps_action_result
logger = logging.getLogger("desktopenv.agent")
class Worker:
    def __init__(
        self,
        engine_params: Dict,
        grounding_agent: ACI,
        platform: str = "ubuntu",
        max_trajectory_length: int = 16,
        enable_reflection: bool = True,
    ):
        """
        Worker receives the main task and generates actions, without the need of hierarchical planning
        Args:
            engine_params: Dict
                Parameters for the multimodal engine
            grounding_agent: Agent
                The grounding agent to use
            platform: str
                OS platform the agent runs on (darwin, linux, windows)
            max_trajectory_length: int
                The amount of images turns to keep
            enable_reflection: bool
                Whether to enable reflection
        """
        # super().__init__(engine_params, platform)
        self.grounding_agent = grounding_agent
        self.max_trajectory_length = max_trajectory_length
        self.enable_reflection = enable_reflection
        self.use_thinking = engine_params.get("model", "") in [
            "claude-3-7-sonnet-20250219"
        ]
        self.generator_agent_config = AgentConfig(
            llm_provider=engine_params.get("engine_type", "openai"),
            llm_model_name=engine_params.get("model", "openai/o3",),
            llm_temperature=engine_params.get("temperature", 1.0),
            llm_base_url=engine_params.get("base_url", "https://openrouter.ai/api/v1"),
            llm_api_key=engine_params.get("api_key", ""),
        )
        self.reset()
    def reset(self):
        self.generator_agent = Agent(
            name="generator_agent",
            conf=self.generator_agent_config,
            system_prompt=GENERATOR_SYS_PROMPT,
            resp_parse_func=reps_action_result
        )
        self.reflection_agent = Agent(
            name="reflection_agent",
            conf=self.generator_agent_config,
            system_prompt=REFLECTION_SYS_PROMPT,
            resp_parse_func=reps_action_result
        )
        self.turn_count = 0
        self.worker_history = []
        self.reflections = []
        self.cost_this_turn = 0
        self.screenshot_inputs = []
        self.dummy_task = Task()
        self.dummy_context = Context()
        self.dummy_context.set_task(self.dummy_task)
        self.dummy_message = Message(headers={'context': self.dummy_context})
        self.planning_model = get_llm_model(self.generator_agent_config)
        self.first_done = False
        self.first_image = None
    def generate_next_action(
        self,
        instruction: str,
        obs: Dict,
    ) -> Tuple[Dict, List]:
        """
        Predict the next action(s) based on the current observation.
        """
        agent = self.grounding_agent
        generator_message = (
            ""
            if self.turn_count > 0
            else "The initial screen is provided. No action has been taken yet."
        )
        # Load the task into the system prompt
        if self.turn_count == 0:
            self.generator_agent.system_prompt = self.generator_agent.system_prompt.replace(
                "TASK_DESCRIPTION", instruction)
        # Get the per-step reflection
        reflection = None
        reflection_thoughts = None
        if self.enable_reflection:
            # Load the initial message
            if self.turn_count == 0:
                text_content = textwrap.dedent(
                    f"""
                    Task Description: {instruction}
                    Current Trajectory below:
                    """
                )
                updated_sys_prompt = (
                    self.reflection_agent.system_prompt + "\n" + text_content
                )
                self.reflection_agent.system_prompt = updated_sys_prompt
                image_content = [
                    {
                        "type": "text",
                        "text": f"The initial screen is provided. No action has been taken yet."
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": "data:image/png;base64," + encode_image(obs["screenshot"])
                        }
                    }
                ]
                self.reflection_agent._init_context(context=self.dummy_context)
                sync_exec(
                    self.reflection_agent._add_human_input_to_memory,
                    image_content,
                    self.dummy_context,
                    "message"
                )
            # Load the latest action
            else:
                image = "data:image/png;base64," + encode_image(obs["screenshot"])
                reflection_message = self.worker_history[-1] + "\n" + f"Here is function execute result: {obs['action_response']}.\n"
                reflection_observation = Observation(content=reflection_message, image=image)
                self.reflection_agent._init_context(context=self.dummy_context)
                reflection_actions = self.reflection_agent.policy(reflection_observation, message=self.dummy_message)
                reflection = reflection_actions[0].action_name
                reflection_thoughts = reflection_actions[0].policy_info
                self.reflections.append(reflection)
                generator_message += f"Here is your function execute result: {obs['action_response']}.\n"
                generator_message += f"REFLECTION: You may use this reflection on the previous action and overall trajectory:\n{reflection}\n"
                logger.info("REFLECTION: %s", reflection)
        if self.first_done:
            pass
        else:
            # Add finalized message to conversation
            generator_message += f"\nCurrent Text Buffer = [{','.join(agent.notes)}]\n"
            image = "data:image/png;base64," + encode_image(obs["screenshot"])
            generator_observation = Observation(content=generator_message, image=image)
            self.generator_agent._init_context(context=self.dummy_context)
            generator_actions = self.generator_agent.policy(generator_observation, message=self.dummy_message)
            plan = generator_actions[0].action_name
            plan_thoughts = generator_actions[0].policy_info
            prune_image_messages(self.generator_agent.memory.memory_store, 16)
            prune_image_messages(self.reflection_agent.memory.memory_store, 16)
            self.worker_history.append(plan)
            logger.info("FULL PLAN:\n %s", plan)
            # self.generator_agent.add_message(plan, role="assistant")
            # Use the grounding agent to convert agent_action("desc") into agent_action([x, y])
        try:
            agent.assign_coordinates(plan, obs)
            plan_code = parse_single_code_from_string(plan.split("Grounded Action")[-1])
            plan_code = sanitize_code(plan_code)
            plan_code = extract_first_agent_function(plan_code)
            exec_code = eval(plan_code)
        except Exception as e:
            logger.error("Error in parsing plan code: %s", e)
            plan_code = "agent.wait(1.0)"
            exec_code = eval(plan_code)
        executor_info = {
            "full_plan": plan,
            "executor_plan": plan,
            "plan_thoughts": plan_thoughts,
            "plan_code": plan_code,
            "reflection": reflection,
            "reflection_thoughts": reflection_thoughts,
        }
        self.turn_count += 1
        self.screenshot_inputs.append(obs["screenshot"])
        return executor_info, [exec_code]