# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors # # SPDX-License-Identifier: Apache-2.0 # # Portions derived from https://github.com/microsoft/autogen are under the MIT License. # SPDX-License-Identifier: MIT import json import logging import re from typing import Any, Union import tiktoken from .agentchat.contrib.img_utils import num_tokens_from_gpt_image from .import_utils import optional_import_block # if PIL is not imported, we will redefine num_tokens_from_gpt_image to return 0 tokens for images # Otherwise, it would raise an ImportError with optional_import_block() as result: import PIL # noqa: F401 pil_imported = result.is_successful if not pil_imported: def num_tokens_from_gpt_image(*args, **kwargs): return 0 logger = logging.getLogger(__name__) logger.img_dependency_warned = False # member variable to track if the warning has been logged def get_max_token_limit(model: str = "gpt-3.5-turbo-0613") -> int: # Handle common azure model names/aliases model = re.sub(r"^gpt\-?35", "gpt-3.5", model) model = re.sub(r"^gpt4", "gpt-4", model) max_token_limit = { "gpt-3.5-turbo": 16385, "gpt-3.5-turbo-0125": 16385, "gpt-3.5-turbo-0301": 4096, "gpt-3.5-turbo-0613": 4096, "gpt-3.5-turbo-instruct": 4096, "gpt-3.5-turbo-16k": 16385, "gpt-3.5-turbo-16k-0613": 16385, "gpt-3.5-turbo-1106": 16385, "gpt-4": 8192, "gpt-4-turbo": 128000, "gpt-4-turbo-2024-04-09": 128000, "gpt-4-32k": 32768, "gpt-4-32k-0314": 32768, # deprecate in Sep "gpt-4-0314": 8192, # deprecate in Sep "gpt-4-0613": 8192, "gpt-4-32k-0613": 32768, "gpt-4-1106-preview": 128000, "gpt-4-0125-preview": 128000, "gpt-4-turbo-preview": 128000, "gpt-4-vision-preview": 128000, "gpt-4o": 128000, "gpt-4o-2024-05-13": 128000, "gpt-4o-2024-08-06": 128000, "gpt-4o-2024-11-20": 128000, "gpt-4o-mini": 128000, "gpt-4o-mini-2024-07-18": 128000, } return max_token_limit[model] def percentile_used(input, model="gpt-3.5-turbo-0613"): return count_token(input) / get_max_token_limit(model) def token_left(input: Union[str, list[str], dict[str, Any]], model="gpt-3.5-turbo-0613") -> int: """Count number of tokens left for an OpenAI model. Args: input: (str, list, dict): Input to the model. model: (str): Model name. Returns: int: Number of tokens left that the model can use for completion. """ return get_max_token_limit(model) - count_token(input, model=model) def count_token(input: Union[str, list[str], dict[str, Any]], model: str = "gpt-3.5-turbo-0613") -> int: """Count number of tokens used by an OpenAI model. Args: input: (str, list, dict): Input to the model. model: (str): Model name. Returns: int: Number of tokens from the input. """ if isinstance(input, str): return _num_token_from_text(input, model=model) elif isinstance(input, (list, dict)): return _num_token_from_messages(input, model=model) else: raise ValueError(f"input must be str, list or dict, but we got {type(input)}") def _num_token_from_text(text: str, model: str = "gpt-3.5-turbo-0613"): """Return the number of tokens used by a string.""" try: encoding = tiktoken.encoding_for_model(model) except KeyError: logger.warning(f"Model {model} not found. Using cl100k_base encoding.") encoding = tiktoken.get_encoding("cl100k_base") return len(encoding.encode(text)) def _num_token_from_messages(messages: Union[list[str], dict[str, Any]], model="gpt-3.5-turbo-0613"): """Return the number of tokens used by a list of messages. retrieved from https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb/ """ if isinstance(messages, dict): messages = [messages] try: encoding = tiktoken.encoding_for_model(model) except KeyError: logger.warning(f"Model {model} not found. Using cl100k_base encoding.") encoding = tiktoken.get_encoding("cl100k_base") if model in { "gpt-3.5-turbo-0613", "gpt-3.5-turbo-16k-0613", "gpt-4-0314", "gpt-4-32k-0314", "gpt-4-0613", "gpt-4-32k-0613", "gpt-4-turbo-preview", "gpt-4-vision-preview", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-2024-11-20", "gpt-4o-mini", "gpt-4o-mini-2024-07-18", }: tokens_per_message = 3 tokens_per_name = 1 elif model == "gpt-3.5-turbo-0301": tokens_per_message = 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n tokens_per_name = -1 # if there's a name, the role is omitted elif "gpt-3.5-turbo" in model: logger.info("gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.") return _num_token_from_messages(messages, model="gpt-3.5-turbo-0613") elif "gpt-4" in model: logger.info("gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.") return _num_token_from_messages(messages, model="gpt-4-0613") elif "gemini" in model: logger.info("Gemini is not supported in tiktoken. Returning num tokens assuming gpt-4-0613.") return _num_token_from_messages(messages, model="gpt-4-0613") elif "claude" in model: logger.info("Claude is not supported in tiktoken. Returning num tokens assuming gpt-4-0613.") return _num_token_from_messages(messages, model="gpt-4-0613") elif "mistral-" in model or "mixtral-" in model: logger.info("Mistral.AI models are not supported in tiktoken. Returning num tokens assuming gpt-4-0613.") return _num_token_from_messages(messages, model="gpt-4-0613") elif "deepseek" in model: logger.info("Deepseek models are not supported in tiktoken. Returning num tokens assuming gpt-4-0613.") return _num_token_from_messages(messages, model="gpt-4-0613") else: raise NotImplementedError( f"""_num_token_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""" ) num_tokens = 0 for message in messages: num_tokens += tokens_per_message for key, value in message.items(): if value is None: continue # handle content if images are in GPT-4-vision if key == "content" and isinstance(value, list): for part in value: if not isinstance(part, dict) or "type" not in part: continue if part["type"] == "text": num_tokens += len(encoding.encode(part["text"])) if "image_url" in part: if not pil_imported and not logger.img_dependency_warned: logger.warning( "img_utils or PIL not imported. Skipping image token count." "Please install autogen with [lmm] option.", ) logger.img_dependency_warned = True try: num_tokens += num_tokens_from_gpt_image( image_data=part["image_url"]["url"], model=model ) except ValueError as e: logger.warning(f"Error in num_tokens_from_gpt_image: {e}") continue # function calls if not isinstance(value, str): try: value = json.dumps(value) except TypeError: logger.warning( f"Value {value} is not a string and cannot be converted to json. It is a type: {type(value)} Skipping." ) continue num_tokens += len(encoding.encode(value)) if key == "name": num_tokens += tokens_per_name num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> return num_tokens def num_tokens_from_functions(functions, model="gpt-3.5-turbo-0613") -> int: """Return the number of tokens used by a list of functions. Args: functions: (list): List of function descriptions that will be passed in model. model: (str): Model name. Returns: int: Number of tokens from the function descriptions. """ try: encoding = tiktoken.encoding_for_model(model) except KeyError: logger.warning(f"Model {model} not found. Using cl100k_base encoding.") encoding = tiktoken.get_encoding("cl100k_base") num_tokens = 0 for function in functions: function_tokens = len(encoding.encode(function["name"])) function_tokens += len(encoding.encode(function["description"])) function_tokens -= 2 if "parameters" in function: parameters = function["parameters"] if "properties" in parameters: for properties_key in parameters["properties"]: function_tokens += len(encoding.encode(properties_key)) v = parameters["properties"][properties_key] for field in v: if field == "type": function_tokens += 2 function_tokens += len(encoding.encode(v["type"])) elif field == "description": function_tokens += 2 function_tokens += len(encoding.encode(v["description"])) elif field == "enum": function_tokens -= 3 for o in v["enum"]: function_tokens += 3 function_tokens += len(encoding.encode(o)) else: logger.warning(f"Not supported field {field}") function_tokens += 11 if len(parameters["properties"]) == 0: function_tokens -= 2 num_tokens += function_tokens num_tokens += 12 return num_tokens