Browse Source

fixed cjs automate client

cjs3
Stephen Lorenz 3 years ago
parent
commit
6c77e6cd6b
  1. 0
      cjs/__init__.py
  2. 4
      cjs/app.py
  3. 76
      cjs/automate.py
  4. 96
      cjs/core/automate.py
  5. 217
      cjs/fmt/efm.py
  6. 0
      cjs/models/__init__.py
  7. 0
      cjs/models/automate.py
  8. 3
      cjsd/app.py
  9. 1
      cjsd/core/database.py
  10. 17
      cjsd/resources/automate.py

0
cjs/__init__.py

4
cjs/app.py

@ -7,9 +7,11 @@ import click
# TODO: replace with plugin manager if possible
import cjs
from cjs import database
from cjs import automate
COMMAND_LIST = [
database.cli
database.cli,
automate.cli
]
# main-entry point to the entire command-line interface

76
cjs/automate.py

@ -8,71 +8,64 @@ import signal
import click
# local
import core.automate
from core.automate import generic_worker
from core.automate import spawn_workers
from core.automate import run_process
from core.automate import send_error
from core.automate import AutomateStatus
import core.database
from core.database import database_location
from core.database import pending_jobs
from core.database import current_batch
from core.database import insert_result
from core.database import next_job
from core.database import set_job_status
import cjs.core.automate
from cjs.core.automate import generic_worker
from cjs.core.automate import spawn_workers
from cjs.core.automate import run_process
from cjs.core.automate import send_error
import utils.io
from utils.io import write_csv
from utils.io import read_json
from utils.io import write_json
import tools.efm
from tools.efm import reqbt_to_evidence
from tools.efm import reqbt_to_comparison
from tools.efm import reqbt_to_settings
import cjs.fmt.efm
from cjs.fmt.efm import reqbt_to_evidence
from cjs.fmt.efm import reqbt_to_comparison
from cjs.fmt.efm import reqbt_to_settings
# BEGIN: Command-line interface wrapper
# Main-entry point to the automate interface
@click.group()
@click.group(name='automate')
@click.option('--num_workers',
type=click.INT,
default=1,
help='Set the number of worker threads.')
@click.option('--working_dir',
@click.option('--data_dir',
type=click.STRING,
default='.app',
default='data1',
help='Set the working directory.')
@click.option('--resource_dir',
type=click.STRING,
default='.app',
help='Set the working directory.')
@click.pass_context
def automate(ctx, num_workers, working_dir, resource_dir):
def cli(ctx, num_workers, data_dir, resource_dir):
'''Bulk test systems.'''
ctx.obj['num_workers'] = num_workers
ctx.obj['working_dir'] = working_dir
ctx.obj['data_dir'] = data_dir
ctx.obj['resource_dir'] = resource_dir
# Main-entry point to the EuroForMix subcommand
@automate.command()
@cli.command()
@click.argument('database_name')
@click.pass_context
def efm(ctx, database_name):
'''Run multisampleUsage.R.\f'''
num_workers = ctx.obj['num_workers']
working_dir = ctx.obj['working_dir']
data_dir = ctx.obj['data_dir']
resource_dir = ctx.obj['resource_dir']
database = {
'host': ctx.obj['host'],
'name': database_name
}
try:
# pull all pending jobs from the database
#job_list = pending_jobs(database_name)
#print('hi')
# spawn a size num_workers worker cluster of efm workers
spawn_workers(efm_qualitative,
working_dir,
database,
data_dir,
resource_dir,
num_workers)
except Exception as e:
@ -82,24 +75,16 @@ def efm(ctx, database_name):
@generic_worker
def efm_qualitative(state, job):
# extract the evidence and comparison table from the job
job_id, table_list = job
evidence_table, comparison_table = table_list
# load job data
evidence_data = read_json(evidence_table.path)
comparison_data = read_json(comparison_table.path)
evidence_data = job['evidence']
comparison_data = job['comparison']
# convert job data to euroformix input format
evidence_tsv = reqbt_to_evidence(evidence_data)
comparison_csv = reqbt_to_comparison(comparison_data)
# determine where to save the input files
working_dir = profile['working_dir']
tmp_dir = '%s/tmp' % working_dir
# determine filenames
evidence_file = '%s/%s.tsv' % (tmp_dir, evidence_table.name)
comparison_file = '%s/%s.csv' % (tmp_dir, comparison_table.name)
evidence_file = '%s/%s.tsv' % (state.input_dir, evidence_data['name'])
comparison_file = '%s/%s.csv' % (state.input_dir, comparison_data['name'])
# write input file to tmp
write_csv(evidence_file, evidence_tsv, delimiter='\t')
@ -113,21 +98,22 @@ def efm_qualitative(state, job):
}
# determine test filename
test_file = '%s/%s-settings.json' % (tmp_dir, comparison_table.name)
test_file = '%s/%s-%s.json' % (tmp_dir, evidence_data['name'], comparison_data['name'])
# write settings file to tmp
write_json(test_file, test_settings)
click.echo('* Starting %s and %s' % (evidence_table.name, comparison_table.name))
click.echo('* Starting %s and %s' % (evidence_data['name'], comparison_data['name']))
try:
# start time to calculate average runtime
# working_dir = ctx.obj['working_dir'] start time to calculate average runtime
start_time = time.time()
out, err = run_process(['../euroformix.headless/R/lrmix.R', test_file], 1800) # 300 seconds = 5 minute timeout
end_time = time.time()
elapsed = end_time-start_time
if not err:
print('yay')
else:
pass
except Exception as e:
raise

