chiark / gitweb /
wip fixes
[hippotat.git] / client
diff --git a/client b/client
index 6080d4d6ae685ba30d22d62ba890a4bda773c316..f26d46b8eff30e177a11c379bd81100249d6ee1e 100755 (executable)
--- a/client
+++ b/client
@@ -14,7 +14,7 @@ class GeneralResponseConsumer(twisted.internet.protocol.Protocol):
     self._desc = desc
 
   def _log(self, dflag, msg, **kwargs):
-    self.cl.log(dflag, '%s: %s' % (self._desc, msg), idof=self._req, **kwargs)
+    self._cl.log(dflag, '%s: %s' % (self._desc, msg), idof=self._req, **kwargs)
 
   def connectionMade(self):
     self._log(DBG.HTTP_CTRL, 'connectionMade')
@@ -25,6 +25,7 @@ class ResponseConsumer(GeneralResponseConsumer):
     ssddesc = '[%s] %s' % (id(req), self._desc)
     self._ssd = SlipStreamDecoder(ssddesc, cl.queue_inbound)
     self._log(DBG.HTTP_CTRL, '__init__')
+    self._success_reported = False
 
   def dataReceived(self, data):
     self._log(DBG.HTTP, 'dataReceived', d=data)
@@ -41,18 +42,21 @@ class ResponseConsumer(GeneralResponseConsumer):
     try:
       self._log(DBG.HTTP, 'ResponseDone')
       self._ssd.flush()
-      self.cl.req_fin(self._req)
+      self._cl.req_fin(self._req)
     except Exception as e:
       self._handleexception()
+    if not self._success_reported:
+      log.info(cl.desc + 'running OK', dflag=False)
+      self._success_reported = True
 
   def _handleexception(self):
     self._latefailure(traceback.format_exc())
 
   def _latefailure(self, reason):
     self._log(DBG.HTTP_CTRL, '_latefailure ' + str(reason))
-    self.cl.req_err(self._req, reason)
+    self._cl.req_err(self._req, reason)
 
-class ErrorResponseConsumer(twisted.internet.protocol.Protocol):
+class ErrorResponseConsumer(GeneralResponseConsumer):
   def __init__(self, cl, req, resp):
     super().__init__(cl, req, 'ERROR-RC')
     self._resp = resp
@@ -74,7 +78,7 @@ class ErrorResponseConsumer(twisted.internet.protocol.Protocol):
       mbody = repr(self._m)
     if not reason.check(twisted.web.client.ResponseDone):
       mbody += ' || ' + str(reason)
-    self.cl.req_err(self._req,
+    self._cl.req_err(self._req,
             "FAILED %d %s | %s"
             % (self._resp.code, self._phrase, mbody))
 
@@ -83,17 +87,18 @@ class Client():
     cl.c = c
     cl.outstanding = { }
     cl.desc = '[%s %s] ' % (ss,cs)
+    log.info(cl.desc + 'setting up', dflag=False)
 
   def log(cl, dflag, msg, **kwargs):
     log_debug(dflag, cl.desc + msg, **kwargs)
 
   def log_outstanding(cl):
-    cl.log(DBG.CTRL_DUMP, 'OS %s' % outstanding)
+    cl.log(DBG.CTRL_DUMP, 'OS %s' % cl.outstanding)
 
   def start(cl):
-    cl.queue = PacketQueue('up', c.max_queue_time)
+    cl.queue = PacketQueue('up', cl.c.max_queue_time)
     cl.agent = twisted.web.client.Agent(
-      reactor, connectTimeout = c.http_timeout)
+      reactor, connectTimeout = cl.c.http_timeout)
 
   def outbound(cl, packet, saddr, daddr):
     #print('OUT ', saddr, daddr, repr(packet))
@@ -104,13 +109,13 @@ class Client():
     cl.log(DBG.HTTP_CTRL,
             'req_ok %d %s %s' % (resp.code, repr(resp.phrase), str(resp)),
             idof=req)
-  if resp.code == 200:
-    rc = ResponseConsumer(cl, req)
-  else:
-    rc = ErrorResponseConsumer(cl, req, resp)
+    if resp.code == 200:
+      rc = ResponseConsumer(cl, req)
+    else:
+      rc = ErrorResponseConsumer(cl, req, resp)
 
