/
OS-Worldf9e9273import re
import base64
from loguru import logger
from typing import List, Optional
from PIL import Image
from io import BytesIO
import tempfile
import os
import math
def encode_image(image_content):
    return base64.b64encode(image_content).decode("utf-8")
def smart_resize(
    height: int,
    width: int,
    factor: int = 28,
    min_pixels: int = 56 * 56,
    max_pixels: int = 14 * 14 * 4 * 1280,
    max_aspect_ratio_allowed: Optional[float] = None,
    size_can_be_smaller_than_factor: bool = False,
):
    """Rescales the image so that the following conditions are met:
    1. Both dimensions (height and width) are divisible by 'factor'.
    2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
    3. The aspect ratio of the image is maintained as closely as possible.
    """
    if not size_can_be_smaller_than_factor and (height < factor or width < factor):
        raise ValueError(
            f"height:{height} or width:{width} must be larger than factor:{factor} "
            f"(when size_can_be_smaller_than_factor is False)"
        )
    elif (
        max_aspect_ratio_allowed is not None
        and max(height, width) / min(height, width) > max_aspect_ratio_allowed
    ):
        raise ValueError(
            f"absolute aspect ratio must be smaller than {max_aspect_ratio_allowed}, "
            f"got {max(height, width) / min(height, width)}"
            f"(when max_aspect_ratio_allowed is not None)"
        )
    h_bar = max(1, round(height / factor)) * factor
    w_bar = max(1, round(width / factor)) * factor
    if h_bar * w_bar > max_pixels:
        beta = math.sqrt((height * width) / max_pixels)
        h_bar = max(1, math.floor(height / beta / factor)) * factor
        w_bar = max(1, math.floor(width / beta / factor)) * factor
    elif h_bar * w_bar < min_pixels:
        beta = math.sqrt(min_pixels / (height * width))
        h_bar = math.ceil(height * beta / factor) * factor
        w_bar = math.ceil(width * beta / factor) * factor
    return h_bar, w_bar
    
def call_openai_naive(model, payload, address_hint=None):
    """
    Naive OpenAI API call using requests.
    """
    # Extract fields from payload
    model = payload.get("model")
    payload["model"] = model.model_id if hasattr(model, "model_id") else "None"
    # address_hint not used here
    base_url = model.base_url
    # logger.warning(f"Base URL: {base_url}, Payload model: {payload['model']}")
    url = f"{base_url}/chat/completions"
    headers = {
        "Content-Type": "application/json",
    }
    data = {
        **payload,
        "n": 1,
    }
    max_retry = 5
    chat_completions = None
    success = False
    while success is False and max_retry > 0:
        try:
            json_data = json.dumps(data)
            response = requests.post(
                url, headers=headers, data=json_data, timeout=120, verify=False
            )
            if response.status_code == 200:
                chat_completions = response.json()
                try:
                    finish_reason = chat_completions["choices"][0].get("finish_reason")
                    if (
                        finish_reason is not None and finish_reason == "stop"
                    ):  # for most of the time, length will not exceed max_tokens
                        success = True
                    else:
                        time.sleep(5)
                        max_retry -= 1
                except Exception as e:
                    logger.error(f"Error in processing chat completion: {e}")
                    time.sleep(5)
                    max_retry -= 1
            else:
                logger.error(f"Failed to call OpenAI API: {response.text}")
                time.sleep(5)
                max_retry -= 1
        except requests.exceptions.ReadTimeout:
            # timeout is normal, don't print trace
            max_retry -= 1
            logger.warning(f"Timeout in OpenAI API call, left retries: {max_retry}")
            time.sleep(5)
        except Exception as e:
            max_retry -= 1
            logger.exception(f"Failed to call OpenAI API: {e}")
            time.sleep(5)
    if chat_completions is None:
        raise RuntimeError("Failed to call OpenAI API, max_retry used up")
    try:
        infos = {}
        if "choices" in chat_completions:
            infos["finish_reason"] = chat_completions["choices"][0].get("finish_reason")
            infos["n"] = len(chat_completions["choices"])
            if "tool_calls" in chat_completions["choices"][0]["message"]:
                infos["tool_calls"] = chat_completions["choices"][0]["message"][
                    "tool_calls"
                ]
            infos["choices"] = chat_completions["choices"]  # for the case of n > 1
        if "usage" in chat_completions:
            infos["usage"] = chat_completions["usage"]
        return chat_completions["choices"][0]["message"]["content"], infos
    except Exception as e:
        logger.error(f"Error in processing chat completion {e}")
        return "", {"n": 1, "usage": 0, "finish_reason": f"error {e}"}
