From: Ian Jackson Date: Sun, 8 Apr 2018 20:43:12 +0000 (+0100) Subject: curveopt: limit no. of concurrent findcurves X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=e7b9a1bdf45e0842e97e6b8bc5e4d19138f377da;p=moebius3.git curveopt: limit no. of concurrent findcurves Signed-off-by: Ian Jackson --- diff --git a/curveopt.py b/curveopt.py index 3367021..5b8f2de 100644 --- a/curveopt.py +++ b/curveopt.py @@ -12,6 +12,7 @@ import math from moedebug import * from moenp import * from moebez import * +from schequeue import ScheduledTask from math import atan2, atan, sqrt @@ -21,11 +22,12 @@ class OptimisedCurve(): counter = 0 def _dbg(oc, s): - dbg('OC#%04d %s' % (oc._counter, s)) + dbg('%s %s' % (oc._desc, s)) def __init__(oc, cp, nt): - oc._counter = OptimisedCurve.counter + oc._desc = 'OC#%04d' % OptimisedCurve.counter OptimisedCurve.counter += 1 + oc._dbg('cp= ' + ' '.join(map(vec2dbg, cp))) db = DiscreteBezier(cp, nt, bezier_constructor=BezierSegment) @@ -50,6 +52,9 @@ class OptimisedCurve(): findcurve_epsilon = 0.01 + oc.sched = ScheduledTask(oc._await_subproc, oc._desc) + oc.sched.pre_spawn() + cl = ['./findcurve', '%d' % (nt+1), '%.18g' % findcurve_epsilon] oc._dbg('STARTING FINDCURVE %s' % cl) subproc = subprocess.Popen( @@ -76,6 +81,7 @@ class OptimisedCurve(): oc.subproc = subproc oc.nt = nt + oc.sched.post_spawn() #oc._await_subproc() def _await_subproc(oc): @@ -110,6 +116,8 @@ class OptimisedCurve(): assert(subproc.returncode == 0) oc.subproc = None + oc.sched.post_reap() + oc._result = np.reshape(findcurve_result, (-1,3), 'C') oc._dbg(repr(oc._result)) diff --git a/schequeue.py b/schequeue.py new file mode 100644 index 0000000..0a7047c --- /dev/null +++ b/schequeue.py @@ -0,0 +1,58 @@ + +from __future__ import print_function + +import multiprocessing + +from moedebug import * + +class ScheduledTask(): + avail = multiprocessing.cpu_count() + running = [ ] + factor = 0.74 + + @staticmethod + def tidy_queue(): + ScheduledTask.running = [ + st + for st in ScheduledTask.running + if st.running + ] + + @staticmethod + def want(weight): + while True: + ScheduledTask.tidy_queue() + running_weight = sum(map((lambda st: st.weight), ScheduledTask.running)) + if ScheduledTask.avail - running_weight >= weight: + return + if not ScheduledTask.running: + return + other = ScheduledTask.running[0] + other._dbg('run_now!') + other.run_now() + assert(not other.running) + + def _dbg(st, s): + dbg('ST %s %s' % (st.desc, s)) + + def __init__(st, run_now, desc, weight=1.): + st.run_now = run_now + st.weight = weight * ScheduledTask.factor + st.running = False + st.desc = desc + st._dbg('create') + + def pre_spawn(st): + st._dbg('pre_spawn') + assert(not st.running) + ScheduledTask.want(st.weight) + + def post_spawn(st): + st._dbg('post_spawn') + st.running = True + ScheduledTask.running.append(st) + ScheduledTask.want(0) + + def post_reap(st): + st._dbg('post_reap') + st.running = False