mirrored 6 minutes ago
0
Shiqian SuUpdate aguvis_agent.py (#141) Fix Aguvis prompt bugc4d818c
import base64
import json
import logging
import os
import re
import tempfile
import time
from http import HTTPStatus
from io import BytesIO
from typing import Dict, List, Tuple

import backoff
import openai
import requests
from PIL import Image
from google.api_core.exceptions import InvalidArgument, ResourceExhausted, InternalServerError, BadRequest
from requests.exceptions import SSLError
from mm_agents.prompts import (
    AGUVIS_PLANNER_SYS_PROMPT,
    AGUVIS_SYS_PROMPT,
    AGUVIS_PLANNING_PROMPT,
    AGUVIS_INNER_MONOLOGUE_APPEND_PROMPT,
    AGUVIS_GROUNDING_PROMPT,
    AGUVIS_GROUNDING_APPEND_PROMPT
)

logger = None


# Function to encode the image
def encode_image(image_content):
    return base64.b64encode(image_content).decode('utf-8')


def encoded_img_to_pil_img(data_str):
    base64_str = data_str.replace("data:image/png;base64,", "")
    image_data = base64.b64decode(base64_str)
    image = Image.open(BytesIO(image_data))

    return image


def save_to_tmp_img_file(data_str):
    base64_str = data_str.replace("data:image/png;base64,", "")
    image_data = base64.b64decode(base64_str)
    image = Image.open(BytesIO(image_data))

    tmp_img_path = os.path.join(tempfile.mkdtemp(), "tmp_img.png")
    image.save(tmp_img_path)

    return tmp_img_path


# FIXME: hardcoded screen size and planner system message
SCREEN_LOGIC_SIZE = (1280, 720)


def parse_code_from_planner_response(input_string: str) -> List[str]:
    """Parse the planner's response containing executable pyautogui code"""

    input_string = "\n".join([line.strip() for line in input_string.split(';') if line.strip()])
    if input_string.strip() in ['WAIT', 'DONE', 'FAIL']:
        return [input_string.strip()]

    # This regular expression will match both ```code``` and ```python code```
    # and capture the `code` part. It uses a non-greedy match for the content inside.
    pattern = r"```(?:\w+\s+)?(.*?)```"
    # Find all non-overlapping matches in the string
    matches = re.findall(pattern, input_string, re.DOTALL)

    # The regex above captures the content inside the triple backticks.
    # The `re.DOTALL` flag allows the dot `.` to match newline characters as well,
    # so the code inside backticks can span multiple lines.

    # matches now contains all the captured code snippets
    codes = []

    for match in matches:
        match = match.strip()
        commands = ['WAIT', 'DONE', 'FAIL']

        if match in commands:
            codes.append(match.strip())
        elif match.split('\n')[-1] in commands:
            if len(match.split('\n')) > 1:
                codes.append("\n".join(match.split('\n')[:-1]))
            codes.append(match.split('\n')[-1])
        else:
            codes.append(match)

    return codes


def parse_aguvis_response(input_string, screen_logic_size=SCREEN_LOGIC_SIZE) -> Tuple[str, List[str]]:
    if input_string.lower().startswith("wait"):
        return "WAIT", "WAIT"
    elif input_string.lower().startswith("done"):
        return "DONE", "DONE"
    elif input_string.lower().startswith("fail"):
        return "FAIL", "FAIL"

    try:
        lines = input_string.strip().split("\n")
        lines = [line for line in lines if line.strip() != ""]
        low_level_instruction = lines[0]

        pyautogui_index = -1

        for i, line in enumerate(lines):
            if line.strip() == "assistantos" or line.strip().startswith("pyautogui"):
                pyautogui_index = i
                break

        if pyautogui_index == -1:
            print(f"Error: Could not parse response {input_string}")
            return None, None

        pyautogui_code_relative_coordinates = "\n".join(lines[pyautogui_index:])
        pyautogui_code_relative_coordinates = pyautogui_code_relative_coordinates.replace("assistantos", "").strip()
        corrected_code = correct_pyautogui_arguments(pyautogui_code_relative_coordinates)

        parsed_action = _pyautogui_code_to_absolute_coordinates(corrected_code, screen_logic_size)
        return low_level_instruction, parsed_action
    except Exception as e:
        print(f"Error: Could not parse response {input_string}")
        return None, None

def correct_pyautogui_arguments(code: str) -> str:
    function_corrections = {
        'write': {
            'incorrect_args': ['text'],
            'correct_args': [],
            'keyword_arg': 'message'
        },
        'press': {
            'incorrect_args': ['key', 'button'],
            'correct_args': [],
            'keyword_arg': None
        },
        'hotkey': {
            'incorrect_args': ['key1', 'key2', 'keys'],
            'correct_args': [],
            'keyword_arg': None
        },
    }

    lines = code.strip().split('\n')
    corrected_lines = []

    for line in lines:
        line = line.strip()
        match = re.match(r'(pyautogui\.(\w+))\((.*)\)', line)
        if match:
            full_func_call = match.group(1)
            func_name = match.group(2)
            args_str = match.group(3)

            if func_name in function_corrections:
                func_info = function_corrections[func_name]
                args = split_args(args_str)
                corrected_args = []

                for arg in args:
                    arg = arg.strip()
                    kwarg_match = re.match(r'(\w+)\s*=\s*(.*)', arg)
                    if kwarg_match:
                        arg_name = kwarg_match.group(1)
                        arg_value = kwarg_match.group(2)

                        if arg_name in func_info['incorrect_args']:
                            if func_info['keyword_arg']:
                                corrected_args.append(f"{func_info['keyword_arg']}={arg_value}")
                            else:
                                corrected_args.append(arg_value)
                        else:
                            corrected_args.append(f'{arg_name}={arg_value}')
                    else:
                        corrected_args.append(arg)

                corrected_args_str = ', '.join(corrected_args)
                corrected_line = f'{full_func_call}({corrected_args_str})'
                corrected_lines.append(corrected_line)
            else:
                corrected_lines.append(line)
        else:
            corrected_lines.append(line)

    corrected_code = '\n'.join(corrected_lines)
    return corrected_code

def split_args(args_str: str) -> List[str]:
    args = []
    current_arg = ''
    within_string = False
    string_char = ''
    prev_char = ''
    for char in args_str:
        if char in ['"', "'"]:
            if not within_string:
                within_string = True
                string_char = char
            elif within_string and prev_char != '\\' and char == string_char:
                within_string = False
        if char == ',' and not within_string:
            args.append(current_arg)
            current_arg = ''
        else:
            current_arg += char
        prev_char = char
    if current_arg:
        args.append(current_arg)
    return args

def extract_coordinates(text, logical_screen_size=SCREEN_LOGIC_SIZE) -> Tuple[int, int] | None:
    # Pattern to match (x=0.1, y=0.2) or (0.1, 0.2) format
    text = text.strip()
    logger.info(f"Extracting coordinates from: {text}")
    pattern = r'\((?:x=)?([-+]?\d*\.\d+|\d+)(?:,\s*(?:y=)?([-+]?\d*\.\d+|\d+))?\)'

    match = re.search(pattern, text)
    if match:
        x = int(float(match.group(1)) * logical_screen_size[0])
        y = int(float(match.group(2)) * logical_screen_size[1]) if match.group(2) else None

        if y is not None:
            return (x, y)

    logger.info(f"Error: No coordinates found in: {text}")
    return None


def _pyautogui_code_to_absolute_coordinates(pyautogui_code_relative_coordinates, logical_screen_size=SCREEN_LOGIC_SIZE):
    """
    Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.
    """
    import re
    import ast

    width, height = logical_screen_size

    pattern = r'(pyautogui\.\w+\([^\)]*\))'

    matches = re.findall(pattern, pyautogui_code_relative_coordinates)

    new_code = pyautogui_code_relative_coordinates

    for full_call in matches:
        func_name_pattern = r'(pyautogui\.\w+)\((.*)\)'
        func_match = re.match(func_name_pattern, full_call, re.DOTALL)
        if not func_match:
            continue

        func_name = func_match.group(1)
        args_str = func_match.group(2)

        try:
            parsed = ast.parse(f"func({args_str})").body[0].value
            parsed_args = parsed.args
            parsed_keywords = parsed.keywords
        except SyntaxError:
            continue

        function_parameters = {
            'click': ['x', 'y', 'clicks', 'interval', 'button', 'duration', 'pause'],
            'moveTo': ['x', 'y', 'duration', 'tween', 'pause'],
            'moveRel': ['xOffset', 'yOffset', 'duration', 'tween', 'pause'],
            'dragTo': ['x', 'y', 'duration', 'button', 'mouseDownUp', 'pause'],
            'dragRel': ['xOffset', 'yOffset', 'duration', 'button', 'mouseDownUp', 'pause'],
            'doubleClick': ['x', 'y', 'interval', 'button', 'duration', 'pause'],
        }

        func_base_name = func_name.split('.')[-1]

        param_names = function_parameters.get(func_base_name, [])

        args = {}
        for idx, arg in enumerate(parsed_args):
            if idx < len(param_names):
                param_name = param_names[idx]
                arg_value = ast.literal_eval(arg)
                args[param_name] = arg_value

        for kw in parsed_keywords:
            param_name = kw.arg
            arg_value = ast.literal_eval(kw.value)
            args[param_name] = arg_value

        updated = False
        if 'x' in args:
            try:
                x_rel = float(args['x'])
                x_abs = int(round(x_rel * width))
                args['x'] = x_abs
                updated = True
            except ValueError:
                pass
        if 'y' in args:
            try:
                y_rel = float(args['y'])
                y_abs = int(round(y_rel * height))
                args['y'] = y_abs
                updated = True
            except ValueError:
                pass
        if 'xOffset' in args:
            try:
                x_rel = float(args['xOffset'])
                x_abs = int(round(x_rel * width))
                args['xOffset'] = x_abs
                updated = True
            except ValueError:
                pass
        if 'yOffset' in args:
            try:
                y_rel = float(args['yOffset'])
                y_abs = int(round(y_rel * height))
                args['yOffset'] = y_abs
                updated = True
            except ValueError:
                pass

        if updated:
            reconstructed_args = []
            for idx, param_name in enumerate(param_names):
                if param_name in args:
                    arg_value = args[param_name]
                    if isinstance(arg_value, str):
                        arg_repr = f"'{arg_value}'"
                    else:
                        arg_repr = str(arg_value)
                    reconstructed_args.append(arg_repr)
                else:
                    break

            used_params = set(param_names[:len(reconstructed_args)])
            for kw in parsed_keywords:
                if kw.arg not in used_params:
                    arg_value = args[kw.arg]
                    if isinstance(arg_value, str):
                        arg_repr = f"{kw.arg}='{arg_value}'"
                    else:
                        arg_repr = f"{kw.arg}={arg_value}"
                    reconstructed_args.append(arg_repr)

            new_args_str = ', '.join(reconstructed_args)
            new_full_call = f"{func_name}({new_args_str})"
            new_code = new_code.replace(full_call, new_full_call)

    return new_code


class AguvisAgent:
    def __init__(
            self,
            platform="ubuntu",
            planner_model="gpt-4o",
            executor_model="qwen-aguvis-7b",
            max_tokens=1500,
            top_p=0.9,
            temperature=0.5,
            action_space="pyautogui",
            observation_type="screenshot",
    ):
        self.platform = platform
        self.planner_model = planner_model
        self.executor_model = executor_model
        assert self.executor_model is not None, "Executor model cannot be None"
        self.max_tokens = max_tokens
        self.top_p = top_p
        self.temperature = temperature
        self.action_space = action_space
        self.observation_type = observation_type
        assert action_space in ["pyautogui"], "Invalid action space"
        assert observation_type in ["screenshot"], "Invalid observation type"
        self.thoughts = []
        self.actions = []
        self.observations = []

    def predict(self, instruction: str, obs: Dict) -> List:
        """
        Predict the next action(s) based on the current observation.
        """
        previous_actions = "\n".join([f"Step {i+1}: {action}" for i, action in enumerate(self.actions)]) if self.actions else "None"

        if self.planner_model is None:
            aguvis_messages = []
            aguvis_messages.append({
                "role": "system",
                "content": [{"type": "text", "text": AGUVIS_SYS_PROMPT}]
            })
            aguvis_messages.append({
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": AGUVIS_PLANNING_PROMPT.format(
                            instruction=instruction,
                            previous_actions=previous_actions,
                        )
                    },
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/png;base64,{encode_image(obs['screenshot'])}"}
                    }
                ],
            })
            aguvis_messages.append({
                "role": "assistant",
                "content": [
                    {"type": "text", "text": AGUVIS_INNER_MONOLOGUE_APPEND_PROMPT}
                ]
            })
            aguvis_response = self.call_llm({
                "model": self.executor_model,
                "messages": aguvis_messages,
                "max_tokens": self.max_tokens,
                "top_p": self.top_p,
                "temperature": self.temperature
            }, self.executor_model)
            logger.info(f"Aguvis Output: {aguvis_response}")
            low_level_instruction, pyautogui_actions = parse_aguvis_response(aguvis_response)

            self.actions.append(low_level_instruction)
            return aguvis_response, [pyautogui_actions]
        else:
            # FIXME [junli]:
            # Using an external planner (GPT-4o) requires relying on more
            # detailed prompt to provide Aguvis with low level instructions.
            # So we temporarily separate the planner prompt and aguvis prompt.

            planner_messages = []
            planner_system_message = AGUVIS_PLANNER_SYS_PROMPT
            planner_messages.append({
                "role": "system",
                "content": [{"type": "text", "text": planner_system_message}]
            })
            planner_messages.append(
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": f"You are asked to complete the following task: {instruction}"
                        },
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
                                "detail": "high"
                            }
                        }
                    ]
                }
            )
            planner_response = self.call_llm({
                "model": self.planner_model,
                "messages": planner_messages,
                "max_tokens": self.max_tokens,
                "top_p": self.top_p,
                "temperature": self.temperature
            }, self.planner_model)
            logger.info(f"Planner output: {planner_response}")
            code = parse_code_from_planner_response(planner_response)
            pyautogui_actions = []
            for line in code:
                code = self.convert_action_to_grounding_model_instruction(
                    line,
                    obs,
                    instruction,
                )
                pyautogui_actions.append(code)

            return "", pyautogui_actions

    def convert_action_to_grounding_model_instruction(
        self, line: str, obs: Dict, instruction: str
    ) -> str:
        pattern = r'(#.*?)\n(pyautogui\.(moveTo|click|rightClick)\((?:x=)?(\d+)(?:,\s*|\s*,\s*y=)(\d+)(?:,\s*duration=[\d.]+)?\))'
        matches = re.findall(pattern, line, re.DOTALL)
        if not matches:
            return line
        new_instruction = line
        for match in matches:
            comment = match[0].split("#")[1].strip()
            original_action = match[1]
            func_name = match[2].strip()

            if "click()" in original_action.lower():
                continue  # Skip click() without coordinates
            
            aguvis_messages = []
            aguvis_messages.append({
                "role": "system",
                "content": [{"type": "text", "text": AGUVIS_SYS_PROMPT}]
            })
            aguvis_messages.append(
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
                                "detail": "high",
                            },
                        },
                        {
                            "type": "text",
                            "text": '\n' + comment,
                        },
                    ],
                }
            )
            aguvis_messages.append(
                {
                    "role": "assistant",
                    "content": [
                        {"type": "text", "text": AGUVIS_GROUNDING_APPEND_PROMPT.format(function_name=func_name)}
                    ],
                }
            )
            grounding_response = self.call_llm({
                "model": self.executor_model,
                "messages": aguvis_messages,
                "max_tokens": self.max_tokens,
                "top_p": self.top_p,
                "temperature": self.temperature
            }, self.executor_model)
            coordinates = extract_coordinates(grounding_response, SCREEN_LOGIC_SIZE)
            # FIXME [junli]: Use ast to reconstruct the action with coordinates
            action_parts = original_action.split('(')
            new_action = f"{action_parts[0]}({coordinates[0]}, {coordinates[1]}"
            if len(action_parts) > 1 and 'duration' in action_parts[1]:
                duration_part = action_parts[1].split(',')[-1]
                new_action += f", {duration_part}"
            elif len(action_parts) > 1 and 'button' in action_parts[1]:
                button_part = action_parts[1].split(',')[-1]
                new_action += f", {button_part}"
            else:
                new_action += ")"
            logger.info(new_action)
            new_instruction = new_instruction.replace(original_action, new_action)

        return new_instruction

    @backoff.on_exception(
        backoff.constant,
        # here you should add more model exceptions as you want,
        # but you are forbidden to add "Exception", that is, a common type of exception
        # because we want to catch this kind of Exception in the outside to ensure
        # each example won't exceed the time limit
        (
                # General exceptions
                SSLError,

                # OpenAI exceptions
                openai.RateLimitError,
                openai.BadRequestError,
                openai.InternalServerError,

                # Google exceptions
                InvalidArgument,
                ResourceExhausted,
                InternalServerError,
                BadRequest,

                # Groq exceptions
                # todo: check
        ),
        interval=30,
        max_tries=10
    )
    def call_llm(self, payload, model):
        if model.startswith("gpt"):
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}"
                # "Authorization": f"Bearer {os.environ['MIT_SPIDER_TOKEN']}"
            }
            logger.info("Generating content with GPT model: %s", model)
            response = requests.post(
                "https://api.openai.com/v1/chat/completions",
                headers=headers,
                json=payload
            )

            if response.status_code != 200:
                logger.error("Failed to call LLM: " + response.text)
                time.sleep(5)
                return ""
            else:
                return response.json()['choices'][0]['message']['content']
        elif "aguvis" in model:
            headers = {
                "Content-Type": "application/json",
            }
            logger.info("Generating content with Aguvis model: %s", model)

            if "7b" in model:
                response = requests.post(
                    "http://101.132.136.195:7908/v1/chat/completions",
                    headers=headers,
                    json=payload
                )
            elif "72b" in model:
                response = requests.post(
                    "http://123.57.10.166:7908/v1/chat/completions",
                    headers=headers,
                    json=payload
                )
            else:
                raise Exception("Unsupported Aguvis model version")

            if response.status_code != 200:
                logger.error("Failed to call LLM: " + response.text)
                time.sleep(5)
                return ""
            else:
                return response.json()['choices'][0]['message']['content']

    def reset(self, _logger=None):
        global logger
        logger = _logger if _logger is not None else logging.getLogger("desktopenv.aguvis_agent")

        self.thoughts = []
        self.action_descriptions = []
        self.actions = []
        self.observations = []