chiark / gitweb /
curveopt: limit no. of concurrent findcurves
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 8 Apr 2018 20:43:12 +0000 (21:43 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 8 Apr 2018 20:43:12 +0000 (21:43 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
curveopt.py
schequeue.py [new file with mode: 0644]

index 33670210ca5554851c5ac4e4144a4fd015bf0d2c..5b8f2de155ded9edf4639c66290e8493f856f3bd 100644 (file)
@@ -12,6 +12,7 @@ import math
 from moedebug import *
 from moenp import *
 from moebez import *
+from schequeue import ScheduledTask
 
 from math import atan2, atan, sqrt
 
@@ -21,11 +22,12 @@ class OptimisedCurve():
   counter = 0
 
   def _dbg(oc, s):
-    dbg('OC#%04d %s' % (oc._counter, s))
+    dbg('%s %s' % (oc._desc, s))
 
   def __init__(oc, cp, nt):
-    oc._counter = OptimisedCurve.counter
+    oc._desc = 'OC#%04d' % OptimisedCurve.counter
     OptimisedCurve.counter += 1
+
     oc._dbg('cp= ' + ' '.join(map(vec2dbg, cp)))
 
     db = DiscreteBezier(cp, nt, bezier_constructor=BezierSegment)
@@ -50,6 +52,9 @@ class OptimisedCurve():
 
     findcurve_epsilon = 0.01
 
+    oc.sched = ScheduledTask(oc._await_subproc, oc._desc)
+    oc.sched.pre_spawn()
+
     cl = ['./findcurve', '%d' % (nt+1), '%.18g' % findcurve_epsilon]
     oc._dbg('STARTING FINDCURVE %s' % cl)
     subproc = subprocess.Popen(
@@ -76,6 +81,7 @@ class OptimisedCurve():
     oc.subproc = subproc
     oc.nt = nt
 
+    oc.sched.post_spawn()
     #oc._await_subproc()
 
   def _await_subproc(oc):
@@ -110,6 +116,8 @@ class OptimisedCurve():
     assert(subproc.returncode == 0)
     oc.subproc = None
 
+    oc.sched.post_reap()
+
     oc._result = np.reshape(findcurve_result, (-1,3), 'C')
     oc._dbg(repr(oc._result))
 
diff --git a/schequeue.py b/schequeue.py
new file mode 100644 (file)
index 0000000..0a7047c
--- /dev/null
@@ -0,0 +1,58 @@
+
+from __future__ import print_function
+
+import multiprocessing
+
+from moedebug import *
+
+class ScheduledTask():
+  avail = multiprocessing.cpu_count()
+  running = [ ]
+  factor = 0.74
+
+  @staticmethod
+  def tidy_queue():
+    ScheduledTask.running = [
+      st
+      for st in ScheduledTask.running
+      if st.running
+    ]
+
+  @staticmethod
+  def want(weight):
+    while True:
+      ScheduledTask.tidy_queue()
+      running_weight = sum(map((lambda st: st.weight), ScheduledTask.running))
+      if ScheduledTask.avail - running_weight >= weight:
+        return
+      if not ScheduledTask.running:
+        return
+      other = ScheduledTask.running[0]
+      other._dbg('run_now!')
+      other.run_now()
+      assert(not other.running)
+
+  def _dbg(st, s):
+    dbg('ST %s %s' % (st.desc, s))
+
+  def __init__(st, run_now, desc, weight=1.):
+    st.run_now = run_now
+    st.weight = weight * ScheduledTask.factor
+    st.running = False
+    st.desc = desc
+    st._dbg('create')
+
+  def pre_spawn(st):
+    st._dbg('pre_spawn')
+    assert(not st.running)
+    ScheduledTask.want(st.weight)
+
+  def post_spawn(st):
+    st._dbg('post_spawn')
+    st.running = True
+    ScheduledTask.running.append(st)
+    ScheduledTask.want(0)
+
+  def post_reap(st):
+    st._dbg('post_reap')
+    st.running = False