mirrored 5 minutes ago
0
Linxin SongCoACT initialize (#292) b968155
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT
"""Create an OpenAI-compatible client using Ollama's API.

Example:
    ```python
    llm_config = {"config_list": [{"api_type": "ollama", "model": "mistral:7b-instruct-v0.3-q6_K"}]}

    agent = autogen.AssistantAgent("my_agent", llm_config=llm_config)
    ```

Install Ollama's python library using: pip install --upgrade ollama
Install fix-busted-json library: pip install --upgrade fix-busted-json

Resources:
- https://github.com/ollama/ollama-python
"""

from __future__ import annotations

import copy
import json
import random
import re
import time
import warnings
from typing import Any, Literal, Optional, Union

from pydantic import BaseModel, Field, HttpUrl

from ..import_utils import optional_import_block, require_optional_import
from ..llm_config import LLMConfigEntry, register_llm_config
from .client_utils import FormatterProtocol, should_hide_tools, validate_parameter
from .oai_models import ChatCompletion, ChatCompletionMessage, ChatCompletionMessageToolCall, Choice, CompletionUsage

with optional_import_block():
    import ollama
    from fix_busted_json import repair_json
    from ollama import Client


@register_llm_config
class OllamaLLMConfigEntry(LLMConfigEntry):
    api_type: Literal["ollama"] = "ollama"
    client_host: Optional[HttpUrl] = None
    stream: bool = False
    num_predict: int = Field(
        default=-1,
        description="Maximum number of tokens to predict, note: -1 is infinite (default), -2 is fill context.",
    )
    num_ctx: int = Field(default=2048)
    repeat_penalty: float = Field(default=1.1)
    seed: int = Field(default=0)
    temperature: float = Field(default=0.8)
    top_k: int = Field(default=40)
    top_p: float = Field(default=0.9)
    hide_tools: Literal["if_all_run", "if_any_run", "never"] = "never"

    def create_client(self):
        raise NotImplementedError("OllamaLLMConfigEntry.create_client is not implemented.")


class OllamaClient:
    """Client for Ollama's API."""

    # Defaults for manual tool calling
    # Instruction is added to the first system message and provides directions to follow a two step
    # process
    # 1. (before tools have been called) Return JSON with the functions to call
    # 2. (directly after tools have been called) Return Text describing the results of the function calls in text format

    # Override using "manual_tool_call_instruction" config parameter
    TOOL_CALL_MANUAL_INSTRUCTION = (
        "You are to follow a strict two step process that will occur over "
        "a number of interactions, so pay attention to what step you are in based on the full "
        "conversation. We will be taking turns so only do one step at a time so don't perform step "
        "2 until step 1 is complete and I've told you the result. The first step is to choose one "
        "or more functions based on the request given and return only JSON with the functions and "
        "arguments to use. The second step is to analyse the given output of the function and summarise "
        "it returning only TEXT and not Python or JSON. "
        "For argument values, be sure numbers aren't strings, they should not have double quotes around them. "
        "In terms of your response format, for step 1 return only JSON and NO OTHER text, "
        "for step 2 return only text and NO JSON/Python/Markdown. "
        'The format for running a function is [{"name": "function_name1", "arguments":{"argument_name": "argument_value"}},{"name": "function_name2", "arguments":{"argument_name": "argument_value"}}] '
        'Make sure the keys "name" and "arguments" are as described. '
        "If you don't get the format correct, try again. "
        "The following functions are available to you:[FUNCTIONS_LIST]"
    )

    # Appended to the last user message if no tools have been called
    # Override using "manual_tool_call_step1" config parameter
    TOOL_CALL_MANUAL_STEP1 = " (proceed with step 1)"

    # Appended to the user message after tools have been executed. Will create a 'user' message if one doesn't exist.
    # Override using "manual_tool_call_step2" config parameter
    TOOL_CALL_MANUAL_STEP2 = " (proceed with step 2)"

    def __init__(self, response_format: Optional[Union[BaseModel, dict[str, Any]]] = None, **kwargs):
        """Note that no api_key or environment variable is required for Ollama."""

        # Store the response format, if provided (for structured outputs)
        self._response_format: Optional[Union[BaseModel, dict[str, Any]]] = response_format

    def message_retrieval(self, response) -> list:
        """Retrieve and return a list of strings or a list of Choice.Message from the response.

        NOTE: if a list of Choice.Message is returned, it currently needs to contain the fields of OpenAI's ChatCompletion Message object,
        since that is expected for function or tool calling in the rest of the codebase at the moment, unless a custom agent is being used.
        """
        return [choice.message for choice in response.choices]

    def cost(self, response) -> float:
        return response.cost

    @staticmethod
    def get_usage(response) -> dict:
        """Return usage summary of the response using RESPONSE_USAGE_KEYS."""
        # ...  # pragma: no cover
        return {
            "prompt_tokens": response.usage.prompt_tokens,
            "completion_tokens": response.usage.completion_tokens,
            "total_tokens": response.usage.total_tokens,
            "cost": response.cost,
            "model": response.model,
        }

    def parse_params(self, params: dict[str, Any]) -> dict[str, Any]:
        """Loads the parameters for Ollama API from the passed in parameters and returns a validated set. Checks types, ranges, and sets defaults"""
        ollama_params = {}

        # Check that we have what we need to use Ollama's API
        # https://github.com/ollama/ollama/blob/main/docs/api.md#generate-a-completion

        # The main parameters are model, prompt, stream, and options
        # Options is a dictionary of parameters for the model
        # There are other, advanced, parameters such as format, system (to override system message), template, raw, etc. - not used

        # We won't enforce the available models
        ollama_params["model"] = params.get("model")
        assert ollama_params["model"], (
            "Please specify the 'model' in your config list entry to nominate the Ollama model to use."
        )

        ollama_params["stream"] = validate_parameter(params, "stream", bool, True, False, None, None)

        # Build up the options dictionary
        # https://github.com/ollama/ollama/blob/main/docs/modelfile.md#valid-parameters-and-values
        options_dict = {}

        if "num_predict" in params:
            # Maximum number of tokens to predict, note: -1 is infinite, -2 is fill context, 128 is default
            options_dict["num_predict"] = validate_parameter(params, "num_predict", int, False, 128, None, None)

        if "num_ctx" in params:
            # Set size of context window used to generate next token, 2048 is default
            options_dict["num_ctx"] = validate_parameter(params, "num_ctx", int, False, 2048, None, None)

        if "repeat_penalty" in params:
            options_dict["repeat_penalty"] = validate_parameter(
                params, "repeat_penalty", (int, float), False, 1.1, None, None
            )

        if "seed" in params:
            options_dict["seed"] = validate_parameter(params, "seed", int, False, 42, None, None)

        if "temperature" in params:
            options_dict["temperature"] = validate_parameter(
                params, "temperature", (int, float), False, 0.8, None, None
            )

        if "top_k" in params:
            options_dict["top_k"] = validate_parameter(params, "top_k", int, False, 40, None, None)

        if "top_p" in params:
            options_dict["top_p"] = validate_parameter(params, "top_p", (int, float), False, 0.9, None, None)

        if self._native_tool_calls and self._tools_in_conversation and not self._should_hide_tools:
            ollama_params["tools"] = params["tools"]

            # Ollama doesn't support streaming with tools natively
            if ollama_params["stream"] and self._native_tool_calls:
                warnings.warn(
                    "Streaming is not supported when using tools and 'Native' tool calling, streaming will be disabled.",
                    UserWarning,
                )

                ollama_params["stream"] = False

        if not self._native_tool_calls and self._tools_in_conversation:
            # For manual tool calling we have injected the available tools into the prompt
            # and we don't want to force JSON mode
            ollama_params["format"] = ""  # Don't force JSON for manual tool calling mode

        if len(options_dict) != 0:
            ollama_params["options"] = options_dict

        # Structured outputs (see https://ollama.com/blog/structured-outputs)
        if not self._response_format and params.get("response_format"):
            self._response_format = params["response_format"]

        if self._response_format:
            if isinstance(self._response_format, dict):
                ollama_params["format"] = self._response_format
            else:
                # Keep self._response_format as a Pydantic model for when process the response
                ollama_params["format"] = self._response_format.model_json_schema()

        return ollama_params

    @require_optional_import(["ollama", "fix_busted_json"], "ollama")
    def create(self, params: dict) -> ChatCompletion:
        messages = params.get("messages", [])

        # Are tools involved in this conversation?
        self._tools_in_conversation = "tools" in params

        # We provide second-level filtering out of tools to avoid LLMs re-calling tools continuously
        if self._tools_in_conversation:
            hide_tools = validate_parameter(
                params, "hide_tools", str, False, "never", None, ["if_all_run", "if_any_run", "never"]
            )
            self._should_hide_tools = should_hide_tools(messages, params["tools"], hide_tools)
        else:
            self._should_hide_tools = False

        # Are we using native Ollama tool calling, otherwise we're doing manual tool calling
        # We allow the user to decide if they want to use Ollama's tool calling
        # or for tool calling to be handled manually through text messages
        # Default is True = Ollama's tool calling
        self._native_tool_calls = validate_parameter(params, "native_tool_calls", bool, False, True, None, None)

        if not self._native_tool_calls:
            # Load defaults
            self._manual_tool_call_instruction = validate_parameter(
                params, "manual_tool_call_instruction", str, False, self.TOOL_CALL_MANUAL_INSTRUCTION, None, None
            )
            self._manual_tool_call_step1 = validate_parameter(
                params, "manual_tool_call_step1", str, False, self.TOOL_CALL_MANUAL_STEP1, None, None
            )
            self._manual_tool_call_step2 = validate_parameter(
                params, "manual_tool_call_step2", str, False, self.TOOL_CALL_MANUAL_STEP2, None, None
            )

        # Convert AG2 messages to Ollama messages
        ollama_messages = self.oai_messages_to_ollama_messages(
            messages,
            (
                params["tools"]
                if (not self._native_tool_calls and self._tools_in_conversation) and not self._should_hide_tools
                else None
            ),
        )

        # Parse parameters to the Ollama API's parameters
        ollama_params = self.parse_params(params)

        ollama_params["messages"] = ollama_messages

        # Token counts will be returned
        prompt_tokens = 0
        completion_tokens = 0
        total_tokens = 0

        ans = None
        if "client_host" in params:
            # Convert client_host to string from HttpUrl
            client = Client(host=str(params["client_host"]))
            response = client.chat(**ollama_params)
        else:
            response = ollama.chat(**ollama_params)

        if ollama_params["stream"]:
            # Read in the chunks as they stream, taking in tool_calls which may be across
            # multiple chunks if more than one suggested
            ans = ""
            for chunk in response:
                ans = ans + (chunk["message"]["content"] or "")

                if "done_reason" in chunk:
                    prompt_tokens = chunk.get("prompt_eval_count", 0)
                    completion_tokens = chunk.get("eval_count", 0)
                    total_tokens = prompt_tokens + completion_tokens
        else:
            # Non-streaming finished
            ans: str = response["message"]["content"]

            prompt_tokens = response.get("prompt_eval_count", 0)
            completion_tokens = response.get("eval_count", 0)
            total_tokens = prompt_tokens + completion_tokens

        if response is not None:
            # Defaults
            ollama_finish = "stop"
            tool_calls = None

            # Id and streaming text into response
            if ollama_params["stream"]:
                response_content = ans
                response_id = chunk["created_at"]
            else:
                response_content = response["message"]["content"]
                response_id = response["created_at"]

            # Process tools in the response
            if self._tools_in_conversation:
                if self._native_tool_calls:
                    if not ollama_params["stream"]:
                        response_content = response["message"]["content"]

                        # Native tool calling
                        if "tool_calls" in response["message"]:
                            ollama_finish = "tool_calls"
                            tool_calls = []
                            random_id = random.randint(0, 10000)
                            for tool_call in response["message"]["tool_calls"]:
                                tool_calls.append(
                                    ChatCompletionMessageToolCall(
                                        id=f"ollama_func_{random_id}",
                                        function={
                                            "name": tool_call["function"]["name"],
                                            "arguments": json.dumps(tool_call["function"]["arguments"]),
                                        },
                                        type="function",
                                    )
                                )

                                random_id += 1

                elif not self._native_tool_calls:
                    # Try to convert the response to a tool call object
                    response_toolcalls = response_to_tool_call(ans)

                    # If we can, then we've got tool call(s)
                    if response_toolcalls is not None:
                        ollama_finish = "tool_calls"
                        tool_calls = []
                        random_id = random.randint(0, 10000)

                        for json_function in response_toolcalls:
                            tool_calls.append(
                                ChatCompletionMessageToolCall(
                                    id=f"ollama_manual_func_{random_id}",
                                    function={
                                        "name": json_function["name"],
                                        "arguments": (
                                            json.dumps(json_function["arguments"])
                                            if "arguments" in json_function
                                            else "{}"
                                        ),
                                    },
                                    type="function",
                                )
                            )

                            random_id += 1

                        # Blank the message content
                        response_content = ""

            if ollama_finish == "stop":  # noqa: SIM102
                # Not a tool call, so let's check if we need to process structured output
                if self._response_format and response_content:
                    try:
                        parsed_response = self._convert_json_response(response_content)
                        response_content = _format_json_response(parsed_response, response_content)
                    except ValueError as e:
                        response_content = str(e)
        else:
            raise RuntimeError("Failed to get response from Ollama.")

        # Convert response to AG2 response
        message = ChatCompletionMessage(
            role="assistant",
            content=response_content,
            function_call=None,
            tool_calls=tool_calls,
        )
        choices = [Choice(finish_reason=ollama_finish, index=0, message=message)]

        response_oai = ChatCompletion(
            id=response_id,
            model=ollama_params["model"],
            created=int(time.time()),
            object="chat.completion",
            choices=choices,
            usage=CompletionUsage(
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                total_tokens=total_tokens,
            ),
            cost=0,  # Local models, FREE!
        )

        return response_oai

    def oai_messages_to_ollama_messages(self, messages: list[dict[str, Any]], tools: list) -> list[dict[str, Any]]:
        """Convert messages from OAI format to Ollama's format.
        We correct for any specific role orders and types, and convert tools to messages (as Ollama can't use tool messages)
        """
        ollama_messages = copy.deepcopy(messages)

        # Remove the name field
        for message in ollama_messages:
            if "name" in message:
                message.pop("name", None)

        # Having a 'system' message on the end does not work well with Ollama, so we change it to 'user'
        # 'system' messages on the end are typical of the summarisation message: summary_method="reflection_with_llm"
        if len(ollama_messages) > 1 and ollama_messages[-1]["role"] == "system":
            ollama_messages[-1]["role"] = "user"

        # Process messages for tool calling manually
        if tools is not None and not self._native_tool_calls:
            # 1. We need to append instructions to the starting system message on function calling
            # 2. If we have not yet called tools we append "step 1 instruction" to the latest user message
            # 3. If we have already called tools we append "step 2 instruction" to the latest user message

            have_tool_calls = False
            have_tool_results = False
            last_tool_result_index = -1

            for i, message in enumerate(ollama_messages):
                if "tool_calls" in message:
                    have_tool_calls = True
                if "tool_call_id" in message:
                    have_tool_results = True
                    last_tool_result_index = i

            tool_result_is_last_msg = have_tool_results and last_tool_result_index == len(ollama_messages) - 1

            if ollama_messages[0]["role"] == "system":
                manual_instruction = self._manual_tool_call_instruction

                # Build a string of the functions available
                functions_string = ""
                for function in tools:
                    functions_string += f"""\n{function}\n"""

                # Replace single quotes with double questions - Not sure why this helps the LLM perform
                # better, but it seems to. Monitor and remove if not necessary.
                functions_string = functions_string.replace("'", '"')

                manual_instruction = manual_instruction.replace("[FUNCTIONS_LIST]", functions_string)

                # Update the system message with the instructions and functions
                ollama_messages[0]["content"] = ollama_messages[0]["content"] + manual_instruction.rstrip()

            # If we are still in the function calling or evaluating process, append the steps instruction
            if (not have_tool_calls or tool_result_is_last_msg) and ollama_messages[0]["role"] == "system":
                # NOTE: we require a system message to exist for the manual steps texts
                # Append the manual step instructions
                content_to_append = (
                    self._manual_tool_call_step1 if not have_tool_results else self._manual_tool_call_step2
                )

                if content_to_append != "":
                    # Append the relevant tool call instruction to the latest user message
                    if ollama_messages[-1]["role"] == "user":
                        ollama_messages[-1]["content"] = ollama_messages[-1]["content"] + content_to_append
                    else:
                        ollama_messages.append({"role": "user", "content": content_to_append})

        # Convert tool call and tool result messages to normal text messages for Ollama
        for i, message in enumerate(ollama_messages):
            if "tool_calls" in message:
                # Recommended tool calls
                content = "Run the following function(s):"
                for tool_call in message["tool_calls"]:
                    content = content + "\n" + str(tool_call)
                ollama_messages[i] = {"role": "assistant", "content": content}
            if "tool_call_id" in message:
                # Executed tool results
                message["result"] = message["content"]
                del message["content"]
                del message["role"]
                content = "The following function was run: " + str(message)
                ollama_messages[i] = {"role": "user", "content": content}

        # As we are changing messages, let's merge if they have two user messages on the end and the last one is tool call step instructions
        if (
            len(ollama_messages) >= 2
            and not self._native_tool_calls
            and ollama_messages[-2]["role"] == "user"
            and ollama_messages[-1]["role"] == "user"
            and (
                ollama_messages[-1]["content"] == self._manual_tool_call_step1
                or ollama_messages[-1]["content"] == self._manual_tool_call_step2
            )
        ):
            ollama_messages[-2]["content"] = ollama_messages[-2]["content"] + ollama_messages[-1]["content"]
            del ollama_messages[-1]

        # Ensure the last message is a user / system message, if not, add a user message
        if ollama_messages[-1]["role"] != "user" and ollama_messages[-1]["role"] != "system":
            ollama_messages.append({"role": "user", "content": "Please continue."})

        return ollama_messages

    def _convert_json_response(self, response: str) -> Any:
        """Extract and validate JSON response from the output for structured outputs.

        Args:
            response (str): The response from the API.

        Returns:
            Any: The parsed JSON response.
        """
        if not self._response_format:
            return response

        try:
            # Parse JSON and validate against the Pydantic model if Pydantic model was provided
            if isinstance(self._response_format, dict):
                return response
            else:
                return self._response_format.model_validate_json(response)
        except Exception as e:
            raise ValueError(f"Failed to parse response as valid JSON matching the schema for Structured Output: {e!s}")


def _format_json_response(response: Any, original_answer: str) -> str:
    """Formats the JSON response for structured outputs using the format method if it exists."""
    return response.format() if isinstance(response, FormatterProtocol) else original_answer


@require_optional_import("fix_busted_json", "ollama")
def response_to_tool_call(response_string: str) -> Any:
    """Attempts to convert the response to an object, aimed to align with function format `[{},{}]`"""
    # We try and detect the list[dict[str, Any]] format:
    # Pattern 1 is [{},{}]
    # Pattern 2 is {} (without the [], so could be a single function call)
    patterns = [r"\[[\s\S]*?\]", r"\{[\s\S]*\}"]

    for i, pattern in enumerate(patterns):
        # Search for the pattern in the input string
        matches = re.findall(pattern, response_string.strip())

        for match in matches:
            # It has matched, extract it and load it
            json_str = match.strip()
            data_object = None

            try:
                # Attempt to convert it as is
                data_object = json.loads(json_str)
            except Exception:
                try:
                    # If that fails, attempt to repair it

                    if i == 0:
                        # Enclose to a JSON object for repairing, which is restored upon fix
                        fixed_json = repair_json("{'temp':" + json_str + "}")
                        data_object = json.loads(fixed_json)
                        data_object = data_object["temp"]
                    else:
                        fixed_json = repair_json(json_str)
                        data_object = json.loads(fixed_json)
                except json.JSONDecodeError as e:
                    if e.msg == "Invalid \\escape":
                        # Handle Mistral/Mixtral trying to escape underlines with \\
                        try:
                            json_str = json_str.replace("\\_", "_")
                            if i == 0:
                                fixed_json = repair_json("{'temp':" + json_str + "}")
                                data_object = json.loads(fixed_json)
                                data_object = data_object["temp"]
                            else:
                                fixed_json = repair_json("{'temp':" + json_str + "}")
                                data_object = json.loads(fixed_json)
                        except Exception:
                            pass
                except Exception:
                    pass

            if data_object is not None:
                data_object = _object_to_tool_call(data_object)

                if data_object is not None:
                    return data_object

    # There's no tool call in the response
    return None


def _object_to_tool_call(data_object: Any) -> list[dict[str, Any]]:
    """Attempts to convert an object to a valid tool call object List[Dict] and returns it, if it can, otherwise None"""
    # If it's a dictionary and not a list then wrap in a list
    if isinstance(data_object, dict):
        data_object = [data_object]

    # Validate that the data is a list of dictionaries
    if isinstance(data_object, list) and all(isinstance(item, dict) for item in data_object):
        # Perfect format, a list of dictionaries

        # Check that each dictionary has at least 'name', optionally 'arguments' and no other keys
        is_invalid = False
        for item in data_object:
            if not is_valid_tool_call_item(item):
                is_invalid = True
                break

        # All passed, name and (optionally) arguments exist for all entries.
        if not is_invalid:
            return data_object
    elif isinstance(data_object, list):
        # If it's a list but the items are not dictionaries, check if they are strings that can be converted to dictionaries
        data_copy = data_object.copy()
        is_invalid = False
        for i, item in enumerate(data_copy):
            try:
                new_item = eval(item)
                if isinstance(new_item, dict):
                    if is_valid_tool_call_item(new_item):
                        data_object[i] = new_item
                    else:
                        is_invalid = True
                        break
                else:
                    is_invalid = True
                    break
            except Exception:
                is_invalid = True
                break

        if not is_invalid:
            return data_object

    return None


def is_valid_tool_call_item(call_item: dict) -> bool:
    """Check that a dictionary item has at least 'name', optionally 'arguments' and no other keys to match a tool call JSON"""
    if "name" not in call_item or not isinstance(call_item["name"], str):
        return False

    if set(call_item.keys()) - {"name", "arguments"}:  # noqa: SIM103
        return False

    return True