--- /dev/null
+import calendar
+import requests
+import string
+import time
+
+import text
+
+class HTTPError(Exception):
+ def __init__(self, response):
+ self.response = response
+
+ def __str__(self):
+ return (f"{self.response.request.method} request "
+ f"for {self.response.request.url} "
+ f"returned status {self.response.status_code}")
+
+class Client:
+ def __init__(self, instance):
+ self.base_url = instance + "/api/v1/"
+ self.log_response = lambda *args, **kws: None
+
+ def enable_debug(self, logfile):
+ logfh = open(logfile, "w")
+ pr = lambda *args, **kws: print(*args, file=logfh, **kws)
+
+ def log_response(rsp):
+ pr("Request: {rsp.request.method} {rsp.request.url}")
+ pr(" Response status: {rsp.status_code}")
+ pr(" Response headers:")
+ for k, v in rsp.headers.items():
+ pr(f" {k}: {v}")
+
+ self.log_response = log_response
+
+ def get_public(self, path, **params):
+ rsp = requests.get(self.base_url + path, params=params)
+ self.log_response(rsp)
+ if rsp.status_code != 200:
+ raise HTTPError(rsp)
+ return rsp.json()
+
+class Status:
+ def __init__(self, data):
+ self.post_id = data['id']
+
+ date, suffix = data['created_at'].split(".", 1)
+ if suffix.lstrip(string.digits) != "Z":
+ raise ValueError(f"{self.post_id}: bad creation date {date!r}")
+ tm = time.strptime(date, "%Y-%m-%dT%H:%M:%S")
+ self.datestamp = calendar.timegm(tm)
+
+ self.account = data['account']
+
+ hp = text.HTMLParser()
+ hp.feed(data['content'])
+ hp.done()
+ self.content = hp.paras
+
+ def text(self):
+ yield text.SeparatorLine(self.datestamp)
+ yield text.FromLine('@' + self.account['acct'],
+ self.account['display_name'])
+ yield text.BlankLine()
+ yield from self.content
+ yield text.BlankLine()
--- /dev/null
+import curses
+import itertools
+import sys
+
+import client
+import text
+
+class CursesUI(client.Client):
+ def curses_setup(self):
+ self.scr = curses.initscr()
+ if hasattr(curses, 'start_color'):
+ curses.start_color()
+ has_colour = True
+ if hasattr(curses, 'use_default_colors'):
+ curses.use_default_colors()
+ default_fg = default_bg = -1
+ else:
+ default_fg, default_bg = 7, 0
+
+ colourmap = {
+ 0: curses.COLOR_BLACK,
+ 1: curses.COLOR_RED,
+ 2: curses.COLOR_GREEN,
+ 3: curses.COLOR_YELLOW,
+ 4: curses.COLOR_BLUE,
+ 5: curses.COLOR_MAGENTA,
+ 6: curses.COLOR_CYAN,
+ 7: curses.COLOR_WHITE,
+ -1: -1,
+ }
+ else:
+ has_colour = False
+ default_fg = default_bg = None
+
+ self.scr.keypad(1)
+ self.scr.scrollok(1)
+ curses.noecho()
+ self.scr_h, self.scr_w = self.scr.getmaxyx()
+
+ self.attrs = {}
+ pairs = {} # (fg, bg) -> curses pair
+ indexgen = itertools.count(1)
+ for colour, sgr_codes in text.colourmap.items():
+ attr, fg, bg = 0, default_fg, default_bg
+ for code in sgr_codes:
+ if code == 0:
+ attr, fg, bg = 0, default_fg, default_bg
+ elif code == 1:
+ attr |= curses.A_BOLD
+ elif code == 7:
+ attr |= curses.A_REVERSE
+ elif 30 <= code <= 37:
+ fg = code - 30
+ elif 40 <= code <= 47:
+ bg = code - 40
+ elif code == 39:
+ fg = default_fg
+ elif code == 49:
+ bg = default_bg
+
+ if has_colour and (fg, bg) != (-1, -1):
+ if (fg, bg) not in pairs:
+ pairindex = next(indexgen)
+ curses.init_pair(pairindex, colourmap[fg], colourmap[bg])
+ pairs[fg, bg] = pairindex
+ attr |= curses.color_pair(pairs[fg, bg])
+
+ self.attrs[colour] = attr
+
+ def curses_shutdown(self):
+ self.scr.erase()
+ self.scr.refresh()
+ curses.endwin()
+
+ def print_at(self, y, x, s):
+ for frag, colour, w in s.frags():
+ self.scr.addstr(y, x, frag, self.attrs[colour])
+ x += w
+
+ def run(self):
+ try:
+ self.curses_setup()
+ self.print_at(5, 3, text.ColouredString(
+ 'testing testing tésting t\uFF45sting one two three',
+ ' SSSSSSS DDDDDDD FFFFFFF ccc ### @@@@@'))
+ self.scr.refresh()
+ self.scr.getch()
+ finally:
+ self.curses_shutdown()
'''
import argparse
-import calendar
-import requests
-import string
import sys
-import time
import unittest
-import text
+import client
+import cursesclient
-class HTTPError(Exception):
- def __init__(self, response):
- self.response = response
-
- def __str__(self):
- return (f"{self.response.request.method} request "
- f"for {self.response.request.url} "
- f"returned status {self.response.status_code}")
-
-class Client:
- def __init__(self, instance):
- self.base_url = instance + "/api/v1/"
- self.log_response = lambda *args, **kws: None
-
- def enable_debug(self, logfile):
- logfh = open(logfile, "w")
- pr = lambda *args, **kws: print(*args, file=logfh, **kws)
-
- def log_response(rsp):
- pr("Request: {rsp.request.method} {rsp.request.url}")
- pr(" Response status: {rsp.status_code}")
- pr(" Response headers:")
- for k, v in rsp.headers.items():
- pr(f" {k}: {v}")
-
- self.log_response = log_response
-
- def get_public(self, path, **params):
- rsp = requests.get(self.base_url + path, params=params)
- self.log_response(rsp)
- if rsp.status_code != 200:
- raise HTTPError(rsp)
- return rsp.json()
-
-class Post:
- def __init__(self, data):
- self.post_id = data['id']
-
- date, suffix = data['created_at'].split(".", 1)
- if suffix.lstrip(string.digits) != "Z":
- raise ValueError(f"{self.post_id}: bad creation date {date!r}")
- tm = time.strptime(date, "%Y-%m-%dT%H:%M:%S")
- self.datestamp = calendar.timegm(tm)
-
- self.account = data['account']
-
- hp = text.HTMLParser()
- hp.feed(data['content'])
- hp.done()
- self.content = hp.paras
-
- def text(self):
- yield text.SeparatorLine(self.datestamp)
- yield text.FromLine('@' + self.account['acct'],
- self.account['display_name'])
- yield text.BlankLine()
- yield from self.content
- yield text.BlankLine()
-
-class MainUI(Client):
+class PublicTimelineUI(client.Client):
def run(self):
for item in self.get_public("timelines/public", limit=10):
- p = Post(item)
+ p = client.Status(item)
for thing in p.text():
for line in thing.render(80):
print(line.ecma48())
parser.add_argument("--log", help="File to log debug information to.")
parser.add_argument("--test", nargs=argparse.REMAINDER,
help="Run unit tests.")
- parser.set_defaults(action=MainUI)
+ parser.add_argument("--public", action="store_const", dest="action",
+ const=PublicTimelineUI, help="Temporary mode to fetch "
+ "a public timeline and print it on the terminal.")
+ parser.set_defaults(action=cursesclient.CursesUI)
args = parser.parse_args()
if args.test is not None:
# Represent colourised terminal text in a width-independent form.
+import collections
import html.parser
import io
import itertools
import unittest
import wcwidth
+# Colour ids in this module are single characters. Here we provide the
+# data that converts each one into a list of integers that go in an
+# ECMA-48 style SGR control sequence.
+colourmap = {
+ ' ': [],
+ 'S': [0, 1, 7, 44, 37],
+ 'D': [0, 7, 44, 37],
+ 'F': [0, 1, 32],
+ 'c': [0, 33],
+ '#': [0, 36],
+ '@': [0, 32],
+}
+
class ColouredString:
def __init__(self, string, colour=' '):
if isinstance(string, ColouredString):
colour = ' '
for sc, cc in itertools.chain(zip(self.s, self.c), [('',' ')]):
if cc != colour:
- buf.write({
- ' ': '\033[m',
- 'S': '\033[0;1;7;44;37m',
- 'D': '\033[0;7;44;37m',
- 'F': '\033[0;1;32m',
- 'c': '\033[0;33m',
- '#': '\033[0;36m',
- '@': '\033[0;32m',
- }[cc])
+ buf.write("\033[{}m".format(";".join(map(str, colourmap[cc]))))
colour = cc
buf.write(sc)
return buf.getvalue()
+ def frags(self):
+ # Return maximal substrings with the same attribute.
+ pos = 0
+ while pos < len(self.c):
+ colour = self.c[pos]
+ fraglen = len(self.c) - pos - len(self.c[pos:].lstrip(colour))
+ frag = self.s[pos:pos+fraglen]
+ yield frag, colour, wcwidth.wcswidth(frag)
+ pos += fraglen
+
class BlankLine:
def render(self, width):
yield ColouredString("")