mirrored 12 minutes ago
0
adlsdztonyrefactor&fix: update README and main.py for improved configuration and task status handling 7d25f90
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import json
import time
from datetime import datetime
from pathlib import Path
from flask import Flask, render_template_string, jsonify, send_file, request, render_template
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# {task_type}_{task_id}: (status_dict, timestamp)
# For "Done" status, we need to verify it for a period to ensure it doesn't change to "Error"
TASK_STATUS_CACHE = {}
# Time in seconds to consider "Done" status as stable (default: 30s)
DONE_STABILITY_PERIOD = int(os.getenv("DONE_STABILITY_PERIOD", "30"))

app = Flask(__name__)

MONITOR_IN_DOCKER = os.getenv("MONITOR_IN_DOCKER", "false").lower() == "true"

if MONITOR_IN_DOCKER:
    # If running in Docker, use default paths
    TASK_CONFIG_PATH = "/app/evaluation_examples/test.json"
    EXAMPLES_BASE_PATH = "/app/evaluation_examples/examples"
    RESULTS_BASE_PATH = "/app/results"
else:
    # Load configuration from environment variables
    TASK_CONFIG_PATH = os.getenv("TASK_CONFIG_PATH", "../evaluation_examples/test.json")
    EXAMPLES_BASE_PATH = os.getenv("EXAMPLES_BASE_PATH", "../evaluation_examples/examples")
    RESULTS_BASE_PATH = os.getenv("RESULTS_BASE_PATH", "../results")

ACTION_SPACE=os.getenv("ACTION_SPACE", "pyautogui")
OBSERVATION_TYPE=os.getenv("OBSERVATION_TYPE", "screenshot")
MODEL_NAME=os.getenv("MODEL_NAME", "computer-use-preview")
MAX_STEPS = int(os.getenv("MAX_STEPS", "150"))

RESULTS_PATH = os.path.join(RESULTS_BASE_PATH, ACTION_SPACE, OBSERVATION_TYPE, MODEL_NAME)

def load_task_list():
    with open(TASK_CONFIG_PATH, 'r') as f:
        return json.load(f)

def get_task_info(task_type, task_id):
    task_file = os.path.join(EXAMPLES_BASE_PATH, task_type, f"{task_id}.json")
    if os.path.exists(task_file):
        with open(task_file, 'r') as f:
            return json.load(f)
    return None

