/
OS-World7d25f90
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
import time
from datetime import datetime
from pathlib import Path
from flask import Flask, render_template_string, jsonify, send_file, request, render_template
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# {task_type}_{task_id}: (status_dict, timestamp)
# For "Done" status, we need to verify it for a period to ensure it doesn't change to "Error"
TASK_STATUS_CACHE = {}
# Time in seconds to consider "Done" status as stable (default: 30s)
DONE_STABILITY_PERIOD = int(os.getenv("DONE_STABILITY_PERIOD", "30"))
app = Flask(__name__)
MONITOR_IN_DOCKER = os.getenv("MONITOR_IN_DOCKER", "false").lower() == "true"
if MONITOR_IN_DOCKER:
# If running in Docker, use default paths
TASK_CONFIG_PATH = "/app/evaluation_examples/test.json"
EXAMPLES_BASE_PATH = "/app/evaluation_examples/examples"
RESULTS_BASE_PATH = "/app/results"
else:
# Load configuration from environment variables
TASK_CONFIG_PATH = os.getenv("TASK_CONFIG_PATH", "../evaluation_examples/test.json")
EXAMPLES_BASE_PATH = os.getenv("EXAMPLES_BASE_PATH", "../evaluation_examples/examples")
RESULTS_BASE_PATH = os.getenv("RESULTS_BASE_PATH", "../results")
ACTION_SPACE=os.getenv("ACTION_SPACE", "pyautogui")
OBSERVATION_TYPE=os.getenv("OBSERVATION_TYPE", "screenshot")
MODEL_NAME=os.getenv("MODEL_NAME", "computer-use-preview")
MAX_STEPS = int(os.getenv("MAX_STEPS", "150"))
RESULTS_PATH = os.path.join(RESULTS_BASE_PATH, ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME)
def load_task_list():
with open(TASK_CONFIG_PATH, 'r') as f:
return json.load(f)
def get_task_info(task_type, task_id):
task_file = os.path.join(EXAMPLES_BASE_PATH, task_type, f"{task_id}.json")
if os.path.exists(task_file):
with open(task_file, 'r') as f:
return json.load(f)
return None
def get_task_status(task_type, task_id):
result_dir = os.path.join(RESULTS_PATH, task_type, task_id)
if not os.path.exists(result_dir):
return {
"status": "Not Started",
"progress": 0,
"total_steps": 0,
"last_update": None
}
traj_file = os.path.join(result_dir, "traj.jsonl")
log_file = os.path.join(result_dir, "runtime.log")
result_file = os.path.join(result_dir, "result.txt")
if not os.path.exists(traj_file):
return {
"status": "Preparing",
"progress": 0,
"total_steps": 0,
"last_update": datetime.fromtimestamp(os.path.getmtime(result_dir)).strftime("%Y-%m-%d %H:%M:%S")
}
# read trajectory file
steps = []
with open(traj_file, 'r') as f:
for line in f:
if line.strip():
steps.append(json.loads(line))
if not steps:
return {
"status": "Initializing",
"progress": 0,
"total_steps": 0,
"last_update": datetime.fromtimestamp(os.path.getmtime(traj_file)).strftime("%Y-%m-%d %H:%M:%S")
}
last_step = steps[-1]
# Check the log file for agent responses and exit conditions
log_data = {
"agent_responses": [],
"exit_condition": None,
"last_message": None
}
if os.path.exists(log_file):
try:
with open(log_file, 'r') as f:
log_content = f.readlines()
last_response = None
for i, line in enumerate(log_content):
# Extract agent responses for each step
if "Responses: [" in line:
response_text = line.split("Responses: [")[1].strip()
if response_text.endswith("]"):
response_text = response_text[:-1] # Remove closing bracket
# Clean up the response text - remove quotes
if response_text.startswith("'") and response_text.endswith("'"):
response_text = response_text[1:-1] # Remove surrounding quotes
elif response_text == '"]': # Empty response
response_text = ""
# Handle list of responses
if response_text and "', '" in response_text:
responses = [r.strip("'") for r in response_text.split("', '")]
log_data["agent_responses"].append(responses[0]) # Use first response
last_response = responses[0] # Keep track of the last response
elif response_text:
log_data["agent_responses"].append(response_text)
last_response = response_text # Keep track of the last response
# Check for exit conditions near the end of the log
if "The state of the agent is not correct" in line or "Exit condition met" in line:
log_data["exit_condition"] = line.strip()
# If this is a message exit, save the last response as the last message
if "message_exit: True" in line and last_response:
log_data["last_message"] = last_response
except Exception as e:
log_data["error"] = f"Error parsing log file: {str(e)}"
# check if the task is done based on both trajectory and log
if last_step.get("done", False):
status = "Done"
elif last_step.get("Error", False):
status = "Error"
elif log_data.get("exit_condition") and "message_exit: True" in log_data.get("exit_condition", ""):
status = "Done (Message Exit)"
elif log_data.get("exit_condition") and "thought_exit: True" in log_data.get("exit_condition", ""):
status = "Done (Thought Exit)"
elif len(steps) >= MAX_STEPS:
status = "Done (Max Steps)"
else:
status = "Running"
# get last action timestamp
try:
last_update = datetime.strptime(last_step["action_timestamp"], "%Y%m%d@%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
except KeyError:
last_update = "None"
result_content = "Task not completed"
if status.startswith("Done"):
if os.path.exists(result_file):
with open(result_file, 'r') as f:
result_content = f.read().strip()
else:
result_content = "Result file not found"
return {
"status": status,
"progress": len(steps),
"max_steps": MAX_STEPS,
"last_update": last_update,
"steps": steps,
"log_data": log_data,
"result": result_content
}
def get_task_status_brief(task_type, task_id):
"""
Get brief status info for a task, without detailed step data, for fast homepage loading.
"""
# Generate cache key based on task type and ID
cache_key = f"{task_type}_{task_id}"
# Check if the status is already cached
current_time = time.time()
last_cache_time = None
if cache_key in TASK_STATUS_CACHE:
cached_status, cached_time = TASK_STATUS_CACHE[cache_key]
last_cache_time = cached_time
# If cached status is "Done", check if it's within the stability period
if cached_status["status"].startswith("Done"):
# If within stability period, recalculate status to ensure it's correct
if current_time - cached_time < DONE_STABILITY_PERIOD:
# Status is still in verification period, refresh it
pass
else:
# Status is stable, return from cache
return cached_status
else:
# For non-Done status (like Error), just return from cache
return cached_status
result_dir = os.path.join(RESULTS_PATH, task_type, task_id)
if not os.path.exists(result_dir):
return {
"status": "Not Started",
"progress": 0,
"max_steps": MAX_STEPS,
"last_update": None
}
traj_file = os.path.join(result_dir, "traj.jsonl")
log_file = os.path.join(result_dir, "runtime.log")
result_file = os.path.join(result_dir, "result.txt")
if not os.path.exists(traj_file):
return {
"status": "Preparing",
"progress": 0,
"max_steps": MAX_STEPS,
"last_update": datetime.fromtimestamp(os.path.getmtime(result_dir)).strftime("%Y-%m-%d %H:%M:%S")
}
# Get file line count and last line without reading the whole file
import subprocess
# Use wc -l to get line count
try:
result = subprocess.run(['wc', '-l', traj_file], capture_output=True, text=True)
if result.returncode == 0:
step_count = int(result.stdout.strip().split()[0])
else:
step_count = 0
except:
step_count = 0
# Use tail -n 1 to get last line
last_step_data = None
if step_count > 0:
try:
result = subprocess.run(['tail', '-n', '1', traj_file], capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
last_step_data = json.loads(result.stdout.strip())
except:
pass
if step_count == 0:
return {
"status": "Initializing",
"progress": 0,
"max_steps": MAX_STEPS,
"last_update": datetime.fromtimestamp(os.path.getmtime(traj_file)).strftime("%Y-%m-%d %H:%M:%S")
}
# Set default status to "Running"
status = "Running"
# Determine status from last step data
if last_step_data:
if last_step_data.get("done", False):
status = "Done"
elif last_step_data.get("Error", False):
status = "Error"
# If step count reaches max, consider as done
if step_count >= MAX_STEPS:
status = "Done (Max Steps)"
# Quickly check exit condition in log file (only last few lines)
if os.path.exists(log_file) and status == "Running":
try:
# Use tail to read last 2 lines of log file
result = subprocess.run(['tail', '-n', '2', log_file], capture_output=True, text=True)
if result.returncode == 0:
log_tail = result.stdout
if "message_exit: True" in log_tail:
status = "Done (Message Exit)"
elif "thought_exit: True" in log_tail:
status = "Done (Thought Exit)"
except:
pass
# If step count reaches max again (double check)
if step_count >= MAX_STEPS:
status = "Done (Max Steps)"
# Get last update time
last_update = "None"
if last_step_data and "action_timestamp" in last_step_data:
try:
last_update = datetime.strptime(last_step_data["action_timestamp"], "%Y%m%d@%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
except:
pass
# Get result content if finished
result_content = None
if status.startswith("Done") and os.path.exists(result_file):
try:
with open(result_file, 'r') as f:
result_content = f.read().strip()
except:
result_content = "Result file not found"
status_dict = {
"status": status,
"progress": step_count,
"max_steps": MAX_STEPS,
"last_update": last_update,
"result": result_content
}
# Cache the status if it is done or error
if status.startswith("Done") or status == "Error":
current_time = last_cache_time if last_cache_time else current_time
TASK_STATUS_CACHE[cache_key] = (status_dict, current_time)
return status_dict
def get_all_tasks_status():
task_list = load_task_list()
result = {}
for task_type, task_ids in task_list.items():
result[task_type] = []
for task_id in task_ids:
task_info = get_task_info(task_type, task_id)
task_status = get_task_status(task_type, task_id)
if task_info:
result[task_type].append({
"id": task_id,
"instruction": task_info.get("instruction", "No instruction provided"),
"status": task_status
})
else:
result[task_type].append({
"id": task_id,
"instruction": "No task info available",
"status": task_status
})
return result
def get_all_tasks_status_brief():
"""
Get brief status info for all tasks, without detailed step data, for fast homepage loading.
"""
task_list = load_task_list()
result = {}
for task_type, task_ids in task_list.items():
result[task_type] = []
for task_id in task_ids:
task_info = get_task_info(task_type, task_id)
task_status = get_task_status_brief(task_type, task_id)
if task_info:
result[task_type].append({
"id": task_id,
"instruction": task_info.get("instruction", "No instruction provided"),
"status": task_status
})
else:
result[task_type].append({
"id": task_id,
"instruction": "No task info available",
"status": task_status
})
return result
@app.route('/')
def index():
return render_template("index.html")
@app.route('/task/<task_type>/<task_id>')
def task_detail(task_type, task_id):
task_info = get_task_info(task_type, task_id)
task_status = get_task_status(task_type, task_id)
if not task_info:
return "Task not found", 404
return render_template("task_detail.html",
task_id=task_id,
task_type=task_type,
task_info=task_info,
task_status=task_status)
@app.route('/api/tasks')
def api_tasks():
"""Task status API"""
return jsonify(get_all_tasks_status())
@app.route('/api/tasks/brief')
def api_tasks_brief():
"""Return brief status info for all tasks, without detailed step data, for fast homepage loading."""
return jsonify(get_all_tasks_status_brief())
@app.route('/task/<task_type>/<task_id>/screenshot/<path:filename>')
def task_screenshot(task_type, task_id, filename):
"""Get task screenshot"""
screenshot_path = os.path.join(RESULTS_PATH, task_type, task_id, filename)
if os.path.exists(screenshot_path):
return send_file(screenshot_path, mimetype='image/png')
else:
return "Screenshot does not exist", 404
@app.route('/task/<task_type>/<task_id>/recording')
def task_recording(task_type, task_id):
"""Get task recording video"""
recording_path = os.path.join(RESULTS_PATH, task_type, task_id, "recording.mp4")
if os.path.exists(recording_path):
response = send_file(recording_path, mimetype='video/mp4')
# Add headers to improve mobile compatibility
response.headers['Accept-Ranges'] = 'bytes'
response.headers['Cache-Control'] = 'public, max-age=3600'
response.headers['X-Content-Type-Options'] = 'nosniff'
return response
else:
return "Recording does not exist", 404
@app.route('/api/task/<task_type>/<task_id>')
def api_task_detail(task_type, task_id):
"""Task detail API"""
task_info = get_task_info(task_type, task_id)
task_status = get_task_status(task_type, task_id)
if not task_info:
return jsonify({"error": "Task does not exist"}), 404
return jsonify({
"info": task_info,
"status": task_status
})
if __name__ == '__main__':
# Check if necessary directories exist
if not os.path.exists(TASK_CONFIG_PATH):
print(f"Warning: Task config file does not exist: {TASK_CONFIG_PATH}")
if not os.path.exists(EXAMPLES_BASE_PATH):
print(f"Warning: Task examples directory does not exist: {EXAMPLES_BASE_PATH}")
# Start web service
host = os.getenv("FLASK_HOST", "0.0.0.0")
port = 8080
debug = os.getenv("FLASK_DEBUG", "false").lower() == "true"
app.run(host=host, port=port, debug=debug)