Browse Source

added some stuff

cjs3
Stephen Lorenz 3 years ago
parent
commit
09374b7dc7
  1. 0
      bin/efm.R
  2. 142
      cjs/cjs/automate.py
  3. 86
      cjs/cjs/core/batch.py
  4. 2
      cjs/cjs/core/interface.py
  5. 4
      cjs/cjs/database.py
  6. 4
      cjs/cjs/main.py
  7. 29
      cjsd/setup.py
  8. 5
      run_cjsd

0
bin/euroformix_quantitative.R → bin/efm.R

142
cjs/cjs/automate.py

@ -22,7 +22,7 @@ from . import PKG_DIR
from .core.batch import generic_worker
from .core.batch import spawn_workers
from .core.batch import run_process
from .core.batch import send_error
from .core.batch import AutomateState
from .core.interface import AutomateInterface
@ -48,8 +48,17 @@ from .fmt.efm import reqbt_to_settings
default=PKG_DIR/'.resources', # NOTE: see __init__ for PKG_DIR
show_default=True,
help='Set the working directory.')
@click.option(
'--bin_dir',
type=click.Path(
file_okay=False,
resolve_path=True
),
default=PKG_DIR/'../../bin', # NOTE: see __init__ for PKG_DIR
show_default=True,
help='Set the software directory.')
@click.pass_context
def cli(ctx, num_workers, resource_dir):
def cli(ctx, num_workers, resource_dir, bin_dir):
'''Bulk test systems.'''
# add additional arguments and options to click's context object
@ -57,7 +66,8 @@ def cli(ctx, num_workers, resource_dir):
ctx.obj = {
**ctx.obj, # include previous key: value pairs
'num_workers': num_workers,
'resource_dir': resource_dir
'resource_dir': resource_dir,
'bin_dir': bin_dir
}
# Main-entry point to the EuroForMix subcommand
@ -68,23 +78,87 @@ def efm(ctx, database_name):
'''Run multisampleUsage.R.\f'''
# unpack the click context object
# alias the context dictionary to a new variable
# cjs daemon variables
address = ctx.obj['address']
port = ctx.obj['port']
database = ctx.obj['database']
interface = AutomateInterface(address, port)
try:
# spawn a size num_workers worker cluster of efm workers
spawn_workers(
efm_qualitative,
database,
data_dir,
resource_dir,
num_workers)
except Exception as e:
raise
# cjs state variables
working_dir = ctx.obj['working_dir']
resource_dir = ctx.obj['resource_dir']
bin_dir = ctx.obj['bin_dir']
# cjs worker variables
num_workers = ctx.obj['num_workers']
# initialize an AutomateInterface object to communicate with
# the cjs daemon at the given address:port
server = AutomateInterface(address, port)
# obtain the latest batch name from the cjs daemon
batch = server.current_batch(database)
# resolve the input and output directory paths
# ensure input and output directories exist
input_dir = '%s/%s/input' % (working_dir, batch)
create_dir(input_dir)
output_dir = '%s/%s/output' % (working_dir, batch)
create_dir(output_dir)
# initialize an AutomateState object to share information among
# the worker threads and the 'foreman' thread
state = AutomateState(
input_dir, # where input files will be stored
output_dir, # where result files will be stored
resource_dir, # where pg software settings files are stored
num_workers # max size of queue == number of threads
)
# begin worker threads
# they will initially wait for the job queue to fill up
spawn_workers(efm_qualitative, state, num_workers)
# initialize 'foreman' or supervisor thread
# the foreman the status and progress of the workers
# it communicates this information with the CJS host
fss = FileSystemScanner()
obs = Observer()
obs.schedule(fss, path=output_dir)
obs.start()
while True:
# get and add job queue
resp = rest_post('%s/automate/next_job' % host, {'database': name})
job = json.loads(resp.content.decode('utf-8'))
# add job to queue
state.input_queue.put(job)
# update job status
try:
while True:
status = state.output_queue.get(block=False)
print(status)
state.output_queue.task_done()
except queue.Empty as e:
print('Foreman: output_queue is empty. Moving on...')
# upload result file
try:
while True:
file_ = fss.pending_files.get(block=False)
print(file_)
fss.pending_files.task_done()
except queue.Empty as e:
print('Foreman: pending_file is empty. Moving on...')
# stop watching the output directory
obs.stop()
# stop workers
for i in range(num_threads):
state.input_queue.put(None)
for t in threads:
t.join()
# TODO: find a better way around passing profile - possibly object decorator?
@generic_worker
@ -98,35 +172,49 @@ def efm_qualitative(state, job):
comparison_csv = reqbt_to_comparison(comparison_data)
# determine filenames
evidence_file = '%s/%s.tsv' % (state.input_dir, evidence_data['name'])
comparison_file = '%s/%s.csv' % (state.input_dir, comparison_data['name'])
evidence_file = '%s/%s.tsv' % (
state.input_dir,
evidence_data['name']
)
comparison_file = '%s/%s.csv' % (
state.input_dir,
comparison_data['name']
)
# write input file to tmp
write_csv(evidence_file, evidence_tsv, delimiter='\t')
write_csv(comparison_file, comparison_csv, delimiter=',')
test_settings = {
'working_dir': '.',
'output_dir': state.output_dir,
'working_dir': state.working_dir,
'resource_dir': state.resource_dir,
'evidence_file': evidence_file,
'comparison_file': comparison_file,
'resource_dir': '/home/csguest/Desktop/cjs3/cjs/data',
**reqbt_to_settings(evidence_data, comparison_data)
**reqbt_to_settings(
evidence_data,
comparison_data
)
}
# determine test filename
test_file = '%s/%s-%s.json' % (state.output_dir,
evidence_data['name'],
comparison_data['name'])
test_file = '%s/%s-%s.json' % (
state.output_dir,
evidence_data['name'],
comparison_data['name']
)
# write settings file to tmp
write_json(test_file, test_settings)
click.echo('* Starting %s and %s' % (evidence_data['name'], comparison_data['name']))
try:
# working_dir = ctx.obj['working_dir'] start time to calculate average runtime
# initialize start time to calculate the average runtime
start_time = time.time()
run_process(['euroformix.headless/R/lrmix.R', test_file])
euroformix_bin = '%s/efm.R' % state.bin_dir
# run euroformix in a separate process and wait for it to finish
run_process([euroformix_bin, test_file])
end_time = time.time()
elapsed = end_time-start_time
except subprocess.CalledProcessError as e:

