DPDispatcher API¶
- dpdispatcher.info()¶
- class dpdispatcher.JobStatus.JobStatus(value)¶
An enumeration.
- completing = 6¶
- finished = 5¶
- running = 3¶
- terminated = 4¶
- unknown = 100¶
- unsubmitted = 1¶
- waiting = 2¶
- class dpdispatcher.base_context.BaseContext¶
- bind_submission(submission)¶
- check_finish(proc)¶
- clean()¶
- download(submission, check_exists=False, mark_failure=True, back_error=False)¶
- kill(proc)¶
- classmethod load_from_dict(context_dict)¶
- read_file(fname)¶
- subclasses_dict = {'LazyLocalContext': <class 'dpdispatcher.lazy_local_context.LazyLocalContext'>, 'LocalContext': <class 'dpdispatcher.local_context.LocalContext'>, 'SSHContext': <class 'dpdispatcher.ssh_context.SSHContext'>}¶
- upload(submission)¶
- write_file(fname, write_str)¶
- dpdispatcher.dpdisp.main()¶
- class dpdispatcher.lazy_local_context.LazyLocalContext(local_root, remote_root=None, remote_profile={})¶
- bind_submission(submission)¶
- block_call(cmd)¶
- block_checkcall(cmd)¶
- call(cmd)¶
- check_file_exists(fname)¶
- check_finish(proc)¶
- clean()¶
- download(jobs, check_exists=False, mark_failure=True, back_error=False)¶
- get_job_root()¶
- get_return(proc)¶
- kill(proc)¶
- classmethod load_from_dict(context_dict)¶
- read_file(fname)¶
- upload(jobs, dereference=True)¶
- write_file(fname, write_str)¶
- class dpdispatcher.local_context.LocalContext(local_root, remote_root, remote_profile={})¶
- bind_submission(submission)¶
- block_call(cmd)¶
- block_checkcall(cmd)¶
- call(cmd)¶
- check_file_exists(fname)¶
- check_finish(proc)¶
- clean()¶
- download(submission, check_exists=False, mark_failure=True, back_error=False)¶
- download_(job_dirs, remote_down_files, check_exists=False, mark_failure=True, back_error=False)¶
- get_job_root()¶
- get_return(proc)¶
- kill(proc)¶
- classmethod load_from_dict(context_dict)¶
- read_file(fname)¶
- upload(submission)¶
- upload_(job_dirs, local_up_files, dereference=True)¶
- write_file(fname, write_str)¶
- class dpdispatcher.lsf.LSF(context)¶
LSF batch
- check_finish_tag(job)¶
- check_status(job)¶
- default_resources(resources)¶
- do_submit(job)¶
submit a single job, assuming that no job is running there.
- gen_script(job)¶
- class dpdispatcher.machine.Machine(context)¶
A machine is used to handle the connection with remote machines.
- contextSubClass derived from BaseContext
The context is used to mainatin the connection with remote machine.
- static arginfo()¶
- check_finish_tag(**kwargs)¶
- check_if_recover(submission)¶
- check_status(job)¶
- default_resources(res)¶
- do_submit(job)¶
submit a single job, assuming that no job is running there.
- gen_command_env_cuda_devices(resources)¶
- gen_script(job)¶
- gen_script_command(job)¶
- gen_script_custom_flags_lines(job)¶
- gen_script_end(job)¶
- gen_script_env(job)¶
- gen_script_header(job)¶
- gen_script_wait(resources)¶
- classmethod load_from_dict(machine_dict)¶
- classmethod load_from_json(json_path)¶
- sub_script_cmd(res)¶
- sub_script_head(res)¶
- subclasses_dict = {'LSF': <class 'dpdispatcher.lsf.LSF'>, 'PBS': <class 'dpdispatcher.pbs.PBS'>, 'Shell': <class 'dpdispatcher.shell.Shell'>, 'Slurm': <class 'dpdispatcher.slurm.Slurm'>}¶
- class dpdispatcher.pbs.PBS(context)¶
- check_finish_tag(job)¶
- check_status(job)¶
- default_resources(resources)¶
- do_submit(job)¶
submit a single job, assuming that no job is running there.
- gen_script(job)¶
- gen_script_header(job)¶
- class dpdispatcher.shell.Shell(context)¶
- check_finish_tag(job)¶
- check_status(job)¶
- check_status_(job)¶
- default_resources(resources)¶
- do_submit(job)¶
submit a single job, assuming that no job is running there.
- gen_script(job)¶
- gen_script_header(job)¶
- class dpdispatcher.slurm.Slurm(context)¶
- check_finish_tag(job)¶
- check_status(job, retry=0, max_retry=3)¶
- default_resources(resources)¶
- do_submit(job, retry=0, max_retry=3)¶
submit a single job, assuming that no job is running there.
- gen_script(job)¶
- gen_script_header(job)¶
- class dpdispatcher.ssh_context.SSHContext(local_root, remote_root, remote_profile, clean_asynchronously=False)¶
- bind_submission(submission)¶
- block_call(cmd)¶
- block_checkcall(cmd, asynchronously=False, stderr_whitelist=None)¶
Run command with arguments. Wait for command to complete. If the return code was zero then return, otherwise raise RuntimeError.
- cmd: str
The command to run.
- asynchronously: bool, optional, default=False
Run command asynchronously. If True, nohup will be used to run the command.
- call(cmd)¶
- check_file_exists(fname)¶
- check_finish(cmd_pipes)¶
- clean()¶
- close()¶
- download(submission, check_exists=False, mark_failure=True, back_error=False)¶
- get_job_root()¶
- get_return(cmd_pipes)¶
- kill(cmd_pipes)¶
- classmethod load_from_dict(context_dict)¶
- read_file(fname)¶
- property sftp¶
- property ssh¶
- upload(submission, dereference=True)¶
- write_file(fname, write_str)¶
- class dpdispatcher.ssh_context.SSHSession(hostname, username, password=None, port=22, key_filename=None, passphrase=None, timeout=10)¶
- static arginfo()¶
- close()¶
- ensure_alive(max_check=10, sleep_time=10)¶
- exec_command(cmd, retry=0)¶
Calling self.ssh.exec_command but has an exception check.
- get_ssh_client()¶
- property sftp¶
Returns sftp. Open a new one if not existing.
- class dpdispatcher.submission.Job(job_task_list, *, resources, machine=None)¶
Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.
- job_task_listlist of Task
the tasks belonging to the job
- resourcesResources
the machine resources. Passed from Submission when it constructs jobs.
- machinemachine
machine object to execute the job. Passed from Submission when it constructs jobs.
- classmethod deserialize(job_dict, machine=None)¶
convert the job_dict to a Submission class object
- submission_dictdict
path-like, the base directory of the local tasks
- submissionJob
the Job class instance converted from the job_dict
- get_hash()¶
- get_job_state()¶
get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.
this method will not submit or resubmit the jobs if the job is unsubmitted.
- handle_unexpected_job_state()¶
- job_to_json()¶
- register_job_id(job_id)¶
- serialize(if_static=False)¶
convert the Task class instance to a dictionary.
- if_staticbool
whether dump the job runtime infomation (job_id, job_state, fail_count, job_uuid etc.) to the dictionary.
- task_dictdict
the dictionary converted from the Task class instance
- submit_job()¶
- class dpdispatcher.submission.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False}, para_deg=1, source_list=[], **kwargs)¶
Resources is used to describe the machine resources we need to do calculations.
- number_nodeint
The number of node need for each job.
- cpu_per_nodeint
cpu numbers of each node.
- gpu_per_nodeint
gpu numbers of each node.
- queue_namestr
The queue name of batch job scheduler system.
- group_sizeint
The number of tasks in a job.
- custom_flagslist of Str
The extra lines pass to job submitting script header
- strategydict
strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool
If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.
- para_degint
Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]
- source_listlist of Path
The env file to be sourced before the command execution.
- static arginfo()¶
- classmethod deserialize(resources_dict)¶
- classmethod load_from_dict(resources_dict)¶
- classmethod load_from_json(json_file)¶
- serialize()¶
- class dpdispatcher.submission.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])¶
A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.
- work_basePath
path-like, the base directory of the local tasks
- machineMachine
machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.
- resourcesResources
the machine resources (cpu or gpu) used to generate the slurm/pbs script
- forward_common_fileslist
the common files to be uploaded to other computers before the jobs begin
- backward_common_fileslist
the common files to be downloaded from other computers after the jobs finish
- task_listlist of Task
a list of tasks to be run.
- bind_machine(machine)¶
bind this submission to a machine. update the machine’s context remote_root and local_root.
- machineMachine
the machine to bind with
- check_all_finished()¶
check whether all the jobs in the submission.
This method will not handle unexpected job state in the submission.
- clean_jobs()¶
- classmethod deserialize(submission_dict, machine=None)¶
convert the submission_dict to a Submission class object
- submission_dictdict
path-like, the base directory of the local tasks
- submissionSubmission
the Submission class instance converted from the submission_dict
- download_jobs()¶
- generate_jobs()¶
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.
- get_hash()¶
- get_submission_state()¶
check whether all the jobs in the submission.
this method will not handle unexpected (like resubmit terminated) job state in the submission.
- handle_unexpected_submission_state()¶
handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.
- register_task(task)¶
- register_task_list(task_list)¶
- run_submission(*, exit_on_submit=False, clean=True)¶
main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. if exit_on_submit is True, submission will exit.
- serialize(if_static=False)¶
convert the Submission class instance to a dictionary.
- if_staticbool
whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.
- submission_dictdict
the dictionary converted from the Submission class instance
- classmethod submission_from_json(json_file_name='submission.json')¶
- submission_to_json()¶
- submit_submission()¶
submit the job belonging to the submission.
- try_recover_from_json()¶
- upload_jobs()¶
- class dpdispatcher.submission.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')¶
A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.
- commandStr
the command to be executed.
- task_work_pathPath
the directory of each file where the files are dependent on.
- forward_fileslist of Path
the files to be transmitted to remote machine before the command execute.
- backward_fileslist of Path
the files to be transmitted from remote machine after the comand finished.
- outlogStr
the filename to which command redirect stdout
- errlogStr
the filename to which command redirect stderr
- classmethod deserialize(task_dict)¶
convert the task_dict to a Task class object
- task_dictdict
the dictionary which contains the task information
- taskTask
the Task class instance converted from the task_dict
- get_hash()¶
- serialize()¶
- class dpdispatcher.dpcloudserver.retcode.RETCODE¶
- DATAERR = '2002'¶
- DBERR = '2000'¶
- IOERR = '2003'¶
- LOGINERR = '2100'¶
- NODATA = '2300'¶
- OK = '0000'¶
- PARAMERR = '2101'¶
- PWDERR = '2104'¶
- REQERR = '2200'¶
- ROLEERR = '2103'¶
- THIRDERR = '2001'¶
- UNDERDEBUG = '2301'¶
- UNKOWNERR = '2400'¶
- USERERR = '2102'¶
- VERIFYERR = '2105'¶
- dpdispatcher.dpcloudserver.zip_file.is_selected(arcname, selected)¶
- dpdispatcher.dpcloudserver.zip_file.unzip_file(zip_file, out_dir='./')¶
- dpdispatcher.dpcloudserver.zip_file.zip_file_list(root_path, zip_filename, file_list=[])¶
- dpdispatcher.dpcloudserver.zip_file.zip_files(root_path, out_file, selected=[])¶