96
cjs/core/automate.py

@ -3,7 +3,7 @@
# standard library
import threading
import queue
import math
import json
import subprocess
from subprocess import TimeoutExpired
@ -11,37 +11,31 @@ from subprocess import TimeoutExpired
# pip
import click
# local
from core.database import fill_job_queue
import watchdog.observers
from watchdog.observers import Observer
import watchdog.events
from watchdog.events import FileSystemEventHandler
# local
import utils.io
from utils.io import read_json
import utils.fs
from utils.fs import create_dir
import utils.net
from utils.net import send_email
import utils.http
from utils.http import rest_post
class FileSystemScanner(FileSystemEventHandler):
def __init__(self):
self.tracked_files = []
def process(self, event):
print('Source Path: %s' % event.src_path) # debug
print('Event Type: %s' % event.event_type)
self.pending_files = queue.Queue()
def on_created(self, event):
self.tracked_files.append(event.src_path)
print(' Tracked: %s' % event.src_path)
def on_deleted(self, event):
self.tracked_files.remove(event.src_path)
print(' Untracked: %s' % event.src_path)
def scan_dir(path, *args, **kwargs):
fss = FileSystemScanner()
obs = Observer()
obs.schedule(fss, path=path)
obs.start()
return fss, obs
self.put(event.src_path)
def send_error(subject, message):
send_email('resources/mailer.json',
@ -49,17 +43,21 @@ def send_error(subject, message):
subject, message)
class AutomateState:
def __init__(self, working_dir, resource_dir):
self.working_dir = working_dir
def __init__(self, input_dir, output_dir, resource_dir):
# pg software inputs
self.input_dir = input_dir
# results files
self.output_dir = output_dir
# pg software settings and other info
self.resource_dir = resource_dir
# initialize a universal lock for all the workers
self.global_lock = threading.Lock()
# queue containing jobs
self.in_queue = queue.Queue()
# queue containg results
self.out_queue = queue.Queue()
# a queue containing jobs
self.input_queue = queue.Queue()
# a queue containg results
self.output_queue = queue.Queue()
def run_process(command, timeout):
# modified from https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate
@ -75,40 +73,54 @@ def generic_worker(fn):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
def wrapper(state, *args, **kwargs):
while True:
job = state.in_queue.get() # get next job
job = state.input_queue.get() # get next job
if job is None: # is this the end of the queue?
break
result = fn(state, *args, **kwargs) # run routine
state.in_queue.task_done() # signal that the job was completed
break # if so, be done :)
result = fn(state, job, *args, **kwargs) # call the job routine
state.input_queue.task_done() # signal that the job was completed
return result
return wrapper
def spawn_workers(target_fn, working_dir, resource_dir, num_threads, *args, **kwargs):
def spawn_workers(target_fn, database, working_dir, resource_dir, num_workers, **kwargs):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
# initialize thread-safe queue
state = AutomateState(working_dir, resource_dir)
# retrieve the current batch
host = database['host']
name = database['name']
resp = rest_post('%s/automate/current_batch' % host, {'database': name})
batch = json.loads(resp.content.decode('utf-8'))['name']
# resolve where to save the data files
input_dir = '%s/%s/input' % (working_dir, batch)
create_dir(input_dir)
output_dir = '%s/%s/output' % (working_dir, batch)
create_dir(output_dir)
# initialize state object that will act as a shared resource for the workers
state = AutomateState(input_dir, output_dir, resource_dir)
# initialize thread list
threads = []
for i in range(num_threads):
for i in range(num_workers):
t = threading.Thread(target=target_fn,
args=(state, *args),
args=(state,),
kwargs={**kwargs})
t.start() # begin worker thread
threads.append(t) # store thread object
# initialize 'foreman' thread
# the foreman tracks the status and progress of the workers
# initialize 'foreman' or supervisor thread
# the foreman the status and progress of the workers
# it communicates this information with the CJS host
fss = FileSystemScanner()
obs = Observer()
obs.schedule(fss, path=output_dir)
obs.start()
while True:
pass
# block until all tasks are done
state.queue.join()
obs.stop()
# stop workers
for i in range(num_threads):
job_queue.put(None)
state.input_queue.put(None)
for t in threads:
t.join()

