| 1 | #!/usr/bin/python3 |
| 2 | # |
| 3 | # Hippotat - Asinine IP Over HTTP program |
| 4 | # ./hippotatd - server main program |
| 5 | # |
| 6 | # Copyright 2017 Ian Jackson |
| 7 | # |
| 8 | # AGPLv3+ + CAFv2+ |
| 9 | # |
| 10 | # This program is free software: you can redistribute it and/or |
| 11 | # modify it under the terms of the GNU Affero General Public |
| 12 | # License as published by the Free Software Foundation, either |
| 13 | # version 3 of the License, or (at your option) any later version, |
| 14 | # with the "CAF Login Exception" as published by Ian Jackson |
| 15 | # (version 2, or at your option any later version) as an Additional |
| 16 | # Permission. |
| 17 | # |
| 18 | # This program is distributed in the hope that it will be useful, |
| 19 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 20 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 21 | # Affero General Public License for more details. |
| 22 | # |
| 23 | # You should have received a copy of the GNU Affero General Public |
| 24 | # License and the CAF Login Exception along with this program, in |
| 25 | # the file AGPLv3+CAFv2. If not, email Ian Jackson |
| 26 | # <ijackson@chiark.greenend.org.uk>. |
| 27 | |
| 28 | #@ import sys; sys.path.append('@PYBUILD_INSTALL_DIR@') |
| 29 | from hippotatlib import * |
| 30 | |
| 31 | import os |
| 32 | import tempfile |
| 33 | import atexit |
| 34 | import shutil |
| 35 | import subprocess |
| 36 | |
| 37 | import twisted.internet |
| 38 | from twisted.web.server import NOT_DONE_YET |
| 39 | |
| 40 | import twisted.web.static |
| 41 | |
| 42 | import hippotatlib.ownsource |
| 43 | from hippotatlib.ownsource import SourceShipmentPreparer |
| 44 | |
| 45 | #import twisted.web.server import Site |
| 46 | #from twisted.web.resource import Resource |
| 47 | |
| 48 | import syslog |
| 49 | |
| 50 | cleanups = [ ] |
| 51 | |
| 52 | clients = { } |
| 53 | |
| 54 | #---------- "router" ---------- |
| 55 | |
| 56 | def route(packet, iface, saddr, daddr): |
| 57 | def lt(dest): |
| 58 | log_debug(DBG.ROUTE, 'route: %s -> %s: %s' % (saddr,daddr,dest), d=packet) |
| 59 | try: dclient = clients[daddr] |
| 60 | except KeyError: dclient = None |
| 61 | if dclient is not None: |
| 62 | lt('client') |
| 63 | dclient.queue_outbound(packet) |
| 64 | elif daddr == c.vaddr or daddr not in c.vnetwork: |
| 65 | lt('inbound') |
| 66 | queue_inbound(ipif, packet) |
| 67 | elif daddr == c.vrelay: |
| 68 | lt('discard relay') |
| 69 | log_discard(packet, iface, saddr, daddr, 'relay') |
| 70 | else: |
| 71 | lt('discard no-client') |
| 72 | log_discard(packet, iface, saddr, daddr, 'no-client') |
| 73 | |
| 74 | #---------- client ---------- |
| 75 | |
| 76 | class Client(): |
| 77 | def __init__(self, ip, cc): |
| 78 | # instance data members |
| 79 | self._ip = ip |
| 80 | self.cc = cc |
| 81 | self._rq = collections.deque() # requests |
| 82 | self._pq = PacketQueue(str(ip), self.cc.max_queue_time) |
| 83 | |
| 84 | if ip not in c.vnetwork: |
| 85 | raise ValueError('client %s not in vnetwork' % ip) |
| 86 | |
| 87 | if ip in clients: |
| 88 | raise ValueError('multiple client cfg sections for %s' % ip) |
| 89 | clients[ip] = self |
| 90 | |
| 91 | self._log(DBG.INIT, 'new') |
| 92 | |
| 93 | def _log(self, dflag, msg, **kwargs): |
| 94 | log_debug(dflag, ('client %s: ' % self._ip)+msg, **kwargs) |
| 95 | |
| 96 | def process_arriving_data(self, d): |
| 97 | self._log(DBG.FLOW, "req data (enc'd)", d=d) |
| 98 | if not len(d): return |
| 99 | for packet in slip.decode(d): |
| 100 | (saddr, daddr) = packet_addrs(packet) |
| 101 | if saddr != self._ip: |
| 102 | raise ValueError('wrong source address %s' % saddr) |
| 103 | route(packet, self._ip, saddr, daddr) |
| 104 | |
| 105 | def _req_cancel(self, request): |
| 106 | self._log(DBG.HTTP_CTRL, 'cancel', idof=request) |
| 107 | try: request.finish() |
| 108 | except Exception: pass |
| 109 | |
| 110 | def _req_error(self, err, request): |
| 111 | self._log(DBG.HTTP_CTRL, 'error %s' % err, idof=request) |
| 112 | self._req_cancel(request) |
| 113 | |
| 114 | def queue_outbound(self, packet): |
| 115 | self._pq.append(packet) |
| 116 | self._check_outbound() |
| 117 | |
| 118 | def _req_fin(self, dummy, request, cl): |
| 119 | self._log(DBG.HTTP_CTRL, '_req_fin ' + repr(dummy), idof=request) |
| 120 | try: cl.cancel() |
| 121 | except twisted.internet.error.AlreadyCalled: pass |
| 122 | |
| 123 | def new_request(self, request): |
| 124 | request.setHeader('Content-Type','application/octet-stream') |
| 125 | cl = reactor.callLater(self.cc.http_timeout, self._req_cancel, request) |
| 126 | nf = request.notifyFinish() |
| 127 | nf.addErrback(self._req_error, request) |
| 128 | nf.addCallback(self._req_fin, request, cl) |
| 129 | self._rq.append(request) |
| 130 | self._check_outbound() |
| 131 | |
| 132 | def _req_write(self, req, d): |
| 133 | self._log(DBG.HTTP, 'req_write ', idof=req, d=d) |
| 134 | req.write(d) |
| 135 | |
| 136 | def _check_outbound(self): |
| 137 | log_debug(DBG.HTTP_CTRL, 'CHKO') |
| 138 | while True: |
| 139 | try: request = self._rq[0] |
| 140 | except IndexError: request = None |
| 141 | if request and request.finished: |
| 142 | self._log(DBG.HTTP_CTRL, 'CHKO req finished, discard', idof=request) |
| 143 | self._rq.popleft() |
| 144 | continue |
| 145 | |
| 146 | if not self._pq.nonempty(): |
| 147 | # no packets, oh well |
| 148 | self._log(DBG.HTTP_CTRL, 'CHKO no packets, OUT-DONE', idof=request) |
| 149 | break |
| 150 | |
| 151 | if request is None: |
| 152 | # no request |
| 153 | self._log(DBG.HTTP_CTRL, 'CHKO no request, OUT-DONE', idof=request) |
| 154 | break |
| 155 | |
| 156 | self._log(DBG.HTTP_CTRL, 'CHKO processing', idof=request) |
| 157 | # request, and also some non-expired packets |
| 158 | self._pq.process((lambda: request.sentLength), |
| 159 | (lambda d: self._req_write(request, d)), |
| 160 | self.cc.max_batch_down) |
| 161 | |
| 162 | assert(request.sentLength) |
| 163 | self._rq.popleft() |
| 164 | request.finish() |
| 165 | self._log(DBG.HTTP, 'complete', idof=request) |
| 166 | # round again, looking for more to do |
| 167 | |
| 168 | while len(self._rq) > self.cc.target_requests_outstanding: |
| 169 | request = self._rq.popleft() |
| 170 | self._log(DBG.HTTP, 'CHKO above target, returning empty', idof=request) |
| 171 | request.finish() |
| 172 | |
| 173 | def process_request(request, desca): |
| 174 | # find client, update config, etc. |
| 175 | metadata = request.args[b'm'][0] |
| 176 | metadata = metadata.split(b'\r\n') |
| 177 | (ci_s, pw, tro, cto) = metadata[0:4] |
| 178 | desca['m[0,2:3]'] = [ci_s, tro, cto] |
| 179 | ci_s = ci_s.decode('utf-8') |
| 180 | tro = int(tro); desca['tro']= tro |
| 181 | cto = int(cto); desca['cto']= cto |
| 182 | ci = ipaddr(ci_s) |
| 183 | desca['ci'] = ci |
| 184 | cl = clients[ci] |
| 185 | if pw != cl.cc.password: raise ValueError('bad password') |
| 186 | desca['pwok']=True |
| 187 | |
| 188 | if tro != cl.cc.target_requests_outstanding: |
| 189 | raise ValueError('tro must be %d' % cl.cc.target_requests_outstanding) |
| 190 | |
| 191 | if cto < cl.cc.http_timeout: |
| 192 | raise ValueError('cto must be >= %d' % cl.cc.http_timeout) |
| 193 | |
| 194 | try: |
| 195 | d = request.args[b'd'][0] |
| 196 | desca['d'] = d |
| 197 | desca['dlen'] = len(d) |
| 198 | except KeyError: |
| 199 | d = b'' |
| 200 | desca['dlen'] = None |
| 201 | |
| 202 | log_http(desca, 'processing', idof=id(request), d=d) |
| 203 | |
| 204 | d = mime_translate(d) |
| 205 | |
| 206 | cl.process_arriving_data(d) |
| 207 | cl.new_request(request) |
| 208 | |
| 209 | def log_http(desca, msg, **kwargs): |
| 210 | try: |
| 211 | kwargs['d'] = desca['d'] |
| 212 | del desca['d'] |
| 213 | except KeyError: |
| 214 | pass |
| 215 | log_debug(DBG.HTTP, msg + repr(desca), **kwargs) |
| 216 | |
| 217 | class NotStupidResource(twisted.web.resource.Resource): |
| 218 | # why this is not the default is a mystery! |
| 219 | def getChild(self, name, request): |
| 220 | if name == b'': return self |
| 221 | else: return twisted.web.resource.Resource.getChild(name, request) |
| 222 | |
| 223 | class IphttpResource(NotStupidResource): |
| 224 | def render_POST(self, request): |
| 225 | log_debug(DBG.HTTP_FULL, |
| 226 | 'req recv: ' + repr(request) + ' ' + repr(request.args), |
| 227 | idof=id(request)) |
| 228 | desca = {'d': None} |
| 229 | try: process_request(request, desca) |
| 230 | except Exception as e: |
| 231 | emsg = traceback.format_exc() |
| 232 | log_http(desca, 'RETURNING EXCEPTION ' + emsg) |
| 233 | request.setHeader('Content-Type','text/plain; charset="utf-8"') |
| 234 | request.setResponseCode(400) |
| 235 | return (emsg + ' # ' + repr(desca) + '\r\n').encode('utf-8') |
| 236 | log_debug(DBG.HTTP_CTRL, '...', idof=id(request)) |
| 237 | return NOT_DONE_YET |
| 238 | |
| 239 | # instantiator should set |
| 240 | # self.hippotat_sources = (source_names[0], source_names[1]) |
| 241 | def __init__(self): |
| 242 | self.hippotat_sources = [None, None] |
| 243 | super().__init__() |
| 244 | |
| 245 | def render_GET(self, request): |
| 246 | log_debug(DBG.HTTP, 'GET request') |
| 247 | s = '<html><body>hippotat\n' |
| 248 | (s0,s1) = self.hippotat_sources |
| 249 | if s0: |
| 250 | s += '<p><a href="%s">source</a>\n' % s0 |
| 251 | if self.hippotat_sources[1]: |
| 252 | s += ('(and that of dependency <a href="%s">packages</a>)\n' % s1) |
| 253 | s += 'available' |
| 254 | else: |
| 255 | s += 'TESTING' |
| 256 | s += '</body></html>' |
| 257 | return s.encode('utf-8') |
| 258 | |
| 259 | def start_http(): |
| 260 | resource = IphttpResource() |
| 261 | site = twisted.web.server.Site(resource) |
| 262 | |
| 263 | for sa in c.saddrs: |
| 264 | ep = sa.make_endpoint() |
| 265 | crash_on_defer(ep.listen(site)) |
| 266 | log_debug(DBG.INIT, 'listening on %s' % sa) |
| 267 | |
| 268 | td = tempfile.mkdtemp() |
| 269 | |
| 270 | def cleanup(): |
| 271 | try: shutil.rmtree(td) |
| 272 | except FileNotFoundError: pass |
| 273 | |
| 274 | cleanups.append(cleanup) |
| 275 | |
| 276 | ssp = SourceShipmentPreparer(td) |
| 277 | ssp.logger = partial(log_debug, DBG.OWNSOURCE) |
| 278 | if DBG.OWNSOURCE in debug_set: ssp.stream_debug = sys.stdout |
| 279 | ssp.download_packages = opts.ownsource >= 2 |
| 280 | if opts.ownsource >= 1: ssp.generate() |
| 281 | |
| 282 | for ix in (0,1): |
| 283 | bn = ssp.output_names[ix] |
| 284 | op = ssp.output_paths[ix] |
| 285 | if op is None: continue |
| 286 | resource.hippotat_sources[ix] = bn |
| 287 | subresource =twisted.web.static.File(op) |
| 288 | resource.putChild(bn.encode('utf-8'), subresource) |
| 289 | |
| 290 | reactor.callLater(0.1, (lambda: log.info('hippotatd started', dflag=False))) |
| 291 | |
| 292 | #---------- config and setup ---------- |
| 293 | |
| 294 | def process_cfg(_opts, putative_servers, putative_clients): |
| 295 | global opts |
| 296 | opts = _opts |
| 297 | |
| 298 | global c |
| 299 | c = ConfigResults() |
| 300 | try: c.server = cfg1get('SERVER','server') |
| 301 | except NoOptionError: c.server = 'SERVER' |
| 302 | |
| 303 | cfg_process_general(c, c.server) |
| 304 | cfg_process_saddrs(c, c.server) |
| 305 | cfg_process_vnetwork(c, c.server) |
| 306 | cfg_process_vaddr(c, c.server) |
| 307 | |
| 308 | for (ci,cs) in putative_clients.items(): |
| 309 | cc = ConfigResults() |
| 310 | sections = cfg_process_client_common(cc,c.server,cs,ci) |
| 311 | if not sections: continue |
| 312 | cfg_process_client_limited(cc,c.server,sections, 'max_batch_down') |
| 313 | cfg_process_client_limited(cc,c.server,sections, 'max_queue_time') |
| 314 | Client(ci, cc) |
| 315 | |
| 316 | try: |
| 317 | c.vrelay = cfg1get(c.server, 'vrelay') |
| 318 | except NoOptionError: |
| 319 | for search in c.vnetwork.hosts(): |
| 320 | if search == c.vaddr: continue |
| 321 | c.vrelay = search |
| 322 | break |
| 323 | |
| 324 | try: c.ifname = cfg1get(c.server, 'ifname_server', raw=True) |
| 325 | except NoOptionError: pass |
| 326 | |
| 327 | cfg_process_ipif(c, |
| 328 | [c.server, 'DEFAULT'], |
| 329 | (('local','vaddr'), |
| 330 | ('peer', 'vrelay'), |
| 331 | ('rnets','vnetwork'))) |
| 332 | |
| 333 | if opts.printconfig is not None: |
| 334 | try: val = cfg1get(c.server, opts.printconfig) |
| 335 | except NoOptionError: pass |
| 336 | else: print(val) |
| 337 | sys.exit(0) |
| 338 | |
| 339 | def catch_termination(): |
| 340 | def run_cleanups(): |
| 341 | for cleanup in cleanups: |
| 342 | cleanup() |
| 343 | |
| 344 | atexit.register(run_cleanups) |
| 345 | |
| 346 | def signal_handler(name, sig, *args): |
| 347 | signal.signal(sig, signal.SIG_DFL) |
| 348 | print('exiting due to %s' % name, file=sys.stderr) |
| 349 | run_cleanups() |
| 350 | os.kill(os.getpid(), sig) |
| 351 | raise RuntimeError('did not die due to signal %s !' % name) |
| 352 | |
| 353 | for sig in (signal.SIGINT, signal.SIGTERM): |
| 354 | try: signame = sig.name |
| 355 | except AttributeError: signame = "signal %d" % sig |
| 356 | signal.signal(sig, partial(signal_handler, signame)) |
| 357 | |
| 358 | def daemonise(): |
| 359 | global syslogfacility |
| 360 | if opts.daemon and opts.syslogfacility is None: |
| 361 | opts.syslogfacility = 'daemon' |
| 362 | |
| 363 | if opts.syslogfacility is not None: |
| 364 | facilnum = syslog.__dict__['LOG_' + opts.syslogfacility.upper()] |
| 365 | syslog.openlog('hippotatd', |
| 366 | facility=facilnum, |
| 367 | logoption=syslog.LOG_PID) |
| 368 | def emit(event): |
| 369 | if logevent_is_boringtwisted(event): return |
| 370 | m = twisted.logger.formatEvent(event) |
| 371 | #print(repr(event), m, file=org_stderr) |
| 372 | level = event.get('log_level') |
| 373 | if event.get('dflag',None) is not None: sl = syslog.LOG_DEBUG |
| 374 | elif level == LogLevel.critical : sl = syslog.LOG_CRIT |
| 375 | elif level == LogLevel.error : sl = syslog.LOG_ERR |
| 376 | elif level == LogLevel.warn : sl = syslog.LOG_WARNING |
| 377 | else : sl = syslog.LOG_INFO |
| 378 | syslog.syslog(sl,m) |
| 379 | failure = event.get('log_failure') |
| 380 | if failure is not None: |
| 381 | for l in failure.getTraceback().split('\n'): |
| 382 | syslog.syslog(sl,l) |
| 383 | glp = twisted.logger.globalLogPublisher |
| 384 | glp.addObserver(emit) |
| 385 | log_debug(DBG.INIT, 'starting to log to syslog') |
| 386 | |
| 387 | #log.crit('daemonic hippotatd crashed', dflag=False) |
| 388 | if opts.daemon: |
| 389 | daemonic_reactor = (twisted.internet.interfaces.IReactorDaemonize |
| 390 | .providedBy(reactor)) |
| 391 | if daemonic_reactor: reactor.beforeDaemonize() |
| 392 | if opts.pidfile is not None: |
| 393 | pidfile_h = open(opts.pidfile, 'w') |
| 394 | rfd, wfd = os.pipe() |
| 395 | childpid = os.fork() |
| 396 | if childpid: |
| 397 | # we are the parent |
| 398 | os.close(wfd) |
| 399 | st = os.read(rfd, 1) |
| 400 | try: |
| 401 | st = st[0] |
| 402 | except IndexError: |
| 403 | st = 127 |
| 404 | log.critical('daemonic hippotatd crashed', dflag=False) |
| 405 | os._exit(st) |
| 406 | os.close(rfd) |
| 407 | os.setsid() |
| 408 | grandchildpid = os.fork() |
| 409 | if grandchildpid: |
| 410 | # we are the intermediate child |
| 411 | if opts.pidfile is not None: |
| 412 | print(grandchildpid, file=pidfile_h) |
| 413 | pidfile_h.close() |
| 414 | os._exit(0) |
| 415 | |
| 416 | if opts.pidfile is not None: |
| 417 | pidfile_h.close() |
| 418 | |
| 419 | logger = subprocess.Popen(['logger','-d', |
| 420 | '-t','hippotat[%d](stderr)' % os.getpid(), |
| 421 | '-p',opts.syslogfacility + '.err'], |
| 422 | stdin=subprocess.PIPE, |
| 423 | stdout=subprocess.DEVNULL, |
| 424 | stderr=subprocess.DEVNULL, |
| 425 | restore_signals=True) |
| 426 | |
| 427 | nullfd = os.open('/dev/null', os.O_RDWR) |
| 428 | os.dup2(nullfd, 0) |
| 429 | os.dup2(nullfd, 1) |
| 430 | os.dup2(logger.stdin.fileno(), 2) |
| 431 | os.close(nullfd) |
| 432 | if daemonic_reactor: reactor.afterDaemonize() |
| 433 | log_debug(DBG.INIT, 'daemonised') |
| 434 | os.write(wfd, b'\0') |
| 435 | os.close(wfd) |
| 436 | |
| 437 | if opts.syslogfacility is not None: |
| 438 | glp.removeObserver(hippotatlib.file_log_observer) |
| 439 | |
| 440 | optparser.add_option('--ownsource', default=2, |
| 441 | action='store_const', dest='ownsource', const=2, |
| 442 | help='source download fully enabled (default)') |
| 443 | |
| 444 | optparser.add_option('--ownsource-local', |
| 445 | action='store_const', dest='ownsource', const=1, |
| 446 | help='source download is local source code only') |
| 447 | |
| 448 | optparser.add_option('--no-ownsource', |
| 449 | action='store_const', dest='ownsource', const=0, |
| 450 | help='source download disabled (for testing only)') |
| 451 | |
| 452 | optparser.add_option('--daemon', |
| 453 | action='store_true', dest='daemon', default=False, |
| 454 | help='daemonize (and log to syslog)') |
| 455 | |
| 456 | optparser.add_option('--pidfile', |
| 457 | nargs=1, type='string', |
| 458 | action='store', dest='pidfile', default=None, |
| 459 | help='write pid to this file') |
| 460 | |
| 461 | optparser.add_option('--syslog-facility', |
| 462 | nargs=1, type='string',action='store', |
| 463 | metavar='FACILITY', dest='syslogfacility', |
| 464 | default=None, |
| 465 | help='log to syslog, with specified facility') |
| 466 | |
| 467 | optparser.add_option('--print-config', |
| 468 | nargs=1, type='string',action='store', |
| 469 | metavar='OPTION', dest='printconfig', |
| 470 | default=None, |
| 471 | help='print one config option value and exit') |
| 472 | |
| 473 | common_startup(process_cfg) |
| 474 | catch_termination() |
| 475 | start_http() |
| 476 | daemonise() |
| 477 | ipif = start_ipif(c.ipif_command, (lambda p,s,d: route(p,"[ipif]",s,d))) |
| 478 | common_run() |