Browse Source

removed files

cjs3
Stephen Lorenz 3 years ago
parent
commit
9abf77393f
  1. 0
      cjs/convert.py
  2. 174
      cjsd/automate.py
  3. 48
      cjsd/client.py
  4. 98
      cjsd/core/automate.py

0
cjsd/convert.py → cjs/convert.py

174
cjsd/automate.py

@ -1,174 +0,0 @@
#!/usr/bin/env python3
# standard library
import time
import signal
# pip
import click
# local
import core.automate
from core.automate import generic_worker
from core.automate import spawn_workers
from core.automate import run_process
from core.automate import send_error
from core.automate import AutomateStatus
import core.database
from core.database import database_location
from core.database import pending_jobs
from core.database import current_batch
from core.database import insert_result
from core.database import next_job
from core.database import set_job_status
import utils.io
from utils.io import write_csv
from utils.io import read_json
from utils.io import write_json
import tools.efm
from tools.efm import reqbt_to_evidence
from tools.efm import reqbt_to_comparison
from tools.efm import reqbt_to_settings
# BEGIN: Command-line interface wrapper
# Main-entry point to the automate interface
@click.group()
@click.option('--num_workers',
type=int, default=1,
help='Set the number of worker threads.')
@click.pass_context
def automate(ctx, num_workers):
'''Bulk test systems.'''
ctx.obj['num_workers'] = num_workers
# Main-entry point to the EuroForMix subcommand
@automate.command()
@click.argument('database_name')
@click.pass_context
def efm(ctx, database_name):
'''Run multisampleUsage.R.\f'''
# initialize a profile that will be used for each worker
base_profile = {
'resource_dir': ctx.obj['resource_dir'],
'worker_name': None,
'database_name': database_name,
'working_dir': database_location(database_name),
'batch_dir': current_batch(database_name).name,
'global_lock': None,
'global_status': AutomateStatus()
}
try:
# pull all pending jobs from the database
#job_list = pending_jobs(database_name)
#print('hi')
# spawn a size num_workers worker cluster of efm workers
spawn_workers(database_name,
efm_worker,
base_profile,
ctx.obj['num_workers'])
except Exception as e:
raise
# TODO: find a better way around passing profile - possibly object decorator?
@generic_worker
def efm_worker(profile, job):
# start time to calculate average runtime
start_time = time.time()
# extract the evidence and comparison table from the job
job_id, table_list = job
evidence_table, comparison_table = table_list
lock = profile['global_lock']
lock.acquire()
try:
set_job_status(profile['database_name'], job_id, 2)
except Exception as e:
print(e)
finally:
lock.release()
# load job data
evidence_data = read_json(evidence_table.path)
comparison_data = read_json(comparison_table.path)
# convert job data to euroformix input format
evidence_tsv = reqbt_to_evidence(evidence_data)
comparison_csv = reqbt_to_comparison(comparison_data)
# determine where to save the input files
working_dir = profile['working_dir']
tmp_dir = '%s/tmp' % working_dir
# determine filenames
evidence_file = '%s/%s.tsv' % (tmp_dir, evidence_table.name)
comparison_file = '%s/%s.csv' % (tmp_dir, comparison_table.name)
# write input file to tmp
write_csv(evidence_file, evidence_tsv, delimiter='\t')
write_csv(comparison_file, comparison_csv, delimiter=',')
test_settings = {
'resource_dir': profile['resource_dir'],
'working_dir': working_dir,
'batch_dir': profile['batch_dir'],
'evidence_file': evidence_file,
'comparison_file': comparison_file,
**reqbt_to_settings(evidence_data, comparison_data)
}
# determine test filename
test_file = '%s/%s-settings.json' % (tmp_dir, comparison_table.name)
# write settings file to tmp
write_json(test_file, test_settings)
click.echo('* Starting %s and %s' % (evidence_table.name, comparison_table.name))
try:
out, err = run_process(['../euroformix.headless/R/lrmix.R', test_file], 1800) # 300 seconds = 5 minute timeout
decoded_output = out[0].decode('utf-8')
#click.echo(decoded_output)
if err:
send_error('%s: R Error' % profile['worker_name'],
'Hi,\n\nAn R error occurred:\n %s\n\nSorry about that.\n\nSincerely,\n%s' % (decoded_output, profile['worker_name']))
lock.acquire()
try:
set_job_status(profile['database_name'], job_id, 4)
finally:
lock.release()
else:
end_time = time.time()
elapsed = end_time-start_time
# TODO: replace this a state database, it is much better and more robust
lock.acquire()
try:
status = profile['global_status']
average_runtime = status.update_status(elapsed)
set_job_status(profile['database_name'], job_id, 3)
click.echo('Average Runtime: %f\n' % average_runtime)
except Exception as e:
print(e)
finally:
lock.release()
except Exception as e:
name = profile['worker_name']
send_error('%s: Exception' % profile['worker_name'],
'Hi,\n\nAn unexpected error occurred:\n %s\n\nSorry about that.\n\nSincerely,\n%s' % (e, profile['worker_name']))
lock.acquire()
try:
set_job_status(profile['database_name'], job_id, 4)
finally:
lock.release()
raise

