#! /usr/bin/python3
###
### Randomized testing of binary trees
###
### (c) 2024 Straylight/Edgeware
###
###----- Licensing notice ---------------------------------------------------
###
### This file is part of Xyla, a library of binary trees.
###
### Xyla is free software: you can redistribute it and/or modify it under
### the terms of the GNU Lesser General Public License as published by the
### Free Software Foundation; either version 3 of the License, or (at your
### option) any later version.
###
### Xyla is distributed in the hope that it will be useful, but WITHOUT
### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
### FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
### License for more details.
###
### You should have received a copy of the GNU Lesser General Public
### License along with Xyla. If not, see .
###--------------------------------------------------------------------------
### External dependencies.
import base64 as B64
import errno as E
import io as IO
import math as M
import optparse as OP
import os as OS
import random as RND
import re as RX
import sys as SYS
import subprocess as SUB
import time as T
###--------------------------------------------------------------------------
### Preliminaries.
SEEDSZ = 32
B32 = 1 << 32
PROG = OS.path.basename(SYS.argv[0])
def base64_encode(buf):
## No, you blundering morons, the result of Base64 encoding is text.
return B64.b64encode(buf).decode("us-ascii")
def binsearch(list, key, keyfn, lessfn):
n = len(list)
lo, hi = 0, n
while lo < hi:
mid = (lo + hi)//2
if lessfn(keyfn(list[mid]), key): lo = mid + 1
else: hi = mid
if lo < n and not lessfn(key, keyfn(list[lo])): found = list[lo]
else: found = None
return found, lo
class WeightedChoice (object):
def __init__(me, choices):
me._choices = []
acc = 0
for wt, opt in choices:
acc += wt
me._choices.append((acc - 1, opt))
me._total = acc
def choose(me, rand):
i = rand.randrange(me._total)
ch, j = binsearch(me._choices, i,
lambda pair: pair[0], lambda x, y: x < y)
return me._choices[j][1]
###--------------------------------------------------------------------------
### Super-stupid reference implementation.
class Collection (object):
def __init__(me, items):
me._set = set(items)
me._list = None
def _ensure_list(me):
if me._list is None: me._list = list(me._set); me._list.sort()
def _ensure_set(me):
if me._set is None: me._set = set(me._list)
def __iter__(me):
me._ensure_list(); return iter(me._list)
def __len__(me):
me._ensure_list(); return len(me._list)
def __contains__(me, x):
me._ensure_set(); return x in me._set
def add(me, x):
me._ensure_set(); me._set.add(x); me._list = None
def remove(me, x):
me._ensure_set(); me._set.remove(x); me._list = None
def join(me, x, yy):
me._ensure_list(); yy._ensure_list()
if x is None:
assert not me._list or not yy._list or me._list[-1] < yy._list[0]
mid = []
else:
assert not me._list or me._list[-1] < x;
assert not yy._list or x < yy._list[0]
mid = [x]
return Collection(me._list + mid + yy._list)
def split(me, x):
me._ensure_list()
x, i = binsearch(me._list, x, lambda x: x, lambda x, y: x < y)
if x is None: j = i
else: j = i + 1
return Collection(me._list[:i]), x, Collection(me._list[j:])
def unisect(me, yy):
me._ensure_set(); yy._ensure_set()
return Collection(me._set | yy._set), Collection(me._set&yy._set)
def diffsect(me, yy):
me._ensure_set(); yy._ensure_set()
return Collection(me._set - yy._set), Collection(me._set&yy._set)
def lower(me):
me._ensure_list()
if not me._list: return None
else: return me._list[0]
def upper(me):
me._ensure_list()
if not me._list: return None
else: return me._list[-1]
###--------------------------------------------------------------------------
### Options processing.
class Options (object):
def __init__(me):
op = OP.OptionParser\
(usage = "%prog [-c STEPS] [-f FILE] [-l LIMIT] "
"[-n STEPS] [-s SEED] [-y STEP] PROG [ARGS ...]")
op.disable_interspersed_args()
for short, long, kw in \
[("-c", "--ckpt-steps",
dict(type = "int", metavar = "STEPS",
dest = "ckpt_steps", default = 5000,
help = "number of steps between checkpoints")),
("-f", "--ckpt-file",
dict(type = "string", metavar = "FILE",
dest = "ckpt_file", default = "soak.ckpt",
help = "file to hold checkpoint information")),
("-l", "--limit",
dict(type = "int", metavar = "LIMIT",
dest = "limit", default = None,
help = "exclusive limit value to store in test trees")),
("-n", "--steps",
dict(type = "int", metavar = "STEPS",
dest = "nsteps", default = None,
help = "number of steps to run before stopping")),
("-s", "--seed",
dict(type = "string", metavar = "SEED",
dest = "seed", default = None,
help = "force initial seed")),
("-y", "--sync",
dict(type = "int", metavar = "STEP",
dest = "sync", default = None,
help = "check and print state from STEP"))]:
op.add_option(short, long, **kw)
opts, args = op.parse_args()
me.limit = opts.limit
me.ckpt_file = opts.ckpt_file
me.sync = opts.sync
me.ckpt_steps = opts.ckpt_steps
me.nsteps = opts.nsteps
me.seed = opts.seed
if len(args) < 1: op.print_usage(SYS.stderr); SYS.exit(2)
me.testprog = args
###--------------------------------------------------------------------------
### Testing state.
class Level (object):
def __init__(me, kind, base, limit, tree = "_"):
xtree, _ = RX.subn(r"0x[0-9a-f]{8}\$", "", tree)
me.coll = Collection(map(int, RX.findall(r"\d+", xtree)))
me.kind, me.base, me.limit, me.tree = \
kind, int(base), int(limit), tree.strip()
me.rlim = int(M.sqrt(me.limit - me.base))
def write(me, file):
file.write("%s %d %d %s\n" % (me.kind, me.base, me.limit, me.tree))
@classmethod
def read(cls, file):
line = file.readline()
if line == "": return None
kind, base, limit, tree = line.split(None, 3)
return cls(kind, base, limit, tree)
class State (object):
def __init__(me, opts):
me._ckpt_file = opts.ckpt_file
try:
with open(me._ckpt_file, "r") as f:
me.seed, = f.readline().split()
stack = []
while True:
lv = Level.read(f)
if lv is None: break
stack.append(lv)
assert stack
me.cur = stack.pop()
me.stack = stack
if opts.seed is not None and me.seed != opts.seed:
raise ValueError("checkpointed seed `%s' /= "
"command-line seed `%s'" % (me.seed, opts.seed))
if opts.limit is not None and me.cur.limit != opts.limit:
raise ValueError("checkpointed limit %d /= command-line limit %d" %
(me.cur.limit, opts.limit))
except IOError as err:
if err.errno != E.ENOENT: raise
if opts.seed is not None: me.seed = opts.seed
else: me.seed = base64_encode(OS.urandom(SEEDSZ))
if opts.limit is not None: me.limit = opts.limit
else: me.limit = 4096
me.stack = []
me.cur = Level('base', 0, me.limit)
me.write_ckpt(reseed = False)
me.rand = RND.Random(me.seed)
n, b = 0, me.cur.limit
while True:
bb = int(M.sqrt(b)) + 4
if bb >= b: break
n, b = n + 1, bb
me.stklim = n
def push(me, lv):
me.stack.append(me.cur)
me.cur = lv
def pop(me):
assert me.stack
lv = me.cur
me.cur = me.stack.pop()
return lv
def write_ckpt(me, reseed = True):
if reseed:
me.seed = base64_encode(bytes(me.rand.randrange(256)
for _ in range(SEEDSZ)))
me.rand.seed(me.seed)
new = me._ckpt_file + ".new"
with open(new, "w") as f:
f.write("%s\n" % me.seed)
for lv in me.stack: lv.write(f)
me.cur.write(f)
try: OS.rename(new, me._ckpt_file)
except OSError as err:
if err.errno == E.EEXIST:
OS.unlink(me._ckpt_file); OS.rename(new, me._ckpt_file)
def clear_ckpt(me):
try: OS.unlink(me._ckpt_file)
except OSError as err:
if err.errno == E.ENOENT: pass
else: raise
###--------------------------------------------------------------------------
### Main machinery.
def choices():
ch = [(896, "addrm1"),
(56, "addrmn"),
(56, "lookup")]
sp = len(ST.stack)
ch += [(ST.stklim - sp, "split"),
(ST.stklim - sp, "push")]
if ST.cur.kind == "join":
ch += [(sp, "join0"), (sp, "join1")]
elif ST.cur.kind == "setop":
ch += [(sp, "unisect"), (sp, "diffsect")]
elif ST.cur.kind == "base":
pass
else:
raise ValueError("unknown level kind `%s'" % ST.cur.kind)
return WeightedChoice(ch)
OPTS = Options()
ST = State(OPTS)
KID = SUB.Popen(OPTS.testprog, universal_newlines = True,
stdin = SUB.PIPE, stdout = SUB.PIPE)
SYNC = False
def fail(msg):
SYS.stderr.write("%s: FAILED: %s\n" % (PROG, msg))
SYS.stderr.write("%s: step = %d\n" % (PROG, CSTEP))
KID.stdin.close()
KID.stdout.close()
rc = KID.wait()
SYS.stderr.write("%s: exit status = %d\n" % (PROG, rc))
SYS.exit(2)
def put(msg, echo = True):
try: KID.stdin.write(msg); KID.stdin.flush()
except IOError as err: fail("write failed: %s" % err)
if SYNC and echo: SYS.stdout.write("$ " + msg); SYS.stdout.flush()
def get(echo = True):
try: line = KID.stdout.readline()
except IOError as err: fail("read failed: %s" % err)
if line == "": fail("unexpected end of file")
if SYNC and echo: SYS.stdout.write(line)
if line[-1] == "\n": return line[:-1]
else: return line
def dump_tree():
if SYNC:
put("D\n:;;END DUMP\n", echo = False)
while True:
line = get(echo = False)
if line == ";;END DUMP": break
SYS.stdout.write(line); SYS.stdout.write("\n")
def check_tree():
put("K\n", echo = False)
line = get(echo = False)
if ST.cur.coll: ref = " ".join("%d" % i for i in ST.cur.coll)
else: ref = "(empty tree)"
if line != ref: fail("iteration mismatch: %s /= %s" % (line, ref))
def snapshot():
put("L\n", echo = False)
ST.cur.tree = get(echo = False)
STEP = CSTEP = 0
for lv in ST.stack:
put("= %s\n" % lv.tree)
dump_tree()
put("(\n")
put("= %s\n" % ST.cur.tree)
dump_tree()
put("S 0x%08x\n" % ST.rand.randrange(B32))
ch = choices()
while OPTS.nsteps is None or STEP < OPTS.nsteps:
if OPTS.sync is not None and OPTS.sync == STEP:
SYNC = True
OPTS.sync = None
snapshot()
SYS.stdout.write("\n;; initial stack\n")
for i, lv in enumerate(ST.stack):
SYS.stdout.write(";; %3d = %s\n" % (i, lv.tree))
SYS.stdout.write(";; TOP = %s\n" % ST.cur.tree)
check_tree()
if SYNC: SYS.stdout.write("\n;; step %d\n" % CSTEP)
op = ch.choose(ST.rand)
if op == "addrm1":
k = ST.rand.randrange(ST.cur.base, ST.cur.limit)
if k in ST.cur.coll: ST.cur.coll.remove(k); put("%d-\n" % k)
else: ST.cur.coll.add(k); put("%d+\n" % k)
elif op == "addrmn":
n = ST.rand.randrange(ST.cur.rlim)
i = ST.rand.randrange(ST.cur.base, ST.cur.limit - n)
m = i + n//2
foundp = m in ST.cur.coll
dir = ST.rand.choice([+1, -1])
rr = range(i, i + n)
if dir < 0: rr = reversed(rr)
firstp = True
buf = IO.StringIO()
if foundp:
for j in rr:
if j not in ST.cur.coll: continue
if firstp: firstp = False
else: buf.write(u" ")
ST.cur.coll.remove(j); buf.write(u"%d-" % j)
else:
for j in rr:
if j in ST.cur.coll: continue
if firstp: firstp = False
else: buf.write(u" ")
ST.cur.coll.add(j); buf.write(u"%d+" % j)
if not firstp: buf.write(u"\n")
put(buf.getvalue())
elif op == "lookup":
k = ST.rand.randrange(ST.cur.base, ST.cur.limit)
put("%d? n\n" % k)
line = get()
if line == "(nil)":
if k in ST.cur.coll: fail("key %d unexpectedly missing" % k)
else:
m = RX.match(r"^#", line)
if m:
kk = int(m.group(1))
if kk != k: fail("search for key %d found %d instead" % (k, kk))
elif k not in ST.cur.coll: fail("key %d unexpectedly found" % k)
else:
fail("unexpected response to lookup: `%s'" % line)
elif op == "split":
check_tree()
k = ST.rand.randrange(ST.cur.base, ST.cur.limit - ST.cur.rlim - 4)
put("%d@ /\n" % k)
left, mid, right = ST.cur.coll.split(k)
ST.cur.coll = left
old_limit, ST.cur.limit = ST.cur.limit, k
ST.push(Level("split.mid", k, k + 1, str(mid)))
ST.push(Level("split.right", k + 1, old_limit,
" ".join("%d" % x for x in right)))
dump_tree(); check_tree(); put(")\n"); ST.pop()
dump_tree(); check_tree(); put(")\n"); ST.pop()
dump_tree(); check_tree(); snapshot()
put("(\n")
if ST.cur.coll: new_base = ST.cur.coll.upper() + 2
else: new_base = ST.cur.base + 2
ST.push(Level("join", new_base, old_limit, "_"))
ch = choices()
elif op == "push":
check_tree(); snapshot()
put("(\n"); ST.push(Level("setop", ST.cur.base, ST.cur.limit, "_"))
ST.stack[-1].limit = ST.cur.limit
ch = choices()
elif op == "join0":
lower = ST.stack[-1].coll
put("* ~\n")
ST.stack[-1].coll = lower.join(None, ST.cur.coll)
ST.stack[-1].limit = ST.cur.limit
ST.pop()
ch = choices()
elif op == "join1":
lower = ST.stack[-1].coll
if lower: base = lower.upper() + 1
else: base = ST.stack[-1].base + 1
if ST.cur.coll: limit = ST.cur.coll.lower()
else: limit = ST.cur.limit
k = ST.rand.randrange(base, limit)
put("%d ~\n" % k)
ST.stack[-1].coll = lower.join(k, ST.cur.coll)
ST.stack[-1].limit = ST.cur.limit
ST.pop()
ch = choices()
elif op == "unisect":
put("|\n")
ST.cur.coll, ST.stack[-1].coll = ST.cur.coll.unisect(ST.stack[-1].coll)
dump_tree(); check_tree(); put(")\n"); ST.pop()
ch = choices()
elif op == "diffsect":
put("\\\n")
diff, ST.cur.coll = ST.cur.coll.diffsect(ST.stack[-1].coll)
ST.push(Level("diffsect.diff", ST.cur.base, ST.cur.limit,
" ".join("%d" % x for x in diff)))
dump_tree(); check_tree(); put(")\n"); ST.pop()
dump_tree(); check_tree(); put(")\n"); ST.pop()
ch = choices()
else:
raise ValueError("unexpected operation `%s'" % op)
STEP += 1; CSTEP += 1
dump_tree()
if SYNC or CSTEP == OPTS.ckpt_steps: check_tree()
if CSTEP == OPTS.ckpt_steps:
snapshot()
ST.write_ckpt()
CSTEP = 0; SYNC = False
put("S 0x%08x\n" % ST.rand.randrange(B32))
while True:
check_tree()
if not ST.stack: break
put(")\n")
ST.pop()
dump_tree()
check_tree()
ST.clear_ckpt()
###----- That's all, folks --------------------------------------------------