Browse Source

working spawn_workers and generic_worker decorator

automate2
Stephen Lorenz 4 years ago
parent
commit
89e704d8dc
  1. 10
      cjs/automate.py
  2. 1
      cjs/cjs.py
  3. 50
      cjs/core/automate.py
  4. 1
      cjs/core/database.py
  5. 1
      cjs/database.py
  6. 64
      cjs/debug.py

10
cjs/automate.py

@ -1,23 +1,17 @@
#!/usr/bin/env python3
import shlex
import sys
import signal
def signal_handler(sig, frame):
print('Exiting automate.')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# from https://www.endpoint.com/blog/2015/01/28/getting-realtime-output-using-python
def run_command(command):
return subprocess.call(command)
@ -31,8 +25,6 @@ def automate(ctx, workers):
'''Bulk test systems.'''
ctx.obj['workers'] = workers
def _efm(q):
from core.vault import next_job, all_jobs
# load the custom headless euroformix package

1
cjs/cjs.py

@ -9,6 +9,7 @@ from reqbt import reqbt
from graphs import graphs
_commands = [
database,
automate,

50
cjs/core/automate.py

@ -1,11 +1,16 @@
#!/usr/bin/env python3
# standard library
import threading
import queue
import subprocess
from subprocess import TimeoutExpired
# pip
import click
# local
import utils.io
from utils.io import read_json
from utils.io import write_csv
@ -13,43 +18,62 @@ from utils.io import write_csv
import utils.net
from utils.net import send_email
def send_error(subject, message):
send_email('resources/mailer.json',
'cucjsoftware@gmail.com',
subject, message)
def run_process(command, timeout):
# modified from https://docs.python.org/3/library/subprocess.html#subprocess.Popen.communicate
proc = subprocess.Popen(command)
try:
outs, errs = proc.communicate(timeout=timeout)
return outs, errs
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
return outs, errs
def generic_worker(fn):
def wrapper(*args, **kwargs):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
def wrapper(job_queue, *args, **kwargs):
while True:
job = q.get()
if job is None:
job = job_queue.get() # get next job
if job is None: # is this the end of the queue?
break
result = fn(item, *args, **kwargs)
q.task_done()
result = fn(job, *args, **kwargs) # run routine
job_queue.task_done() # signal that the job was completed
return result
return wrapper
def spawn_workers(job_list, routine, num_threads):
def spawn_workers(job_list, target_fn, num_threads, *args, **kwargs):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
# initialize thread-safe queue
q = queue.Queue()
job_queue = queue.Queue()
# initialize thread list
threads = []
for i in range(num_threads):
t = threading.Thread(target=worker, args=(,))
t.start()
threads.append(t)
t = threading.Thread(target=target_fn,
args=(job_queue, *args),
kwargs={**kwargs})
t.start() # begin worker thread
threads.append(t) # store thread object
# insert jobs into queue
for job in job_list:
q.put(job)
job_queue.put(job)
# block until all tasks are done
q.join()
job_queue.join()
# stop workers
for i in range(num_threads):
q.put(None)
job_queue.put(None)
for t in threads:
t.join()
@generic_worker
def debug(job):
print('hi')

1
cjs/core/database.py

@ -30,6 +30,7 @@ from models.database import BatchTable
from models.database import ResultTable
from models.database import EventTable
# BEGIN: Generic Database API
# modified from https://stackoverflow.com/a/36944992

1
cjs/database.py

@ -48,6 +48,7 @@ from models.database import BatchTable
import models.reqbt
from models.reqbt import EvidenceSchema
def database_skeleton(root):
try:
create_dir(root)

64
cjs/debug.py

@ -0,0 +1,64 @@
#!/usr/bin/env python3
import threading
import queue
import subprocess
import click
import utils.io
from utils.io import read_json
from utils.io import write_csv
import utils.net
from utils.net import send_email
def send_error(subject, message):
send_email('resources/mailer.json',
'cucjsoftware@gmail.com',
subject, message)
def generic_worker(fn):
def wrapper(job_queue, *args, **kwargs):
while True:
job = job_queue.get()
if job is None:
break
result = fn(job, *args, **kwargs)
job_queue.task_done()
return result
return wrapper
def spawn_workers(job_list, target_fn, num_threads, *args, **kwargs):
# modified from https://docs.python.org/3.7/library/queue.html?highlight=queue#queue.Queue.join
# initialize thread-safe queue
job_queue = queue.Queue()
# initialize thread list
threads = []
for i in range(num_threads):
t = threading.Thread(target=target_fn,
args=(job_queue, *args),
kwargs={**kwargs})
t.start() # begin worker thread
threads.append(t) # store thread object
# insert jobs into queue
for job in job_list:
job_queue.put(job)
# block until all tasks are done
job_queue.join()
# stop workers
for i in range(num_threads):
job_queue.put(None)
for t in threads:
t.join()
@generic_worker
def debug(job):
print('hi')
spawn_workers([1, 2, 3], debug, 1)
Loading…
Cancel
Save