48
cjsd/client.py

@ -1,48 +0,0 @@
#!/usr/bin/env python3
import click
import json
import requests
import utils.io
from utils.io import read_json
headers = {
'Content-Type': 'application/json'
}
payload = {
'database': 'steve',
'type': 'evidence',
'data': None
}
data = read_json('cjs/data/reqbt-fps.json')
with click.progressbar(data.items()) as bar:
for key, values in bar:
payload['data'] = values
response = requests.post('http://127.0.0.1:8000/database/upload',
headers=headers,
json=payload)
#print(response.content.decode('utf-8'))
payload = {
'database': 'steve',
'type': 'comparison',
'data': None
}
data = read_json('cjs/data/jfs2003id.json')
t = []
with click.progressbar(data.items()) as bar:
for key, values in bar:
for v in values:
t.append(v)
with click.progressbar(t) as bar:
for z in bar:
payload['data'] = z
response = requests.post('http://127.0.0.1:8000/database/upload',
headers=headers,
json=payload)
#print(response.content.decode('utf-8'))

98
cjsd/core/automate.py

@ -1,98 +0,0 @@
#!/usr/bin/env python3
# standard library
import threading
import queue
import math
import subprocess
from subprocess import TimeoutExpired
# pip
import click
# local
from core.database import fill_job_queue
import utils.io
from utils.io import read_json
import utils.net
from utils.net import send_email
class AutomateStatus:
def __init__(self):
self.completed_jobs = 0
self.runtimes = []
self.runtime = 0
def update_status(self, runtime):
self.runtimes.append(runtime)
self.completed_jobs = len(self.runtimes)
self.runtime = math.fsum(self.runtimes)/self.completed_jobs
return self.runtime
def send_error(subject, message):
send_email('resources/mailer.json',
'cucjsoftware@gmail.com',
subject, message)
def run_process(command, timeout):
# modified from https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
try:
out = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
out = proc.communicate()
return out, proc.returncode
def generic_worker(fn):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
def wrapper(my_profile, job_queue, *args, **kwargs):
while True:
job = job_queue.get() # get next job
if job is None: # is this the end of the queue?
break
result = fn(my_profile, job, *args, **kwargs) # run routine
job_queue.task_done() # signal that the job was completed
return result
return wrapper
def spawn_workers(database_name, target_fn, base_profile, num_threads, *args, **kwargs):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
# initialize thread-safe queue
job_queue = queue.Queue()
# initialize a universal lock for all the workers
base_profile['global_lock'] = threading.Lock()
# initialize thread list
threads = []
for i in range(num_threads):
my_profile = dict(base_profile)
my_profile['worker_name'] = 'Worker %d' % (i+1)
t = threading.Thread(target=target_fn,
args=(my_profile, job_queue, *args),
kwargs={**kwargs})
t.start() # begin worker thread
threads.append(t) # store thread object
# insert jobs into queue
fill_job_queue(database_name, job_queue)
'''
# insert jobs into queue
for job in job_list:
job_queue.put(job)
'''
# block until all tasks are done
job_queue.join()
# stop workers
for i in range(num_threads):
job_queue.put(None)
for t in threads:
t.join()
Loading…
Cancel
Save