def preprocess_for_naive_openai(self, payload):
    if isinstance(payload["model"], str):
        payload["model"] = getattr(self, "openai_client", None)
    return payload
def encoded_img_to_pil_img(data_str):
    base64_str = data_str.replace("data:image/png;base64,", "")
    image_data = base64.b64decode(base64_str)
    return Image.open(BytesIO(image_data))
def save_to_tmp_img_file(data_str):
    base64_str = data_str.replace("data:image/png;base64,", "")
    image_data = base64.b64decode(base64_str)
    image = Image.open(BytesIO(image_data))
    tmp_img_path = os.path.join(tempfile.mkdtemp(), "tmp_img.png")
    image.save(tmp_img_path)
    return tmp_img_path
def bbox_to_center_1000(bbox: str) -> tuple[int, int]:
    regex_list = [
        r"<\|box_start\|>\((\d+),(\d+)\),\((\d+),(\d+)\)<\|box_end\|>",  # '<|box_start|>(576,12),(592,42)<|box_end|>'
        r"<\|box_start\|>\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]\]<\|box_end\|>",  # '<|box_start|>[[576, 12, 592, 42]]<|box_end|>'
        r"<\|box_start\|>\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]<\|box_end\|>",  # '<|box_start|>[[576, 12, 592, 42]<|box_end|>', this is actually wrong format, but we parse it anyway
        r"<\|box_start\|>\((\d+),\s*(\d+),\s*(\d+),\s*(\d+)\)<\|box_end\|>",  # '<|box_start|>(576, 12, 592, 42)<|box_end|>', this is actually wrong format, but we parse it anyway
        r"\((\d+),(\d+)\),\((\d+),(\d+)\)",  # Versions without the 'bbox' special tokens
        r"\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]\]",
        r"\[\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]",
        r"\((\d+),\s*(\d+),\s*(\d+),\s*(\d+)\)",
    ]
    for regex in regex_list:
        match = re.search(regex, bbox)
        if match:
            break
    if not match:
        raise ValueError(
            f"Bounding box coordinates not found in the input string: {bbox}"
        )
    x_top_left, y_top_left, x_bottom_right, y_bottom_right = map(int, match.groups())
    x_center = (x_top_left + x_bottom_right) // 2
    y_center = (y_top_left + y_bottom_right) // 2
    return x_center, y_center
def bbox_to_center_1(bbox: str) -> tuple[int, int]:
    regex_list = [
        r"\[\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*\]",
    ]
    for regex in regex_list:
        match = re.search(regex, bbox)
        if match:
            break
    if not match:
        raise ValueError(
            f"Bounding box coordinates not found in the input string: {bbox}"
        )
    coordinates = tuple(map(float, match.groups()))
    coordinates = [int(coord * 1000) for coord in coordinates]
    x_center = (coordinates[0] + coordinates[2]) // 2
    y_center = (coordinates[1] + coordinates[3]) // 2
    return x_center, y_center
def _coordinate_projection(x, y, screen_width, screen_height, coordinate_type):
    if coordinate_type == "relative":
        return int(round(x * screen_width)), int(round(y * screen_height))
    elif coordinate_type == "absolute":
        return x, y
    elif coordinate_type == "qwen25":
        height, width = smart_resize(
            height=screen_height,
            width=screen_width,
            factor=28,
            min_pixels=3136,
            max_pixels=12845056,
        )
        return int(x / width * screen_width), int(y / height * screen_height)
    elif coordinate_type == "relative1000":
        if screen_width == 0 or screen_height == 0:
            raise ValueError(
                "Screen width and height must be greater than zero for relative1000 coordinates."
            )
        x_abs = int(round(x * screen_width / 1000))
        y_abs = int(round(y * screen_height / 1000))
        return x_abs, y_abs
    else:
        raise ValueError(f"Unsupported coordinate type: {coordinate_type}")
