/
Vimgolf4117bba
"""
Library of vimgolf-gym
"""
# TODO: implement a openai-gym style interface
# reference link: https://github.com/Farama-Foundation/Gymnasium
# TODO: implement a gradual scoring system by comparing the buffer with the target output, extracting the vim edit buffer in the middle of execution
# Vim API reference:
# https://github.com/LachlanGray/vim-agent
# https://github.com/nsbradford/VimGPT
# TODO: dockerize the executor part, push the image, offer an option or environment variable to use dockerized executor
import atexit
import os
import pathlib
import shutil
import sys
import tempfile
import typing
import time
import zipfile
import PIL.Image
import requests
import uuid
import vimgolf.vimgolf as vimgolf
import vimgolf_gym.dataclasses as dataclasses
import vimgolf_gym.log_parser as log_parser
import vimgolf_gym.terminal_executor as terminal_executor
from vimgolf_gym._vimrc import (
_prepare_cybergod_vimrc_with_buffer_file,
_CYBERGOD_VIMGOLF_VIMRC_FILEPATH,
)
import subprocess
_HOMEDIR = os.path.expanduser("~")
_CYBERGOD_VIMGOLF_DATASET_BASEDIR = os.path.join(
_HOMEDIR, ".cache", "cybergod-vimgolf-dataset"
)
_CYBERGOD_VIMGOLF_GYM_DATASET_DOWNLOADED = os.path.join(
_CYBERGOD_VIMGOLF_DATASET_BASEDIR, "DATASET_DOWNLOADED"
) # a flag to indicate whether the dataset has been downloaded
os.makedirs(_CYBERGOD_VIMGOLF_DATASET_BASEDIR, exist_ok=True)
__all__ = [
"make",
"list_local_challenge_ids",
"get_local_challenge_definition",
"get_local_challenge_metadata",
"get_local_challenge_worst_solution",
"get_local_challenge_worst_solution_header",
"format_vimgolf_string",
"VimGolfEnv",
]
class DatasetInitError(Exception):
pass
def format_vimgolf_string(string:str):
"""dos2unix and add newline to end if missing."""
return vimgolf.format_(string)
def assert_challenge_id_length(challenge_id: str):
"""Assert the challenge_id length to be 24"""
assert len(challenge_id) == 24
def make(
env_name: str,
custom_challenge: typing.Optional[dataclasses.VimGolfCustomChallenge] = None,
use_docker: bool = False,
log_buffer: bool = False,
) -> "VimGolfEnv":
"""
Create a VimGolf environment.
The env_name can be one of the following:
- `vimgolf-test`: A simple test environment.
- `vimgolf-local-<challenge_id>`: A local environment for a specific challenge.
- `vimgolf-online-<challenge_id>`: An online environment for a specific challenge.
- `vimgolf-custom`: A custom environment. The `custom_challenge` parameter will be used.
Args:
env_name (str): The name of the environment to create.
use_docker (bool, optional): Whether to use a dockerized executor. Defaults to False.
log_buffer (bool, optional): Whether to log the editor buffer or not. Defaults to False.
custom_challenge (Optional[VimGolfCustomChallenge], optional): The custom challenge to use. Defaults to None.
Raises:
NotImplementedError: If the environment name is not recognized.
Returns:
VimGolfEnv: The created environment
"""
if use_docker:
os.environ["VIMGOLF_GYM_USE_DOCKER"] = "1"
else:
os.environ["VIMGOLF_GYM_USE_DOCKER"] = "0"
if log_buffer:
os.environ["VIMGOLF_GYM_LOG_BUFFER"] = "1"
else:
os.environ["VIMGOLF_GYM_LOG_BUFFER"] = "0"
if env_name == "vimgolf-test":
env = make_test()
elif env_name == "vimgolf-custom":
assert (
custom_challenge
), "custom_challenge must be provided for vimgolf-custom environment"
env = make_env_with_text(
input_text=custom_challenge.input, output_text=custom_challenge.output
)
elif env_name.startswith("vimgolf-local-"):
challenge_id = env_name[len("vimgolf-local-") :]
assert_challenge_id_length(challenge_id)
env = make_offline_with_cybergod_dataset(challenge_id)
elif env_name.startswith("vimgolf-online-"):
challenge_id = env_name[len("vimgolf-online-") :]
assert_challenge_id_length(challenge_id)
env = make_online(challenge_id)
else:
raise NotImplementedError
return env
def make_test() -> "VimGolfEnv":
"""
Create an environment for a simple test challenge.
The test challenge is about typing "hello world" into the buffer and then saving and quitting vim.
The expected solution is "hello world\nhello world\n".
"""
input_text = ""
output_text = "hello world\nhello world\n"
return make_env_with_text(input_text=input_text, output_text=output_text)
def make_env_with_text(input_text: str, output_text: str) -> "VimGolfEnv":
"""
Create a VimGolfEnv with given input and output text.
Creates a temporary directory and two files, one for the input and one for the output.
Then it calls make_offline with the paths of the two files.
Args:
input_text (str): The starting code.
output_text (str): The expected solution code.
Returns:
VimGolfEnv: The environment object.
"""
tempdir = tempfile.TemporaryDirectory()
atexit.register(tempdir.cleanup)
input_file = os.path.join(tempdir.name, "input.txt")
output_file = os.path.join(tempdir.name, "output.txt")
with open(input_file, "w") as f:
f.write(input_text)
with open(output_file, "w") as f:
f.write(output_text)
return make_offline(input_file, output_file)
def make_offline(input_file: str, output_file: str) -> "VimGolfEnv":
"""
Create an environment from a VimGolf challenge given as a local file.
Args:
input_file (str): Path to the file containing the starting code.
output_file (str): Path to the file containing the expected solution code.
Returns:
VimGolfEnv: An environment for the given challenge.
"""
use_docker = os.environ.get("VIMGOLF_GYM_USE_DOCKER", None) == "1"
log_buffer = os.environ.get("VIMGOLF_GYM_LOG_BUFFER", None) == "1"
return VimGolfEnv(
input_file=input_file,
output_file=output_file,
use_docker=use_docker,
log_buffer=log_buffer,
)
def make_online(challenge_id: str) -> "VimGolfEnv":
"""
Create an environment from a VimGolf challenge online.
Given a challenge_id, obtain the challenge definition from the VimGolf website
and create an environment out of it.
Args:
challenge_id (str): Unique identifier for the challenge.
Returns:
VimGolfEnv: An environment for the given challenge.
"""
challenge_url = vimgolf.get_challenge_url(challenge_id)
challenge_data = requests.get(challenge_url).content
challenge = dataclasses.VimGolfChallengeDefinition.parse_raw(challenge_data)
return make_env_with_challenge(challenge)
def make_env_with_challenge(
challenge: dataclasses.VimGolfChallengeDefinition,
) -> "VimGolfEnv":
"""
Create an environment from a VimGolfChallengeDefinition object.
This function simply passes the input and output text to make_env_with_text,
which will create a VimGolfEnv with the given text.
Args:
challenge: VimGolfChallengeDefinition object
Returns:
VimGolfEnv: An environment for the given challenge
"""
return make_env_with_text(
input_text=challenge.input.data, output_text=challenge.output.data
)
def init_cybergod_vimgolf_dataset() -> None:
"""
Initialize the local dataset by downloading it if it does not exist yet.
After this function is called, the local dataset should be downloaded and
ready to use.
This function is called by `list_local_challenge_ids` and `make_offline_with_cybergod_dataset`.
"""
if not os.path.exists(_CYBERGOD_VIMGOLF_GYM_DATASET_DOWNLOADED):
download_cybergod_vimgolf_dataset()
def list_local_challenge_ids() -> list[str]:
"""
List all challenge ids in the local dataset.
This function will download the local dataset if it does not exist yet.
Returns:
list[str]: a list of all challenge ids in the local dataset.
"""
init_cybergod_vimgolf_dataset()
challenges_dir = os.path.join(
_CYBERGOD_VIMGOLF_DATASET_BASEDIR,
"challenges",
)
challenge_ids = os.listdir(challenges_dir)
return challenge_ids
def make_offline_with_cybergod_dataset(challenge_id: str) -> "VimGolfEnv":
"""
Load a VimGolf challenge from the local dataset and make an environment
out of it.
Given a challenge_id, find the corresponding challenge definition in the
dataset, parse it into a VimGolfChallengeDefinition object, and create a
VimGolfEnv out of it.
Args:
challenge_id (str): Unique identifier for the challenge.
Returns:
VimGolfEnv: An environment for the given challenge.
"""
init_cybergod_vimgolf_dataset()
challenge = get_local_challenge_definition(challenge_id)
return make_env_with_challenge(challenge)
def get_local_challenge_metadata(challenge_id: str):
"""
Load a VimGolf challenge's metadata from the local dataset.
Given a challenge_id, find the corresponding JSON file in the dataset
and parse it into a VimGolfChallengeMetadata object.
Args:
challenge_id (str): Unique identifier for the challenge.
Returns:
VimGolfChallengeMetadata: Parsed challenge metadata.
Raises:
AssertionError: If the metadata file does not exist.
"""
metadata_file = os.path.join(
_CYBERGOD_VIMGOLF_DATASET_BASEDIR, "challenges", challenge_id, "metadata.json"
)
assert os.path.exists(metadata_file), (
"Metadata file '%s' does not exist" % metadata_file
)
with open(metadata_file, "r") as f:
metadata = dataclasses.VimGolfChallengeMetadata.parse_raw(f.read())
return metadata
def get_local_challenge_worst_solution(challenge_id: str):
"""
Load the worst solution for a challenge from the local dataset.
Given a challenge_id, find the corresponding JSON file in the dataset
and parse it into a VimGolfPublicSolution object.
Args:
challenge_id (str): Unique identifier for the challenge.
Returns:
VimGolfPublicSolution: Parsed worst solution.
Raises:
AssertionError: If the worst solution file does not exist.
"""
metadata_file = os.path.join(
_CYBERGOD_VIMGOLF_DATASET_BASEDIR,
"challenges",
challenge_id,
"worst_solution.json",
)
assert os.path.exists(metadata_file), (
"Worst solution file '%s' does not exist" % metadata_file
)
with open(metadata_file, "r") as f:
Worst_solution = dataclasses.VimGolfPublicSolution.parse_raw(f.read())
return Worst_solution
def get_local_challenge_worst_solution_header(challenge_id: str):
"""
Parse the worst solution's header string from the local dataset.
Given a challenge_id, find the corresponding worst solution file in the dataset
and parse its header string into a VimGolfParsedPublicSolutionHeader object.
Args:
challenge_id (str): Unique identifier for the challenge.
Returns:
VimGolfParsedPublicSolutionHeader: Parsed header string.
Raises:
AssertionError: If the worst solution file does not exist.
"""
solution = get_local_challenge_worst_solution(challenge_id)
header = solution.header
ret = dataclasses.parse_public_solution_header(header)
return ret
def get_local_challenge_definition(challenge_id: str):
"""
Load a VimGolf challenge definition from the local dataset.
Given a challenge_id, find the corresponding JSON file in the dataset
and parse it into a VimGolfChallengeDefinition object.
Args:
challenge_id (str): Unique identifier for the challenge.
Returns:
VimGolfChallengeDefinition: Parsed challenge definition.
Raises:
AssertionError: If the challenge file does not exist.
"""
challenge_file = os.path.join(
_CYBERGOD_VIMGOLF_DATASET_BASEDIR, "challenges", challenge_id, "challenge.json"
)
assert os.path.exists(challenge_file), (
"Challenge file '%s' does not exist" % challenge_file
)
with open(challenge_file, "r") as f:
challenge = dataclasses.VimGolfChallengeDefinition.parse_raw(f.read())
return challenge
def download_cybergod_vimgolf_dataset():
"""
Download the CyberGod VimGolf dataset from various sources.
This function is called when the dataset is not initialized yet. It downloads the dataset
from various sources (Kaggle, Hugging Face, GitHub Releases, GitHub Mirror) and extracts it
to the dataset directory. After the download is finished, it touches the flag file
_CYBERGOD_VIMGOLF_GYM_DATASET_DOWNLOADED to indicate that the dataset is initialized.
If the download fails, it raises an exception and cleans up the dataset directory.
:raises DatasetInitError: If the dataset download fails.
"""
print(
"Initializing CyberGod VimGolf dataset at:", _CYBERGOD_VIMGOLF_DATASET_BASEDIR
)
try:
# TODO: add huggingface, hf-mirror.com, github releases and github mirror links
download_urls = [
"https://www.kaggle.com/api/v1/datasets/download/jessysisca/vimgolf-challenges-and-solutions",
"https://hf-mirror.com/datasets/James4Ever0/vimgolf_challenges_and_solutions/resolve/main/challenges.zip?download=true",
"https://huggingface.co/datasets/James4Ever0/vimgolf_challenges_and_solutions/resolve/main/challenges.zip?download=true",
"https://github.com/James4Ever0/vimgolf-gym/releases/download/dataset-release/challenges.zip",
"https://bgithub.xyz/James4Ever0/vimgolf-gym/releases/download/dataset-release/challenges.zip",
]
with tempfile.TemporaryDirectory() as tempdir:
zip_file_path = os.path.join(
tempdir, "vimgolf-challenges-and-solutions.zip"
)
with open(zip_file_path, "wb") as f:
content = None
for url in download_urls:
try:
print("Downloading:", url)
content = requests.get(
url, allow_redirects=True, timeout=10
).content
break
except requests.Timeout:
print("Timeout, trying next URL")
if content:
f.write(content)
else:
raise DatasetInitError("Failed to download the dataset")
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
# extract to CYBERGOD_VIMGOLF_GYM_DATASET_DIR
zip_ref.extractall(_CYBERGOD_VIMGOLF_DATASET_BASEDIR)
# after all, touch the flag _CYBERGOD_VIMGOLF_GYM_DATASET_DOWNLOADED
pathlib.Path(_CYBERGOD_VIMGOLF_GYM_DATASET_DOWNLOADED).touch()
finally:
if not os.path.exists(_CYBERGOD_VIMGOLF_GYM_DATASET_DOWNLOADED):
# cleanup the dataset basedir, if the dataset is not downloaded successfully
shutil.rmtree(_CYBERGOD_VIMGOLF_DATASET_BASEDIR)
class VimGolfEnv:
def __init__(
self,
input_file: str,
output_file: str,
width: int = 80,
height: int = 24,
init_keys: str = "",
use_docker: bool = False,
log_buffer: bool = False,
):
"""Initialize the environment with the given input and output files.
Args:
input_file (str): the input file path
output_file (str): the output file path
width (int): the width of the terminal
height (int): the height of the terminal
use_docker (bool): whether use dockerized executor or local (requiring vim installed)
log_buffer (bool): whether to log the editor buffer or not
init_keys (str): initial input keys in Vimgolf solution style
"""
self.use_docker = use_docker
"""whether use dockerized executor or local (requiring vim installed)"""
self.input_file = input_file
"""the input file path"""
self.output_file = output_file
"""the output file path"""
self.log_buffer = log_buffer
"""whether to log the editor buffer or not"""
self.init_keys = init_keys
"""initial input keys in Vimgolf solution style"""
assert os.path.isfile(
self.input_file
), f"Input file {self.input_file} does not exist."
assert os.path.isfile(
self.output_file
), f"Output file {self.output_file} does not exist."
# check if the content of the input file is different from the output file
with open(self.input_file, "rb") as f:
_input_content = f.read()
with open(self.output_file, "rb") as f:
_output_content = f.read()
assert _input_content != _output_content, "Input file and output file cannot be the same."
# TODO: run a modified version of vimgolf local python script writing progress to a jsonl file, which embeds in this script, for easy state inspection and data collection (we can create a temporary directory for cleanup)
self.log_directory = tempfile.TemporaryDirectory()
"""the log directory, where tempfiles stored"""
self.log_file = os.path.join(self.log_directory.name, "vimgolf.log")
"""the log file path, used to retrieve progress info of the vimgolf process"""
self.buffer_file = os.path.join(
self.log_directory.name, "vimgolf_editor_buffer"
)
"""the editor buffer, used to track granual progress"""
self._container_name = str(uuid.uuid4())
if self.use_docker:
mountpoint = "/vimgolf_gym_workdir"
docker_output_file = os.path.join(mountpoint, "out")
docker_input_file = os.path.join(mountpoint, "in")
docker_buffer_file = os.path.join(mountpoint, "vimgolf_editor_buffer")
docker_log_file = os.path.join(mountpoint, "vimgolf.log")
shutil.copy(self.input_file, os.path.join(self.log_directory.name, "in"))
shutil.copy(self.output_file, os.path.join(self.log_directory.name, "out"))
extra_docker_run_params = []
if self.log_buffer:
_prepare_cybergod_vimrc_with_buffer_file(docker_buffer_file)
extra_docker_run_params += [
"-v",
"%s:%s:ro"
% (
_CYBERGOD_VIMGOLF_VIMRC_FILEPATH,
"/usr/local/lib/python3.10/dist-packages/vimgolf/vimgolf.vimrc",
),
]
self.command = [
"docker",
"run",
"--rm",
"-it",
"--name",
self._container_name,
"-v",
"%s:%s" % (self.log_directory.name, mountpoint),
*extra_docker_run_params,
"--network=none",
"--entrypoint",
"python3",
"agile4im/cybergod_vimgolf_gym",
"-m",
"vimgolf_gym.vimgolf",
"--input_file",
docker_input_file,
"--output_file",
docker_output_file,
"--log_file",
docker_log_file,
]
if self.init_keys:
self.command += ["--init_keys", self.init_keys]
else:
extra_flags = []
if self.log_buffer:
extra_flags += ["--buffer_file", self.buffer_file]
if self.init_keys:
extra_flags += ["--init_keys", self.init_keys]
self.command = [
sys.executable,
"-m",
"vimgolf_gym.vimgolf",
"--input_file",
self.input_file,
"--output_file",
self.output_file,
"--log_file",
self.log_file,
*extra_flags,
]
self._closing = False
self.command: list[str]
"""the command passed to underlying terminal executor"""
self.width = width
"""terminal width"""
self.height = height
"""terminal height"""
self.create_executor_and_log_watcher()
self.executor: terminal_executor.TerminalExecutor
"""terminal executor for running vimgolf process"""
self.log_watcher: log_parser.VimGolfLogWatcher
"""log watcher for tracking vimgolf log output"""
atexit.register(self.log_directory.cleanup)
atexit.register(self._kill_docker_container)
@property
def buffer(self) -> typing.Optional[bytes]:
"""The editor buffer"""
if not self.log_buffer:
return None
else:
if os.path.isfile(self.buffer_file):
with open(self.buffer_file, "rb") as f:
return f.read()
def act(self, action: str):
"""Take an action
Args:
action (str): the action to take
"""
self.executor.input(action)
@property
def success(self):
"""Check if the vimgolf challenge has been solved successfully"""
return self.log_watcher.success
def get_best_success_result(self):
"""
Return the best success result in the log watcher.
Returns:
Optional[VimGolfEnvResult]: The best success result.
"""
return self.log_watcher.get_best_success_result()
def get_last_success_result(self):
"""
Return the last success result in the log watcher.
Returns:
Optional[VimGolfEnvResult]: The last success result
"""
return self.log_watcher.get_last_success_result()
@property
def results(self):
"""The results of the vimgolf challenge environment
Returns:
list[VimGolfEnvResult]: The results of the vimgolf challenge environment
"""
return self.log_watcher.results
def __enter__(self):
return self
def __exit__(self, exc, value, tb):
self.close()
@property
def success_results(self):
"""The success results of the vimgolf challenge environment
Returns:
list[VimGolfEnvResult]: The success results of the vimgolf challenge environment
"""
return self.log_watcher.success_results
def create_executor_and_log_watcher(self):
"""Create the executor and log watcher"""
self.executor = terminal_executor.TerminalExecutor(
command=self.command, width=self.width, height=self.height
)
if not hasattr(self, "log_watcher"):
self.log_watcher = log_parser.VimGolfLogWatcher(self.log_file)
# shall we wait the executor be ready
# we wait for the 'play(...)' indicator to appear in the log file.
while True:
if os.path.exists(self.log_file):
with open(self.log_file, "r") as f:
log_content = f.read()
if "play(...)" in log_content:
break
time.sleep(0.5)
def reset(self):
"""Reset the environment"""
self.close()
self._closing = False
self.create_executor_and_log_watcher()
def render(self):
"""Render the environment"""
screenshot = self.screenshot()
# display the screenshot
screenshot.show()
def screenshot(self):
"""Take a screenshot of the environment
Returns:
PIL.Image.Image: The screenshot
"""
with tempfile.TemporaryDirectory() as tmpdir:
png_tmpfile_path = os.path.join(tmpdir, "screenshot.png")
self.executor.screenshot(png_tmpfile_path)
image = PIL.Image.open(png_tmpfile_path)
return image
def _kill_docker_container(self):
if self.use_docker:
subprocess.run(
["docker", "kill", self._container_name], capture_output=True
)
def verify_keys(self, keys: str):
"""Verify a solution by its keys
Args:
keys (str): the keys to verify, in Vimgolf style
"""
assert keys, "Keys cannot be empty"
with VimGolfEnv(
input_file=self.input_file,
output_file=self.output_file,
init_keys=keys,
use_docker=self.use_docker,
log_buffer=True,
) as env:
for _ in range(3):
success = env.success
if success:
break
time.sleep(1)
if not success:
buffer = env.buffer
with open(self.output_file, "rb") as f:
expected_output = f.read()
buffer = vimgolf.format_(buffer)
expected_output = vimgolf.format_(expected_output)
success = buffer == expected_output
return success
def close(self):
"""Close the environment"""
if not self._closing:
self._closing = True
self.executor.close()
self._kill_docker_container()
del self.executor
if os.path.exists(self.log_file):
os.remove(self.log_file)
setattr(self, "executor", None)
def calculate_relative_inverse_score(self, score:int, worst_score:typing.Optional[int] =None):
"""Calculate the relative inverse score of the given score
Args:
score (int): The score to calculate the relative inverse score of.
worst_score (int, optional): The worst score to use. Defaults to None.
Returns:
float: The relative inverse score.
"""
assert score >= 0, "Score must be non-negative"
if worst_score is None:
with open(self.output_file, "r") as f:
worst_score = len(f.read()) + 10
ret = worst_score / score
return ret