chiark / gitweb /
Test of streaming
authorSimon Tatham <anakin@pobox.com>
Mon, 4 Dec 2023 20:06:04 +0000 (20:06 +0000)
committerSimon Tatham <anakin@pobox.com>
Mon, 4 Dec 2023 21:28:29 +0000 (21:28 +0000)
client.py
mastodonochrome

index 7c44d249b0679b90a63a220a39c9245668688d0c..ebd27f2f4d1cc13954376bed3b5f4a2ca8284a51 100644 (file)
--- a/client.py
+++ b/client.py
@@ -1,4 +1,5 @@
 import calendar
+import io
 import json
 import os
 import re
@@ -62,7 +63,7 @@ class Client:
 
         self.log_response = log_response
 
-    def method(self, method, path, base, params, links={}):
+    def method_start(self, method, path, base, params, links={}):
         headers = {}
         if self.bearer_token is not None:
             headers['Authorization'] = 'Bearer ' + self.bearer_token
@@ -77,7 +78,10 @@ class Client:
                 break
             links[m.group(2)] = m.group(1)
             linkhdr = linkhdr[m.end():]
-        return rsp.json()
+        return rsp
+
+    def method(self, method, path, base, params, links={}):
+        return self.method_start(method, path, base, params, links).json()
 
     def get(self, path, base='api', **params):
         return self.method(requests.get, path, base, params)
@@ -108,6 +112,26 @@ class Client:
         data = self.method(requests.get, link, None, {}, links)
         return data, links
 
+    def get_streaming_lines(self, path, base='api', **params):
+        reqgetstream = lambda *args, **kws: requests.get(
+            *args, stream=True, **kws)
+        rsp = self.method_start(reqgetstream, path, base, params, {})
+        if rsp.status_code != 200:
+            raise HTTPError(rsp)
+
+        it = rsp.iter_content(None)
+        fh = io.BytesIO()
+        for chunk in it:
+            while b'\n' in chunk:
+                pos = chunk.index(b'\n')
+                fh.write(chunk[:pos])
+                chunk = chunk[pos+1:]
+
+                yield fh.getvalue().decode('utf-8', errors='replace')
+                fh = io.BytesIO()
+
+            fh.write(chunk)
+
     def get_url(self, path, base='api', **params):
         r = requests.Request(method="GET", url=self.urls[base] + path,
                              params=params)
index b097e932906773300f1ab736814b182909194251..2b8973b6cce9511bb5f835b538eff599b241c22b 100755 (executable)
@@ -57,6 +57,12 @@ class CombinedUI(client.Client):
                 for line in thing.render(80):
                     print(line.ecma48())
 
+class StreamUI(client.Client):
+    def run(self):
+        import time
+        for chunk in self.get_streaming_lines("streaming/user"):
+            print(time.strftime("%Y-%m-%d %H:%M:%S"), repr(chunk))
+
 class MyTestLoader(unittest.TestLoader):
     def loadTestsFromModule(self, module):
         suite = super().loadTestsFromModule(module)
@@ -76,6 +82,9 @@ def main():
                         const=CombinedUI, help="Temporary mode to fetch "
                         "the user's timeline and mentions, interleave them, "
                         "and print the result on the terminal.")
+    parser.add_argument("--stream", action="store_const", dest="action",
+                        const=StreamUI, help="Test mode for streaming "
+                        "HTTP retrievals.")
     parser.add_argument("--login", action="store_const", dest="action",
                         const=login.LoginUI, help="Log in to a user account.")
     parser.set_defaults(action=cursesclient.CursesUI)