chiark / gitweb /
@@@ remove debugging print
[mLib-python] / t / t-sys.py
1 ### -*-python-*-
2 ###
3 ### Test system-specific functionality
4 ###
5 ### (c) 2019 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of the Python interface to mLib.
11 ###
12 ### mLib/Python is free software: you can redistribute it and/or modify it
13 ### under the terms of the GNU General Public License as published by the
14 ### Free Software Foundation; either version 2 of the License, or (at your
15 ### option) any later version.
16 ###
17 ### mLib/Python is distributed in the hope that it will be useful, but
18 ### WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20 ### General Public License for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with mLib/Python.  If not, write to the Free Software
24 ### Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
25 ### USA.
26
27 import mLib as M
28 import fcntl as FC
29 import os as OS
30 import socket as SK
31 import sys as SYS
32 import testutils as T
33 import unittest as U
34
35 ## Cygwin leaves cruft in the top bits of inode numbers for some reason.
36 if SYS.platform.startswith('cygwin'):
37   def _hack_inode(ino): return ino&0x0000ffffffffffff
38 else:
39   def _hack_inode(ino): return ino
40
41 ###--------------------------------------------------------------------------
42 class TestSys (U.TestCase):
43
44   def _fdid(me, fd):
45     st = OS.fstat(fd)
46     return st.st_dev, _hack_inode(st.st_ino)
47
48   def _check_same_file(me, fd0, fd1):
49     me.assertEqual(me._fdid(fd0), me._fdid(fd1))
50
51   def _make_fd(me):
52     fd, other = OS.pipe()
53     OS.close(other)
54     return fd
55
56   def test_fdflags(me):
57     fd = me._make_fd()
58     f = OS.fdopen(fd, "r")
59     try:
60       of, ofd = FC.fcntl(fd, FC.F_GETFL), FC.fcntl(fd, FC.F_GETFD)
61       M.fdflags(fd, fbic = OS.O_NONBLOCK, fxor = OS.O_NONBLOCK)
62       me.assertEqual(FC.fcntl(fd, FC.F_GETFL), of | OS.O_NONBLOCK)
63       M.fdflags(f, fbic = OS.O_NONBLOCK)
64       me.assertEqual(FC.fcntl(fd, FC.F_GETFL), of&~OS.O_NONBLOCK)
65       M.fdflags(fd, fdbic = FC.FD_CLOEXEC, fdxor = FC.FD_CLOEXEC)
66       me.assertEqual(FC.fcntl(fd, FC.F_GETFD), ofd | FC.FD_CLOEXEC)
67       M.fdflags(f, fdbic = FC.FD_CLOEXEC)
68       me.assertEqual(FC.fcntl(fd, FC.F_GETFD), ofd&~FC.FD_CLOEXEC)
69       me.assertRaises(AttributeError, M.fdflags, "bzzt")
70       me.assertRaises(OSError, M.fdflags, -1, 1, 1)
71     finally:
72       f.close()
73
74   def test_fdpass(me):
75     sk0, sk1 = SK.socketpair(SK.AF_UNIX, SK.SOCK_STREAM)
76     fd = me._make_fd()
77     nfd = -1
78     try:
79       msg = T.bin("some unnecessary message")
80       n = M.fdsend(sk0, fd, msg)
81       me.assertEqual(n, len(msg))
82       nfd, buf = M.fdrecv(sk1, 32)
83       me.assertEqual(buf, msg)
84       me._check_same_file(fd, nfd)
85       OS.close(nfd); nfd = -1
86     finally:
87       sk0.close(); sk1.close()
88       OS.close(fd)
89       if nfd != -1: OS.close(nfd)
90
91   def test_mdup(me):
92     NFD = 5
93     fds = [me._make_fd() for i in T.range(NFD)]
94     v = [(fds[i], fds[(i + 1)%5]) for i in T.range(NFD)]
95     try:
96       id = [me._fdid(fds[i]) for i in T.range(NFD)]
97       vv = M.mdup(v)
98       me.assertTrue(vv is v)
99       for i in T.range(NFD):
100         cur, want = v[i]
101         me.assertEqual(cur, want)
102         me.assertEqual(cur, fds[(i + 1)%NFD])
103         me.assertEqual(me._fdid(cur), id[i])
104     finally:
105       for fd, _ in v: OS.close(fd)
106
107 ###----- That's all, folks --------------------------------------------------
108
109 if __name__ == "__main__": U.main()