chiark / gitweb /
@@@ lbuf needs test
[mLib-python] / test.py
1 #! /usr/bin/python
2 ### -*- coding: utf-8 -*-
3
4 import contextlib as CTX
5 import fcntl as FC
6 import os as OS
7 import socket as S
8 import sys as SYS
9 import mLib as M
10
11 if SYS.version_info >= (3,):
12   def _bin(text): return text.encode()
13   def _text(bin): return bin.decode()
14   xrange = range
15 else:
16   def _bin(text): return text
17   def _text(bin): return bin
18
19 def must_equal(x, y):
20   if x == y: pass
21   else: raise AssertionError("%r != %r" % (x, y))
22
23 ## Done!
24 print(";; test begins (Python %s)" % SYS.version)
25
26 ## crc32
27 print(";; test crc32...")
28 must_equal(M.crc32(_bin("abc")), 0x352441c2)
29 must_equal(M.CRC32().chunk(_bin("a")).chunk(_bin("bc")).done(), 0x352441c2)
30
31 ## unihash
32 print(";; test unihash...")
33 assert M.Unihash().key is None
34 must_equal(M.Unihash.hash(_bin("abc")), 0xbf71f6a2)
35 must_equal(M.Unihash().chunk(_bin("a")).chunk(_bin("bc")).done(), 0xbf71f6a2)
36 key = M.UnihashKey(0x8498a262)
37 assert M.Unihash(key).key is key
38 must_equal(M.Unihash.hash(_bin("abc"), key), 0xecd1e2a2)
39 must_equal(key.hash(_bin("abc")), 0xecd1e2a2)
40 must_equal(M.Unihash(key).chunk(_bin("a")).chunk(_bin("bc")).done(), 0xecd1e2a2)
41 M.setglobalkey(0x8498a262)
42 must_equal(M.Unihash.hash(_bin("abc")), 0xecd1e2a2)
43
44 ## atom
45 print(";; test atom...")
46 foo = M.Atom("foo")
47 bar = M.Atom("bär")
48 assert foo != bar
49 assert foo is M.Atom("foo")
50 assert bar is M.Atom("bär")
51 assert foo.internedp
52 must_equal(foo.name, "foo")
53 must_equal(bar.name, "bär")
54 gen = M.Atom()
55 assert gen is not M.Atom()
56 assert not gen.internedp
57 all = set(M.atoms())
58 assert foo in all
59 assert bar in all
60
61 ## assoc, sym
62 def test_mapping(mapcls, keya, keyz):
63   tab = mapcls()
64   tab[keya] = 69; must_equal(tab[keya], 69)
65   tab[keya] = 42; must_equal(tab[keya], 42)
66   tab[keyz] = 'zing'; must_equal(tab[keyz], 'zing')
67   del tab[keyz]
68   try: tab[keyz]
69   except KeyError: pass
70   else: assert False
71   must_equal(list(tab.keys()), [keya])
72   must_equal(list(tab.values()), [42])
73   must_equal(list(tab.items()), [(keya, 42)])
74 print(";; test assoc...")
75 test_mapping(M.AssocTable, foo, bar)
76 print(";; test sym...")
77 test_mapping(M.SymTable, 'foo', 'bar')
78
79 ## str
80 print(";; test str...")
81 must_equal(M.word(" foo  bar baz"), ("foo", "bar baz"))
82 must_equal(M.word("  'foo \\' bar'  baz ", quotep = True),
83            ("foo ' bar", "baz "))
84 must_equal(M.split(' one "two \\" three" four five six', 3, quotep = True),
85            (["one", 'two " three', "four"], "five six"))
86 assert M.match("foo*bar", "food is served at the bar")
87 must_equal(M.sanitize("some awful string!", 12), "some_awful_s")
88 assert M.versioncmp("1.2.5~pre99", "1.2.5") < 0
89 assert M.versioncmp("1.2.5pre99", "1.2.5") > 0
90
91 ## codec
92 ref = _bin("just >some ?string to encode")
93 def test_codecclass(enccls, deccls, encref):
94   enc = enccls("!", 8, M.CDCF_LOWERC)
95   e = enc.encode(ref[:10])
96   e += enc.encode(ref[10:])
97   e += enc.done()
98   must_equal(e, encref)
99   encref = encref.replace("!", "")
100   dec = deccls(M.CDCF_IGNCASE)
101   d = dec.decode(encref[:15])
102   d += dec.decode(encref[15:])
103   d += dec.done()
104   must_equal(d, ref)
105   try: deccls().decode("???")
106   except M.CodecError as e:
107     if e.err == M.CDCERR_INVCH: pass
108     else: raise
109   else: assert False
110 def test_oldcodec(enccls, deccls, encref):
111   enc = enccls()
112   enc.indent = "!"
113   enc.maxline = 8
114   e = enc.encode(ref[:10])
115   e += enc.encode(ref[10:])
116   e += enc.done()
117   must_equal(e, encref)
118   encref = encref.replace("!", "")
119   dec = deccls()
120   d = dec.decode(encref[:15])
121   d += dec.decode(encref[15:])
122   d += dec.done()
123   must_equal(d, ref)
124
125 print(";; test base64...")
126 test_codecclass(M.Base64Encoder, M.Base64Decoder,
127                 "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==")
128 test_codecclass(M.File64Encoder, M.File64Decoder,
129                 "anVzdCA+!c29tZSA%!c3RyaW5n!IHRvIGVu!Y29kZQ==")
130 test_codecclass(M.Base64URLEncoder, M.Base64URLDecoder,
131                 "anVzdCA-!c29tZSA_!c3RyaW5n!IHRvIGVu!Y29kZQ==")
132 test_oldcodec(M.Base64Encode, M.Base64Decode,
133               "anVzdCA+!c29tZSA/!c3RyaW5n!IHRvIGVu!Y29kZQ==")
134
135 print(";; test base32...")
136 test_codecclass(M.Base32Encoder, M.Base32Decoder,
137                 "nj2xg5ba!hzzw63lf!ea7xg5ds!nfxgoidu!n4qgk3td!n5sgk===")
138 test_codecclass(M.Base32HexEncoder, M.Base32HexDecoder,
139                 "d9qn6t10!7ppmurb5!40vn6t3i!d5n6e83k!dsg6arj3!dti6a===")
140 test_oldcodec(M.Base32Encode, M.Base32Decode,
141               "NJ2XG5BA!HZZW63LF!EA7XG5DS!NFXGOIDU!N4QGK3TD!N5SGK===")
142
143 print(";; test hex...")
144 test_codecclass(M.HexEncoder, M.HexDecoder,
145                 "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465")
146 test_oldcodec(M.HexEncode, M.HexDecode,
147               "6a757374!203e736f!6d65203f!73747269!6e672074!6f20656e!636f6465")
148
149 ## url
150 print(";; test url...")
151 uenc = M.URLEncode()
152 uenc.encode("one", "some/string!")
153 uenc.strictp = True
154 uenc.encode("two", "some other/string!")
155 uenc.laxp = True
156 uenc.encode("three", "another!string")
157 r = uenc.result
158 must_equal(r, "one=some/string%21&two=some+other%2fstring%21&three=another!string")
159 must_equal(list(M.URLDecode(r)),
160            [("one", "some/string!"),
161             ("two", "some other/string!"),
162             ("three", "another!string")])
163
164 ## report
165 @CTX.contextmanager
166 def stderr_test(want_out):
167   pin, pout = OS.pipe()
168   fin = OS.fdopen(pin, 'r')
169   olderr = OS.dup(2)
170   OS.dup2(pout, 2)
171   OS.close(pout)
172   try:
173     yield
174     OS.close(2)
175     out = fin.read()
176   finally:
177     fin.close()
178     OS.dup2(olderr, 2)
179   must_equal(out, want_out)
180 print(";; test report...")
181 M.ego("my/path/name")
182 must_equal(M.quis, "name")
183 with stderr_test("name: test moaning\n"):
184   M.moan("test moaning")
185 with stderr_test("name: test death\n"):
186   try: M.die("test death")
187   except SystemExit as e: assert e.code == 126
188   else: assert False
189
190 ## fwatch
191 print(";; test fwatch...")
192 fd = OS.open(",test-stamp", OS.O_WRONLY | OS.O_CREAT, 0o666)
193 fw = M.FWatch(fd)
194 assert not fw.update()
195 OS.write(fd, _bin("stuff\n"))
196 assert fw.update()
197 assert fw.file is fd
198 fd2 = OS.open(",test-stamp.new", OS.O_WRONLY | OS.O_CREAT, 0o666)
199 OS.rename(",test-stamp.new", ",test-stamp")
200 assert not fw.update()
201 fw.file = ",test-stamp"
202 assert fw.update()
203 OS.close(fd)
204 OS.close(fd2)
205 OS.unlink(",test-stamp")
206
207 ## fdflags
208 print(";; test fdflags...")
209 pin, pout = OS.pipe()
210 ofl = FC.fcntl(pin, FC.F_GETFL)
211 ofd = FC.fcntl(pout, FC.F_GETFD)
212 fout = OS.fdopen(pout, 'wb')
213 M.fdflags(pin, fbic = OS.O_NONBLOCK, fxor = OS.O_NONBLOCK)
214 assert FC.fcntl(pin, FC.F_GETFL) == ofl | OS.O_NONBLOCK
215 M.fdflags(pin, fbic = OS.O_NONBLOCK, fxor = 0)
216 assert FC.fcntl(pin, FC.F_GETFL) == ofl&~OS.O_NONBLOCK
217 M.fdflags(fout, fdbic = FC.FD_CLOEXEC, fdxor = FC.FD_CLOEXEC)
218 assert FC.fcntl(pout, FC.F_GETFD) == ofd | FC.FD_CLOEXEC
219 M.fdflags(fout, fdbic = FC.FD_CLOEXEC, fdxor = 0)
220 assert FC.fcntl(pout, FC.F_GETFD) == ofd&~FC.FD_CLOEXEC
221 OS.close(pin)
222 fout.close()
223
224 ## fdpass
225 print(";; test fdpass...")
226 pin, pout = OS.pipe()
227 fin = OS.fdopen(pin, 'r')
228 OS.write(pout, _bin("Hello, world!"))
229 OS.close(pout)
230 sk0, sk1 = S.socketpair(S.AF_UNIX, S.SOCK_STREAM)
231 M.fdsend(sk0, fin, _bin("Have a pipe!"))
232 fin.close()
233 sk0.close()
234 fd, msg = M.fdrecv(sk1, 16)
235 sk1.close()
236 must_equal(msg, _bin("Have a pipe!"))
237 data = OS.read(fd, 16)
238 OS.close(fd)
239 must_equal(data, _bin("Hello, world!"))
240
241 ## mdup
242 print(";; test mdup...")
243 def mkfd():
244   fd = OS.open(",delete-me", OS.O_WRONLY | OS.O_CREAT, 0o666)
245   OS.unlink(",delete-me")
246   return fd
247 def fid(fd):
248   st = OS.fstat(fd)
249   return st.st_dev, st.st_ino
250 initial = [mkfd() for i in xrange(5)]
251 ref = [fid(fd) for fd in initial]
252 op = [(initial[0], initial[1]),
253       (initial[1], initial[2]),
254       (initial[2], initial[0]),
255       (initial[0], initial[3]),
256       (initial[3], -1),
257       (initial[0], initial[4])]
258 M.mdup(op)
259 for have, want in op:
260   if want != -1: must_equal(have, want)
261 must_equal(op[0][0], initial[1]); must_equal(fid(op[0][0]), ref[0])
262 must_equal(op[1][0], initial[2]); must_equal(fid(op[1][0]), ref[1])
263 must_equal(op[2][0], initial[0]); must_equal(fid(op[2][0]), ref[2])
264 must_equal(op[3][0], initial[3]); must_equal(fid(op[3][0]), ref[0])
265 pass;                             must_equal(fid(op[4][0]), ref[3])
266 must_equal(op[5][0], initial[4]); must_equal(fid(op[5][0]), ref[0])
267 for fd, _ in op: OS.close(fd)
268
269 ## (not testing detachtty and daemonize)
270
271
272
273 ## Done!
274 print(";; test completed OK")