def rescale_coord(
    coord: tuple[int, int],
    original_width: int,
    original_height: int,
    scaled_width=1000,
    scaled_height=1000,
) -> tuple[int, int]:
    # According to https://huggingface.co/spaces/maxiw/OS-ATLAS/blob/398c3256a4fec409a074e0e4b5ac1d1d5bf7c240/app.py#L36
    # It seems that OS-ATLAS model are rescaled to output 1000x1000 images
    # So we need to rescale the coordinates back to the original image size
    x_scale = original_width / scaled_width
    y_scale = original_height / scaled_height
    return int(coord[0] * x_scale), int(coord[1] * y_scale)
def _pyautogui_code_to_absolute_coordinates(
    pyautogui_code_relative_coordinates,
    logical_screen_size,
    coordinate_type="relative",
    model_input_size=None,
):
    """
    Convert the relative coordinates in the pyautogui code to absolute coordinates based on the logical screen size.
    """
    import re
    import ast
    if coordinate_type not in ["relative", "relative1000", "absolute", "qwen25"]:
        raise ValueError(
            f"Invalid coordinate type: {coordinate_type}. Expected one of ['relative', 'relative1000', 'absolute', 'qwen25']."
        )
    screen_width, screen_height = logical_screen_size
    if model_input_size is not None:
        model_width, model_height = model_input_size
        width_scale, height_scale = (
            screen_width / model_width,
            screen_height / model_height,
        )
    else:
        width_scale, height_scale = 1, 1
    pattern = r"(pyautogui\.\w+\([^\)]*\))"
    matches = re.findall(pattern, pyautogui_code_relative_coordinates)
    new_code = pyautogui_code_relative_coordinates
    for full_call in matches:
        func_name_pattern = r"(pyautogui\.\w+)\((.*)\)"
        func_match = re.match(func_name_pattern, full_call, re.DOTALL)
        if not func_match:
            continue
        func_name = func_match.group(1)
        args_str = func_match.group(2)
        try:
            parsed = ast.parse(f"func({args_str})").body[0].value
            parsed_args = parsed.args
            parsed_keywords = parsed.keywords
        except SyntaxError:
            return pyautogui_code_relative_coordinates
        function_parameters = {
            "click": ["x", "y", "clicks", "interval", "button", "duration", "pause"],
            "moveTo": ["x", "y", "duration", "tween", "pause"],
            "moveRel": ["xOffset", "yOffset", "duration", "tween", "pause"],
            "dragTo": ["x", "y", "duration", "button", "mouseDownUp", "pause"],
            "dragRel": [
                "xOffset",
                "yOffset",
                "duration",
                "button",
                "mouseDownUp",
                "pause",
            ],
            "doubleClick": ["x", "y", "interval", "button", "duration", "pause"],
        }
        func_base_name = func_name.split(".")[-1]
        param_names = function_parameters.get(func_base_name, [])
        args = {}
        for idx, arg in enumerate(parsed_args):
            if idx < len(param_names):
                param_name = param_names[idx]
                arg_value = ast.literal_eval(arg)
                args[param_name] = arg_value
        try:
            for kw in parsed_keywords:
                param_name = kw.arg
                arg_value = ast.literal_eval(kw.value)
                args[param_name] = arg_value
        except Exception as e:
            logger.error(f"Error parsing keyword arguments: {e}")
            return pyautogui_code_relative_coordinates
        updated = False
        if "x" in args and "y" in args:
            try:
                x_rel = float(args["x"])
                y_rel = float(args["y"])
                x_abs, y_abs = _coordinate_projection(
                    x_rel, y_rel, screen_width, screen_height, coordinate_type
                )
                # logger.warning(f"Projecting coordinates: ({x_rel}, {y_rel}) to ({x_abs}, {y_abs}) using {coordinate_type} projection.")
                args["x"] = x_abs * width_scale
                args["y"] = y_abs * height_scale
                updated = True
            except ValueError:
                pass
        if "xOffset" in args and "yOffset" in args:
            try:
                x_rel = float(args["xOffset"])
                y_rel = float(args["yOffset"])
                x_abs, y_abs = _coordinate_projection(
                    x_rel, y_rel, screen_width, screen_height, coordinate_type
                )
                args["xOffset"] = x_abs * width_scale
                args["yOffset"] = y_abs * height_scale
                updated = True
            except ValueError:
                pass
        if updated:
            reconstructed_args = []
            for idx, param_name in enumerate(param_names):
                if param_name in args:
                    arg_value = args[param_name]
                    if isinstance(arg_value, str):
                        arg_repr = f"'{arg_value}'"
                    else:
                        arg_repr = str(arg_value)
                    reconstructed_args.append(arg_repr)
                else:
                    break
            used_params = set(param_names[: len(reconstructed_args)])
            for kw in parsed_keywords:
                if kw.arg not in used_params:
                    arg_value = args[kw.arg]
                    if isinstance(arg_value, str):
                        arg_repr = f"{kw.arg}='{arg_value}'"
                    else:
                        arg_repr = f"{kw.arg}={arg_value}"
                    reconstructed_args.append(arg_repr)
            new_args_str = ", ".join(reconstructed_args)
            new_full_call = f"{func_name}({new_args_str})"
            new_code = new_code.replace(full_call, new_full_call)
    return new_code
