### -*-python-*- ### ### CGI machinery ### ### (c) 2013 Mark Wooding ### ###----- Licensing notice --------------------------------------------------- ### ### This file is part of Chopwood: a password-changing service. ### ### Chopwood is free software; you can redistribute it and/or modify ### it under the terms of the GNU Affero General Public License as ### published by the Free Software Foundation; either version 3 of the ### License, or (at your option) any later version. ### ### Chopwood is distributed in the hope that it will be useful, ### but WITHOUT ANY WARRANTY; without even the implied warranty of ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ### GNU Affero General Public License for more details. ### ### You should have received a copy of the GNU Affero General Public ### License along with Chopwood; if not, see ### . from __future__ import with_statement import contextlib as CTX import os as OS; ENV = OS.environ import re as RX import sys as SYS import time as T import traceback as TB from auto import HOME, PACKAGE, VERSION import config as CONF; CFG = CONF.CFG import format as F import output as O; OUT = O.OUT; PRINT = O.PRINT import subcommand as SC import util as U ###-------------------------------------------------------------------------- ### Configuration tweaks. _script_name = ENV.get('SCRIPT_NAME', '/cgi-bin/chpwd') CONF.DEFAULTS.update( ## The URL of this program, when it's run through CGI. SCRIPT_NAME = _script_name, ## A (maybe relative) URL for static content. By default this comes from ## the main script, but we hope that user agents cache it. STATIC = _script_name + '/static') ###-------------------------------------------------------------------------- ### Escaping and encoding. ## Some handy regular expressions. R_URLESC = RX.compile('%([0-9a-fA-F]{2})') R_URLBAD = RX.compile('[^-\\w,.!]') R_HTMLBAD = RX.compile('[&<>\'"]') def urldecode(s): """Decode a single form-url-encoded string S.""" return R_URLESC.sub(lambda m: chr(int(m.group(1), 16)), s.replace('+', ' ')) return s def urlencode(s): """Encode a single string S using form-url-encoding.""" return R_URLBAD.sub(lambda m: '%%%02x' % ord(m.group(0)), s) def htmlescape(s): """Escape a literal string S so that HTML doesn't misinterpret it.""" return R_HTMLBAD.sub(lambda m: '&#x%02x;' % ord(m.group(0)), s) ## Some standard character sequences, and HTML entity names for prettier ## versions. html_quotify = U.StringSubst({ "<": '<', ">": '>', "&": '&', "`": '‘', "'": '’', '"': '"', "``": '“', "''": '”', "--": '–', "---": '—' }) ###-------------------------------------------------------------------------- ### Output machinery. class HTTPOutput (O.FileOutput): """ Output driver providing an automatic HTTP header. The `headerp' attribute is true if we've written a header. The `header' method will print a custom header if this is wanted. """ def __init__(me, *args, **kw): """Constructor: initialize `headerp' flag.""" super(HTTPOutput, me).__init__(*args, **kw) me.headerp = False def write(me, msg): """Output protocol: print a header if we've not written one already.""" if not me.headerp: me.header('text/plain') super(HTTPOutput, me).write(msg) def header(me, content_type = 'text/plain', **kw): """ Print a header, if none has yet been printed. Keyword arguments can be passed to emit HTTP headers: see `http_header' for the formatting rules. """ if me.headerp: return me.headerp = True for h in O.http_headers(content_type = content_type, **kw): me.writeln(h) me.writeln('') def cookie(name, value, **kw): """ Return a HTTP `Set-Cookie' header. The NAME and VALUE give the name and value of the cookie; both are form-url-encoded to prevent misinterpretation (fortunately, `cgiparse' knows to undo this transformation). The KW are other attributes to declare: the names are forced to lower-case and underscores `_' are replaced by hyphens `-'; a `True' value is assumed to indicate that the attribute is boolean, and omitted. """ attr = {} for k, v in kw.iteritems(): k = '-'.join(i.lower() for i in k.split('_')) attr[k] = v try: maxage = int(attr['max-age']) except KeyError: pass else: attr['expires'] = T.strftime('%a, %d %b %Y %H:%M:%S GMT', T.gmtime(U.NOW + maxage)) return '; '.join(['%s=%s' % (urlencode(name), urlencode(value))] + [v is not True and '%s=%s' % (k, v) or k for k, v in attr.iteritems() if v]) def action(*v, **kw): """ Build a URL invoking this script. The positional arguments V are used to construct a path which is appended to the (deduced or configured) script name (and presumably will be read back as `PATH_INFO'). The keyword arguments are (form-url-encoded and) appended as a query string, if present. """ url = '/'.join([CFG.SCRIPT_NAME] + list(v)) if kw: url += '?' + ';'.join('%s=%s' % (urlencode(k), urlencode(kw[k])) for k in sorted(kw)) return htmlescape(url) def static(name): """Build a URL for the static file NAME.""" return htmlescape(CFG.STATIC + '/' + name) def redirect(where, **kw): """ Write a complete redirection to some other URL. """ OUT.header(content_type = 'text/html', status = 302, location = where, **kw) PRINT("""\ No, sorry, it's moved again.