217
cjs/fmt/efm.py

@ -0,0 +1,217 @@
#!/usr/bin/env python3
# initialize header for a euroformix input file
# euroformix evidence files are tab (\t) delimited
evidence_header = [
'Sample Name', # human-readable name - e.g. reqbt-1
'Marker', # loci - e.g. vWA
# 'Allele 1', # value - e.g. 12.3
# 'Allele 2',
# 'Allele 3',
# 'Allele 4',
# 'Allele 5',
# 'Allele 6',
# 'Height 1', # peak height (rfu) - e.g. 200
# 'Height 2',
# 'Height 3',
# 'Height 4',
# 'Height 5',
# 'Height 6',
'ADO', # ?
'UD1', # ?
'' # tab at the end
]
reqbt_loci = {
'D8S1179',
'D21S11',
'D7S820',
'CSF1PO',
'D3S1358',
'TH01',
'D13S317',
'D16S539',
'D2S1338',
'D19S433',
'vWA',
'TPOX',
'D18S51',
'D5S818',
'FGA'
}
# initialize header for euroformix comparison
# euroformix comparison files are comma (,) delimited
comparison_header = [
'SampleName', # human-readable name - e.g. reqbt-1-1
'Marker', # loci - e.g. vWA
'Allele1', # value - e.g. 12.3
'Allele2'
]
# initialize loci supported by euroformix's evidence files
# NOTE: this is in the order they are seen in stain.txt
# NOTE: because I am unsure if order matters, there is comparison_loci as well
evidence_loci = [
'AMEL',
'D3S1358',
'TH01',
'D21S11',
'D18S51',
'D10S1248',
'D1S1656',
'D2S1338',
'D16S539',
'D22S1045',
'vWA',
'D8S1179',
'FGA',
'D2S441',
'D12S391',
'D19S433',
'SE33'
]
# initialize loci supported by euroformix's comparison files
# NOTE: this is in the order they are seen in refs.csv
# NOTE: because I am unsure if order matters, there is evidence_loci as well
comparison_loci = [
'D3S1358',
'TH01',
'D21S11',
'D18S51',
'D10S1248',
'D1S1656',
'D2S1338',
'D16S539',
'D22S1045',
'VWA', # NOTE: caps?
'D8S1179',
'FGA',
'D2S441',
'D12S391',
'D19S433',
'SE33'
]
def any_bad_values(values):
bad_values = ['NEG', 'INC']
if any(bad_value in values for bad_value in bad_values):
return True
else:
return False
def reqbt_to_settings(evidence_json, comparison_json):
settings = {
'test_name': comparison_json['name'],
'evidence_name': evidence_json['name'],
'comparison_name': comparison_json['name'],
'num_contributors': evidence_json['contributors'],
'num_replicates': len(evidence_json['replicates'])
}
return settings
def reqbt_to_evidence(evidence_json):
# initialize list to store data in tsv format
evidence_tsv = []
# initialize some variables to reduce magic numbers
max_alleles = 0
# initialize name from json data
i = 1
name = evidence_json['name']
for replicate in evidence_json['replicates']:
for locus in reqbt_loci: # TODO: merge evidence_loci. this was formerly evidence_loci
# initialize temporary row to append to replicate_tv
row = []
# add 'Sample Name'
row.append('%s_%d' % (name, i))
# add 'Marker'
row.append(locus)
try:
# try to see if locus exists in the reqbt data
# if not, jump to except KeyError
values = replicate[locus]
# calculate how many allele headers we will need (for the longest values)
if any_bad_values(values):
continue
len_values = len(values)
if max_alleles < len_values:
max_alleles = len_values
# we will zip this loop with a range of 6 to ENSURE
# only 6 values are added as the loop will end on 6
# iterations
row.extend(values)
except KeyError: # locus didn't exist, skip it
pass
except Exception as e: # something went seriously wrong
print('Error: An unexepected error occurred: %s' % e)
raise
# finally, add row to replicate_tsv
evidence_tsv.append(row)
i += 1
# now we must generate our dynamic header
allele_section = ['Allele %d' % (i+1) for i in range(max_alleles)]
dynamic_header = list(evidence_header)
dynamic_header[2:2] = allele_section # insert header element starting at index 2
# we must fill any empty/missing columns with tabs (''):
for row in evidence_tsv:
while len(row) < len(dynamic_header):
row.append('')
row[-3] = 'false'
# finally, insert the header at the beginning
evidence_tsv.insert(0, dynamic_header)
return evidence_tsv # TODO: make run more than 1 replicate
def reqbt_to_comparison(comparison_json):
# initialize list to store data in tsv format
comparison_csv = []
# initialize some variables to reduce magic numbers
max_alleles = 2
# initialize name form json data
name = comparison_json['name']
for locus in reqbt_loci:
# initialize temporary row to append to replicate_tv
row = []
# add 'Sample Name'
row.append(name)
# add 'Marker'
row.append(locus)
try:
# for some reason vWA is capitalized differently, so..
# TODO: is there a better way?
if locus == 'VWA':
locus = 'vWA' # fix it
# try to see if locus exists in the reqbt data
# if not, jump to except KeyError
values = comparison_json[locus] # NOTE: use uppercase here
if any_bad_values(values):
continue
row.extend(values)
except KeyError: # locus didn't exist, skip it
pass
except Exception as e: # something went seriously wrong
print('Error: An unexepected error occurred: %s' % e)
raise
# we must fill any empty/missing columns with tabs (''):
while len(row) < len(comparison_header):
row.append('')
# finally, add row to replicate_tsv
comparison_csv.append(row)
# finally, insert the header at the beginning
comparison_csv.insert(0, comparison_header)
return comparison_csv

