#! /usr/bin/python3 ### ### Randomized testing of binary trees ### ### (c) 2024 Straylight/Edgeware ### ###----- Licensing notice --------------------------------------------------- ### ### This file is part of Xyla, a library of binary trees. ### ### Xyla is free software: you can redistribute it and/or modify it under ### the terms of the GNU Lesser General Public License as published by the ### Free Software Foundation; either version 3 of the License, or (at your ### option) any later version. ### ### Xyla is distributed in the hope that it will be useful, but WITHOUT ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public ### License for more details. ### ### You should have received a copy of the GNU Lesser General Public ### License along with Xyla. If not, see . ###-------------------------------------------------------------------------- ### External dependencies. import base64 as B64 import errno as E import io as IO import math as M import optparse as OP import os as OS import random as RND import re as RX import sys as SYS import subprocess as SUB import time as T ###-------------------------------------------------------------------------- ### Preliminaries. SEEDSZ = 32 B32 = 1 << 32 PROG = OS.path.basename(SYS.argv[0]) def base64_encode(buf): ## No, you blundering morons, the result of Base64 encoding is text. return B64.b64encode(buf).decode("us-ascii") def binsearch(list, key, keyfn, lessfn): n = len(list) lo, hi = 0, n while lo < hi: mid = (lo + hi)//2 if lessfn(keyfn(list[mid]), key): lo = mid + 1 else: hi = mid if lo < n and not lessfn(key, keyfn(list[lo])): found = list[lo] else: found = None return found, lo class WeightedChoice (object): def __init__(me, choices): me._choices = [] acc = 0 for wt, opt in choices: acc += wt me._choices.append((acc - 1, opt)) me._total = acc def choose(me, rand): i = rand.randrange(me._total) ch, j = binsearch(me._choices, i, lambda pair: pair[0], lambda x, y: x < y) return me._choices[j][1] ###-------------------------------------------------------------------------- ### Super-stupid reference implementation. class Collection (object): def __init__(me, items): me._set = set(items) me._list = None def _ensure_list(me): if me._list is None: me._list = list(me._set); me._list.sort() def _ensure_set(me): if me._set is None: me._set = set(me._list) def __iter__(me): me._ensure_list(); return iter(me._list) def __len__(me): me._ensure_list(); return len(me._list) def __contains__(me, x): me._ensure_set(); return x in me._set def add(me, x): me._ensure_set(); me._set.add(x); me._list = None def remove(me, x): me._ensure_set(); me._set.remove(x); me._list = None def join(me, x, yy): me._ensure_list(); yy._ensure_list() if x is None: assert not me._list or not yy._list or me._list[-1] < yy._list[0] mid = [] else: assert not me._list or me._list[-1] < x; assert not yy._list or x < yy._list[0] mid = [x] return Collection(me._list + mid + yy._list) def split(me, x): me._ensure_list() x, i = binsearch(me._list, x, lambda x: x, lambda x, y: x < y) if x is None: j = i else: j = i + 1 return Collection(me._list[:i]), x, Collection(me._list[j:]) def unisect(me, yy): me._ensure_set(); yy._ensure_set() return Collection(me._set | yy._set), Collection(me._set&yy._set) def diffsect(me, yy): me._ensure_set(); yy._ensure_set() return Collection(me._set - yy._set), Collection(me._set&yy._set) def lower(me): me._ensure_list() if not me._list: return None else: return me._list[0] def upper(me): me._ensure_list() if not me._list: return None else: return me._list[-1] ###-------------------------------------------------------------------------- ### Options processing. class Options (object): def __init__(me): op = OP.OptionParser\ (usage = "%prog [-c STEPS] [-f FILE] [-l LIMIT] " "[-n STEPS] [-s SEED] [-y STEP] PROG [ARGS ...]") op.disable_interspersed_args() for short, long, kw in \ [("-c", "--ckpt-steps", dict(type = "int", metavar = "STEPS", dest = "ckpt_steps", default = 5000, help = "number of steps between checkpoints")), ("-f", "--ckpt-file", dict(type = "string", metavar = "FILE", dest = "ckpt_file", default = "soak.ckpt", help = "file to hold checkpoint information")), ("-l", "--limit", dict(type = "int", metavar = "LIMIT", dest = "limit", default = None, help = "exclusive limit value to store in test trees")), ("-n", "--steps", dict(type = "int", metavar = "STEPS", dest = "nsteps", default = None, help = "number of steps to run before stopping")), ("-s", "--seed", dict(type = "string", metavar = "SEED", dest = "seed", default = None, help = "force initial seed")), ("-y", "--sync", dict(type = "int", metavar = "STEP", dest = "sync", default = None, help = "check and print state from STEP"))]: op.add_option(short, long, **kw) opts, args = op.parse_args() me.limit = opts.limit me.ckpt_file = opts.ckpt_file me.sync = opts.sync me.ckpt_steps = opts.ckpt_steps me.nsteps = opts.nsteps me.seed = opts.seed if len(args) < 1: op.print_usage(SYS.stderr); SYS.exit(2) me.testprog = args ###-------------------------------------------------------------------------- ### Testing state. class Level (object): def __init__(me, kind, base, limit, tree = "_"): xtree, _ = RX.subn(r"0x[0-9a-f]{8}\$", "", tree) me.coll = Collection(map(int, RX.findall(r"\d+", xtree))) me.kind, me.base, me.limit, me.tree = \ kind, int(base), int(limit), tree.strip() me.rlim = int(M.sqrt(me.limit - me.base)) def write(me, file): file.write("%s %d %d %s\n" % (me.kind, me.base, me.limit, me.tree)) @classmethod def read(cls, file): line = file.readline() if line == "": return None kind, base, limit, tree = line.split(None, 3) return cls(kind, base, limit, tree) class State (object): def __init__(me, opts): me._ckpt_file = opts.ckpt_file try: with open(me._ckpt_file, "r") as f: me.seed, = f.readline().split() stack = [] while True: lv = Level.read(f) if lv is None: break stack.append(lv) assert stack me.cur = stack.pop() me.stack = stack if opts.seed is not None and me.seed != opts.seed: raise ValueError("checkpointed seed `%s' /= " "command-line seed `%s'" % (me.seed, opts.seed)) if opts.limit is not None and me.cur.limit != opts.limit: raise ValueError("checkpointed limit %d /= command-line limit %d" % (me.cur.limit, opts.limit)) except IOError as err: if err.errno != E.ENOENT: raise if opts.seed is not None: me.seed = opts.seed else: me.seed = base64_encode(OS.urandom(SEEDSZ)) if opts.limit is not None: me.limit = opts.limit else: me.limit = 4096 me.stack = [] me.cur = Level('base', 0, me.limit) me.write_ckpt(reseed = False) me.rand = RND.Random(me.seed) n, b = 0, me.cur.limit while True: bb = int(M.sqrt(b)) + 4 if bb >= b: break n, b = n + 1, bb me.stklim = n def push(me, lv): me.stack.append(me.cur) me.cur = lv def pop(me): assert me.stack lv = me.cur me.cur = me.stack.pop() return lv def write_ckpt(me, reseed = True): if reseed: me.seed = base64_encode(bytes(me.rand.randrange(256) for _ in range(SEEDSZ))) me.rand.seed(me.seed) new = me._ckpt_file + ".new" with open(new, "w") as f: f.write("%s\n" % me.seed) for lv in me.stack: lv.write(f) me.cur.write(f) try: OS.rename(new, me._ckpt_file) except OSError as err: if err.errno == E.EEXIST: OS.unlink(me._ckpt_file); OS.rename(new, me._ckpt_file) def clear_ckpt(me): try: OS.unlink(me._ckpt_file) except OSError as err: if err.errno == E.ENOENT: pass else: raise ###-------------------------------------------------------------------------- ### Main machinery. def choices(): ch = [(896, "addrm1"), (56, "addrmn"), (56, "lookup")] sp = len(ST.stack) ch += [(ST.stklim - sp, "split"), (ST.stklim - sp, "push")] if ST.cur.kind == "join": ch += [(sp, "join0"), (sp, "join1")] elif ST.cur.kind == "setop": ch += [(sp, "unisect"), (sp, "diffsect")] elif ST.cur.kind == "base": pass else: raise ValueError("unknown level kind `%s'" % ST.cur.kind) return WeightedChoice(ch) OPTS = Options() ST = State(OPTS) KID = SUB.Popen(OPTS.testprog, universal_newlines = True, stdin = SUB.PIPE, stdout = SUB.PIPE) SYNC = False def fail(msg): SYS.stderr.write("%s: FAILED: %s\n" % (PROG, msg)) SYS.stderr.write("%s: step = %d\n" % (PROG, CSTEP)) KID.stdin.close() KID.stdout.close() rc = KID.wait() SYS.stderr.write("%s: exit status = %d\n" % (PROG, rc)) SYS.exit(2) def put(msg, echo = True): try: KID.stdin.write(msg); KID.stdin.flush() except IOError as err: fail("write failed: %s" % err) if SYNC and echo: SYS.stdout.write("$ " + msg); SYS.stdout.flush() def get(echo = True): try: line = KID.stdout.readline() except IOError as err: fail("read failed: %s" % err) if line == "": fail("unexpected end of file") if SYNC and echo: SYS.stdout.write(line) if line[-1] == "\n": return line[:-1] else: return line def dump_tree(): if SYNC: put("D\n:;;END DUMP\n", echo = False) while True: line = get(echo = False) if line == ";;END DUMP": break SYS.stdout.write(line); SYS.stdout.write("\n") def check_tree(): put("K\n", echo = False) line = get(echo = False) if ST.cur.coll: ref = " ".join("%d" % i for i in ST.cur.coll) else: ref = "(empty tree)" if line != ref: fail("iteration mismatch: %s /= %s" % (line, ref)) def snapshot(): put("L\n", echo = False) ST.cur.tree = get(echo = False) STEP = CSTEP = 0 for lv in ST.stack: put("= %s\n" % lv.tree) dump_tree() put("(\n") put("= %s\n" % ST.cur.tree) dump_tree() put("S 0x%08x\n" % ST.rand.randrange(B32)) ch = choices() while OPTS.nsteps is None or STEP < OPTS.nsteps: if OPTS.sync is not None and OPTS.sync == STEP: SYNC = True OPTS.sync = None snapshot() SYS.stdout.write("\n;; initial stack\n") for i, lv in enumerate(ST.stack): SYS.stdout.write(";; %3d = %s\n" % (i, lv.tree)) SYS.stdout.write(";; TOP = %s\n" % ST.cur.tree) check_tree() if SYNC: SYS.stdout.write("\n;; step %d\n" % CSTEP) op = ch.choose(ST.rand) if op == "addrm1": k = ST.rand.randrange(ST.cur.base, ST.cur.limit) if k in ST.cur.coll: ST.cur.coll.remove(k); put("%d-\n" % k) else: ST.cur.coll.add(k); put("%d+\n" % k) elif op == "addrmn": n = ST.rand.randrange(ST.cur.rlim) i = ST.rand.randrange(ST.cur.base, ST.cur.limit - n) m = i + n//2 foundp = m in ST.cur.coll dir = ST.rand.choice([+1, -1]) rr = range(i, i + n) if dir < 0: rr = reversed(rr) firstp = True buf = IO.StringIO() if foundp: for j in rr: if j not in ST.cur.coll: continue if firstp: firstp = False else: buf.write(u" ") ST.cur.coll.remove(j); buf.write(u"%d-" % j) else: for j in rr: if j in ST.cur.coll: continue if firstp: firstp = False else: buf.write(u" ") ST.cur.coll.add(j); buf.write(u"%d+" % j) if not firstp: buf.write(u"\n") put(buf.getvalue()) elif op == "lookup": k = ST.rand.randrange(ST.cur.base, ST.cur.limit) put("%d? n\n" % k) line = get() if line == "(nil)": if k in ST.cur.coll: fail("key %d unexpectedly missing" % k) else: m = RX.match(r"^#", line) if m: kk = int(m.group(1)) if kk != k: fail("search for key %d found %d instead" % (k, kk)) elif k not in ST.cur.coll: fail("key %d unexpectedly found" % k) else: fail("unexpected response to lookup: `%s'" % line) elif op == "split": check_tree() k = ST.rand.randrange(ST.cur.base, ST.cur.limit - ST.cur.rlim - 4) put("%d@ /\n" % k) left, mid, right = ST.cur.coll.split(k) ST.cur.coll = left old_limit, ST.cur.limit = ST.cur.limit, k ST.push(Level("split.mid", k, k + 1, str(mid))) ST.push(Level("split.right", k + 1, old_limit, " ".join("%d" % x for x in right))) dump_tree(); check_tree(); put(")\n"); ST.pop() dump_tree(); check_tree(); put(")\n"); ST.pop() dump_tree(); check_tree(); snapshot() put("(\n") if ST.cur.coll: new_base = ST.cur.coll.upper() + 2 else: new_base = ST.cur.base + 2 ST.push(Level("join", new_base, old_limit, "_")) ch = choices() elif op == "push": check_tree(); snapshot() put("(\n"); ST.push(Level("setop", ST.cur.base, ST.cur.limit, "_")) ST.stack[-1].limit = ST.cur.limit ch = choices() elif op == "join0": lower = ST.stack[-1].coll put("* ~\n") ST.stack[-1].coll = lower.join(None, ST.cur.coll) ST.stack[-1].limit = ST.cur.limit ST.pop() ch = choices() elif op == "join1": lower = ST.stack[-1].coll if lower: base = lower.upper() + 1 else: base = ST.stack[-1].base + 1 if ST.cur.coll: limit = ST.cur.coll.lower() else: limit = ST.cur.limit k = ST.rand.randrange(base, limit) put("%d ~\n" % k) ST.stack[-1].coll = lower.join(k, ST.cur.coll) ST.stack[-1].limit = ST.cur.limit ST.pop() ch = choices() elif op == "unisect": put("|\n") ST.cur.coll, ST.stack[-1].coll = ST.cur.coll.unisect(ST.stack[-1].coll) dump_tree(); check_tree(); put(")\n"); ST.pop() ch = choices() elif op == "diffsect": put("\\\n") diff, ST.cur.coll = ST.cur.coll.diffsect(ST.stack[-1].coll) ST.push(Level("diffsect.diff", ST.cur.base, ST.cur.limit, " ".join("%d" % x for x in diff))) dump_tree(); check_tree(); put(")\n"); ST.pop() dump_tree(); check_tree(); put(")\n"); ST.pop() ch = choices() else: raise ValueError("unexpected operation `%s'" % op) STEP += 1; CSTEP += 1 dump_tree() if SYNC or CSTEP == OPTS.ckpt_steps: check_tree() if CSTEP == OPTS.ckpt_steps: snapshot() ST.write_ckpt() CSTEP = 0; SYNC = False put("S 0x%08x\n" % ST.rand.randrange(B32)) while True: check_tree() if not ST.stack: break put(")\n") ST.pop() dump_tree() check_tree() ST.clear_ckpt() ###----- That's all, folks --------------------------------------------------