Source code for gridtk.manager


from __future__ import print_function

import os, sys
import subprocess
import socket # to get the host name
from .models import Base, Job, ArrayJob, Status, times
from .tools import logger


import sqlalchemy
from distutils.version import LooseVersion


# Defines an equivalent `which` function to dig-out the location of excutables
import shutil
if sys.version_info[:2] >= (3, 3):

  which = shutil.which

else: #define our own

[docs] def which(cmd, mode=os.F_OK|os.X_OK, path=None): from distutils.spawn import find_executable candidate = find_executable(cmd, path) st = os.stat(candidate) if bool(st.st_mode & mode): return candidate return None
[docs]class JobManager: """This job manager defines the basic interface for handling jobs in the SQL database.""" def __init__(self, database = 'submitted.sql3', wrapper_script = None, debug = False): self._database = os.path.realpath(database) self._engine = sqlalchemy.create_engine("sqlite:///"+self._database, connect_args={'timeout': 600}, echo=debug) self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine) # store the command that this job manager was called with if wrapper_script is None: wrapper_script = 'jman' if not os.path.exists(wrapper_script): bindir = os.path.join(os.path.realpath(os.curdir), 'bin') wrapper_script = which(wrapper_script, path=os.pathsep.join((bindir, os.environ['PATH']))) if wrapper_script is None: raise IOError("Could not find the installation path of gridtk. Please specify it in the wrapper_script parameter of the JobManager.") if not os.path.exists(wrapper_script): raise IOError("Your wrapper_script cannot be found. Jobs will not be executable.") self.wrapper_script = wrapper_script def __del__(self): # remove the database if it is empty if os.path.isfile(self._database): # in errornous cases, the session might still be active, so don't create a deadlock here! if not hasattr(self, 'session'): self.lock() job_count = len(self.get_jobs()) self.unlock() if not job_count: logger.debug("Removed database file '%s' since database is empty" % self._database) os.remove(self._database)
[docs] def lock(self): """Generates (and returns) a blocking session object to the database.""" if hasattr(self, 'session'): raise RuntimeError('Dead lock detected. Please do not try to lock the session when it is already locked!') if LooseVersion(sqlalchemy.__version__) < LooseVersion('0.7.8'): # for old sqlalchemy versions, in some cases it is required to re-generate the engine for each session self._engine = sqlalchemy.create_engine("sqlite:///"+self._database) self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine) # create the database if it does not exist yet if not os.path.exists(self._database): self._create() # now, create a session self.session = self._session_maker() logger.debug("Created new database session to '%s'" % self._database) return self.session
[docs] def unlock(self): """Closes the session to the database.""" if not hasattr(self, 'session'): raise RuntimeError('Error detected! The session that you want to close does not exist any more!') logger.debug("Closed database session of '%s'" % self._database) self.session.close() del self.session
def _create(self): """Creates a new and empty database.""" from .tools import makedirs_safe # create directory for sql database makedirs_safe(os.path.dirname(self._database)) # create all the tables Base.metadata.create_all(self._engine) logger.debug("Created new empty database '%s'" % self._database)
[docs] def get_jobs(self, job_ids = None): """Returns a list of jobs that are stored in the database.""" if job_ids is not None and len(job_ids) == 0: return [] q = self.session.query(Job) if job_ids is not None: q = q.filter(Job.unique.in_(job_ids)) return sorted(list(q), key=lambda job: job.unique)
def _job_and_array(self, job_id, array_id = None): # get the job (and the array job) with the given id(s) job = self.get_jobs((job_id,)) if len(job) > 1: logger.error("%d jobs with the same ID '%d' were detected in the database"%(len(job), job_id)) elif not len(job): logger.warn("Job with ID '%d' was not found in the database."%job_id) return (None, None) job = job[0] unique_id = job.unique if array_id is not None: array_job = list(self.session.query(ArrayJob).filter(ArrayJob.job_id == unique_id).filter(ArrayJob.id == array_id)) assert (len(array_job) == 1) return (job, array_job[0]) else: return (job, None)
[docs] def run_job(self, job_id, array_id = None): """This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable.""" # set the job's status in the database try: # get the job from the database self.lock() jobs = self.get_jobs((job_id,)) if not len(jobs): # it seems that the job has been deleted in the meanwhile return job = jobs[0] # get the machine name we are executing on; this might only work at idiap machine_name = socket.gethostname() # set the 'executing' status to the job job.execute(array_id, machine_name) self.session.commit() except Exception as e: logger.error("Caught exception '%s'", e) pass finally: self.unlock() # get the command line of the job from the database; does not need write access self.lock() job = self.get_jobs((job_id,))[0] command_line = job.get_command_line() exec_dir = job.get_exec_dir() self.unlock() logger.info("Starting job %d: %s", job_id, " ".join(command_line)) # execute the command line of the job, and wait until it has finished try: result = subprocess.call(command_line, cwd=exec_dir) logger.info("Job %d finished with result %s", job_id, str(result)) except Exception as e: logger.error("The job with id '%d' could not be executed: %s", job_id, e) result = 69 # ASCII: 'E' # set a new status and the results of the job try: self.lock() jobs = self.get_jobs((job_id,)) if not len(jobs): # it seems that the job has been deleted in the meanwhile logger.error("The job with id '%d' could not be found in the database!", job_id) self.unlock() return job = jobs[0] job.finish(result, array_id) self.session.commit() # This might not be working properly, so use with care! if job.stop_on_failure and job.status == 'failure': # the job has failed # stop this and all dependent jobs from execution dependent_jobs = job.get_jobs_waiting_for_us() dependent_job_ids = set([dep.unique for dep in dependent_jobs] + [job.unique]) while len(dependent_jobs): dep = dependent_jobs.pop(0) new = dep.get_jobs_waiting_for_us() dependent_jobs += new dependent_job_ids.update([dep.unique for dep in new]) self.unlock() deps = sorted(list(dependent_job_ids)) self.stop_jobs(deps) logger.warn ("Stopped dependent jobs '%s' since this job failed.", str(deps)) except Exception as e: logger.error("Caught exception '%s'", e) pass finally: if hasattr(self, 'session'): self.unlock()
[docs] def list(self, job_ids, print_array_jobs = False, print_dependencies = False, long = False, print_times = False, status=Status, names=None, ids_only=False): """Lists the jobs currently added to the database.""" # configuration for jobs if print_dependencies: fields = ("job-id", "grid-id", "queue", "status", "job-name", "dependencies", "submitted command line") lengths = (8, 20, 14, 14, 20, 30, 43) format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:^%d} {5:^%d} {6:<%d}" % lengths dependency_length = lengths[4] else: fields = ("job-id", "grid-id", "queue", "status", "job-name", "submitted command line") lengths = (8, 20, 14, 14, 20, 43) format = "{0:^%d} {1:^%d} {2:^%d} {3:^%d} {4:^%d} {5:<%d}" % lengths dependency_length = 0 # if ids_only: # self.lock() # for job in self.get_jobs(): # print(job.unique, end=" ") # self.unlock() # return array_format = "{0:^%d} {1:>%d} {2:^%d} {3:^%d}" % lengths[:4] delimiter = format.format(*['='*k for k in lengths]) array_delimiter = array_format.format(*["-"*k for k in lengths[:4]]) header = [fields[k].center(lengths[k]) for k in range(len(lengths))] # print header if not ids_only: print(' '.join(header)) print(delimiter) self.lock() for job in self.get_jobs(job_ids): job.refresh() if job.status in status and (names is None or job.name in names): if ids_only: print(job.unique, end=" ") else: print(job.format(format, dependency_length, None if long else 43)) if print_times: print(times(job)) if (not ids_only) and print_array_jobs and job.array: print(array_delimiter) for array_job in job.array: if array_job.status in status: print(array_job.format(array_format)) if print_times: print(times(array_job)) print(array_delimiter) self.unlock()
[docs] def report(self, job_ids=None, array_ids=None, output=True, error=True, status=Status, name=None): """Iterates through the output and error files and write the results to command line.""" def _write_contents(job): # Writes the contents of the output and error files to command line out_file, err_file = job.std_out_file(), job.std_err_file() logger.info("Contents of output file: '%s'" % out_file) if output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size > 0: print(open(out_file).read().rstrip()) print("-"*20) if error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size > 0: logger.info("Contents of error file: '%s'" % err_file) print(open(err_file).read().rstrip()) print("-"*40) def _write_array_jobs(array_jobs): for array_job in array_jobs: print("Array Job", str(array_job.id), ("(%s) :"%array_job.machine_name if array_job.machine_name is not None else ":")) _write_contents(array_job) self.lock() # check if an array job should be reported if array_ids: if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.") array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.unique.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids))) if array_jobs: print(array_jobs[0].job) _write_array_jobs(array_jobs) else: # iterate over all jobs jobs = self.get_jobs(job_ids) for job in jobs: if name is not None and job.name != name: continue if job.status not in status: continue if job.array: print(job) _write_array_jobs(job.array) else: print(job) _write_contents(job) if job.log_dir is not None: print("-"*60) self.unlock()
[docs] def delete(self, job_ids, array_ids = None, delete_logs = True, delete_log_dir = False, status = Status, delete_jobs = True): """Deletes the jobs with the given ids from the database.""" def _delete_dir_if_empty(log_dir): if log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir): os.rmdir(log_dir) logger.info("Removed empty log directory '%s'" % log_dir) def _delete(job, try_to_delete_dir=False): # delete the job from the database if delete_logs: out_file, err_file = job.std_out_file(), job.std_err_file() if out_file and os.path.exists(out_file): os.remove(out_file) logger.debug("Removed output log file '%s'" % out_file) if err_file and os.path.exists(err_file): os.remove(err_file) logger.debug("Removed error log file '%s'" % err_file) if try_to_delete_dir: _delete_dir_if_empty(job.log_dir) if delete_jobs: self.session.delete(job) self.lock() # check if array ids are specified if array_ids: if len(job_ids) != 1: logger.error("If array ids are specified exactly one job id must be given.") array_jobs = list(self.session.query(ArrayJob).join(Job).filter(Job.unique.in_(job_ids)).filter(Job.unique == ArrayJob.job_id).filter(ArrayJob.id.in_(array_ids))) if array_jobs: job = array_jobs[0].job for array_job in array_jobs: if array_job.status in status: if delete_jobs: logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique)) _delete(array_job) if not job.array: if job.status in status: if delete_jobs: logger.info("Deleting job '%d' from the database." % job.unique) _delete(job, delete_jobs) else: # iterate over all jobs jobs = self.get_jobs(job_ids) for job in jobs: # delete all array jobs if job.array: for array_job in job.array: if array_job.status in status: if delete_jobs: logger.debug("Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique)) _delete(array_job) # delete this job if job.status in status: if delete_jobs: logger.info("Deleting job '%d' from the database." % job.unique) _delete(job, delete_jobs) self.session.commit() self.unlock()