def split_args(args_str: str) -> List[str]:
    args = []
    current_arg = ""
    within_string = False
    string_char = ""
    prev_char = ""
    for char in args_str:
        if char in ['"', "'"]:
            if not within_string:
                within_string = True
                string_char = char
            elif within_string and prev_char != "\\" and char == string_char:
                within_string = False
        if char == "," and not within_string:
            args.append(current_arg)
            current_arg = ""
        else:
            current_arg += char
        prev_char = char
    if current_arg:
        args.append(current_arg)
    return args
def correct_pyautogui_arguments(code: str) -> str:
    function_corrections = {
        "write": {
            "incorrect_args": ["text", "content"],
            "correct_args": [],
            "keyword_arg": "message",
        },
        "press": {
            "incorrect_args": ["key", "button"],
            "correct_args": [],
            "keyword_arg": None,
        },
        "hotkey": {
            "incorrect_args": ["key1", "key2", "keys"],
            "correct_args": [],
            "keyword_arg": None,
        },
    }
    lines = code.strip().split("\n")
    corrected_lines = []
    for line in lines:
        line = line.strip()
        match = re.match(r"(pyautogui\.(\w+))\((.*)\)", line)
        if match:
            full_func_call = match.group(1)
            func_name = match.group(2)
            args_str = match.group(3)
            if func_name in function_corrections:
                func_info = function_corrections[func_name]
                args = split_args(args_str)
                corrected_args = []
                for arg in args:
                    arg = arg.strip()
                    kwarg_match = re.match(r"(\w+)\s*=\s*(.*)", arg)
                    if kwarg_match:
                        arg_name = kwarg_match.group(1)
                        arg_value = kwarg_match.group(2)
                        if arg_name in func_info["incorrect_args"]:
                            if func_info["keyword_arg"]:
                                corrected_args.append(
                                    f"{func_info['keyword_arg']}={arg_value}"
                                )
                            else:
                                corrected_args.append(arg_value)
                        else:
                            corrected_args.append(f"{arg_name}={arg_value}")
                    else:
                        corrected_args.append(arg)
                corrected_args_str = ", ".join(corrected_args)
                corrected_line = f"{full_func_call}({corrected_args_str})"
                corrected_lines.append(corrected_line)
            else:
                corrected_lines.append(line)
        else:
            corrected_lines.append(line)
    corrected_code = "\n".join(corrected_lines)
    return corrected_code
def image_message_from_obs(obs, for_training=False):
    if not for_training:
        return {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/png;base64,{encode_image(obs['screenshot'])}",
                "detail": "high",
            },
        }
    else:
        return {"type": "image_url", "image_url": {"url": obs["screenshot_path"]}}