86
cjs/cjs/core/batch.py

@ -45,7 +45,7 @@ def send_error(subject, message):
subject, message)
class AutomateState:
def __init__(self, input_dir, output_dir, resource_dir, queue_size):
def __init__(self, input_dir, output_dir, resource_dir, bin_dir, queue_size):
# pg software inputs
self.input_dir = input_dir
# results files
@ -53,6 +53,8 @@ class AutomateState:
# pg software settings and other info
self.resource_dir = resource_dir
self.bin_dir = bin_dir
# initialize a universal lock for all the workers
self.global_lock = threading.Lock()
@ -84,84 +86,26 @@ def poll_queue(queue_obj, item_func):
except Exception as e:
raise
def generic_worker(fn):
def generic_worker(func):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
def wrapper(state, *args, **kwargs):
def wrapper(State, *args, **kwargs):
while True:
job = state.input_queue.get() # get next job
job = State.input_queue.get() # get next job
if job is None: # is this the end of the queue?
break # if so, be done :)
result = fn(state, job, *args, **kwargs) # call the job routine
state.input_queue.task_done() # signal that the job was completed
result = func(State, job, *args, **kwargs) # call the job routine
State.input_queue.task_done() # signal that the job was completed
return result
return wrapper
def spawn_workers(target_fn, database, working_dir, resource_dir, num_workers, **kwargs):
def spawn_workers(target_func, State, num_workers, *args, **kwargs):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
# retrieve the current batch
host = database['host']
name = database['name']
resp = rest_post('%s/automate/current_batch' % host, {'database': name})
batch = json.loads(resp.content.decode('utf-8'))['name']
# resolve where to save the data files
input_dir = '%s/%s/input' % (working_dir, batch)
create_dir(input_dir)
print('input_dir: %s' % input_dir)
output_dir = '%s/%s/output' % (working_dir, batch)
create_dir(output_dir)
print('output_dir: %s' % output_dir)
# initialize state object that will act as a shared resource for the workers
state = AutomateState(input_dir, output_dir, resource_dir, num_workers)
# initialize thread list
threads = []
thread_list = []
for i in range(num_workers):
t = threading.Thread(target=target_fn,
args=(state,),
t = threading.Thread(target=target_func,
args=(State, *args),
kwargs={**kwargs})
t.start() # begin worker thread
threads.append(t) # store thread object
# initialize 'foreman' or supervisor thread
# the foreman the status and progress of the workers
# it communicates this information with the CJS host
fss = FileSystemScanner()
obs = Observer()
obs.schedule(fss, path=output_dir)
obs.start()
while True:
# get and add job queue
resp = rest_post('%s/automate/next_job' % host, {'database': name})
job = json.loads(resp.content.decode('utf-8'))
# add job to queue
state.input_queue.put(job)
# update job status
try:
while True:
status = state.output_queue.get(block=False)
print(status)
state.output_queue.task_done()
except queue.Empty as e:
print('Foreman: output_queue is empty. Moving on...')
t.start() # start the worker thread
thread_list.append(t) # store thread object
# upload result file
try:
while True:
file_ = fss.pending_files.get(block=False)
print(file_)
fss.pending_files.task_done()
except queue.Empty as e:
print('Foreman: pending_file is empty. Moving on...')
# stop watching the output directory
obs.stop()
# stop workers
for i in range(num_threads):
state.input_queue.put(None)
for t in threads:
t.join()
return thread_list

2
cjs/cjs/core/interface.py

@ -4,7 +4,7 @@ import json
import requests
class WebInterface:
def __init__(self, address, port, database):
def __init__(self, address, port):
# TODO: add hostname and port validation
# TODO: find a better way to format them
self.address = 'http://%s:%s' % (address, port)

4
cjs/cjs/database.py

@ -8,10 +8,6 @@ import click
import utils.io
from utils.io import read_json
import utils.http
from utils.http import rest_get
from utils.http import rest_post
@click.group(name='database')
@click.pass_context
def cli(ctx):

4
cjs/cjs/main.py

@ -6,7 +6,7 @@ import click
# local modules
# TODO: implement a plugin manager for simplicity's sake
#from . import database
from . import database
from . import automate
# main-entry point to the entire command-line interface
@ -56,7 +56,7 @@ def cli(ctx, address, port, working_dir):
}
COMMAND_LIST = [
# database.cli,
database.cli,
automate.cli
]

29
cjsd/setup.py

@ -2,24 +2,31 @@
import setuptools
from setuptools import setup
from setuptools import find_packages
with open('README.md', 'r') as fh:
long_description = fh.read()
setup(
name='cjs',
version='3.0.0',
packages=setuptools.find_packages(),
name='cjsd',
version='1.0.0',
author='Stephen Lorenz',
author_email='lorenzsj@clarkson.edu',
description='A backend server to the cjs client.',
long_description=long_description,
long_description_content_type='text/markdown',
url='https://github.com/lorenzsj/cjs',
packages=find_packages(),
install_requires=[
'click',
'marshmallow',
'sqlalchemy',
'matplotlib',
'watchdog',
'falcon',
'requests',
'gunicorn'
],
entry_points={
'console_scripts': [
'cjs = cjs.main:cli',
]
}
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent'
]
)

5
run_cjsd

@ -1,4 +1,5 @@
#!/usr/bin/env bash
cd cjsd
../.venv/bin/gunicorn app:api --reload
cd cjsd/cjsd
../../.env/bin/gunicorn main:api --reload
Loading…
Cancel
Save