mirrored 18 minutes ago
0
yuanmengqifeat: refactor run_multienv_qwen25vl.py and qwen25vl_agent.py for improved logging and task management - Introduced signal handling for graceful shutdown of environments and processes. - Enhanced logging configuration to support dynamic log levels and structured output. - Updated argument parsing to include new parameters for model selection and task execution. - Refactored task distribution logic to streamline environment task management. - Improved error handling during task execution and environment cleanup. - Adjusted Qwen25VLAgent initialization to support new model and thought prefix options. - Reduced max tries for LLM calls to optimize performance. 82c3cdd
import base64
import json
import logging
import time
import os
from io import BytesIO
from typing import Dict, List, Tuple

import backoff
import openai
from PIL import Image
from requests.exceptions import SSLError
from google.api_core.exceptions import (
    InvalidArgument,
    ResourceExhausted,
    InternalServerError,
    BadRequest,
)
from mm_agents.utils.qwen_vl_utils import smart_resize



logger = None

MAX_RETRY_TIMES = 5

def encode_image(image_content):
    return base64.b64encode(image_content).decode("utf-8")


def process_image(image_bytes):
    """
    Process an image for Qwen VL models.
    Resize the image to dimensions expected by the model.
    
    Args:
        image_bytes: Raw image bytes
    
    Returns:
        Base64 encoded image string of the processed image
    """
    # Open image from bytes
    image = Image.open(BytesIO(image_bytes))
    width, height = image.size
    
    # Calculate resized dimensions
    resized_height, resized_width = smart_resize(
        height=height, 
        width=width
    )
    
    # Resize the image
    image = image.resize((resized_width, resized_height))
    
    # Convert to bytes
    buffer = BytesIO()
    image.save(buffer, format="PNG")
    processed_bytes = buffer.getvalue()
    
    # Return base64 encoded string
    return base64.b64encode(processed_bytes).decode('utf-8')


