/
Vimgolf1e84e8a
"""
Terminal executor
Code source: [terminal_asciicast_record_executor.py](https://github.com/James4Ever0/agi_computer_control/blob/master/web_gui_terminal_recorder/executor_and_replayer/terminal_asciicast_record_executor.py) (modified)
"""
# pending pypi project name: termexec
# if you want to release this code as a pypi package, you must create a through unit test, across launching terminal, docker container, network requests, etc. focus on the effectiveness of clean-up, and the ability to handle exceptions.
import threading
import time
import os
import signal
import atexit
import agg_python_bindings
import ptyprocess
from pydantic import BaseModel
from typing import Protocol, cast
class Cursor(BaseModel):
"""Cursor position and visibility.
Attributes:
x (int): The x-coordinate (column) of the cursor.
y (int): The y-coordinate (row/line) of the cursor.
hidden (bool): True if the cursor is hidden, False otherwise.
"""
x: int
y: int
hidden: bool
def decode_bytes_to_utf8_safe(_bytes: bytes):
"""
Decode with UTF-8, but replace errors with a replacement character (�).
"""
ret = _bytes.decode("utf-8", errors="replace")
return ret
# screen init params: width, height
# screen traits: write_bytes, display, screenshot
class _TerminalScreenProtocol(Protocol):
def write_bytes(self, _bytes: bytes): ...
@property
def display(self) -> str: ...
def screenshot(self, png_output_path: str): ...
def close(self): ...
@property
def cursor(self) -> Cursor: ...
class AvtScreen:
def __init__(self, width: int, height: int):
"""
Initialize an AvtScreen object.
Args:
width (int): The width of the virtual terminal emulator.
height (int): The height of the virtual terminal emulator.
"""
self.vt = agg_python_bindings.TerminalEmulator(width, height)
"""terminal emulator provided by avt"""
self._closing = False
def write_bytes(self, _bytes: bytes):
"""
Write the given bytes to the virtual terminal emulator.
The bytes are decoded with UTF-8 and any decoding errors are replaced with a
replacement character (�).
Args:
_bytes (bytes): The bytes to be written to the virtual terminal emulator.
"""
decoded_bytes = decode_bytes_to_utf8_safe(_bytes)
self.vt.feed_str(decoded_bytes)
@property
def cursor(self):
"""
Get the current position of the cursor as a `Cursor` object.
Returns:
Cursor: A `Cursor` object with `x` and `y` properties for the current column and row of the cursor, respectively, and a `hidden` property which is `True` if the cursor is hidden and `False` otherwise.
"""
col, row, visible = self.vt.get_cursor()
ret = Cursor(x=col, y=row, hidden=not visible)
return ret
@property
def display(self):
"""
Get the current display of the terminal emulator as a string.
Returns:
str: A string representation of the current display of the terminal emulator.
"""
ret = "\n".join(self.vt.text_raw())
return ret
def screenshot(self, png_output_path: str):
"""
Saves the current state of the terminal emulator to a PNG image file.
The image has the same width and height as the terminal emulator.
Args:
png_output_path: The path to write the PNG image to.
"""
self.vt.screenshot(png_output_path)
def close(self):
"""
Releases all resources used by the terminal emulator.
This is necessary to avoid crashes when creating multiple instances of this class.
"""
if not self._closing:
self._closing = True
del self.vt
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.close()
class TerminalProcess:
def __init__(self, command: list[str], width: int, height: int, backend="avt"):
"""
Initializes the terminal emulator with a command to execute.
Args:
command (list[str]): List of command strings to execute in the terminal
width (int): Width of the terminal emulator
height (int): Height of the terminal emulator
backend (str, optional): Backend to use for terminal emulator. Defaults to "avt".
"""
self._closing = False
rows, cols = height, width
self.pty_process: ptyprocess.PtyProcess = cast(
ptyprocess.PtyProcess,
ptyprocess.PtyProcess.spawn(command, dimensions=(rows, cols)),
)
"""a process executing command in a pseudo terminal"""
if backend == "avt":
self.vt_screen = AvtScreen(width=width, height=height)
"""virtual terminal screen"""
else:
raise ValueError(
"Unknown terminal emulator backend '%s' (known ones: avt, pyte)"
% backend
)
self.vt_screen = cast(_TerminalScreenProtocol, self.vt_screen)
self.__pty_process_reading_thread = threading.Thread(
target=self.__read_and_update_screen, daemon=True
)
self.__start_ptyprocess_reading_thread()
atexit.register(self.close)
def __start_ptyprocess_reading_thread(self):
"""Starts a thread to read output from the terminal process and update the Pyte screen"""
self.__pty_process_reading_thread.start()
def write(self, data: bytes):
"""Writes input data to the terminal process"""
self.pty_process.write(data)
def close(self):
"""Closes the terminal process and the reading thread"""
if not self._closing:
self._closing = True
os.kill(self.pty_process.pid, signal.SIGTERM)
time.sleep(0.5)
if self.pty_process.isalive:
os.kill(self.pty_process.pid, signal.SIGKILL)
self.vt_screen.close()
self.__pty_process_reading_thread.join(timeout=0.5)
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.close()
def __read_and_update_screen(self, poll_interval=0.01):
"""Reads available output from terminal and updates Pyte screen
Args:
poll_interval (float, optional): Interval in seconds to poll for available output. Defaults to 0.01.
"""
while True:
try:
# ptyprocess.read is blocking. only pexpect has read_nonblocking
process_output_bytes = self.pty_process.read(1024)
# write bytes to pyte screen
self.vt_screen.write_bytes(process_output_bytes)
except KeyboardInterrupt: # user interrupted
break
except SystemExit: # python process exit
break
except SystemError: # python error
break
except EOFError: # terminal died
break
except:
# Timeout means no data available, EOF means process ended
pass
finally:
time.sleep(poll_interval)
class TerminalExecutor:
def __init__(self, command: list[str], width: int, height: int):
"""
Initializes executor with a command to run in terminal emulator, using avt as backend.
Args:
command (list[str]): List of command strings to execute
width (int): Width of the terminal emulator
height (int): Height of the terminal emulator
"""
self.terminal = TerminalProcess(command=command, width=width, height=height)
"""a terminal process, running command in pty screen"""
self._closing = False
def input(self, text: str):
"""
Sends input text to the terminal process
Args:
text (str): The input text to send
"""
self.terminal.write(text.encode())
# Allow time for processing output
time.sleep(0.1)
@property
def display(self) -> str:
"""
Get the current display of the terminal emulator as a string.
"""
return self.terminal.vt_screen.display
def screenshot(self, png_save_path: str):
"""
Saves the current display of the terminal emulator as a .png file
Args:
png_save_path (str): The path to save the screenshot to
"""
self.terminal.vt_screen.screenshot(png_save_path)
def close(self):
"""
Closes the terminal emulator process and the associated reading thread.
"""
if not self._closing:
self._closing = True
self.terminal.close()
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.close()
def test_harmless_command_locally_with_bash():
"""
Tests the TerminalExecutor class with a harmless command by running a docker alpine container
and executing a series of input events, then taking a screenshot and dumping the terminal
display to a file.
"""
SLEEP_INTERVAL = 0.5
command = ["docker", "run", "--rm", "-it", "alpine"]
input_events = ['echo "Hello World!"', "\n"]
executor = TerminalExecutor(command=command, width=80, height=24)
time.sleep(1)
for event in input_events:
executor.input(event)
time.sleep(SLEEP_INTERVAL)
# check for screenshot, text dump
text_dump = executor.display
print("Dumping terminal display to ./terminal_executor_text_dump.txt")
with open("./terminal_executor_text_dump.txt", "w+") as f:
f.write(text_dump)
print("Taking terminal screenshot at ./terminal_executor_screenshot.png")
executor.screenshot("./terminal_executor_screenshot.png")
print("Done")
def test():
"""
Runs a test for the TerminalExecutor class by running a harmless command
locally with bash and taking a screenshot and dumping the terminal display
to a file.
This test is useful for checking that the TerminalExecutor class works in
a real environment.
"""
test_harmless_command_locally_with_bash()
if __name__ == "__main__":
test()