Source code for dpdispatcher.run

import os
import re
import sys
from glob import glob
from hashlib import sha1

from dpdispatcher.machine import Machine
from dpdispatcher.submission import Resources, Submission, Task

if sys.version_info >= (3, 11):
    import tomllib
else:
    import tomli as tomllib
from typing import List, Optional

from dargs import Argument

from dpdispatcher.arginfo import machine_dargs, resources_dargs, task_dargs

REGEX = r"(?m)^# /// (?P<type>[a-zA-Z0-9-]+)$\s(?P<content>(^#(| .*)$\s)+)^# ///$"


[docs] def read_pep723(script: str) -> Optional[dict]: """Read a PEP 723 script metadata from a script string. Parameters ---------- script : str Script content. Returns ------- dict PEP 723 metadata. """ name = "script" matches = list( filter(lambda m: m.group("type") == name, re.finditer(REGEX, script)) ) if len(matches) > 1: # TODO: Add tests for scenarios where multiple script blocks are found raise ValueError(f"Multiple {name} blocks found") elif len(matches) == 1: content = "".join( line[2:] if line.startswith("# ") else line[1:] for line in matches[0].group("content").splitlines(keepends=True) ) return tomllib.loads(content) else: # TODO: Add tests for scenarios where no metadata is found return None
[docs] def pep723_args() -> Argument: """Return the argument parser for PEP 723 metadata.""" machine_args = machine_dargs() machine_args.fold_subdoc = True machine_args.doc = "Machine configuration. See related documentation for details." resources_args = resources_dargs(detail_kwargs=False) resources_args.fold_subdoc = True resources_args.doc = ( "Resources configuration. See related documentation for details." ) task_args = task_dargs() command_arg = task_args["command"] command_arg.doc = ( "Python interpreter or launcher. No need to contain the Python script filename." ) command_arg.default = "python" command_arg.optional = True task_args["task_work_path"].doc += " Can be a glob pattern." task_args.name = "task_list" task_args.doc = "List of tasks to execute." task_args.repeat = True task_args.dtype = (list,) return Argument( "pep723", dtype=dict, doc="PEP 723 metadata", sub_fields=[ Argument( "work_base", dtype=str, optional=True, default="./", doc="Base directory for the work", ), Argument( "forward_common_files", dtype=List[str], optional=True, default=[], doc="Common files to forward to the remote machine", ), Argument( "backward_common_files", dtype=List[str], optional=True, default=[], doc="Common files to backward from the remote machine", ), machine_args, resources_args, task_args, ], )
[docs] def create_submission(metadata: dict, hash: str) -> Submission: """Create a Submission instance from a PEP 723 metadata. Parameters ---------- metadata : dict PEP 723 metadata. hash : str Submission hash. Returns ------- Submission Submission instance. """ base = pep723_args() metadata = base.normalize_value(metadata, trim_pattern="_*") base.check_value(metadata, strict=False) tasks = [] for task in metadata["task_list"]: task = task.copy() task["command"] += f" $REMOTE_ROOT/script_{hash}.py" task_work_path = os.path.join( metadata["machine"]["local_root"], metadata["work_base"], task["task_work_path"], ) if os.path.isdir(task_work_path): tasks.append(Task.load_from_dict(task)) elif glob(task_work_path): for file in glob(task_work_path): tasks.append(Task.load_from_dict({**task, "task_work_path": file})) # TODO: Add tests for scenarios where the task work path is a glob pattern else: # TODO: Add tests for scenarios where the task work path is not found raise FileNotFoundError(f"Task work path {task_work_path} not found.") return Submission( work_base=metadata["work_base"], forward_common_files=metadata["forward_common_files"], backward_common_files=metadata["backward_common_files"], machine=Machine.load_from_dict(metadata["machine"]), resources=Resources.load_from_dict(metadata["resources"]), task_list=tasks, )
[docs] def run_pep723(script: str): """Run a PEP 723 script. Parameters ---------- script : str Script content. """ metadata = read_pep723(script) if metadata is None: raise ValueError("No PEP 723 metadata found.") dpdispatcher_metadata = metadata["tool"]["dpdispatcher"] script_hash = sha1(script.encode("utf-8")).hexdigest() submission = create_submission(dpdispatcher_metadata, script_hash) submission.machine.context.write_file(f"script_{script_hash}.py", script) # write script submission.run_submission()