import json from mm_agents.uipath.types_utils import ( ComputerUseAction, ComputerUseStep, SupportedActions, PlanActionType, PlanAction, key_maps, ExecutionState, State, ) import mm_agents.uipath.utils as utils from mm_agents.uipath.action_planner import ActionPlanner, PlannerOutput from mm_agents.uipath.grounder_client import GrounderClient class UiPathComputerUseV1(object): def __init__(self): self.planner = ActionPlanner() self.executor = GrounderClient() async def predict_request( self, request_body: dict, model_name: str ) -> tuple[dict, dict]: state = State( task=request_body["userTask"], image_base64=request_body["image"], previous_steps=request_body.get("previousSteps", []), ) execution_state = ExecutionState(model_name=model_name, execution_info={}) output = await self.predict(state, execution_state) return output def process_grounding( self, plan_action: PlanAction, grounding_result: utils.GroundingOutput, x: int, y: int, ): match plan_action.action_type: case PlanActionType.Scroll: # guess the scroll direction if missing in the plan output if "direction" not in plan_action.parameters: if "scroll up" in plan_action.description.lower(): scroll_direction = "up" else: scroll_direction = "down" else: scroll_direction = plan_action.parameters["direction"] action = ComputerUseAction( name=SupportedActions.Scroll, description=plan_action.description, parameters={"position": [x, y], "direction": scroll_direction}, ) if "distance" in plan_action.parameters: match scroll_direction: case "up": action.parameters["offset"] = [ 0, plan_action.parameters["distance"], ] case "down": action.parameters["offset"] = [ 0, -plan_action.parameters["distance"], ] case "left": action.parameters["offset"] = [ plan_action.parameters["distance"], 0, ] case "right": action.parameters["offset"] = [ -plan_action.parameters["distance"], 0, ] case PlanActionType.Drag: assert grounding_result.end_position is not None, ( "End position must be provided for drag action" ) x_end, y_end = grounding_result.end_position action = ComputerUseAction( name=SupportedActions.Drag, description=plan_action.description, parameters={ "path": [ {"x": x, "y": y}, {"x": x_end, "y": y_end}, ] }, ) case _: action_name = plan_action.action_type parameters = {"position": [x, y]} if plan_action.action_type == PlanActionType.DoubleClick: action_name = SupportedActions.Click parameters["click_type"] = "double" elif plan_action.action_type == PlanActionType.RightClick: action_name = SupportedActions.Click parameters["button"] = "right" elif plan_action.action_type == PlanActionType.MouseMove: action_name = SupportedActions.MouseMove # different names assert action_name in [ SupportedActions.Click, SupportedActions.MouseMove, ] action = ComputerUseAction( name=action_name, description=plan_action.description, parameters=parameters, ) return action async def predict( self, state: State, execution_state: ExecutionState ) -> tuple[dict, dict]: planer_output: PlannerOutput = self.planner.predict(state, execution_state) plan_action = planer_output.plan_action action: ComputerUseAction | None = None step: ComputerUseStep | None = None match plan_action.action_type: case PlanActionType.KeyPress: keys = plan_action.parameters["key"].split(" ") keys = [key.strip() for key in keys] keys = [key_maps.get(key, key) for key in keys] action = ComputerUseAction( name=SupportedActions.KeyPress, description=plan_action.description, parameters={"keys": keys}, ) case PlanActionType.Wait: action = ComputerUseAction( name=SupportedActions.Wait, description=plan_action.description, parameters={}, ) case PlanActionType.ExtractData: # return a step with no action, just to store the extracted data step = ComputerUseStep( description=plan_action.description, actions=[], additional_parameters={ "extracted_data": plan_action.parameters, }, thought=planer_output.thought, ) case PlanActionType.Finish: action = ComputerUseAction( name=SupportedActions.Finish, description=plan_action.description, parameters=plan_action.parameters, ) case ( PlanActionType.Click | PlanActionType.MouseMove | PlanActionType.Scroll | PlanActionType.Drag | PlanActionType.DoubleClick | PlanActionType.RightClick ): if plan_action.action_type != PlanActionType.Drag: grounding_result = await self.executor.predict( state.image_base64, plan_action.description, action=plan_action.action_type, ) else: grounding_result = await self.executor.predict( state.image_base64, plan_action.parameters["start_description"], action=plan_action.action_type, ) grounding_result_end = await self.executor.predict( state.image_base64, plan_action.parameters["end_description"], action=plan_action.action_type, ) grounding_result.end_position = grounding_result_end.position x, y = grounding_result.position action = self.process_grounding(plan_action, grounding_result, x, y) case PlanActionType.Type: action = ComputerUseAction( name=SupportedActions.TypeInto, description=plan_action.description, parameters={"value": plan_action.parameters["text"]}, ) if step is None: assert action is not None step = ComputerUseStep( description=plan_action.description, actions=[action], additional_parameters={}, thought=planer_output.thought, ) # save additional data for history assert step.additional_parameters is not None step.additional_parameters["thought"] = planer_output.thought step.additional_parameters["review"] = planer_output.review step.additional_parameters.update(planer_output.additional_sections) step.additional_parameters["plan_action"] = json.dumps(plan_action.to_dict()) history_image = state.image_base64 previous_steps_parameters = { "max_chat_history_messages": 1000, "max_chat_history_images": self.planner.number_history_steps_with_images, "image": history_image, } agent_response = { "step": step.to_response_dict(), "previous_steps_parameters": previous_steps_parameters, } return agent_response