class Qwen25VLAgent:

    def __init__(
        self,
        platform="ubuntu",
        model="qwen2.5-vl-72b-instruct",
        max_tokens=1500,
        top_p=0.9,
        temperature=0.5,
        action_space="pyautogui",
        observation_type="screenshot",
        history_n=4,  # Number of previous interactions to include in full detail
        add_thought_prefix=False,
    ):
        self.platform = platform
        self.model = model
        self.max_tokens = max_tokens
        self.top_p = top_p
        self.temperature = temperature
        self.action_space = action_space
        self.observation_type = observation_type
        self.history_n = history_n  # Control how many previous interactions to include
        self.add_thought_prefix = add_thought_prefix
        assert action_space in ["pyautogui"], "Invalid action space"
        assert observation_type in ["screenshot"], "Invalid observation type"
        self.thoughts = []
        self.actions = []
        self.observations = []
        self.responses = []  # Store model responses
        self.screenshots = []  # Store processed screenshots

    def predict(self, instruction: str, obs: Dict) -> List:
        """
        Predict the next action(s) based on the current observation.
        """
        # Process the screenshot image
        screenshot_bytes = obs["screenshot"]
        
        # Display original dimensions
        image = Image.open(BytesIO(screenshot_bytes))
        width, height = image.size
        print(f"Original screen resolution: {width}x{height}")
        
        # Process the image
        processed_image = process_image(screenshot_bytes)
        processed_img = Image.open(BytesIO(base64.b64decode(processed_image)))
        processed_width, processed_height = processed_img.size
        print(f"Processed image resolution: {processed_width}x{processed_height}")
        
        # Save the current screenshot to history
        self.screenshots.append(processed_image)
        
        # Calculate history window start index
        current_step = len(self.actions)
        history_start_idx = max(0, current_step - self.history_n)
        
        # Build previous actions string - only include actions outside the history window
        previous_actions = []
        for i in range(history_start_idx):
            if i < len(self.actions):
                previous_actions.append(f"Step {i+1}: {self.actions[i]}")
        previous_actions_str = "\n".join(previous_actions) if previous_actions else "None"

        # System prompt with tool definition
        tools_def = {
            "type": "function", 
            "function": {
                "name_for_human": "computer_use", 
                "name": "computer_use", 
                "description": "Use a mouse and keyboard to interact with a computer, and take screenshots.",
                "parameters": {
                    "properties": {
                        "action": {
                            "description": "The action to perform.",
                            "enum": ["key", "type", "mouse_move", "left_click", "left_click_drag", 
                                     "right_click", "middle_click", "double_click", "scroll", "wait", "terminate"], 
                            "type": "string"
                        },
                        "keys": {"description": "Required only by `action=key`.", "type": "array"}, 
                        "text": {"description": "Required only by `action=type`.", "type": "string"}, 
                        "coordinate": {"description": "The x,y coordinates for mouse actions.", "type": "array"}, 
                        "pixels": {"description": "The amount of scrolling.", "type": "number"}, 
                        "time": {"description": "The seconds to wait.", "type": "number"}, 
                        "status": {
                            "description": "The status of the task.", 
                            "type": "string", 
                            "enum": ["success", "failure"]
                        }
                    }, 
                    "required": ["action"], 
                    "type": "object"
                }, 
                "args_format": "Format the arguments as a JSON object."
            }
        }
        
        system_prompt = """You are a helpful assistant

# Tools

You may call one or more functions to assist with the user query.

You are provided with function signatures within <tools></tools> XML tags:
<tools>
""" + json.dumps(tools_def) + """
</tools>

For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:
<tool_call>
{"name": <function-name>, "arguments": <args-json-object>}
</tool_call>"""

        # Create instruction prompt
        instruction_prompt = f"""
Please generate the next move according to the UI screenshot, instruction and previous actions.

Instruction: {instruction}

Previous actions:
{previous_actions_str}"""

        # Initialize messages with system prompt
        messages = [
            {
                "role": "system",
                "content": [{
                    "type": "text",
                    "text": system_prompt
                }]
            }
        ]
        
        # Add history responses and images within the history window
        history_len = min(self.history_n, len(self.responses))
        if history_len > 0:
            # Only include the most recent history_n steps
            history_responses = self.responses[-history_len:]
            history_screenshots = self.screenshots[-history_len-1:-1]  # Include one more for the previous screenshot
            
            # Add history in conversation format
            for idx in range(history_len):
                # Add the screenshot (user message)
                if idx < len(history_screenshots):
                    screenshot_b64 = history_screenshots[idx]
                    
                    # If this is the first history item, include instruction prompt
                    if idx == 0:
                        messages.append({
                            "role": "user",
                            "content": [
                                {
                                    "type": "image_url",
                                    "image_url": {
                                        "url": f"data:image/png;base64,{screenshot_b64}"
                                    }
                                },
                                {
                                    "type": "text",
                                    "text": instruction_prompt
                                }
                            ]
                        })
                    else:
                        messages.append({
                            "role": "user",
                            "content": [
                                {
                                    "type": "image_url",
                                    "image_url": {
                                        "url": f"data:image/png;base64,{screenshot_b64}"
                                    }
                                }
                            ]
                        })
                
                # Add the action and response (assistant message)
                
                messages.append({
                    "role": "assistant",
                    "content": [
                        {"type": "text", "text": history_responses[idx]}
                    ]
                })
            
            # Add the current screenshot without instruction (since we already have history)
            messages.append({
                "role": "user",
                "content": [
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{processed_image}"
                        }
                    }
                ]
            })
        else:
            # If no history, just add current screenshot with instruction prompt
            messages.append({
                "role": "user",
                "content": [
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{processed_image}"
                        }
                    },
                    {
                        "type": "text",
                        "text": instruction_prompt
                    }
                ]
            })

        # append_text = f"""Step {current_step+1}: Thought:"""
        if self.add_thought_prefix:
            append_text = f"""Thought:"""
            messages.append({"role": "assistant", "content": [{"type": "text", "text": append_text}]})

        # Call the LLM
        response = self.call_llm(
            {
                "model": self.model,
                "messages": messages,
                "max_tokens": self.max_tokens,
                "top_p": self.top_p,
                "temperature": self.temperature,
            },
            self.model,
        )

        logger.info(f"Qwen25VL Output: {response}")
        
        # Save response to history
        self.responses.append(response)

        # Parse response and extract pyautogui code
        low_level_instruction, pyautogui_code = self.parse_response(
            response, 
            width, 
            height, 
            processed_width, 
            processed_height
        )

        logger.info(f"Low level instruction: {low_level_instruction}")
        logger.info(f"Pyautogui code: {pyautogui_code}")

        # Add the action to history
        self.actions.append(low_level_instruction)

        return response, pyautogui_code

    def parse_response(self, response: str, original_width: int = None, original_height: int = None, 
                       processed_width: int = None, processed_height: int = None) -> Tuple[str, List[str]]:
        """
        Parse LLM response and convert it to low level action and pyautogui code.
        
        Args:
            response: Raw response string from the model
            original_width: Width of the original screenshot
            original_height: Height of the original screenshot
            processed_width: Width of the processed image
            processed_height: Height of the processed image
            
        Returns:
            Tuple of (low_level_instruction, list of pyautogui_commands)
        """
        low_level_instruction = ""
        pyautogui_code = []
        
        if response is None or not response.strip():
            return low_level_instruction, pyautogui_code
        
        # Define function to adjust coordinates based on original and processed dimensions
        def adjust_coordinates(x: float, y: float) -> Tuple[int, int]:
            """
            Adjust coordinates from processed image dimensions to original image dimensions.
            """
            if all([original_width, original_height, processed_width, processed_height]):
                # Calculate the scale factors between original and processed images
                x_scale = original_width / processed_width
                y_scale = original_height / processed_height
                
                # Apply scaling to get coordinates in original image space
                adjusted_x = int(x * x_scale)
                adjusted_y = int(y * y_scale)
                
                return adjusted_x, adjusted_y
            else:
                # If any dimension is missing, return the original coordinates
                return int(x), int(y)
        
        # Define inner function to process tool calls
        def process_tool_call(json_str: str) -> None:
            """Process a single tool call JSON string."""
            try:
                # Parse the JSON
                tool_call = json.loads(json_str)
                if tool_call.get("name") == "computer_use":
                    # Convert computer_use actions to pyautogui commands
                    args = tool_call["arguments"]
                    action = args["action"]
                    
                    if action == "left_click":
                        if "coordinate" in args:
                            x, y = args["coordinate"]
                            adj_x, adj_y = adjust_coordinates(x, y)
                            pyautogui_code.append(f"pyautogui.click({adj_x}, {adj_y})")
                        else:
                            pyautogui_code.append("pyautogui.click()")
                            
                    elif action == "right_click":
                        if "coordinate" in args:
                            x, y = args["coordinate"]
                            adj_x, adj_y = adjust_coordinates(x, y)
                            pyautogui_code.append(f"pyautogui.rightClick({adj_x}, {adj_y})")
                        else:
                            pyautogui_code.append("pyautogui.rightClick()")
                            
                    elif action == "middle_click":
                        if "coordinate" in args:
                            x, y = args["coordinate"]
                            adj_x, adj_y = adjust_coordinates(x, y)
                            pyautogui_code.append(f"pyautogui.middleClick({adj_x}, {adj_y})")
                        else:
                            pyautogui_code.append("pyautogui.middleClick()")
                            
                    elif action == "double_click":
                        if "coordinate" in args:
                            x, y = args["coordinate"]
                            adj_x, adj_y = adjust_coordinates(x, y)
                            pyautogui_code.append(f"pyautogui.doubleClick({adj_x}, {adj_y})")
                        else:
                            pyautogui_code.append("pyautogui.doubleClick()")
                            
                    elif action == "type":
                        text = args.get("text", "")
                        pyautogui_code.append(f"pyautogui.typewrite('{text}')")
                        
                    elif action == "key":
                        keys = args.get("keys", [])
                        # Fix possible formatting issues in the keys parameter
                        if isinstance(keys, list):
                            # Clean up any formatting issues in the keys
                            cleaned_keys = []
                            for key in keys:
                                # Check if the key has the "keys=[" prefix or "]" suffix
                                if isinstance(key, str):
                                    # Remove "keys=[" prefix if present
                                    if key.startswith("keys=["):
                                        key = key[6:]
                                    # Remove "]" suffix if present
                                    if key.endswith("]"):
                                        key = key[:-1]
                                    # Handle case where string contains representation of list items
                                    if key.startswith("['") or key.startswith("[\""):
                                        key = key[2:] if len(key) > 2 else key
                                    if key.endswith("']") or key.endswith("\"]"):
                                        key = key[:-2] if len(key) > 2 else key
                                    # Strip any extra whitespace
                                    key = key.strip()
                                    # Add to cleaned keys
                                    cleaned_keys.append(key)
                                else:
                                    cleaned_keys.append(key)
                            keys = cleaned_keys
                            
                        # Format the keys for hotkey or press command
                        keys_str = ", ".join([f"'{key}'" for key in keys])
                        if len(keys) > 1:
                            pyautogui_code.append(f"pyautogui.hotkey({keys_str})")
                        else:
                            pyautogui_code.append(f"pyautogui.press({keys_str})")
                            
                    elif action == "scroll":
                        pixels = args.get("pixels", 0)
                        pyautogui_code.append(f"pyautogui.scroll({pixels})")
                        
                    elif action == "wait":
                        pyautogui_code.append("WAIT")  # Special code for wait action
                        
                    elif action == "terminate":
                        pyautogui_code.append("DONE")  # Special code for termination
                        
                    elif action == "mouse_move":
                        if "coordinate" in args:
                            x, y = args["coordinate"]
                            adj_x, adj_y = adjust_coordinates(x, y)
                            pyautogui_code.append(f"pyautogui.moveTo({adj_x}, {adj_y})")
                        else:
                            pyautogui_code.append("pyautogui.moveTo(0, 0)")
                            
                    elif action == "left_click_drag":
                        if "coordinate" in args:
                            x, y = args["coordinate"]
                            adj_x, adj_y = adjust_coordinates(x, y)
                            duration = args.get("duration", 0.5)
                            pyautogui_code.append(f"pyautogui.dragTo({adj_x}, {adj_y}, duration={duration})")
                        else:
                            pyautogui_code.append("pyautogui.dragTo(0, 0)")
            except (json.JSONDecodeError, KeyError) as e:
                logger.error(f"Failed to parse tool call: {e}")
        
        # Parse the response line by line
        lines = response.split('\n')
        inside_tool_call = False
        current_tool_call = []
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
            
            # Extract low-level instruction from lines starting with "Action:" or similar
            if line.lower().startswith(("action:", "step", "i will", "i'll", "now i")):
                if not low_level_instruction:
                    # Only store the first action description as low level instruction
                    low_level_instruction = line
                continue
            
            # Handle lines inside tool call markers
            if line.startswith("<tool_call>") or line.startswith("") or line.startswith("📐"): # Yeah, it's a bug during data processing
                inside_tool_call = True
                continue
            elif line.startswith("</tool_call>") or line.startswith("") or line.startswith("📐"): # Yeah, it's a bug during data processing
                if current_tool_call:
                    # Process the collected tool call
                    process_tool_call("\n".join(current_tool_call))
                    current_tool_call = []
                inside_tool_call = False
                continue
            
            if inside_tool_call:
                current_tool_call.append(line)
                continue
            
            # Try to parse individual lines as JSON
            if line.startswith("{") and line.endswith("}"):
                try:
                    json_obj = json.loads(line)
                    if "name" in json_obj and "arguments" in json_obj:
                        process_tool_call(line)
                except json.JSONDecodeError:
                    pass
        
        # Process any remaining tool call content
        if current_tool_call:
            process_tool_call("\n".join(current_tool_call))
        
        # If we still don't have a low-level instruction, generate a default one
        if not low_level_instruction and len(pyautogui_code) > 0:
            action_type = pyautogui_code[0].split(".", 1)[1].split("(", 1)[0]
            low_level_instruction = f"Performing {action_type} action"
        
        return low_level_instruction, pyautogui_code

    @backoff.on_exception(
        backoff.constant,
        # here you should add more model exceptions as you want,
        # but you are forbidden to add "Exception", that is, a common type of exception
        # because we want to catch this kind of Exception in the outside to ensure
        # each example won't exceed the time limit
        (
            # General exceptions
            SSLError,
            # OpenAI exceptions
            openai.RateLimitError,
            openai.BadRequestError,
            openai.InternalServerError,
            # Google exceptions
            InvalidArgument,
            ResourceExhausted,
            InternalServerError,
            BadRequest,
            # Groq exceptions
            # todo: check
        ),
        interval=30,
        max_tries=5,
    )
    def call_llm(self, payload, model):
        messages = payload["messages"]

        base_url = os.getenv('DASHSCOPE_BASE_URL', "https://dashscope.aliyuncs.com/compatible-mode/v1")
        api_key = os.getenv('DASHSCOPE_API_KEY', "sk-123")
        
        client = openai.OpenAI(
            base_url=base_url,
            api_key=api_key
        )

        for _ in range(MAX_RETRY_TIMES):
            logger.info("Generating content with Qwen model: %s", model)
            try:
                response = client.chat.completions.create(
                    model=model,
                    messages=messages,
                    max_tokens=self.max_tokens,
                    temperature=self.temperature,
                    top_p=self.top_p
                )
                return response.choices[0].message.content
            except Exception as e:
                logger.error(f"Error calling Qwen model: {e}")
                time.sleep(5)
                continue
        return ""

    def reset(self, _logger=None):
        global logger
        logger = (_logger if _logger is not None else
                  logging.getLogger("desktopenv.qwen25vl_agent"))

        self.thoughts = []
        self.action_descriptions = []
        self.actions = []
        self.observations = []
        self.responses = []  # Reset responses
        self.screenshots = []  # Reset screenshots