#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import json import time from datetime import datetime from pathlib import Path from flask import Flask, render_template_string, jsonify, send_file, request, render_template from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # {task_type}_{task_id}: (status_dict, timestamp) # For "Done" status, we need to verify it for a period to ensure it doesn't change to "Error" TASK_STATUS_CACHE = {} # Time in seconds to consider "Done" status as stable (default: 30s) DONE_STABILITY_PERIOD = int(os.getenv("DONE_STABILITY_PERIOD", "30")) app = Flask(__name__) MONITOR_IN_DOCKER = os.getenv("MONITOR_IN_DOCKER", "false").lower() == "true" if MONITOR_IN_DOCKER: # If running in Docker, use default paths TASK_CONFIG_PATH = "/app/evaluation_examples/test.json" EXAMPLES_BASE_PATH = "/app/evaluation_examples/examples" RESULTS_BASE_PATH = "/app/results" else: # Load configuration from environment variables TASK_CONFIG_PATH = os.getenv("TASK_CONFIG_PATH", "../evaluation_examples/test.json") EXAMPLES_BASE_PATH = os.getenv("EXAMPLES_BASE_PATH", "../evaluation_examples/examples") RESULTS_BASE_PATH = os.getenv("RESULTS_BASE_PATH", "../results") ACTION_SPACE=os.getenv("ACTION_SPACE", "pyautogui") OBSERVATION_TYPE=os.getenv("OBSERVATION_TYPE", "screenshot") MODEL_NAME=os.getenv("MODEL_NAME", "computer-use-preview") MAX_STEPS = int(os.getenv("MAX_STEPS", "150")) RESULTS_PATH = os.path.join(RESULTS_BASE_PATH, ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME) def load_task_list(): with open(TASK_CONFIG_PATH, 'r') as f: return json.load(f) def get_task_info(task_type, task_id): task_file = os.path.join(EXAMPLES_BASE_PATH, task_type, f"{task_id}.json") if os.path.exists(task_file): with open(task_file, 'r') as f: return json.load(f) return None def get_task_status(task_type, task_id): result_dir = os.path.join(RESULTS_PATH, task_type, task_id) if not os.path.exists(result_dir): return { "status": "Not Started", "progress": 0, "total_steps": 0, "last_update": None } traj_file = os.path.join(result_dir, "traj.jsonl") log_file = os.path.join(result_dir, "runtime.log") result_file = os.path.join(result_dir, "result.txt") if not os.path.exists(traj_file): return { "status": "Preparing", "progress": 0, "total_steps": 0, "last_update": datetime.fromtimestamp(os.path.getmtime(result_dir)).strftime("%Y-%m-%d %H:%M:%S") } # read trajectory file steps = [] with open(traj_file, 'r') as f: for line in f: if line.strip(): steps.append(json.loads(line)) if not steps: return { "status": "Initializing", "progress": 0, "total_steps": 0, "last_update": datetime.fromtimestamp(os.path.getmtime(traj_file)).strftime("%Y-%m-%d %H:%M:%S") } last_step = steps[-1] # Check the log file for agent responses and exit conditions log_data = { "agent_responses": [], "exit_condition": None, "last_message": None } if os.path.exists(log_file): try: with open(log_file, 'r') as f: log_content = f.readlines() last_response = None for i, line in enumerate(log_content): # Extract agent responses for each step if "Responses: [" in line: response_text = line.split("Responses: [")[1].strip() if response_text.endswith("]"): response_text = response_text[:-1] # Remove closing bracket # Clean up the response text - remove quotes if response_text.startswith("'") and response_text.endswith("'"): response_text = response_text[1:-1] # Remove surrounding quotes elif response_text == '"]': # Empty response response_text = "" # Handle list of responses if response_text and "', '" in response_text: responses = [r.strip("'") for r in response_text.split("', '")] log_data["agent_responses"].append(responses[0]) # Use first response last_response = responses[0] # Keep track of the last response elif response_text: log_data["agent_responses"].append(response_text) last_response = response_text # Keep track of the last response # Check for exit conditions near the end of the log if "The state of the agent is not correct" in line or "Exit condition met" in line: log_data["exit_condition"] = line.strip() # If this is a message exit, save the last response as the last message if "message_exit: True" in line and last_response: log_data["last_message"] = last_response except Exception as e: log_data["error"] = f"Error parsing log file: {str(e)}" # check if the task is done based on both trajectory and log if last_step.get("done", False): status = "Done" elif last_step.get("Error", False): status = "Error" elif log_data.get("exit_condition") and "message_exit: True" in log_data.get("exit_condition", ""): status = "Done (Message Exit)" elif log_data.get("exit_condition") and "thought_exit: True" in log_data.get("exit_condition", ""): status = "Done (Thought Exit)" elif len(steps) >= MAX_STEPS: status = "Done (Max Steps)" else: status = "Running" # get last action timestamp try: last_update = datetime.strptime(last_step["action_timestamp"], "%Y%m%d@%H%M%S").strftime("%Y-%m-%d %H:%M:%S") except KeyError: last_update = "None" result_content = "Task not completed" if status.startswith("Done"): if os.path.exists(result_file): with open(result_file, 'r') as f: result_content = f.read().strip() else: result_content = "Result file not found" return { "status": status, "progress": len(steps), "max_steps": MAX_STEPS, "last_update": last_update, "steps": steps, "log_data": log_data, "result": result_content } def get_task_status_brief(task_type, task_id): """ Get brief status info for a task, without detailed step data, for fast homepage loading. """ # Generate cache key based on task type and ID cache_key = f"{task_type}_{task_id}" # Check if the status is already cached current_time = time.time() last_cache_time = None if cache_key in TASK_STATUS_CACHE: cached_status, cached_time = TASK_STATUS_CACHE[cache_key] last_cache_time = cached_time # If cached status is "Done", check if it's within the stability period if cached_status["status"].startswith("Done"): # If within stability period, recalculate status to ensure it's correct if current_time - cached_time < DONE_STABILITY_PERIOD: # Status is still in verification period, refresh it pass else: # Status is stable, return from cache return cached_status else: # For non-Done status (like Error), just return from cache return cached_status result_dir = os.path.join(RESULTS_PATH, task_type, task_id) if not os.path.exists(result_dir): return { "status": "Not Started", "progress": 0, "max_steps": MAX_STEPS, "last_update": None } traj_file = os.path.join(result_dir, "traj.jsonl") log_file = os.path.join(result_dir, "runtime.log") result_file = os.path.join(result_dir, "result.txt") if not os.path.exists(traj_file): return { "status": "Preparing", "progress": 0, "max_steps": MAX_STEPS, "last_update": datetime.fromtimestamp(os.path.getmtime(result_dir)).strftime("%Y-%m-%d %H:%M:%S") } # Get file line count and last line without reading the whole file import subprocess # Use wc -l to get line count try: result = subprocess.run(['wc', '-l', traj_file], capture_output=True, text=True) if result.returncode == 0: step_count = int(result.stdout.strip().split()[0]) else: step_count = 0 except: step_count = 0 # Use tail -n 1 to get last line last_step_data = None if step_count > 0: try: result = subprocess.run(['tail', '-n', '1', traj_file], capture_output=True, text=True) if result.returncode == 0 and result.stdout.strip(): last_step_data = json.loads(result.stdout.strip()) except: pass if step_count == 0: return { "status": "Initializing", "progress": 0, "max_steps": MAX_STEPS, "last_update": datetime.fromtimestamp(os.path.getmtime(traj_file)).strftime("%Y-%m-%d %H:%M:%S") } # Set default status to "Running" status = "Running" # Determine status from last step data if last_step_data: if last_step_data.get("done", False): status = "Done" elif last_step_data.get("Error", False): status = "Error" # If step count reaches max, consider as done if step_count >= MAX_STEPS: status = "Done (Max Steps)" # Quickly check exit condition in log file (only last few lines) if os.path.exists(log_file) and status == "Running": try: # Use tail to read last 2 lines of log file result = subprocess.run(['tail', '-n', '2', log_file], capture_output=True, text=True) if result.returncode == 0: log_tail = result.stdout if "message_exit: True" in log_tail: status = "Done (Message Exit)" elif "thought_exit: True" in log_tail: status = "Done (Thought Exit)" except: pass # If step count reaches max again (double check) if step_count >= MAX_STEPS: status = "Done (Max Steps)" # Get last update time last_update = "None" if last_step_data and "action_timestamp" in last_step_data: try: last_update = datetime.strptime(last_step_data["action_timestamp"], "%Y%m%d@%H%M%S").strftime("%Y-%m-%d %H:%M:%S") except: pass # Get result content if finished result_content = None if status.startswith("Done") and os.path.exists(result_file): try: with open(result_file, 'r') as f: result_content = f.read().strip() except: result_content = "Result file not found" status_dict = { "status": status, "progress": step_count, "max_steps": MAX_STEPS, "last_update": last_update, "result": result_content } # Cache the status if it is done or error if status.startswith("Done") or status == "Error": current_time = last_cache_time if last_cache_time else current_time TASK_STATUS_CACHE[cache_key] = (status_dict, current_time) return status_dict def get_all_tasks_status(): task_list = load_task_list() result = {} for task_type, task_ids in task_list.items(): result[task_type] = [] for task_id in task_ids: task_info = get_task_info(task_type, task_id) task_status = get_task_status(task_type, task_id) if task_info: result[task_type].append({ "id": task_id, "instruction": task_info.get("instruction", "No instruction provided"), "status": task_status }) else: result[task_type].append({ "id": task_id, "instruction": "No task info available", "status": task_status }) return result def get_all_tasks_status_brief(): """ Get brief status info for all tasks, without detailed step data, for fast homepage loading. """ task_list = load_task_list() result = {} for task_type, task_ids in task_list.items(): result[task_type] = [] for task_id in task_ids: task_info = get_task_info(task_type, task_id) task_status = get_task_status_brief(task_type, task_id) if task_info: result[task_type].append({ "id": task_id, "instruction": task_info.get("instruction", "No instruction provided"), "status": task_status }) else: result[task_type].append({ "id": task_id, "instruction": "No task info available", "status": task_status }) return result @app.route('/') def index(): return render_template("index.html") @app.route('/task//') def task_detail(task_type, task_id): task_info = get_task_info(task_type, task_id) task_status = get_task_status(task_type, task_id) if not task_info: return "Task not found", 404 return render_template("task_detail.html", task_id=task_id, task_type=task_type, task_info=task_info, task_status=task_status) @app.route('/api/tasks') def api_tasks(): """Task status API""" return jsonify(get_all_tasks_status()) @app.route('/api/tasks/brief') def api_tasks_brief(): """Return brief status info for all tasks, without detailed step data, for fast homepage loading.""" return jsonify(get_all_tasks_status_brief()) @app.route('/task///screenshot/') def task_screenshot(task_type, task_id, filename): """Get task screenshot""" screenshot_path = os.path.join(RESULTS_PATH, task_type, task_id, filename) if os.path.exists(screenshot_path): return send_file(screenshot_path, mimetype='image/png') else: return "Screenshot does not exist", 404 @app.route('/task///recording') def task_recording(task_type, task_id): """Get task recording video""" recording_path = os.path.join(RESULTS_PATH, task_type, task_id, "recording.mp4") if os.path.exists(recording_path): response = send_file(recording_path, mimetype='video/mp4') # Add headers to improve mobile compatibility response.headers['Accept-Ranges'] = 'bytes' response.headers['Cache-Control'] = 'public, max-age=3600' response.headers['X-Content-Type-Options'] = 'nosniff' return response else: return "Recording does not exist", 404 @app.route('/api/task//') def api_task_detail(task_type, task_id): """Task detail API""" task_info = get_task_info(task_type, task_id) task_status = get_task_status(task_type, task_id) if not task_info: return jsonify({"error": "Task does not exist"}), 404 return jsonify({ "info": task_info, "status": task_status }) if __name__ == '__main__': # Check if necessary directories exist if not os.path.exists(TASK_CONFIG_PATH): print(f"Warning: Task config file does not exist: {TASK_CONFIG_PATH}") if not os.path.exists(EXAMPLES_BASE_PATH): print(f"Warning: Task examples directory does not exist: {EXAMPLES_BASE_PATH}") # Start web service host = os.getenv("FLASK_HOST", "0.0.0.0") port = 8080 debug = os.getenv("FLASK_DEBUG", "false").lower() == "true" app.run(host=host, port=port, debug=debug)