def get_task_status(task_type, task_id):
    result_dir = os.path.join(RESULTS_PATH, task_type, task_id)
    
    if not os.path.exists(result_dir):
        return {
            "status": "Not Started",
            "progress": 0,
            "total_steps": 0,
            "last_update": None
        }
    
    traj_file = os.path.join(result_dir, "traj.jsonl")
    log_file = os.path.join(result_dir, "runtime.log")
    result_file = os.path.join(result_dir, "result.txt")
    
    if not os.path.exists(traj_file):
        return {
            "status": "Preparing",
            "progress": 0,
            "total_steps": 0,
            "last_update": datetime.fromtimestamp(os.path.getmtime(result_dir)).strftime("%Y-%m-%d %H:%M:%S")
        }
    
    # read trajectory file
    steps = []
    with open(traj_file, 'r') as f:
        for line in f:
            if line.strip():
                steps.append(json.loads(line))
    
    if not steps:
        return {
            "status": "Initializing",
            "progress": 0,
            "total_steps": 0,
            "last_update": datetime.fromtimestamp(os.path.getmtime(traj_file)).strftime("%Y-%m-%d %H:%M:%S")
        }
    
    last_step = steps[-1]
    
    # Check the log file for agent responses and exit conditions
    log_data = {
        "agent_responses": [],
        "exit_condition": None,
        "last_message": None
    }
    
    if os.path.exists(log_file):
        try:
            with open(log_file, 'r') as f:
                log_content = f.readlines()
                last_response = None
                
                for i, line in enumerate(log_content):
                    # Extract agent responses for each step
                    if "Responses: [" in line:
                        response_text = line.split("Responses: [")[1].strip()
                        if response_text.endswith("]"):
                            response_text = response_text[:-1]  # Remove closing bracket
                        
                        # Clean up the response text - remove quotes
                        if response_text.startswith("'") and response_text.endswith("'"):
                            response_text = response_text[1:-1]  # Remove surrounding quotes
                        elif response_text == '"]':  # Empty response
                            response_text = ""
                        
                        # Handle list of responses
                        if response_text and "', '" in response_text:
                            responses = [r.strip("'") for r in response_text.split("', '")]
                            log_data["agent_responses"].append(responses[0])  # Use first response
                            last_response = responses[0]  # Keep track of the last response
                        elif response_text:
                            log_data["agent_responses"].append(response_text)
                            last_response = response_text  # Keep track of the last response
                    
                    # Check for exit conditions near the end of the log
                    if "The state of the agent is not correct" in line or "Exit condition met" in line:
                        log_data["exit_condition"] = line.strip()
                        # If this is a message exit, save the last response as the last message
                        if "message_exit: True" in line and last_response:
                            log_data["last_message"] = last_response
        except Exception as e:
            log_data["error"] = f"Error parsing log file: {str(e)}"
    
    # check if the task is done based on both trajectory and log
    if last_step.get("done", False):
        status = "Done"
    elif last_step.get("Error", False):
        status = "Error"
    elif log_data.get("exit_condition") and "message_exit: True" in log_data.get("exit_condition", ""):
        status = "Done (Message Exit)"
    elif log_data.get("exit_condition") and "thought_exit: True" in log_data.get("exit_condition", ""):
        status = "Done (Thought Exit)"
    elif len(steps) >= MAX_STEPS:
        status = "Done (Max Steps)"
    else:
        status = "Running"
    
    # get last action timestamp
    try:
        last_update = datetime.strptime(last_step["action_timestamp"], "%Y%m%d@%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
    except KeyError:
        last_update = "None"
    
    result_content = "Task not completed"
    if status.startswith("Done"):
        if os.path.exists(result_file):
            with open(result_file, 'r') as f:
                result_content = f.read().strip()
        else:
            result_content = "Result file not found"
    
    return {
        "status": status,
        "progress": len(steps),
        "max_steps": MAX_STEPS,
        "last_update": last_update,
        "steps": steps,
        "log_data": log_data,
        "result": result_content
    }

def get_task_status_brief(task_type, task_id):
    """
    Get brief status info for a task, without detailed step data, for fast homepage loading.
    """
    # Generate cache key based on task type and ID
    cache_key = f"{task_type}_{task_id}"
    
    # Check if the status is already cached
    current_time = time.time()
    last_cache_time = None
    if cache_key in TASK_STATUS_CACHE:
        cached_status, cached_time = TASK_STATUS_CACHE[cache_key]
        last_cache_time = cached_time
        # If cached status is "Done", check if it's within the stability period
        if cached_status["status"].startswith("Done"):
            # If within stability period, recalculate status to ensure it's correct
            if current_time - cached_time < DONE_STABILITY_PERIOD:
                # Status is still in verification period, refresh it
                pass
            else:
                # Status is stable, return from cache
                return cached_status
        else:
            # For non-Done status (like Error), just return from cache
            return cached_status
    
    result_dir = os.path.join(RESULTS_PATH, task_type, task_id)
    
    if not os.path.exists(result_dir):
        return {
            "status": "Not Started",
            "progress": 0,
            "max_steps": MAX_STEPS,
            "last_update": None
        }
    
    traj_file = os.path.join(result_dir, "traj.jsonl")
    log_file = os.path.join(result_dir, "runtime.log")
    result_file = os.path.join(result_dir, "result.txt")
    
    if not os.path.exists(traj_file):
        return {
            "status": "Preparing",
            "progress": 0,
            "max_steps": MAX_STEPS,
            "last_update": datetime.fromtimestamp(os.path.getmtime(result_dir)).strftime("%Y-%m-%d %H:%M:%S")
        }
    
    # Get file line count and last line without reading the whole file
    import subprocess
    
    # Use wc -l to get line count
    try:
        result = subprocess.run(['wc', '-l', traj_file], capture_output=True, text=True)
        if result.returncode == 0:
            step_count = int(result.stdout.strip().split()[0])
        else:
            step_count = 0
    except:
        step_count = 0
    
    # Use tail -n 1 to get last line
    last_step_data = None
    if step_count > 0:
        try:
            result = subprocess.run(['tail', '-n', '1', traj_file], capture_output=True, text=True)
            if result.returncode == 0 and result.stdout.strip():
                last_step_data = json.loads(result.stdout.strip())
        except:
            pass
    
    if step_count == 0:
        return {
            "status": "Initializing",
            "progress": 0,
            "max_steps": MAX_STEPS,
            "last_update": datetime.fromtimestamp(os.path.getmtime(traj_file)).strftime("%Y-%m-%d %H:%M:%S")
        }
    
    # Set default status to "Running"
    status = "Running"
    
    # Determine status from last step data
    if last_step_data:
        if last_step_data.get("done", False):
            status = "Done"
        elif last_step_data.get("Error", False):
            status = "Error"
    
    # If step count reaches max, consider as done
    if step_count >= MAX_STEPS:
        status = "Done (Max Steps)"
    
    # Quickly check exit condition in log file (only last few lines)
    if os.path.exists(log_file) and status == "Running":
        try:
            # Use tail to read last 2 lines of log file
            result = subprocess.run(['tail', '-n', '2', log_file], capture_output=True, text=True)
            if result.returncode == 0:
                log_tail = result.stdout
                if "message_exit: True" in log_tail:
                    status = "Done (Message Exit)"
                elif "thought_exit: True" in log_tail:
                    status = "Done (Thought Exit)"
        except:
            pass
    
    # If step count reaches max again (double check)
    if step_count >= MAX_STEPS:
        status = "Done (Max Steps)"
    
    # Get last update time
    last_update = "None"
    if last_step_data and "action_timestamp" in last_step_data:
        try:
            last_update = datetime.strptime(last_step_data["action_timestamp"], "%Y%m%d@%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
        except:
            pass
    
    # Get result content if finished
    result_content = None
    if status.startswith("Done") and os.path.exists(result_file):
        try:
            with open(result_file, 'r') as f:
                result_content = f.read().strip()
        except:
            result_content = "Result file not found"
    
    status_dict = {
        "status": status,
        "progress": step_count,
        "max_steps": MAX_STEPS,
        "last_update": last_update,
        "result": result_content
    }
    
    # Cache the status if it is done or error
    if status.startswith("Done") or status == "Error":
        current_time = last_cache_time if last_cache_time else current_time
        TASK_STATUS_CACHE[cache_key] = (status_dict, current_time)
    
    return status_dict

def get_all_tasks_status():
    task_list = load_task_list()
    result = {}
    
    for task_type, task_ids in task_list.items():
        result[task_type] = []
        for task_id in task_ids:
            task_info = get_task_info(task_type, task_id)
            task_status = get_task_status(task_type, task_id)
            
            if task_info:
                result[task_type].append({
                    "id": task_id,
                    "instruction": task_info.get("instruction", "No instruction provided"),
                    "status": task_status
                })
            else:
                result[task_type].append({
                    "id": task_id,
                    "instruction": "No task info available",
                    "status": task_status
                })
    
    return result

def get_all_tasks_status_brief():
    """
    Get brief status info for all tasks, without detailed step data, for fast homepage loading.
    """
    task_list = load_task_list()
    result = {}
    
    for task_type, task_ids in task_list.items():
        result[task_type] = []
        for task_id in task_ids:
            task_info = get_task_info(task_type, task_id)
            task_status = get_task_status_brief(task_type, task_id)
            
            if task_info:
                result[task_type].append({
                    "id": task_id,
                    "instruction": task_info.get("instruction", "No instruction provided"),
                    "status": task_status
                })
            else:
                result[task_type].append({
                    "id": task_id,
                    "instruction": "No task info available",
                    "status": task_status
                })
    
    return result

@app.route('/')
def index():
    return render_template("index.html")

@app.route('/task/<task_type>/<task_id>')
def task_detail(task_type, task_id):
    task_info = get_task_info(task_type, task_id)
    task_status = get_task_status(task_type, task_id)
    
    if not task_info:
        return "Task not found", 404
    
    return render_template("task_detail.html", 
                            task_id=task_id, 
                            task_type=task_type, 
                            task_info=task_info, 
                            task_status=task_status)

@app.route('/api/tasks')
def api_tasks():
    """Task status API"""
    return jsonify(get_all_tasks_status())

@app.route('/api/tasks/brief')
def api_tasks_brief():
    """Return brief status info for all tasks, without detailed step data, for fast homepage loading."""
    return jsonify(get_all_tasks_status_brief())

@app.route('/task/<task_type>/<task_id>/screenshot/<path:filename>')
def task_screenshot(task_type, task_id, filename):
    """Get task screenshot"""
    screenshot_path = os.path.join(RESULTS_PATH, task_type, task_id, filename)
    if os.path.exists(screenshot_path):
        return send_file(screenshot_path, mimetype='image/png')
    else:
        return "Screenshot does not exist", 404

@app.route('/task/<task_type>/<task_id>/recording')
def task_recording(task_type, task_id):
    """Get task recording video"""
    recording_path = os.path.join(RESULTS_PATH, task_type, task_id, "recording.mp4")
    if os.path.exists(recording_path):
        response = send_file(recording_path, mimetype='video/mp4')
        # Add headers to improve mobile compatibility
        response.headers['Accept-Ranges'] = 'bytes'
        response.headers['Cache-Control'] = 'public, max-age=3600'
        response.headers['X-Content-Type-Options'] = 'nosniff'
        return response
    else:
        return "Recording does not exist", 404

@app.route('/api/task/<task_type>/<task_id>')
def api_task_detail(task_type, task_id):
    """Task detail API"""
    task_info = get_task_info(task_type, task_id)
    task_status = get_task_status(task_type, task_id)
    
    if not task_info:
        return jsonify({"error": "Task does not exist"}), 404
    
    return jsonify({
        "info": task_info,
        "status": task_status
    })

if __name__ == '__main__':
    # Check if necessary directories exist
    if not os.path.exists(TASK_CONFIG_PATH):
        print(f"Warning: Task config file does not exist: {TASK_CONFIG_PATH}")
    
    if not os.path.exists(EXAMPLES_BASE_PATH):
        print(f"Warning: Task examples directory does not exist: {EXAMPLES_BASE_PATH}")
    
    # Start web service
    host = os.getenv("FLASK_HOST", "0.0.0.0")
    port = 8080
    debug = os.getenv("FLASK_DEBUG", "false").lower() == "true"
    
    app.run(host=host, port=port, debug=debug)