I'm over here now. """ % htmlescape(where)) ###-------------------------------------------------------------------------- ### Templates. ## Where we find our templates. TMPLDIR = HOME ## Keyword arguments for templates. STATE = U.Fluid() STATE.kw = {} ## Set some basic keyword arguments. @CONF.hook def set_template_keywords(): STATE.kw.update( package = PACKAGE, version = VERSION, script = CFG.SCRIPT_NAME, static = CFG.STATIC, allowop = CFG.ALLOWOP) class TemplateFinder (object): """ A magical fake dictionary whose keys are templates. """ def __init__(me, dir): me._cache = {} me._dir = dir def __getitem__(me, key): try: return me._cache[key] except KeyError: pass with open(OS.path.join(me._dir, key)) as f: tmpl = f.read() me._cache[key] = tmpl return tmpl STATE.kw['TMPL'] = TMPL = TemplateFinder(TMPLDIR) @CTX.contextmanager def tmplkw(**kw): """ Context manager: execute the body with additional keyword arguments """ d = dict() d.update(STATE.kw) d.update(kw) with STATE.bind(kw = d): yield FORMATOPS = {} class FormatHTML (F.SimpleFormatOperation): """ ~H: escape output suitable for inclusion in HTML. With `:', additionally apply quotification. """ def _convert(me, arg): if me.colonp: return html_quotify(arg) else: return htmlescape(arg) FORMATOPS['H'] = FormatHTML class FormatWrap (F.BaseFormatOperation): """ ~<...~@>: wrap enclosed material in another formatting control string. The argument is a formatting control. The enclosed material is split into pieces separated by `~;' markers. The formatting control is performed, and passed the list of pieces (as compiled formatting operations) in the keyword argument `wrapped'. """ def __init__(me, *args): super(FormatWrap, me).__init__(*args) pieces = [] while True: piece, delim = F.collect_subformat('>;') pieces.append(piece) if delim.char == '>': break me.pieces = pieces def _format(me, atp, colonp): op = F.compile(me.getarg.get()) with F.FORMAT.bind(argmap = dict(F.FORMAT.argmap, wrapped = me.pieces)): op.format() FORMATOPS['<'] = FormatWrap def format_tmpl(control, **kw): with F.COMPILE.bind(opmaps = [FORMATOPS, F.BASEOPS]): with tmplkw(**kw): F.format(OUT, control, **STATE.kw) def page(template, header = {}, title = 'Chopwood', **kw): header = dict(header, content_type = 'text/html') OUT.header(**header) format_tmpl(TMPL['wrapper.fhtml'], title = title, payload = TMPL[template], **kw) ###-------------------------------------------------------------------------- ### Error reporting. @CTX.contextmanager def cgi_errors(hook = None): """ Context manager: report errors in the body as useful HTML. If HOOK is given, then call it before reporting errors. It may have set up useful stuff. """ try: yield None except Exception, e: if hook: hook() if isinstance(e, U.ExpectedError) and not OUT.headerp: page('error.fhtml', header = dict(status = e.code), title = 'Chopwood: error', error = e) else: exty, exval, extb = SYS.exc_info() with tmplkw(exception = TB.format_exception_only(exty, exval), traceback = TB.extract_tb(extb), PARAM = sorted(PARAM), COOKIE = sorted(COOKIE.items()), PATH = PATH, ENV = sorted(ENV.items())): if OUT.headerp: format_tmpl(TMPL['exception.fhtml'], toplevel = False) else: page('exception.fhtml', header = dict(status = 500), title = 'Chopwood: internal error', toplevel = True) ###-------------------------------------------------------------------------- ### CGI input. ## Lots of global variables to be filled in by `cgiparse'. COOKIE = {} SPECIAL = {} PARAM = [] PARAMDICT = {} PATH = [] SSLP = False ## Regular expressions for splitting apart query and cookie strings. R_QSPLIT = RX.compile('[;&]') R_CSPLIT = RX.compile(';') def split_keyvalue(string, delim, default): """ Split a STRING, and generate the resulting KEY=VALUE pairs. The string is split at DELIM; the components are parsed into KEY[=VALUE] pairs. The KEYs and VALUEs are stripped of leading and trailing whitespace, and form-url-decoded. If the VALUE is omitted, then the DEFAULT is used unless the DEFAULT is `None' in which case the component is simply ignored. """ for kv in delim.split(string): try: k, v = kv.split('=', 1) except ValueError: if default is None: continue else: k, v = kv, default k, v = k.strip(), v.strip() if not k: continue k, v = urldecode(k), urldecode(v) yield k, v def cgiparse(): """ Process all of the various exciting CGI environment variables. We read environment variables and populate some tables left in global variables: it's all rather old-school. Variables set are as follows. `COOKIE' A dictionary mapping cookie names to the values provided by the user agent. `SPECIAL' A dictionary holding some special query parameters which are of interest at a global level, and should not be passed to a subcommand handler. No new entries will be added to this dictionary, though values will be modified to reflect the query parameters discovered. Conventionally, such parameters have names beginning with `%'. `PARAM' The query parameters as a list of (KEY, VALUE) pairs. Special parameters are omitted. `PARAMDICT' The query parameters as a dictionary. Special parameters, and parameters which appear more than once, are omitted. `PATH' The trailing `PATH_INFO' path, split at `/' markers, with any trailing empty component removed. `SSLP' True if the client connection is carried over SSL or TLS. """ global SSLP def getenv(var): try: return ENV[var] except KeyError: raise U.ExpectedError, (500, "No `%s' supplied" % var) ## Yes, we want the request method. method = getenv('REQUEST_METHOD') ## Acquire the query string. if method == 'GET': q = getenv('QUERY_STRING') elif method == 'POST': ## We must read the query string from stdin. n = getenv('CONTENT_LENGTH') if not n.isdigit(): raise U.ExpectedError, (500, "Invalid CONTENT_LENGTH") n = int(n, 10) if getenv('CONTENT_TYPE') != 'application/x-www-form-urlencoded': raise U.ExpectedError, (500, "Unexpected content type `%s'" % ct) q = SYS.stdin.read(n) if len(q) != n: raise U.ExpectedError, (500, "Failed to read correct length") else: raise U.ExpectedError, (500, "Unexpected request method `%s'" % method) ## Populate the `SPECIAL', `PARAM' and `PARAMDICT' tables. seen = set() for k, v in split_keyvalue(q, R_QSPLIT, 't'): if k in SPECIAL: SPECIAL[k] = v else: PARAM.append((k, v)) if k in seen: del PARAMDICT[k] else: PARAMDICT[k] = v seen.add(k) ## Parse out the cookies, if any. try: c = ENV['HTTP_COOKIE'] except KeyError: pass else: for k, v in split_keyvalue(c, R_CSPLIT, None): COOKIE[k] = v ## Set up the `PATH'. try: p = ENV['PATH_INFO'] except KeyError: pass else: pp = p.lstrip('/').split('/') if pp and not pp[-1]: pp.pop() PATH[:] = pp ## Check the crypto for the connection. if ENV.get('SSL_PROTOCOL'): SSLP = True ###-------------------------------------------------------------------------- ### CGI subcommands. class Subcommand (SC.Subcommand): """ A CGI subcommand object. As for `subcommand.Subcommand', but with additional protocol for processing CGI parameters. """ def cgi(me, param, path): """ Invoke the subcommand given a collection of CGI parameters. PARAM is a list of (KEY, VALUE) pairs from the CGI query. The CGI query parameters are checked against the subcommand's parameters (making sure that mandatory parameters are supplied, that any switches are given boolean values, and that only the `rest' parameter, if any, is duplicated). PATH is a list of trailing path components. They are used to satisfy the `rest' parameter if there is one and there are no query parameters which satisfy the `rest' parameter; otherwise, an `ExpectedError' is raised if the list of path elements is non-empty. """ ## We're going to make a pass over the supplied parameters, and we'll ## check them off against the formal parameters as we go; so we'll need ## to be able to look them up. We'll also keep track of the ones we've ## seen so that we can make sure that all of the mandatory parameters ## were actually supplied. ## ## To that end: `want' is a dictionary mapping parameter names to ## functions which will do something useful with the value; `seen' is a ## set of the parameters which have been assigned; and `kw' is going to ## be the keyword-argument dictionary we pass to the handler function. want = {} kw = {} def set_value(k, v): """Set a simple value: we shouldn't see multiple values.""" if k in kw: raise U.ExpectedError, (400, "Repeated parameter `%s'" % k) kw[k] = v def set_bool(k, v): """Set a simple boolean value: for switches.""" set_value(k, v.lower() in ['true', 't', 'yes', 'y']) def set_list(k, v): """Append the value to a list: for the `rest' parameter.""" kw.setdefault(k, []).append(v) ## Set up the `want' map. for o in me.opts: if o.argname: want[o.name] = set_value else: want[o.name] = set_bool for p in me.params: want[p.name] = set_value for p in me.oparams: want[p.name] = set_value if me.rparam: want[me.rparam.name] = set_list ## Work through the list of supplied parameters. for k, v in param: try: f = want[k] except KeyError: if v: raise U.ExpectedError, (400, "Unexpected parameter `%s'" % k) else: f(k, v) ## Deal with a path, if there is one. if path: if me.rparam and me.rparam.name not in kw: kw[me.rparam.name] = path else: raise U.ExpectedError, (404, "Superfluous path elements") ## Make sure we saw all of the mandatory parameters. for p in me.params: if p.name not in kw: raise U.ExpectedError, (400, "Missing parameter `%s'" % p.name) ## Invoke the subcommand. me.func(**kw) def subcommand(name, contexts, desc, cls = Subcommand, *args, **kw): """Decorator for defining CGI subcommands.""" return SC.subcommand(name, contexts, desc, cls = cls, *args, **kw) ###----- That's all, folks --------------------------------------------------