X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/catacomb-python/blobdiff_plain/e91150b266bbd768905258166601fdf333d3d987..553d59fec08fd9102b21fd0dc83ba708862a6090:/t/t-buffer.py diff --git a/t/t-buffer.py b/t/t-buffer.py new file mode 100644 index 0000000..24823c6 --- /dev/null +++ b/t/t-buffer.py @@ -0,0 +1,251 @@ +### -*- mode: python, coding: utf-8 -*- +### +### Test read and write buffers +### +### (c) 2019 Straylight/Edgeware +### + +###----- Licensing notice --------------------------------------------------- +### +### This file is part of the Python interface to Catacomb. +### +### Catacomb/Python is free software: you can redistribute it and/or +### modify it under the terms of the GNU General Public License as +### published by the Free Software Foundation; either version 2 of the +### License, or (at your option) any later version. +### +### Catacomb/Python is distributed in the hope that it will be useful, but +### WITHOUT ANY WARRANTY; without even the implied warranty of +### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +### General Public License for more details. +### +### You should have received a copy of the GNU General Public License +### along with Catacomb/Python. If not, write to the Free Software +### Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +### USA. + +###-------------------------------------------------------------------------- +### Imported modules. + +import catacomb as C +import unittest as U +import testutils as T + +###-------------------------------------------------------------------------- +class TestReadBuffer (U.TestCase): + + def check_getn(me, w, bigendp, getfn): + """Check that `getuN' works.""" + buf = C.ReadBuffer(T.span(w + 2)) + me.assertEqual(buf.getu8(), 0x00) + me.assertEqual(getfn(buf), T.bytes_as_int(w, bigendp)) + me.assertEqual(buf.getu8(), w + 1) + me.assertTrue(buf.endp) + me.assertRaises(C.BufferError, getfn, C.ReadBuffer(T.span(w - 1))) + me.assertEqual(getfn(C.ReadBuffer(w*C.bytes("00"))), 0) + me.assertEqual(getfn(C.ReadBuffer(w*C.bytes("ff"))), (1 << 8*w) - 1) + + def check_getbufn(me, w, bigendp, blkfn, buffn): + """Check that `getblkN' and `getbufN' work.""" + + ## Run tests for several different data sizes. + for n in [0, 1, 7, 8, 19, 255, 12345, 65535, 123456]: + + ## Make a sequence to parse. If it's too large, then skip. + if n >= 1 << 8*w: continue + seq = T.prep_lenseq(w, n, bigendp, True) + + ## Check `getblkN'. + buf = C.ReadBuffer(seq) + me.assertEqual(buf.getu8(), 0) + me.assertEqual(blkfn(buf), T.span(n)) + me.assertEqual(buf.getu8(), 0xff) + me.assertTrue(buf.endp) + + ## Check `getbufN'. Delete the outside buffer early, to make sure that + ## the subbuffer keeps it alive. + buf = C.ReadBuffer(seq) + me.assertEqual(buf.getu8(), 0) + b = buffn(buf) + me.assertEqual(buf.getu8(), 0xff) + me.assertTrue(buf.endp) + del buf + me.assertEqual(b.offset, 0) + me.assertEqual(b.size, n) + if n > 0: + me.assertEqual(b.getu8(), 0x00) + b.offset = n - 1 + me.assertEqual(b.getu8(), (n - 1)&0xff) + me.assertTrue(b.endp) + + ## Test invalid lengths. This is going to work by setting the top bit + ## of the length, so if it's already set, then that won't be any good. + if n >= 1 << 8*w - 1: continue + seq = T.prep_lenseq(w, n, bigendp, False) + + ## Check `getblkN'. + me.assertRaises(C.BufferError, blkfn, C.ReadBuffer(T.span(w - 1))) + b = C.ReadBuffer(seq) + me.assertEqual(b.getu8(), 0) + me.assertRaises(C.BufferError, blkfn, b) + + ## Check `getbufN'. + me.assertRaises(C.BufferError, buffn, C.ReadBuffer(T.span(w - 1))) + b = C.ReadBuffer(seq) + me.assertEqual(b.getu8(), 0) + me.assertRaises(C.BufferError, buffn, b) + + def test_readbuffer(me): + + ## Check `getuN'. + me.check_getn(1, True, lambda buf: buf.getu8()) + me.check_getn(2, True, lambda buf: buf.getu16()) + me.check_getn(2, True, lambda buf: buf.getu16b()) + me.check_getn(2, False, lambda buf: buf.getu16l()) + me.check_getn(3, True, lambda buf: buf.getu24()) + me.check_getn(3, True, lambda buf: buf.getu24b()) + me.check_getn(3, False, lambda buf: buf.getu24l()) + me.check_getn(4, True, lambda buf: buf.getu32()) + me.check_getn(4, True, lambda buf: buf.getu32b()) + me.check_getn(4, False, lambda buf: buf.getu32l()) + if hasattr(C.ReadBuffer, "getu64"): + me.check_getn(8, True, lambda buf: buf.getu64()) + me.check_getn(8, True, lambda buf: buf.getu64b()) + me.check_getn(8, False, lambda buf: buf.getu64l()) + + ## Check `getblkN' and `getbufN'. + me.check_getbufn(1, True, + lambda buf: buf.getblk8(), + lambda buf: buf.getbuf8()) + me.check_getbufn(2, True, + lambda buf: buf.getblk16(), + lambda buf: buf.getbuf16()) + me.check_getbufn(2, True, + lambda buf: buf.getblk16b(), + lambda buf: buf.getbuf16b()) + me.check_getbufn(2, False, + lambda buf: buf.getblk16l(), + lambda buf: buf.getbuf16l()) + me.check_getbufn(3, True, + lambda buf: buf.getblk24(), + lambda buf: buf.getbuf24()) + me.check_getbufn(3, True, + lambda buf: buf.getblk24b(), + lambda buf: buf.getbuf24b()) + me.check_getbufn(3, False, + lambda buf: buf.getblk24l(), + lambda buf: buf.getbuf24l()) + me.check_getbufn(4, True, + lambda buf: buf.getblk32(), + lambda buf: buf.getbuf32()) + me.check_getbufn(4, True, + lambda buf: buf.getblk32b(), + lambda buf: buf.getbuf32b()) + me.check_getbufn(4, False, + lambda buf: buf.getblk32l(), + lambda buf: buf.getbuf32l()) + if hasattr(C.ReadBuffer, "getu64"): + me.check_getbufn(8, True, + lambda buf: buf.getblk64(), + lambda buf: buf.getbuf64()) + me.check_getbufn(8, True, + lambda buf: buf.getblk64b(), + lambda buf: buf.getbuf64b()) + me.check_getbufn(8, False, + lambda buf: buf.getblk64l(), + lambda buf: buf.getbuf64l()) + + ## Check other `ReadBuffer' methods and properties. + buf = C.ReadBuffer(T.span(256)) + me.assertEqual(buf.size, 256) + me.assertEqual(buf.left, 256) + me.assertEqual(buf.offset, 0) + buf.offset = 52 + me.assertEqual(buf.left, 204) + buf.skip(7) + me.assertEqual(buf.offset, 59) + me.assertEqual(buf.left, 197) + me.assertRaises(C.BufferError, C.ReadBuffer(T.span(6)).skip, 7) + me.assertEqual(buf.get(5), C.bytes("3b3c3d3e3f")) + me.assertRaises(C.BufferError, C.ReadBuffer(T.span(4)).get, 5) + +###-------------------------------------------------------------------------- +class TestWriteBuffer (U.TestCase): + + def check_putn(me, w, bigendp, putfn): + """Check `putuN'.""" + + ## Check encoding an integer. + buf = C.WriteBuffer() + buf.putu8(0x00) + putfn(buf, T.bytes_as_int(w, bigendp)) + buf.putu8(w + 1) + me.assertEqual(C.ByteString(buf), T.span(w + 2)) + me.assertEqual(C.ByteString(putfn(C.WriteBuffer(), (1 << 8*w) - 1)), + w*C.bytes("ff")) + me.assertEqual(C.ByteString(putfn(C.WriteBuffer(), C.MP(0))), + w*C.bytes("00")) + + ## Check overflow detection. + me.assertRaises((OverflowError, ValueError), + putfn, C.WriteBuffer(), 1 << 8*w) + + def check_putbufn(me, w, bigendp, putfn): + """Check `putblkN'.""" + + ## Go through a number of different sizes. + for n in [0, 1, 7, 8, 19, 255, 12345, 65535, 123456]: + if n >= 1 << 8*w: continue + me.assertEqual(C.ByteString(putfn(C.WriteBuffer().putu8(0x00), + T.span(n)).putu8(0xff)), + T.prep_lenseq(w, n, bigendp, True)) + + ## Check blocks which are too large for the length prefix. + if w <= 3: + me.assertRaises(ValueError, putfn, + C.WriteBuffer(), C.ByteString.zero(1 << 8*w)) + + def test_writebuffer(me): + + ## Check `putuN'. + me.check_putn(1, True, lambda buf, n: buf.putu8(n)) + me.check_putn(2, True, lambda buf, n: buf.putu16(n)) + me.check_putn(2, True, lambda buf, n: buf.putu16b(n)) + me.check_putn(2, False, lambda buf, n: buf.putu16l(n)) + me.check_putn(3, True, lambda buf, n: buf.putu24(n)) + me.check_putn(3, True, lambda buf, n: buf.putu24b(n)) + me.check_putn(3, False, lambda buf, n: buf.putu24l(n)) + me.check_putn(4, True, lambda buf, n: buf.putu32(n)) + me.check_putn(4, True, lambda buf, n: buf.putu32b(n)) + me.check_putn(4, False, lambda buf, n: buf.putu32l(n)) + if hasattr(C.WriteBuffer, "putu64"): + me.check_putn(8, True, lambda buf, n: buf.putu64(n)) + me.check_putn(8, True, lambda buf, n: buf.putu64b(n)) + me.check_putn(8, False, lambda buf, n: buf.putu64l(n)) + + ## Check `putblkN". + me.check_putbufn(1, True, lambda buf, x: buf.putblk8(x)) + me.check_putbufn(2, True, lambda buf, x: buf.putblk16(x)) + me.check_putbufn(2, True, lambda buf, x: buf.putblk16b(x)) + me.check_putbufn(2, False, lambda buf, x: buf.putblk16l(x)) + me.check_putbufn(3, True, lambda buf, x: buf.putblk24(x)) + me.check_putbufn(3, True, lambda buf, x: buf.putblk24b(x)) + me.check_putbufn(3, False, lambda buf, x: buf.putblk24l(x)) + me.check_putbufn(4, True, lambda buf, x: buf.putblk32(x)) + me.check_putbufn(4, True, lambda buf, x: buf.putblk32b(x)) + me.check_putbufn(4, False, lambda buf, x: buf.putblk32l(x)) + if hasattr(C.WriteBuffer, "putu64"): + me.check_putbufn(8, True, lambda buf, x: buf.putblk64(x)) + me.check_putbufn(8, True, lambda buf, x: buf.putblk64b(x)) + me.check_putbufn(8, False, lambda buf, x: buf.putblk64l(x)) + + ## Check other methods and properties. + buf = C.WriteBuffer() + buf.zero(17) + buf.put(T.span(23)) + me.assertEqual(buf.size, 40) + me.assertEqual(C.ByteString(buf), C.ByteString.zero(17) + T.span(23)) + +###----- That's all, folks -------------------------------------------------- + +if __name__ == "__main__": U.main()