-  resp.deliverBody(rc)
-  # now rc is responsible for calling req_fin
+    resp.deliverBody(rc)
+    # now rc is responsible for calling req_fin
 
   def req_err(cl, req, err):
     # called when the Deferred fails, or (if it completes),
@@ -120,17 +125,18 @@ class Client():
       if isinstance(err, twisted.python.failure.Failure):
         err = err.getTraceback()
       print('[%#x] %s' % (id(req), err), file=sys.stderr)
-      if not isinstance(outstanding[req], int):
-        raise RuntimeError('[%#x] previously %s' % (id(req), outstanding[req]))
+      if not isinstance(cl.outstanding[req], int):
+        raise RuntimeError('[%#x] previously %s' %
+                           (id(req), cl.outstanding[req]))
       cl.outstanding[req] = err
       cl.log_outstanding()
-      reactor.callLater(c.http_retry, partial(cl.req_fin, req))
+      reactor.callLater(cl.c.http_retry, partial(cl.req_fin, req))
     except Exception as e:
       crash(traceback.format_exc() + '\n----- handling -----\n' + err)
 
   def req_fin(cl, req):
     del cl.outstanding[req]
-    cl.log(DBG.HTTP_CTRL, 'req_fin OS=%d' % len(outstanding), idof=req)
+    cl.log(DBG.HTTP_CTRL, 'req_fin OS=%d' % len(cl.outstanding), idof=req)
     cl.check_outbound()
 
   def check_outbound(cl):
@@ -138,13 +144,13 @@ class Client():
       if len(cl.outstanding) >= cl.c.max_outstanding:
         break
 
-      if (not queue.nonempty() and
-          len(cl.outstanding) >= cl.c.target_outstanding):
+      if (not cl.queue.nonempty() and
+          len(cl.outstanding) >= cl.c.target_requests_outstanding):
         break
 
       d = b''
       def moredata(s): nonlocal d; d += s
-      queue.process((lambda: len(d)),
+      cl.queue.process((lambda: len(d)),
                     moredata,
                     cl.c.max_batch_up)
 
@@ -157,7 +163,8 @@ class Client():
               b'Content-Disposition: form-data; name="m"'   + crlf + crlf +
               str(cl.c.client)            .encode('ascii')  + crlf +
               cl.c.password                                 + crlf +
-              str(cl.c.target_outstanding).encode('ascii')  + crlf +
+              str(cl.c.target_requests_outstanding)
+                                          .encode('ascii')  + crlf +
               str(cl.c.http_timeout)      .encode('ascii')  + crlf +
             ((
               b'--b'                                        + crlf +
@@ -181,7 +188,7 @@ class Client():
       bytesreader = io.BytesIO(mime)
       producer = twisted.web.client.FileBodyProducer(bytesreader)
 
-      req = agent.request(b'POST',
+      req = cl.agent.request(b'POST',
                           cl.c.url,
                           twisted.web.client.Headers(hh),
                           producer)
@@ -205,7 +212,7 @@ def process_cfg(putative_servers, putative_clients):
     for (ci,cs) in putative_clients.items():
       c = ConfigResults()
 
-      sections = process_cfg_client_common(c,ss,cs,ci):
+      sections = cfg_process_client_common(c,ss,cs,ci)
       if not sections: continue
 
       def srch(getter,key): return cfg_search(getter,key,sections)
@@ -214,15 +221,19 @@ def process_cfg(putative_servers, putative_clients):
       c.max_outstanding = srch(cfg.getint, 'max_requests_outstanding')
       c.max_batch_up    = srch(cfg.getint, 'max_batch_up')
       c.http_retry      = srch(cfg.getint, 'http_retry')
+      c.max_queue_time  = srch(cfg.getint, 'max_queue_time')
       c.vroutes         = srch(cfg.get,    'vroutes')
 
-      process_cfg_common(c,ss)
       try: c.url = srch(cfg.get,'url')
       except NoOptionError:
-        process_cfg_saddrs()
+        cfg_process_saddrs(c, ss)
         c.url = c.saddrs[0].url()
 
-      process_cfg_ipif(cc,
+      c.client = ci
+
+      cfg_process_vaddr(c,ss)
+
+      cfg_process_ipif(c,
                        sections,
                        (('local','client'),
                         ('peer', 'vaddr'),