0
cjs/models/__init__.py

0
cjs/models/automate.py

3
cjsd/app.py

@ -32,7 +32,8 @@ ROUTES = {
'/database/status': database.StatusResource,
'/database/stage': database.StageResource,
'/automate/next_job': automate.NextJobResource,
'/automate/update_job': automate.UpdateJobResource
'/automate/update_job': automate.UpdateJobResource,
'/automate/current_batch': automate.CurrentBatchResource
}
def create_api():

1
cjsd/core/database.py

@ -382,7 +382,6 @@ def stage_jobs(name, batch_name):
# TODO: implement fine-grained handling
raise
def pending_jobs(name):
'''Return a list of all pending jobs.'''
try:

17
cjsd/resources/automate.py

@ -10,6 +10,7 @@ import core.database
from core.database import next_job
from core.database import set_job_status
from core.database import find_job
from core.database import current_batch
import utils.io
from utils.io import read_json
@ -61,3 +62,19 @@ class UpdateJobResource:
'Unable to get next job.')
except Exception as e:
raise falcon.HTTPInternalServerError()
class CurrentBatchResource:
@falcon.before(get_database_context)
def on_post(self, req, resp):
database_name = req.context['database']
try:
batch = current_batch(database_name)
resp.media = {
'name': batch.name
}
except IntegrityError:
raise falcon.HTTPBadRequest('No batches',
'Trying staging the job table.')
except Exception as e:
raise falcon.HTTPInternalServerError()
Loading…
Cancel
Save