import shlex
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from dpdispatcher.machine import Machine
pbs_script_header_template = """
#!/bin/bash -l
{select_node_line}
#PBS -j oe
{queue_name_line}
"""
[docs]class PBS(Machine):
[docs] def gen_script(self, job):
pbs_script = super(PBS, self).gen_script(job)
return pbs_script
[docs] def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + "_job_id"
# script_str = self.sub_script(job_dirs, cmd, args=args, resources=resources, outlog=outlog, errlog=errlog)
self.context.write_file(fname=script_file_name, write_str=script_str)
# self.context.write_file(fname=os.path.join(self.context.submission.work_base, script_file_name), write_str=script_str)
# script_file_dir = os.path.join(self.context.submission.work_base)
script_file_dir = self.context.remote_root
# stdin, stdout, stderr = self.context.block_checkcall('cd %s && %s %s' % (self.context.remote_root, 'qsub', script_file_name))
stdin, stdout, stderr = self.context.block_checkcall(
"cd %s && %s %s"
% (shlex.quote(script_file_dir), "qsub", shlex.quote(script_file_name))
)
subret = stdout.readlines()
job_id = subret[0].split()[0]
self.context.write_file(job_id_name, job_id)
return job_id
[docs] def default_resources(self, resources):
pass
[docs] def check_status(self, job):
job_id = job.job_id
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("qstat -x " + job_id)
err_str = stderr.read().decode("utf-8")
if ret != 0:
if (
str("qstat: Unknown Job Id") in err_str
or str("Job has finished") in err_str
):
if self.check_finish_tag(job=job):
return JobStatus.finished
else:
return JobStatus.terminated
else:
raise RuntimeError(
"status command qstat fails to execute. erro info: %s return code %d"
% (err_str, ret)
)
status_line = stdout.read().decode("utf-8").split("\n")[-2]
status_word = status_line.split()[-2]
# dlog.info (status_word)
if status_word in ["Q", "H"]:
return JobStatus.waiting
elif status_word in ["R"]:
return JobStatus.running
elif status_word in ["C", "E", "K", "F"]:
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else:
return JobStatus.terminated
else:
return JobStatus.unknown
[docs] def check_finish_tag(self, job):
job_tag_finished = job.job_hash + "_job_tag_finished"
return self.context.check_file_exists(job_tag_finished)
[docs]class Torque(PBS):
[docs] def check_status(self, job):
job_id = job.job_id
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("qstat -l " + job_id)
err_str = stderr.read().decode("utf-8")
if ret != 0:
if (
str("qstat: Unknown Job Id") in err_str
or str("Job has finished") in err_str
):
if self.check_finish_tag(job=job):
return JobStatus.finished
else:
return JobStatus.terminated
else:
raise RuntimeError(
"status command qstat fails to execute. erro info: %s return code %d"
% (err_str, ret)
)
status_line = stdout.read().decode("utf-8").split("\n")[-2]
status_word = status_line.split()[-2]
# dlog.info (status_word)
if status_word in ["Q", "H"]:
return JobStatus.waiting
elif status_word in ["R"]:
return JobStatus.running
elif status_word in ["C", "E", "K", "F"]:
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else:
return JobStatus.terminated
else:
return JobStatus.unknown