from __future__ import print_function
import os, sys
import subprocess
import socket # to get the host name
from .models import Base, Job, ArrayJob, Status, times
from .tools import logger
import sqlalchemy
from distutils.version import LooseVersion
# Defines an equivalent `which` function to dig-out the location of excutables
import shutil
if sys.version_info[:2] >= (3, 3):
which = shutil.which
else: #define our own
[docs] def which(cmd, mode=os.F_OK|os.X_OK, path=None):
from distutils.spawn import find_executable
candidate = find_executable(cmd, path)
st = os.stat(candidate)
if bool(st.st_mode & mode):
return candidate
return None
[docs]class JobManager:
"""This job manager defines the basic interface for handling jobs in the SQL database."""
def __init__(self, database = 'submitted.sql3', wrapper_script = None, debug = False):
self._database = os.path.realpath(database)
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, connect_args={'timeout': 600}, echo=debug)
self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
# store the command that this job manager was called with
if wrapper_script is None: wrapper_script = 'jman'
if not os.path.exists(wrapper_script):
bindir = os.path.join(os.path.realpath(os.curdir), 'bin')
wrapper_script = which(wrapper_script, path=os.pathsep.join((bindir,
os.environ['PATH'])))
if wrapper_script is None:
raise IOError("Could not find the installation path of gridtk. Please specify it in the wrapper_script parameter of the JobManager.")
if not os.path.exists(wrapper_script):
raise IOError("Your wrapper_script cannot be found. Jobs will not be executable.")
self.wrapper_script = wrapper_script
def __del__(self):
# remove the database if it is empty
if os.path.isfile(self._database):
# in errornous cases, the session might still be active, so don't create a deadlock here!
if not hasattr(self, 'session'):
self.lock()
job_count = len(self.get_jobs())
self.unlock()
if not job_count:
logger.debug("Removed database file '%s' since database is empty" % self._database)
os.remove(self._database)
[docs] def lock(self):
"""Generates (and returns) a blocking session object to the database."""
if hasattr(self, 'session'):
raise RuntimeError('Dead lock detected. Please do not try to lock the session when it is already locked!')
if LooseVersion(sqlalchemy.__version__) < LooseVersion('0.7.8'):
# for old sqlalchemy versions, in some cases it is required to re-generate the engine for each session
self._engine = sqlalchemy.create_engine("sqlite:///"+self._database)
self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
# create the database if it does not exist yet
if not os.path.exists(self._database):
self._create()
# now, create a session
self.session = self._session_maker()
logger.debug("Created new database session to '%s'" % self._database)
return self.session
[docs] def unlock(self):
"""Closes the session to the database."""
if not hasattr(self, 'session'):
raise RuntimeError('Error detected! The session that you want to close does not exist any more!')
logger.debug("Closed database session of '%s'" % self._database)
self.session.close()
del self.session
def _create(self):
"""Creates a new and empty database."""
from .tools import makedirs_safe
# create directory for sql database
makedirs_safe(os.path.dirname(self._database))
# create all the tables
Base.metadata.create_all(self._engine)
logger.debug("Created new empty database '%s'" % self._database)
[docs] def get_jobs(self, job_ids = None):
"""Returns a list of jobs that are stored in the database."""
if job_ids is not None and len(job_ids) == 0:
return []
q = self.session.query(Job)
if job_ids is not None:
q = q.filter(Job.unique.in_(job_ids))
return sorted(list(q), key=lambda job: job.unique)
def _job_and_array(self, job_id, array_id = None):
# get the job (and the array job) with the given id(s)
job = self.get_jobs((job_id,))
if len(job) > 1:
logger.error("%d jobs with the same ID '%d' were detected in the database"%(len(job), job_id))
elif not len(job):
logger.warn("Job with ID '%d' was not found in the database."%job_id)
return (None, None)
job = job[0]
unique_id = job.unique
if array_id is not None:
array_job = list(self.session.query(ArrayJob).filter(ArrayJob.job_id == unique_id).filter(ArrayJob.id == array_id))
assert (len(array_job) == 1)
return (job, array_job[0])
else:
return (job, None)
[docs] def run_job(self, job_id, array_id = None):
"""This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable."""
# set the job's status in the database
try:
# get the job from the database
self.lock()
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
return
job = jobs[0]
# get the machine name we are executing on; this might only work at idiap
machine_name = socket.gethostname()
# set the 'executing' status to the job
job.execute(array_id, machine_name)
self.session.commit()
except Exception as e:
logger.error("Caught exception '%s'", e)
pass
finally:
self.unlock()
# get the command line of the job from the database; does not need write access
self.lock()
job = self.get_jobs((job_id,))[0]
command_line = job.get_command_line()
exec_dir = job.get_exec_dir()
self.unlock()
logger.info("Starting job %d: %s", job_id, " ".join(command_line))
# execute the command line of the job, and wait until it has finished
try:
result = subprocess.call(command_line, cwd=exec_dir)
logger.info("Job %d finished with result %s", job_id, str(result))
except Exception as e:
logger.error("The job with id '%d' could not be executed: %s", job_id, e)
result = 69 # ASCII: 'E'
# set a new status and the results of the job
try:
self.lock()
jobs = self.get_jobs((job_id,))
if not len(jobs):
# it seems that the job has been deleted in the meanwhile
logger.error("The job with id '%d' could not be found in the database!", job_id)
self.unlock()
return
job = jobs[0]
job.finish(result, array_id)
self.session.commit()
# This might not be working properly, so use with care!
if job.stop_on_failure and job.status == 'failure':
# the job has failed
# stop this and all dependent jobs from execution
dependent_jobs = job.get_jobs_waiting_for_us()
dependent_job_ids = set([dep.unique for dep in dependent_jobs] + [job.unique])
while len(dependent_jobs):
dep = dependent_jobs.pop(0)
new = dep.get_jobs_waiting_for_us()
dependent_jobs += new
dependent_job_ids.update([dep.unique for dep in new])
self.unlock()
deps = sorted(list(dependent_job_ids))
self.stop_jobs(deps)
logger.warn ("Stopped dependent jobs '%s' since this job failed.", str(deps))
except Exception as e:
logger.error("Caught exception '%s'", e)
pass
finally:
if hasattr(self, 'session'):
self.unlock()
[docs] def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False, print_times = False, status=Status, names=None, ids_only=False):
"""Lists the jobs currently added to the database."""
# configuration for jobs
if print_dependencies:
fields = ("job-id", "grid-id", "queue", "status", "job-name", "dependencies", "submitted command line")
lengths = (8, 20, 14, 14, 20, 30, 43)
format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:^%d} {5:^%d} {6:<%d}" % lengths
dependency_length = lengths[4]
else:
fields = ("job-id", "grid-id", "queue", "status", "job-name", "submitted command line")
lengths = (8, 20, 14, 14, 20, 43)
format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:^%d} {5:<%d}" % lengths
dependency_length = 0
# if ids_only:
# self.lock()
# for job in self.get_jobs():
# print(job.unique, end=" ")
# self.unlock()
# return
array_format = "{0:^%d} {1:>%d} {2:^%d} {3:^%d}" % lengths[:4]
delimiter = format.format(*['='*k for k in lengths])
array_delimiter = array_format.format(*["-"*k for k in lengths[:4]])
header = [fields[k].center(lengths[k]) for k in range(len(lengths))]
# print header
if not ids_only:
print(' '.join(header))
print(delimiter)
self.lock()
for job in self.get_jobs(job_ids):
job.refresh()
if job.status in status and (names is None or job.name in names):
if ids_only:
print(job.unique, end=" ")
else:
print(job.format(format, dependency_length, None if long else 43))
if print_times:
print(times(job))
if (not ids_only) and print_array_jobs and job.array:
print(array_delimiter)
for array_job in job.array:
if array_job.status in status:
print(array_job.format(array_format))
if print_times:
print(times(array_job))
print(array_delimiter)
self.unlock()
[docs] def report(self, job_ids=None, array_ids=None, output=True, error=True, status=Status, name=None):
"""Iterates through the output and error files and write the results to command line."""
def _write_contents(job):
# Writes the contents of the output and error files to command line
out_file, err_file = job.std_out_file(), job.std_err_file()
logger.info("Contents of output file: '%s'" % out_file)
if output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size > 0:
print(open(out_file).read().rstrip())
print("-"*20)
if error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size > 0:
logger.info("Contents of error file: '%s'" % err_file)
print(open(err_file).read().rstrip())
print("-"*40)
def _write_array_jobs(array_jobs):
for array_job in array_jobs:
print("Array Job", str(array_job.id), ("(%s) :"%array_job.machine_name if array_job.machine_name is not None else ":"))
_write_contents(array_job)
self.lock()
# check if an array job should be reported
if array_ids:
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.unique.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if array_jobs: print(array_jobs[0].job)
_write_array_jobs(array_jobs)
else:
# iterate over all jobs
jobs = self.get_jobs(job_ids)
for job in jobs:
if name is not None and job.name != name:
continue
if job.status not in status:
continue
if job.array:
print(job)
_write_array_jobs(job.array)
else:
print(job)
_write_contents(job)
if job.log_dir is not None:
print("-"*60)
self.unlock()
[docs] def delete(self, job_ids, array_ids = None, delete_logs = True, delete_log_dir = False, status = Status, delete_jobs = True):
"""Deletes the jobs with the given ids from the database."""
def _delete_dir_if_empty(log_dir):
if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir):
os.rmdir(log_dir)
logger.info("Removed empty log directory '%s'" % log_dir)
def _delete(job, try_to_delete_dir=False):
# delete the job from the database
if delete_logs:
out_file, err_file = job.std_out_file(), job.std_err_file()
if out_file and os.path.exists(out_file):
os.remove(out_file)
logger.debug("Removed output log file '%s'" % out_file)
if err_file and os.path.exists(err_file):
os.remove(err_file)
logger.debug("Removed error log file '%s'" % err_file)
if try_to_delete_dir:
_delete_dir_if_empty(job.log_dir)
if delete_jobs:
self.session.delete(job)
self.lock()
# check if array ids are specified
if array_ids:
if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.")
array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.unique.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids)))
if array_jobs:
job = array_jobs[0].job
for array_job in array_jobs:
if array_job.status in status:
if delete_jobs:
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique))
_delete(array_job)
if not job.array:
if job.status in status:
if delete_jobs:
logger.info("Deleting job '%d' from the database." % job.unique)
_delete(job, delete_jobs)
else:
# iterate over all jobs
jobs = self.get_jobs(job_ids)
for job in jobs:
# delete all array jobs
if job.array:
for array_job in job.array:
if array_job.status in status:
if delete_jobs:
logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique))
_delete(array_job)
# delete this job
if job.status in status:
if delete_jobs:
logger.info("Deleting job '%d' from the database." % job.unique)
_delete(job, delete_jobs)
self.session.commit()
self.unlock()