mirrored a few seconds ago
0
Linxin SongCoACT initialize (#292) b968155
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from  https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT
import base64
import copy
import os
import re
from io import BytesIO
from math import ceil
from typing import Any, Union

import requests

from ...import_utils import optional_import_block, require_optional_import
from .. import utils

with optional_import_block():
    from PIL import Image


# Parameters for token counting for images for different models
MODEL_PARAMS = {
    "gpt-4-vision": {
        "max_edge": 2048,
        "min_edge": 768,
        "tile_size": 512,
        "base_token_count": 85,
        "token_multiplier": 170,
    },
    "gpt-4o-mini": {
        "max_edge": 2048,
        "min_edge": 768,
        "tile_size": 512,
        "base_token_count": 2833,
        "token_multiplier": 5667,
    },
    "gpt-4o": {"max_edge": 2048, "min_edge": 768, "tile_size": 512, "base_token_count": 85, "token_multiplier": 170},
}


@require_optional_import("PIL", "unknown")
def get_pil_image(image_file: Union[str, "Image.Image"]) -> "Image.Image":
    """Loads an image from a file and returns a PIL Image object.

    Parameters:
        image_file (str, or Image): The filename, URL, URI, or base64 string of the image file.

    Returns:
        Image.Image: The PIL Image object.
    """
    if isinstance(image_file, Image.Image):
        # Already a PIL Image object
        return image_file

    # Remove quotes if existed
    if image_file.startswith('"') and image_file.endswith('"'):
        image_file = image_file[1:-1]
    if image_file.startswith("'") and image_file.endswith("'"):
        image_file = image_file[1:-1]

    if image_file.startswith("http://") or image_file.startswith("https://"):
        # A URL file
        response = requests.get(image_file)
        content = BytesIO(response.content)
        image = Image.open(content)
    # Match base64-encoded image URIs for supported formats: jpg, jpeg, png, gif, bmp, webp
    elif re.match(r"data:image/(?:jpg|jpeg|png|gif|bmp|webp);base64,", image_file):
        # A URI. Remove the prefix and decode the base64 string.
        base64_data = re.sub(r"data:image/(?:jpg|jpeg|png|gif|bmp|webp);base64,", "", image_file)
        image = _to_pil(base64_data)
    elif os.path.exists(image_file):
        # A local file
        image = Image.open(image_file)
    else:
        # base64 encoded string
        image = _to_pil(image_file)

    return image.convert("RGB")


@require_optional_import("PIL", "unknown")
def get_image_data(image_file: Union[str, "Image.Image"], use_b64=True) -> bytes:
    """Loads an image and returns its data either as raw bytes or in base64-encoded format.

    This function first loads an image from the specified file, URL, or base64 string using
    the `get_pil_image` function. It then saves this image in memory in PNG format and
    retrieves its binary content. Depending on the `use_b64` flag, this binary content is
    either returned directly or as a base64-encoded string.

    Parameters:
        image_file (str, or Image): The path to the image file, a URL to an image, or a base64-encoded
                          string of the image.
        use_b64 (bool): If True, the function returns a base64-encoded string of the image data.
                        If False, it returns the raw byte data of the image. Defaults to True.

    Returns:
        bytes: The image data in raw bytes if `use_b64` is False, or a base64-encoded string
               if `use_b64` is True.
    """
    image = get_pil_image(image_file)

    buffered = BytesIO()
    image.save(buffered, format="PNG")
    content = buffered.getvalue()

    if use_b64:
        return base64.b64encode(content).decode("utf-8")
    else:
        return content


@require_optional_import("PIL", "unknown")
def llava_formatter(prompt: str, order_image_tokens: bool = False) -> tuple[str, list[str]]:
    """Formats the input prompt by replacing image tags and returns the new prompt along with image locations.

    Parameters:
        - prompt (str): The input string that may contain image tags like `<img ...>`.
        - order_image_tokens (bool, optional): Whether to order the image tokens with numbers.
            It will be useful for GPT-4V. Defaults to False.

    Returns:
        - Tuple[str, List[str]]: A tuple containing the formatted string and a list of images (loaded in b64 format).
    """
    # Initialize variables
    new_prompt = prompt
    image_locations = []
    images = []
    image_count = 0

    # Regular expression pattern for matching <img ...> tags
    img_tag_pattern = re.compile(r"<img ([^>]+)>")

    # Find all image tags
    for match in img_tag_pattern.finditer(prompt):
        image_location = match.group(1)

        try:
            img_data = get_image_data(image_location)
        except Exception as e:
            # Remove the token
            print(f"Warning! Unable to load image from {image_location}, because of {e}")
            new_prompt = new_prompt.replace(match.group(0), "", 1)
            continue

        image_locations.append(image_location)
        images.append(img_data)

        # Increment the image count and replace the tag in the prompt
        new_token = f"<image {image_count}>" if order_image_tokens else "<image>"

        new_prompt = new_prompt.replace(match.group(0), new_token, 1)
        image_count += 1

    return new_prompt, images


@require_optional_import("PIL", "unknown")
def pil_to_data_uri(image: "Image.Image") -> str:
    """Converts a PIL Image object to a data URI.

    Parameters:
        image (Image.Image): The PIL Image object.

    Returns:
        str: The data URI string.
    """
    buffered = BytesIO()
    image.save(buffered, format="PNG")
    content = buffered.getvalue()
    return convert_base64_to_data_uri(base64.b64encode(content).decode("utf-8"))


def convert_base64_to_data_uri(base64_image):
    def _get_mime_type_from_data_uri(base64_image):
        # Decode the base64 string
        image_data = base64.b64decode(base64_image)
        # Check the first few bytes for known signatures
        if image_data.startswith(b"\xff\xd8\xff"):
            return "image/jpeg"
        elif image_data.startswith(b"\x89PNG\r\n\x1a\n"):
            return "image/png"
        elif image_data.startswith(b"GIF87a") or image_data.startswith(b"GIF89a"):
            return "image/gif"
        elif image_data.startswith(b"RIFF") and image_data[8:12] == b"WEBP":
            return "image/webp"
        return "image/jpeg"  # use jpeg for unknown formats, best guess.

    mime_type = _get_mime_type_from_data_uri(base64_image)
    data_uri = f"data:{mime_type};base64,{base64_image}"
    return data_uri


@require_optional_import("PIL", "unknown")
def gpt4v_formatter(prompt: str, img_format: str = "uri") -> list[Union[str, dict[str, Any]]]:
    """Formats the input prompt by replacing image tags and returns a list of text and images.

    Args:
        prompt (str): The input string that may contain image tags like `<img ...>`.
        img_format (str): what image format should be used. One of "uri", "url", "pil".

    Returns:
        List[Union[str, dict[str, Any]]]: A list of alternating text and image dictionary items.
    """
    assert img_format in ["uri", "url", "pil"]

    output = []
    last_index = 0
    image_count = 0

    # Find all image tags
    for parsed_tag in utils.parse_tags_from_content("img", prompt):
        image_location = parsed_tag["attr"]["src"]
        try:
            if img_format == "pil":
                img_data = get_pil_image(image_location)
            elif img_format == "uri":
                img_data = get_image_data(image_location)
                img_data = convert_base64_to_data_uri(img_data)
            elif img_format == "url":
                img_data = image_location
            else:
                raise ValueError(f"Unknown image format {img_format}")
        except Exception as e:
            # Warning and skip this token
            print(f"Warning! Unable to load image from {image_location}, because {e}")
            continue

        # Add text before this image tag to output list
        output.append({"type": "text", "text": prompt[last_index : parsed_tag["match"].start()]})

        # Add image data to output list
        output.append({"type": "image_url", "image_url": {"url": img_data}})

        last_index = parsed_tag["match"].end()
        image_count += 1

    # Add remaining text to output list
    if last_index < len(prompt):
        output.append({"type": "text", "text": prompt[last_index:]})
    return output


def extract_img_paths(paragraph: str) -> list:
    """Extract image paths (URLs or local paths) from a text paragraph.

    Parameters:
        paragraph (str): The input text paragraph.

    Returns:
        list: A list of extracted image paths.
    """
    # Regular expression to match image URLs and file paths.
    # This regex detects URLs and file paths with common image extensions, including support for the webp format.
    img_path_pattern = re.compile(
        r"\b(?:http[s]?://\S+\.(?:jpg|jpeg|png|gif|bmp|webp)|\S+\.(?:jpg|jpeg|png|gif|bmp|webp))\b", re.IGNORECASE
    )

    # Find all matches in the paragraph
    img_paths = re.findall(img_path_pattern, paragraph)
    return img_paths


@require_optional_import("PIL", "unknown")
def _to_pil(data: str) -> "Image.Image":
    """Converts a base64 encoded image data string to a PIL Image object.

    This function first decodes the base64 encoded string to bytes, then creates a BytesIO object from the bytes,
    and finally creates and returns a PIL Image object from the BytesIO object.

    Parameters:
        data (str): The encoded image data string.

    Returns:
        Image.Image: The PIL Image object created from the input data.
    """
    return Image.open(BytesIO(base64.b64decode(data)))


@require_optional_import("PIL", "unknown")
def message_formatter_pil_to_b64(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
    """Converts the PIL image URLs in the messages to base64 encoded data URIs.

    This function iterates over a list of message dictionaries. For each message,
    if it contains a 'content' key with a list of items, it looks for items
    with an 'image_url' key. The function then converts the PIL image URL
    (pointed to by 'image_url') to a base64 encoded data URI.

    Parameters:
        messages (List[Dict]): A list of message dictionaries. Each dictionary
                               may contain a 'content' key with a list of items,
                               some of which might be image URLs.

    Returns:
        List[Dict]: A new list of message dictionaries with PIL image URLs in the
                    'image_url' key converted to base64 encoded data URIs.

    Example Input:
        example 1:
        ```python
        [
            {'content': [{'type': 'text', 'text': 'You are a helpful AI assistant.'}], 'role': 'system'},
            {'content': [
                {'type': 'text', 'text': "What's the breed of this dog here?"},
                {'type': 'image_url', 'image_url': {'url': a PIL.Image.Image}},
                {'type': 'text', 'text': '.'}],
            'role': 'user'}
        ]
        ```

    Example Output:
        example 1:
        ```python
        [
            {'content': [{'type': 'text', 'text': 'You are a helpful AI assistant.'}], 'role': 'system'},
            {'content': [
                {'type': 'text', 'text': "What's the breed of this dog here?"},
                {'type': 'image_url', 'image_url': {'url': a B64 Image}},
                {'type': 'text', 'text': '.'}],
            'role': 'user'}
        ]
        ```
    """
    new_messages = []
    for message in messages:
        # deepcopy to avoid modifying the original message.
        message = copy.deepcopy(message)
        if isinstance(message, dict) and "content" in message:
            # First, if the content is a string, parse it into a list of parts.
            # This is for tool output that contains images.
            if isinstance(message["content"], str):
                message["content"] = gpt4v_formatter(message["content"], img_format="pil")

            # Second, if the content is a list, process any image parts.
            if isinstance(message["content"], list):
                for item in message["content"]:
                    if (
                        isinstance(item, dict)
                        and "image_url" in item
                        and isinstance(item["image_url"]["url"], Image.Image)
                    ):
                        item["image_url"]["url"] = pil_to_data_uri(item["image_url"]["url"])

        new_messages.append(message)

    return new_messages


@require_optional_import("PIL", "unknown")
def num_tokens_from_gpt_image(
    image_data: Union[str, "Image.Image"], model: str = "gpt-4-vision", low_quality: bool = False
) -> int:
    """Calculate the number of tokens required to process an image based on its dimensions
    after scaling for different GPT models. Supports "gpt-4-vision", "gpt-4o", and "gpt-4o-mini".
    This function scales the image so that its longest edge is at most 2048 pixels and its shortest
    edge is at most 768 pixels (for "gpt-4-vision"). It then calculates the number of 512x512 tiles
    needed to cover the scaled image and computes the total tokens based on the number of these tiles.

    Reference: https://openai.com/api/pricing/

    Args:
        image_data : Union[str, Image.Image]: The image data which can either be a base64 encoded string, a URL, a file path, or a PIL Image object.
        model: str: The model being used for image processing. Can be "gpt-4-vision", "gpt-4o", or "gpt-4o-mini".
        low_quality: bool: Whether to use low-quality processing. Defaults to False.

    Returns:
        int: The total number of tokens required for processing the image.

    Examples:
    --------
    >>> from PIL import Image
    >>> img = Image.new("RGB", (2500, 2500), color="red")
    >>> num_tokens_from_gpt_image(img, model="gpt-4-vision")
    765
    """
    image = get_pil_image(image_data)  # PIL Image
    width, height = image.size

    # Determine model parameters
    if "gpt-4-vision" in model or "gpt-4-turbo" in model or "gpt-4v" in model or "gpt-4-v" in model:
        params = MODEL_PARAMS["gpt-4-vision"]
    elif "gpt-4o-mini" in model:
        params = MODEL_PARAMS["gpt-4o-mini"]
    elif "gpt-4o" in model:
        params = MODEL_PARAMS["gpt-4o"]
    else:
        raise ValueError(
            f"Model {model} is not supported. Choose 'gpt-4-vision', 'gpt-4-turbo', 'gpt-4v', 'gpt-4-v', 'gpt-4o', or 'gpt-4o-mini'."
        )

    if low_quality:
        return params["base_token_count"]

    # 1. Constrain the longest edge
    if max(width, height) > params["max_edge"]:
        scale_factor = params["max_edge"] / max(width, height)
        width, height = int(width * scale_factor), int(height * scale_factor)

    # 2. Further constrain the shortest edge
    if min(width, height) > params["min_edge"]:
        scale_factor = params["min_edge"] / min(width, height)
        width, height = int(width * scale_factor), int(height * scale_factor)

    # 3. Count how many tiles are needed to cover the image
    tiles_width = ceil(width / params["tile_size"])
    tiles_height = ceil(height / params["tile_size"])
    total_tokens = params["base_token_count"] + params["token_multiplier"] * (tiles_width * tiles_height)

    return total_tokens