Browse Source

broken: refactored cjs automate

cjs3
Stephen Lorenz 3 years ago
parent
commit
ad2557f0a6
  1. 94
      cjs/automate.py
  2. 51
      cjs/core/automate.py

94
cjs/automate.py

@ -38,15 +38,23 @@ from tools.efm import reqbt_to_settings
# Main-entry point to the automate interface
@click.group()
@click.option('--num_workers',
type=int, default=1,
help='Set the number of worker threads.')
@click.option('--max_jobs',
type=int, default=4,
type=click.INT,
default=1,
help='Set the number of worker threads.')
@click.option('--working_dir',
type=click.STRING,
default='.app',
help='Set the working directory.')
@click.option('--resource_dir',
type=click.STRING,
default='.app',
help='Set the working directory.')
@click.pass_context
def automate(ctx, num_workers):
def automate(ctx, num_workers, working_dir, resource_dir):
'''Bulk test systems.'''
ctx.obj['num_workers'] = num_workers
ctx.obj['working_dir'] = working_dir
ctx.obj['resource_dir'] = resource_dir
# Main-entry point to the EuroForMix subcommand
@automate.command()
@ -55,48 +63,28 @@ def automate(ctx, num_workers):
def efm(ctx, database_name):
'''Run multisampleUsage.R.\f'''
# initialize a profile that will be used for each worker
base_profile = {
'resource_dir': ctx.obj['resource_dir'],
'worker_name': None,
'database_name': database_name,
'working_dir': database_location(database_name),
'batch_dir': current_batch(database_name).name,
'global_lock': None,
'global_status': AutomateStatus()
}
num_workers = ctx.obj['num_workers']
working_dir = ctx.obj['working_dir']
resource_dir = ctx.obj['resource_dir']
try:
# pull all pending jobs from the database
#job_list = pending_jobs(database_name)
#print('hi')
# spawn a size num_workers worker cluster of efm workers
spawn_workers(database_name,
efm_worker,
base_profile,
ctx.obj['num_workers'])
spawn_workers(efm_qualitative,
working_dir,
resource_dir,
num_workers)
except Exception as e:
raise
# TODO: find a better way around passing profile - possibly object decorator?
@generic_worker
def efm_worker(profile, job):
# start time to calculate average runtime
start_time = time.time()
def efm_qualitative(state, job):
# extract the evidence and comparison table from the job
job_id, table_list = job
evidence_table, comparison_table = table_list
lock = profile['global_lock']
lock.acquire()
try:
set_job_status(profile['database_name'], job_id, 2)
except Exception as e:
print(e)
finally:
lock.release()
# load job data
evidence_data = read_json(evidence_table.path)
comparison_data = read_json(comparison_table.path)
@ -118,9 +106,7 @@ def efm_worker(profile, job):
write_csv(comparison_file, comparison_csv, delimiter=',')
test_settings = {
'resource_dir': profile['resource_dir'],
'working_dir': working_dir,
'batch_dir': profile['batch_dir'],
'evidence_file': evidence_file,
'comparison_file': comparison_file,
**reqbt_to_settings(evidence_data, comparison_data)
@ -134,41 +120,15 @@ def efm_worker(profile, job):
click.echo('* Starting %s and %s' % (evidence_table.name, comparison_table.name))
try:
# start time to calculate average runtime
start_time = time.time()
out, err = run_process(['../euroformix.headless/R/lrmix.R', test_file], 1800) # 300 seconds = 5 minute timeout
decoded_output = out[0].decode('utf-8')
#click.echo(decoded_output)
if err:
send_error('%s: R Error' % profile['worker_name'],
'Hi,\n\nAn R error occurred:\n %s\n\nSorry about that.\n\nSincerely,\n%s' % (decoded_output, profile['worker_name']))
lock.acquire()
try:
set_job_status(profile['database_name'], job_id, 4)
finally:
lock.release()
end_time = time.time()
elapsed = end_time-start_time
if not err:
else:
end_time = time.time()
elapsed = end_time-start_time
# TODO: replace this a state database, it is much better and more robust
lock.acquire()
try:
status = profile['global_status']
average_runtime = status.update_status(elapsed)
set_job_status(profile['database_name'], job_id, 3)
click.echo('Average Runtime: %f\n' % average_runtime)
except Exception as e:
print(e)
finally:
lock.release()
except Exception as e:
name = profile['worker_name']
send_error('%s: Exception' % profile['worker_name'],
'Hi,\n\nAn unexpected error occurred:\n %s\n\nSorry about that.\n\nSincerely,\n%s' % (e, profile['worker_name']))
lock.acquire()
try:
set_job_status(profile['database_name'], job_id, 4)
finally:
lock.release()
raise

51
cjs/core/automate.py

@ -43,23 +43,23 @@ def scan_dir(path, *args, **kwargs):
obs.start()
return fss, obs
class AutomateStatus:
def __init__(self):
self.completed_jobs = 0
self.runtimes = []
self.runtime = 0
def update_status(self, runtime):
self.runtimes.append(runtime)
self.completed_jobs = len(self.runtimes)
self.runtime = math.fsum(self.runtimes)/self.completed_jobs
return self.runtime
def send_error(subject, message):
send_email('resources/mailer.json',
'cucjsoftware@gmail.com',
subject, message)
class AutomateState:
def __init__(self, working_dir, resource_dir):
self.working_dir = working_dir
self.resource_dir = resource_dir
# initialize a universal lock for all the workers
self.global_lock = threading.Lock()
# queue containing jobs
self.in_queue = queue.Queue()
# queue containg results
self.out_queue = queue.Queue()
def run_process(command, timeout):
# modified from https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate
@ -73,32 +73,27 @@ def run_process(command, timeout):
def generic_worker(fn):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
def wrapper(my_profile, job_queue, *args, **kwargs):
def wrapper(state, *args, **kwargs):
while True:
job = job_queue.get() # get next job
job = state.in_queue.get() # get next job
if job is None: # is this the end of the queue?
break
result = fn(my_profile, job, *args, **kwargs) # run routine
job_queue.task_done() # signal that the job was completed
result = fn(state, *args, **kwargs) # run routine
state.in_queue.task_done() # signal that the job was completed
return result
return wrapper
def spawn_workers(database_name, target_fn, base_profile, num_threads, *args, **kwargs):
def spawn_workers(target_fn, working_dir, resource_dir, num_threads, *args, **kwargs):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
# initialize thread-safe queue
job_queue = queue.Queue()
# initialize a universal lock for all the workers
base_profile['global_lock'] = threading.Lock()
state = AutomateState(working_dir, resource_dir)
# initialize thread list
threads = []
for i in range(num_threads):
my_profile = dict(base_profile)
my_profile['worker_name'] = 'Worker %d' % (i+1)
t = threading.Thread(target=target_fn,
args=(my_profile, job_queue, *args),
args=(state, *args),
kwargs={**kwargs})
t.start() # begin worker thread
threads.append(t) # store thread object
@ -107,10 +102,10 @@ def spawn_workers(database_name, target_fn, base_profile, num_threads, *args, **
# the foreman tracks the status and progress of the workers
# it communicates this information with the CJS host
while True:
pass
# block until all tasks are done
job_queue.join()
state.queue.join()
# stop workers
for i in range(num_threads):

Loading…
Cancel
Save