/
OS-Worldb968155
# Copyright (c) 2023 - 2025, AG2ai, Inc., AG2ai open-source projects maintainers and core contributors
#
# SPDX-License-Identifier: Apache-2.0
#
# Portions derived from https://github.com/microsoft/autogen are under the MIT License.
# SPDX-License-Identifier: MIT
"""Create an OpenAI-compatible client using Ollama's API.
Example:
```python
llm_config = {"config_list": [{"api_type": "ollama", "model": "mistral:7b-instruct-v0.3-q6_K"}]}
agent = autogen.AssistantAgent("my_agent", llm_config=llm_config)
```
Install Ollama's python library using: pip install --upgrade ollama
Install fix-busted-json library: pip install --upgrade fix-busted-json
Resources:
- https://github.com/ollama/ollama-python
"""
from __future__ import annotations
import copy
import json
import random
import re
import time
import warnings
from typing import Any, Literal, Optional, Union
from pydantic import BaseModel, Field, HttpUrl
from ..import_utils import optional_import_block, require_optional_import
from ..llm_config import LLMConfigEntry, register_llm_config
from .client_utils import FormatterProtocol, should_hide_tools, validate_parameter
from .oai_models import ChatCompletion, ChatCompletionMessage, ChatCompletionMessageToolCall, Choice, CompletionUsage
with optional_import_block():
import ollama
from fix_busted_json import repair_json
from ollama import Client
@register_llm_config
class OllamaLLMConfigEntry(LLMConfigEntry):
api_type: Literal["ollama"] = "ollama"
client_host: Optional[HttpUrl] = None
stream: bool = False
num_predict: int = Field(
default=-1,
description="Maximum number of tokens to predict, note: -1 is infinite (default), -2 is fill context.",
)
num_ctx: int = Field(default=2048)
repeat_penalty: float = Field(default=1.1)
seed: int = Field(default=0)
temperature: float = Field(default=0.8)
top_k: int = Field(default=40)
top_p: float = Field(default=0.9)
hide_tools: Literal["if_all_run", "if_any_run", "never"] = "never"
def create_client(self):
raise NotImplementedError("OllamaLLMConfigEntry.create_client is not implemented.")
class OllamaClient:
"""Client for Ollama's API."""
# Defaults for manual tool calling
# Instruction is added to the first system message and provides directions to follow a two step
# process
# 1. (before tools have been called) Return JSON with the functions to call
# 2. (directly after tools have been called) Return Text describing the results of the function calls in text format
# Override using "manual_tool_call_instruction" config parameter
TOOL_CALL_MANUAL_INSTRUCTION = (
"You are to follow a strict two step process that will occur over "
"a number of interactions, so pay attention to what step you are in based on the full "
"conversation. We will be taking turns so only do one step at a time so don't perform step "
"2 until step 1 is complete and I've told you the result. The first step is to choose one "
"or more functions based on the request given and return only JSON with the functions and "
"arguments to use. The second step is to analyse the given output of the function and summarise "
"it returning only TEXT and not Python or JSON. "
"For argument values, be sure numbers aren't strings, they should not have double quotes around them. "
"In terms of your response format, for step 1 return only JSON and NO OTHER text, "
"for step 2 return only text and NO JSON/Python/Markdown. "
'The format for running a function is [{"name": "function_name1", "arguments":{"argument_name": "argument_value"}},{"name": "function_name2", "arguments":{"argument_name": "argument_value"}}] '
'Make sure the keys "name" and "arguments" are as described. '
"If you don't get the format correct, try again. "
"The following functions are available to you:[FUNCTIONS_LIST]"
)
# Appended to the last user message if no tools have been called
# Override using "manual_tool_call_step1" config parameter
TOOL_CALL_MANUAL_STEP1 = " (proceed with step 1)"
# Appended to the user message after tools have been executed. Will create a 'user' message if one doesn't exist.
# Override using "manual_tool_call_step2" config parameter
TOOL_CALL_MANUAL_STEP2 = " (proceed with step 2)"
def __init__(self, response_format: Optional[Union[BaseModel, dict[str, Any]]] = None, **kwargs):
"""Note that no api_key or environment variable is required for Ollama."""
# Store the response format, if provided (for structured outputs)
self._response_format: Optional[Union[BaseModel, dict[str, Any]]] = response_format
def message_retrieval(self, response) -> list:
"""Retrieve and return a list of strings or a list of Choice.Message from the response.
NOTE: if a list of Choice.Message is returned, it currently needs to contain the fields of OpenAI's ChatCompletion Message object,
since that is expected for function or tool calling in the rest of the codebase at the moment, unless a custom agent is being used.
"""
return [choice.message for choice in response.choices]
def cost(self, response) -> float:
return response.cost
@staticmethod
def get_usage(response) -> dict:
"""Return usage summary of the response using RESPONSE_USAGE_KEYS."""
# ... # pragma: no cover
return {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
"cost": response.cost,
"model": response.model,
}
def parse_params(self, params: dict[str, Any]) -> dict[str, Any]:
"""Loads the parameters for Ollama API from the passed in parameters and returns a validated set. Checks types, ranges, and sets defaults"""
ollama_params = {}
# Check that we have what we need to use Ollama's API
# https://github.com/ollama/ollama/blob/main/docs/api.md#generate-a-completion
# The main parameters are model, prompt, stream, and options
# Options is a dictionary of parameters for the model
# There are other, advanced, parameters such as format, system (to override system message), template, raw, etc. - not used
# We won't enforce the available models
ollama_params["model"] = params.get("model")
assert ollama_params["model"], (
"Please specify the 'model' in your config list entry to nominate the Ollama model to use."
)
ollama_params["stream"] = validate_parameter(params, "stream", bool, True, False, None, None)
# Build up the options dictionary
# https://github.com/ollama/ollama/blob/main/docs/modelfile.md#valid-parameters-and-values
options_dict = {}
if "num_predict" in params:
# Maximum number of tokens to predict, note: -1 is infinite, -2 is fill context, 128 is default
options_dict["num_predict"] = validate_parameter(params, "num_predict", int, False, 128, None, None)
if "num_ctx" in params:
# Set size of context window used to generate next token, 2048 is default
options_dict["num_ctx"] = validate_parameter(params, "num_ctx", int, False, 2048, None, None)
if "repeat_penalty" in params:
options_dict["repeat_penalty"] = validate_parameter(
params, "repeat_penalty", (int, float), False, 1.1, None, None
)
if "seed" in params:
options_dict["seed"] = validate_parameter(params, "seed", int, False, 42, None, None)
if "temperature" in params:
options_dict["temperature"] = validate_parameter(
params, "temperature", (int, float), False, 0.8, None, None
)
if "top_k" in params:
options_dict["top_k"] = validate_parameter(params, "top_k", int, False, 40, None, None)
if "top_p" in params:
options_dict["top_p"] = validate_parameter(params, "top_p", (int, float), False, 0.9, None, None)
if self._native_tool_calls and self._tools_in_conversation and not self._should_hide_tools:
ollama_params["tools"] = params["tools"]
# Ollama doesn't support streaming with tools natively
if ollama_params["stream"] and self._native_tool_calls:
warnings.warn(
"Streaming is not supported when using tools and 'Native' tool calling, streaming will be disabled.",
UserWarning,
)
ollama_params["stream"] = False
if not self._native_tool_calls and self._tools_in_conversation:
# For manual tool calling we have injected the available tools into the prompt
# and we don't want to force JSON mode
ollama_params["format"] = "" # Don't force JSON for manual tool calling mode
if len(options_dict) != 0:
ollama_params["options"] = options_dict
# Structured outputs (see https://ollama.com/blog/structured-outputs)
if not self._response_format and params.get("response_format"):
self._response_format = params["response_format"]
if self._response_format:
if isinstance(self._response_format, dict):
ollama_params["format"] = self._response_format
else:
# Keep self._response_format as a Pydantic model for when process the response
ollama_params["format"] = self._response_format.model_json_schema()
return ollama_params
@require_optional_import(["ollama", "fix_busted_json"], "ollama")
def create(self, params: dict) -> ChatCompletion:
messages = params.get("messages", [])
# Are tools involved in this conversation?
self._tools_in_conversation = "tools" in params
# We provide second-level filtering out of tools to avoid LLMs re-calling tools continuously
if self._tools_in_conversation:
hide_tools = validate_parameter(
params, "hide_tools", str, False, "never", None, ["if_all_run", "if_any_run", "never"]
)
self._should_hide_tools = should_hide_tools(messages, params["tools"], hide_tools)
else:
self._should_hide_tools = False
# Are we using native Ollama tool calling, otherwise we're doing manual tool calling
# We allow the user to decide if they want to use Ollama's tool calling
# or for tool calling to be handled manually through text messages
# Default is True = Ollama's tool calling
self._native_tool_calls = validate_parameter(params, "native_tool_calls", bool, False, True, None, None)
if not self._native_tool_calls:
# Load defaults
self._manual_tool_call_instruction = validate_parameter(
params, "manual_tool_call_instruction", str, False, self.TOOL_CALL_MANUAL_INSTRUCTION, None, None
)
self._manual_tool_call_step1 = validate_parameter(
params, "manual_tool_call_step1", str, False, self.TOOL_CALL_MANUAL_STEP1, None, None
)
self._manual_tool_call_step2 = validate_parameter(
params, "manual_tool_call_step2", str, False, self.TOOL_CALL_MANUAL_STEP2, None, None
)
# Convert AG2 messages to Ollama messages
ollama_messages = self.oai_messages_to_ollama_messages(
messages,
(
params["tools"]
if (not self._native_tool_calls and self._tools_in_conversation) and not self._should_hide_tools
else None
),
)
# Parse parameters to the Ollama API's parameters
ollama_params = self.parse_params(params)
ollama_params["messages"] = ollama_messages
# Token counts will be returned
prompt_tokens = 0
completion_tokens = 0
total_tokens = 0
ans = None
if "client_host" in params:
# Convert client_host to string from HttpUrl
client = Client(host=str(params["client_host"]))
response = client.chat(**ollama_params)
else:
response = ollama.chat(**ollama_params)
if ollama_params["stream"]:
# Read in the chunks as they stream, taking in tool_calls which may be across
# multiple chunks if more than one suggested
ans = ""
for chunk in response:
ans = ans + (chunk["message"]["content"] or "")
if "done_reason" in chunk:
prompt_tokens = chunk.get("prompt_eval_count", 0)
completion_tokens = chunk.get("eval_count", 0)
total_tokens = prompt_tokens + completion_tokens
else:
# Non-streaming finished
ans: str = response["message"]["content"]
prompt_tokens = response.get("prompt_eval_count", 0)
completion_tokens = response.get("eval_count", 0)
total_tokens = prompt_tokens + completion_tokens
if response is not None:
# Defaults
ollama_finish = "stop"
tool_calls = None
# Id and streaming text into response
if ollama_params["stream"]:
response_content = ans
response_id = chunk["created_at"]
else:
response_content = response["message"]["content"]
response_id = response["created_at"]
# Process tools in the response
if self._tools_in_conversation:
if self._native_tool_calls:
if not ollama_params["stream"]:
response_content = response["message"]["content"]
# Native tool calling
if "tool_calls" in response["message"]:
ollama_finish = "tool_calls"
tool_calls = []
random_id = random.randint(0, 10000)
for tool_call in response["message"]["tool_calls"]:
tool_calls.append(
ChatCompletionMessageToolCall(
id=f"ollama_func_{random_id}",
function={
"name": tool_call["function"]["name"],
"arguments": json.dumps(tool_call["function"]["arguments"]),
},
type="function",
)
)
random_id += 1
elif not self._native_tool_calls:
# Try to convert the response to a tool call object
response_toolcalls = response_to_tool_call(ans)
# If we can, then we've got tool call(s)
if response_toolcalls is not None:
ollama_finish = "tool_calls"
tool_calls = []
random_id = random.randint(0, 10000)
for json_function in response_toolcalls:
tool_calls.append(
ChatCompletionMessageToolCall(
id=f"ollama_manual_func_{random_id}",
function={
"name": json_function["name"],
"arguments": (
json.dumps(json_function["arguments"])
if "arguments" in json_function
else "{}"
),
},
type="function",
)
)
random_id += 1
# Blank the message content
response_content = ""
if ollama_finish == "stop": # noqa: SIM102
# Not a tool call, so let's check if we need to process structured output
if self._response_format and response_content:
try:
parsed_response = self._convert_json_response(response_content)
response_content = _format_json_response(parsed_response, response_content)
except ValueError as e:
response_content = str(e)
else:
raise RuntimeError("Failed to get response from Ollama.")
# Convert response to AG2 response
message = ChatCompletionMessage(
role="assistant",
content=response_content,
function_call=None,
tool_calls=tool_calls,
)
choices = [Choice(finish_reason=ollama_finish, index=0, message=message)]
response_oai = ChatCompletion(
id=response_id,
model=ollama_params["model"],
created=int(time.time()),
object="chat.completion",
choices=choices,
usage=CompletionUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
),
cost=0, # Local models, FREE!
)
return response_oai
def oai_messages_to_ollama_messages(self, messages: list[dict[str, Any]], tools: list) -> list[dict[str, Any]]:
"""Convert messages from OAI format to Ollama's format.
We correct for any specific role orders and types, and convert tools to messages (as Ollama can't use tool messages)
"""
ollama_messages = copy.deepcopy(messages)
# Remove the name field
for message in ollama_messages:
if "name" in message:
message.pop("name", None)
# Having a 'system' message on the end does not work well with Ollama, so we change it to 'user'
# 'system' messages on the end are typical of the summarisation message: summary_method="reflection_with_llm"
if len(ollama_messages) > 1 and ollama_messages[-1]["role"] == "system":
ollama_messages[-1]["role"] = "user"
# Process messages for tool calling manually
if tools is not None and not self._native_tool_calls:
# 1. We need to append instructions to the starting system message on function calling
# 2. If we have not yet called tools we append "step 1 instruction" to the latest user message
# 3. If we have already called tools we append "step 2 instruction" to the latest user message
have_tool_calls = False
have_tool_results = False
last_tool_result_index = -1
for i, message in enumerate(ollama_messages):
if "tool_calls" in message:
have_tool_calls = True
if "tool_call_id" in message:
have_tool_results = True
last_tool_result_index = i
tool_result_is_last_msg = have_tool_results and last_tool_result_index == len(ollama_messages) - 1
if ollama_messages[0]["role"] == "system":
manual_instruction = self._manual_tool_call_instruction
# Build a string of the functions available
functions_string = ""
for function in tools:
functions_string += f"""\n{function}\n"""
# Replace single quotes with double questions - Not sure why this helps the LLM perform
# better, but it seems to. Monitor and remove if not necessary.
functions_string = functions_string.replace("'", '"')
manual_instruction = manual_instruction.replace("[FUNCTIONS_LIST]", functions_string)
# Update the system message with the instructions and functions
ollama_messages[0]["content"] = ollama_messages[0]["content"] + manual_instruction.rstrip()
# If we are still in the function calling or evaluating process, append the steps instruction
if (not have_tool_calls or tool_result_is_last_msg) and ollama_messages[0]["role"] == "system":
# NOTE: we require a system message to exist for the manual steps texts
# Append the manual step instructions
content_to_append = (
self._manual_tool_call_step1 if not have_tool_results else self._manual_tool_call_step2
)
if content_to_append != "":
# Append the relevant tool call instruction to the latest user message
if ollama_messages[-1]["role"] == "user":
ollama_messages[-1]["content"] = ollama_messages[-1]["content"] + content_to_append
else:
ollama_messages.append({"role": "user", "content": content_to_append})
# Convert tool call and tool result messages to normal text messages for Ollama
for i, message in enumerate(ollama_messages):
if "tool_calls" in message:
# Recommended tool calls
content = "Run the following function(s):"
for tool_call in message["tool_calls"]:
content = content + "\n" + str(tool_call)
ollama_messages[i] = {"role": "assistant", "content": content}
if "tool_call_id" in message:
# Executed tool results
message["result"] = message["content"]
del message["content"]
del message["role"]
content = "The following function was run: " + str(message)
ollama_messages[i] = {"role": "user", "content": content}
# As we are changing messages, let's merge if they have two user messages on the end and the last one is tool call step instructions
if (
len(ollama_messages) >= 2
and not self._native_tool_calls
and ollama_messages[-2]["role"] == "user"
and ollama_messages[-1]["role"] == "user"
and (
ollama_messages[-1]["content"] == self._manual_tool_call_step1
or ollama_messages[-1]["content"] == self._manual_tool_call_step2
)
):
ollama_messages[-2]["content"] = ollama_messages[-2]["content"] + ollama_messages[-1]["content"]
del ollama_messages[-1]
# Ensure the last message is a user / system message, if not, add a user message
if ollama_messages[-1]["role"] != "user" and ollama_messages[-1]["role"] != "system":
ollama_messages.append({"role": "user", "content": "Please continue."})
return ollama_messages
def _convert_json_response(self, response: str) -> Any:
"""Extract and validate JSON response from the output for structured outputs.
Args:
response (str): The response from the API.
Returns:
Any: The parsed JSON response.
"""
if not self._response_format:
return response
try:
# Parse JSON and validate against the Pydantic model if Pydantic model was provided
if isinstance(self._response_format, dict):
return response
else:
return self._response_format.model_validate_json(response)
except Exception as e:
raise ValueError(f"Failed to parse response as valid JSON matching the schema for Structured Output: {e!s}")
def _format_json_response(response: Any, original_answer: str) -> str:
"""Formats the JSON response for structured outputs using the format method if it exists."""
return response.format() if isinstance(response, FormatterProtocol) else original_answer
@require_optional_import("fix_busted_json", "ollama")
def response_to_tool_call(response_string: str) -> Any:
"""Attempts to convert the response to an object, aimed to align with function format `[{},{}]`"""
# We try and detect the list[dict[str, Any]] format:
# Pattern 1 is [{},{}]
# Pattern 2 is {} (without the [], so could be a single function call)
patterns = [r"\[[\s\S]*?\]", r"\{[\s\S]*\}"]
for i, pattern in enumerate(patterns):
# Search for the pattern in the input string
matches = re.findall(pattern, response_string.strip())
for match in matches:
# It has matched, extract it and load it
json_str = match.strip()
data_object = None
try:
# Attempt to convert it as is
data_object = json.loads(json_str)
except Exception:
try:
# If that fails, attempt to repair it
if i == 0:
# Enclose to a JSON object for repairing, which is restored upon fix
fixed_json = repair_json("{'temp':" + json_str + "}")
data_object = json.loads(fixed_json)
data_object = data_object["temp"]
else:
fixed_json = repair_json(json_str)
data_object = json.loads(fixed_json)
except json.JSONDecodeError as e:
if e.msg == "Invalid \\escape":
# Handle Mistral/Mixtral trying to escape underlines with \\
try:
json_str = json_str.replace("\\_", "_")
if i == 0:
fixed_json = repair_json("{'temp':" + json_str + "}")
data_object = json.loads(fixed_json)
data_object = data_object["temp"]
else:
fixed_json = repair_json("{'temp':" + json_str + "}")
data_object = json.loads(fixed_json)
except Exception:
pass
except Exception:
pass
if data_object is not None:
data_object = _object_to_tool_call(data_object)
if data_object is not None:
return data_object
# There's no tool call in the response
return None
def _object_to_tool_call(data_object: Any) -> list[dict[str, Any]]:
"""Attempts to convert an object to a valid tool call object List[Dict] and returns it, if it can, otherwise None"""
# If it's a dictionary and not a list then wrap in a list
if isinstance(data_object, dict):
data_object = [data_object]
# Validate that the data is a list of dictionaries
if isinstance(data_object, list) and all(isinstance(item, dict) for item in data_object):
# Perfect format, a list of dictionaries
# Check that each dictionary has at least 'name', optionally 'arguments' and no other keys
is_invalid = False
for item in data_object:
if not is_valid_tool_call_item(item):
is_invalid = True
break
# All passed, name and (optionally) arguments exist for all entries.
if not is_invalid:
return data_object
elif isinstance(data_object, list):
# If it's a list but the items are not dictionaries, check if they are strings that can be converted to dictionaries
data_copy = data_object.copy()
is_invalid = False
for i, item in enumerate(data_copy):
try:
new_item = eval(item)
if isinstance(new_item, dict):
if is_valid_tool_call_item(new_item):
data_object[i] = new_item
else:
is_invalid = True
break
else:
is_invalid = True
break
except Exception:
is_invalid = True
break
if not is_invalid:
return data_object
return None
def is_valid_tool_call_item(call_item: dict) -> bool:
"""Check that a dictionary item has at least 'name', optionally 'arguments' and no other keys to match a tool call JSON"""
if "name" not in call_item or not isinstance(call_item["name"], str):
return False
if set(call_item.keys()) - {"name", "arguments"}: # noqa: SIM103
return False
return True