From 3b6027d2bb19c580b71e961aea88361bbe7e19af Mon Sep 17 00:00:00 2001 From: Simon Tatham Date: Fri, 5 Jan 2024 09:58:28 +0000 Subject: [PATCH] Ceremonially delete the Python prototype! We've reached feature parity, although I haven't been running the Rust version live for more than a few hours so far, so there's still a possibility of finding a bug the old version lacked. But it's now actually awkward to have it in the same directory - the two clients look similar enough that I can accidentally run './mastodonochrome' in place of './target/debug/mastodonochrome' during testing, and then be puzzled when it doesn't behave _quite_ the same. So, if I need it in an emergency, I can always recover it from the git log. --- .gitignore | 1 - client.py | 679 ------------------- cursesclient.py | 1666 ----------------------------------------------- login.py | 111 ---- mastodonochrome | 121 ---- scan_re.py | 102 --- text.py | 742 --------------------- util.py | 49 -- 8 files changed, 3471 deletions(-) delete mode 100644 client.py delete mode 100644 cursesclient.py delete mode 100644 login.py delete mode 100755 mastodonochrome delete mode 100644 scan_re.py delete mode 100644 text.py delete mode 100644 util.py diff --git a/.gitignore b/.gitignore index a0be326..4fffb2f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ -/__pycache__/ /target /Cargo.lock diff --git a/client.py b/client.py deleted file mode 100644 index ea1619a..0000000 --- a/client.py +++ /dev/null @@ -1,679 +0,0 @@ -import calendar -import io -import json -import os -import re -import requests -import string -import sys -import time - -import text - -class HTTPError(Exception): - def __init__(self, response): - self.response = response - - def __str__(self): - return (f"{self.response.request.method} request " - f"for {self.response.request.url} " - f"returned status {self.response.status_code}") - -class Client: - def __init__(self): - # Ensure the requests module doesn't replace our bearer tokens - # with any random stuff that might be in .netrc - os.environ["NETRC"] = "/dev/null" - self.bearer_token = None - self.log_response = lambda *args, **kws: None - - # A global cache across all feeds etc of statuses by id, so - # that we can look up the text of one to show in a 'Re:' header - self.status_cache = {} - - self.readonly = False - - def set_instance_url(self, instance_url): - self.instance_url = instance_url - self.urls = { - None: '', - 'auth': instance_url + "/oauth/", - 'api': instance_url + "/api/v1/", - } - - def set_username(self, username, instance_domain, account_id): - self.username = username - self.instance_domain = instance_domain - self.fq_username = f"{self.username}@{self.instance_domain}" - self.account_id = account_id - - def enable_debug(self, logfile): - self.logfh = open(logfile, "w") - pr = lambda *args, **kws: print(*args, file=self.logfh, **kws) - - def log_response(rsp, content): - pr(f"Request: {rsp.request.method} {rsp.request.url}") - pr(" Request headers:") - for k, v in rsp.request.headers.items(): - pr(f" {k}: {v}") - pr(f" Response status: {rsp.status_code}") - pr(" Response headers:") - for k, v in rsp.headers.items(): - pr(f" {k}: {v}") - if content: - if 'application/json' not in rsp.headers.get('content-type'): - pr(f" Response: {rsp.content!r}") - else: - pr(" Response JSON:") - j = rsp.json() - for line in json.dumps(j, indent=4).splitlines(): - pr(" " + line) - self.logfh.flush() - - self.log_response = log_response - - def set_readonly(self): - self.readonly = True - - def method_start(self, method, path, base, params, stream, links={}): - def transform(value): - if isinstance(value, bool): - return {False:"false", True:"true"}[value] - return value - params = {key: transform(value) for key, value in params.items()} - headers = {} - if self.bearer_token is not None: - headers['Authorization'] = 'Bearer ' + self.bearer_token - rsp = method(self.urls[base] + path, params=params, headers=headers, - stream=stream) - self.log_response(rsp, content=not stream) - if rsp.status_code != 200: - raise HTTPError(rsp) - linkhdr = rsp.headers.get('Link', '') - while len(linkhdr) > 0: - m = re.match(r'<([^>]+)>\s*;\s*rel="([^"]+)"(?:,\s*)?', linkhdr) - if m is None: - break - links[m.group(2)] = m.group(1) - linkhdr = linkhdr[m.end():] - return rsp - - def method(self, method, path, base, params, links={}): - return self.method_start(method, path, base, params, - False, links).json() - - def get(self, path, base='api', **params): - return self.method(requests.get, path, base, params) - def post(self, path, base='api', **params): - assert not self.readonly, "HTTP POST got through in readonly mode" - return self.method(requests.post, path, base, params) - - def get_incremental(self, path, base='api', **params): - params.setdefault('limit', 32) - while True: - links = {} - try: - data = self.method(requests.get, path, base, params, links) - except HTTPError as e: - if e.response.status_code == 429: - # Blocked for too many requests, oops - break - yield from data - if 'next' not in links: - break - base, path = None, links['next'] - - def get_incremental_start(self, path, base='api', **params): - params.setdefault('limit', 32) - links = {} - data = self.method(requests.get, path, base, params, links) - return data, links - - def get_incremental_cont(self, link): - links = {} - data = self.method(requests.get, link, None, {}, links) - return data, links - - def get_streaming_lines(self, path, base='api', **params): - rsp = self.method_start(requests.get, path, base, params, True, {}) - if rsp.status_code != 200: - raise HTTPError(rsp) - - it = rsp.iter_content(None) - fh = io.BytesIO() - for chunk in it: - while b'\n' in chunk: - pos = chunk.index(b'\n') - fh.write(chunk[:pos]) - chunk = chunk[pos+1:] - - yield fh.getvalue().decode('utf-8', errors='replace') - fh = io.BytesIO() - - fh.write(chunk) - - def get_url(self, path, base='api', **params): - r = requests.Request(method="GET", url=self.urls[base] + path, - params=params) - p = r.prepare() - return p.url - - def fq(self, account_name): - return (account_name if '@' in account_name - else account_name + '@' + self.instance_domain) - - def home_timeline_feed(self): - return HomeTimelineFeed(self) - - def mentions_feed(self): - return MentionsFeed(self) - - def ego_feed(self): - return EgoFeed(self) - - def thread_feed(self, id, full): - return ThreadFeed(self, id, full) - - def cache_status(self, status): - self.status_cache[status['id']] = status - - def get_status_by_id(self, id): - if id in self.status_cache: - return self.status_cache[id] - st = self.get(f"statuses/{id}") - self.cache_status(st) - return st - -class Feed: - """Base class that encapsulates some kind of collection of _things_ we - can get from the server, with both the ability to go backwards in - time (into existing history) and forwards in time (waiting for - updates that maybe haven't yet been posted). - """ - def __init__(self, client): - self.client = client - def can_extend(self): - return False - def extend_past(self): - return False - def extend_future(self): - return False - -class IncrementalServerFeed(Feed): - """A Feed that fetches something from the server via get_incremental.""" - - def __init__(self, client, url, params, get=lambda item: item): - super().__init__(client) - self.url = url - self.params = params - self.get = get - self.started = False - - def start(self): - data, links = self.client.get_incremental_start( - self.url, **self.params) - self.data = list(self.get(d) for d in reversed(data)) - self.origin = len(self.data) - self.prev_link = links.get('prev') - self.next_link = links.get('next') - self.started = True - - def min_index(self): - return -self.origin - def max_index(self): - return len(self.data) - self.origin - def __getitem__(self, n): - return self.data[n + self.origin] - - def can_extend(self): - return True - def extend_past(self): - if not self.started: - return None - if self.next_link is None: - return False - data, links = self.client.get_incremental_cont(self.next_link) - if len(data) == 0: - return False - self.data[0:0] = list(self.get(d) for d in reversed(data)) - self.origin += len(data) - self.next_link = links.get('next') - return len(data) > 0 - - def extend_future(self): - if not self.started: - return None - if self.prev_link is None: - return False - data, links = self.client.get_incremental_cont(self.prev_link) - if len(data) == 0: - return False - self.data.extend(self.get(d) for d in reversed(data)) - self.prev_link = links.get('prev') - return len(data) > 0 - -class HomeTimelineFeed(IncrementalServerFeed): - def __init__(self, client): - super().__init__(client, "timelines/home", {}) - -class PublicTimelineFeed(IncrementalServerFeed): - def __init__(self, client, local): - super().__init__(client, "timelines/public", { - 'local': local - }) - -class MentionsFeed(IncrementalServerFeed): - def __init__(self, client): - super().__init__(client, "notifications", {"types[]":['mention']}, - get=lambda item: item['status']) - -class UserStatusesFeed(IncrementalServerFeed): - def __init__(self, client, account_id, include_boosts=True, - include_replies=True): - super().__init__(client, f"accounts/{account_id}/statuses", { - 'exclude_replies': not include_replies, - 'exclude_reblogs': not include_boosts, - }) - -class HashtagStatusesFeed(IncrementalServerFeed): - def __init__(self, client, hashtag): - super().__init__(client, f"timelines/tag/{hashtag}", {}) - -class UserListFeed(IncrementalServerFeed): - def __init__(self, client, url): - super().__init__(client, url, {}) - -class ThreadFeed(Feed): - def __init__(self, client, post_id, full): - super().__init__(client) - self.post_id = post_id - self.want_full = full - - def start(self): - status = cstatus = self.client.get_status_by_id(self.post_id) - context = self.client.get(f"statuses/{self.post_id}/context") - if self.want_full and len(context["ancestors"]) > 0: - cstatus = context["ancestors"][0] - cid = cstatus["id"] - context = self.client.get(f"statuses/{cid}/context") - self.data = context["ancestors"] + [cstatus] + context["descendants"] - self.is_full = len(context["ancestors"]) == 0 - - def min_index(self): - return 0 - def max_index(self): - return len(self.data) - def __getitem__(self, n): - return self.data[n] - -class StatusInfoFeed(Feed): - def __init__(self, client, status): - self.client = client - self.status = status - self.data = [Status(status, client, full_info=True)] - - def start(self): - pass - - def min_index(self): - return 0 - def max_index(self): - return len(self.data) - def __getitem__(self, n): - return self.data[n] - -class UserInfoFeed(Feed): - def __init__(self, client, account): - self.client = client - self.account = account - self.data = [UserInfo(account, client)] - - def start(self): - pass - - def min_index(self): - return 0 - def max_index(self): - return len(self.data) - def __getitem__(self, n): - return self.data[n] - -class EgoFeed(IncrementalServerFeed): - def __init__(self, client): - super().__init__(client, "notifications", { - "types[]":['reblog','follow','favourite']}) - -def parse_creation_time(created_at): - date, suffix = created_at.split(".", 1) - if suffix.lstrip(string.digits) != "Z": - raise ValueError(f"{self.post_id}: bad creation date {date!r}") - tm = time.strptime(date, "%Y-%m-%dT%H:%M:%S") - return calendar.timegm(tm) - -def noneify(s, include_empty=False, colour=' '): - if include_empty and s is not None and len(s) == 0: - s = None # empty string counts as None - return (text.ColouredString("none", '0') if s is None - else text.ColouredString(s, colour)) - -class Status: - def __init__(self, data, client, full_info=False): - rb = data.get('reblog') - if rb is not None: - self.booster = data['account'] - data = rb - else: - self.booster = None - - client.cache_status(data) - - self.data = data - self.post_id = data['id'] - - self.datestamp = parse_creation_time(data['created_at']) - - self.account = data['account'] - - hp = text.HTMLParser() - hp.feed(data['content']) - hp.done() - self.content = hp.paras - - self.media = data.get('media_attachments', []) - - self.reply_id = data.get('in_reply_to_id') - - self.mentions = data.get('mentions', []) - - self.client = client - - self.update_fave_boost(data) - - self.full_info = full_info - - def update_fave_boost(self, data): - self.favourited = data.get('favourited', False) - self.boosted = data.get('reblogged', False) - - def text(self): - yield text.SeparatorLine(self.datestamp, - self.favourited, self.boosted) - yield text.FromLine(self.client.fq(self.account['acct']), - self.account['display_name']) - if self.booster is not None: - yield text.BoosterLine(self.client.fq(self.booster['acct']), - self.booster['display_name']) - if self.reply_id is not None: - hp = text.HTMLParser() - try: - reply_status = self.client.get_status_by_id(self.reply_id) - hp.feed(reply_status['content']) - except HTTPError as ex: - hp.feed(f'[unavailable: {ex.response.status_code}]') - except KeyError: # returned 200 with an empty JSON object - hp.feed(f'[unavailable]') - hp.done() - yield text.InReplyToLine(hp.paras) - yield text.BlankLine() - yield from self.content - if len(self.content) > 0: - yield text.BlankLine() - for media in self.media: - yield text.Media(media['url'], media.get('description')) - - if self.full_info: - yield text.SeparatorLine() - yield text.BlankLine() - yield text.Paragraph(f"Post id: " + self.post_id) - url = self.data['url'] - yield text.Paragraph("On the web: " + - text.ColouredString(url, 'u')) - yield text.BlankLine() - - created = noneify(self.data['created_at']) - yield text.Paragraph(f"Creation time: " + created) - edited = noneify(self.data['edited_at']) - yield text.Paragraph(f"Last edit time: " + edited) - reply_id = noneify(self.data.get('in_reply_to_id')) - yield text.Paragraph("Reply to post: " + reply_id) - reply_acct = noneify(self.data.get('in_reply_to_account_id')) - yield text.Paragraph("Reply to account: " + reply_acct) - yield text.BlankLine() - - lang = noneify(self.data['language']) - yield text.Paragraph("Language: " + lang) - vis = self.data['visibility'] - yield text.Paragraph("Visibility: " + vis) - sensitive = "yes" if self.data['sensitive'] else "no" - yield text.Paragraph("Sensitive: " + sensitive) - spoiler = noneify(self.data['spoiler_text'], include_empty=True) - yield text.IndentedParagraph(0, 2, "Spoiler text: " + spoiler) - yield text.BlankLine() - - replies = str(self.data['replies_count']) - yield text.Paragraph("Replies: " + replies) - boosts = str(self.data['reblogs_count']) - yield text.Paragraph("Boosts: " + boosts) - faves = str(self.data['favourites_count']) - yield text.Paragraph("Favourites: " + faves) - yield text.BlankLine() - - if len(self.mentions) > 0: - yield text.Paragraph("Mentioned user:") - for mention in self.mentions: - yield text.IndentedParagraph(2, 4, text.ColouredString( - self.client.fq(mention['acct']), 'f')) - yield text.BlankLine() - - app_subdict = self.data.get('application') - if app_subdict is None: - app_subdict = {} - client = noneify(app_subdict.get('name')) - yield text.Paragraph("Client name: " + client) - client_url = noneify(app_subdict.get('website'), colour='u') - yield text.Paragraph("Client website: " + client_url) - yield text.BlankLine() - - def get_account_id(self): - return self.account['id'] - def get_reply_recipients(self): - yield self.client.fq(self.account['acct']) - for mention in self.mentions: - yield self.client.fq(mention['acct']) - def get_reply_id(self): - return self.post_id - -class Notification: - def __init__(self, data, client): - self.ntype = data.get('type') - self.account = data['account'] - self.post_id = data['id'] - self.datestamp = parse_creation_time(data['created_at']) - st = data.get('status') - if st is not None: - client.cache_status(st) - hp = text.HTMLParser() - hp.feed(st['content']) - hp.done() - self.content = hp.paras - else: - self.content = [] - self.client = client - - def text(self): - yield text.NotificationLog( - self.datestamp, self.client.fq(self.account['acct']), - self.account['display_name'], self.ntype, self.content) - - def get_account_id(self): - return self.account['id'] - def get_reply_recipients(self): - yield self.client.fq(self.account['acct']) - def get_reply_id(self): - return None - -class UserInfo: - def __init__(self, data, client): - self.account = data - self.client = client - - self.relationship = None - try: - for rel in self.client.get("accounts/relationships", id=data['id']): - if rel['id'] == data['id']: - self.relationship = rel - except HTTPError: - pass - - hp = text.HTMLParser() - hp.feed(data['note']) - hp.done() - self.bio = [] - for p in hp.paras: - ip = text.IndentedParagraph(2, 2) - ip.add_para(p) - self.bio.append(ip) - - self.info = [] - for field in self.account['fields']: - hp = text.HTMLParser() - hp.feed(field['value']) - hp.done() - ip = text.IndentedParagraph(2, 4) - name = field['name'] - if not name.endswith(":"): - name += ":" - ip.add(text.ColouredString( - name, 'f' if field['verified_at'] else ' ')) - it = iter(hp.paras) - try: - ip.add_para(next(it)) - except StopIteration: - pass - self.info.append(ip) - - for p in it: - ip = text.IndentedParagraph(4, 4) - ip.add_para(p) - self.bio.append(ip) - - def text(self): - yield text.Paragraph("Account name: " + text.ColouredString( - self.client.fq(self.account['acct']), 'f')) - url = self.account['url'] - yield text.Paragraph("On the web: " + text.ColouredString(url, 'u')) - yield text.BlankLine() - yield text.Paragraph("Display name: " + self.account['display_name']) - yield text.Paragraph("Bio:") - yield from self.bio - yield text.BlankLine() - if len(self.info) > 0: - yield text.Paragraph("Information:") - yield from self.info - yield text.BlankLine() - flags = list(self.flags_text()) - if len(flags) > 0: - yield from flags - yield text.BlankLine() - rel = list(self.relationship_text()) - if len(rel) > 0: - yield text.Paragraph("Relationships to this user:") - yield from rel - yield text.BlankLine() - aid = self.account['id'] - yield text.Paragraph(f"Account id: {aid}") - created = noneify(self.account['created_at']) - yield text.Paragraph(f"Account created: " + created) - posted = noneify(self.account['last_status_at']) - yield text.Paragraph(f"Latest post: " + created) - n = self.account['statuses_count'] - yield text.Paragraph(f"Number of posts: {n}") - yield text.BlankLine() - n = self.account['followers_count'] - yield text.Paragraph(f"Number of followers: {n}") - n = self.account['following_count'] - yield text.Paragraph(f"Number of users followed: {n}") - yield text.BlankLine() - - def flags_text(self): - if self.account['locked']: - yield text.Paragraph( - "This account is " + text.ColouredString("locked", 'r') + - " (you can't follow it without its permission).") - if self.account.get('suspended'): - yield text.Paragraph(text.ColouredString( - "This account is suspended.", 'r')) - if self.account.get('limited'): - yield text.Paragraph(text.ColouredString( - "This account is silenced.", 'r')) - if self.account['bot']: - yield text.Paragraph("This account identifies as a bot.") - if self.account['group']: - yield text.Paragraph("This account identifies as a group.") - moved_to = self.account.get('moved') - if moved_to is not None: - yield text.Paragraph( - text.ColouredString("This account has moved to:", 'r') + " " + - text.ColouredString(self.client.fq(moved_to['acct']), 'f')) - - def relationship_text(self): - if self.relationship is None: - return - if self.account['id'] == self.client.account_id: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "You are this user!", - " ___ ")) - if self.relationship['following']: - if self.relationship['showing_reblogs']: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "You follow this user.", 'f')) - else: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "You follow this user (but without boosts).", 'f')) - if self.relationship['followed_by']: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "This user follows you.", 'f')) - if self.relationship['requested']: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "This user has requested to follow you!", 'F')) - if self.relationship['notifying']: - yield text.IndentedParagraph( - 2, 4, "You have enabled notifications for this user.") - if self.relationship['blocking']: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "You have blocked this user.", 'r')) - if self.relationship['blocked_by']: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "This user has blocked you.", 'r')) - if self.relationship['muting']: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "You have muted this user.", 'r')) - if self.relationship['muting_notifications']: - yield text.IndentedParagraph(2, 4, text.ColouredString( - "You have muted notifications from this user.", 'r')) - if self.relationship['domain_blocking']: - yield text.IndentedParagraph( - 2, 4, "You have blocked this user's domain.") - - def get_account_id(self): - return self.account['id'] - def get_reply_recipients(self): - yield self.client.fq(self.account['acct']) - def get_reply_id(self): - return None - -class UserListEntry: - def __init__(self, data, client): - self.account = data - self.client = client - - def text(self): - yield text.UserListEntry(self.client.fq(self.account['acct']), - self.account['display_name']) - - def get_account_id(self): - return self.account['id'] - def get_reply_recipients(self): - yield self.client.fq(self.account['acct']) - def get_reply_id(self): - return None diff --git a/cursesclient.py b/cursesclient.py deleted file mode 100644 index b970312..0000000 --- a/cursesclient.py +++ /dev/null @@ -1,1666 +0,0 @@ -import collections -import curses -import itertools -import os -import re -import select -import signal -import sys -import threading -import time -import unittest - -import client -import scan_re -import text -import util - -from util import ctrl - -class CursesUI(client.Client): - def __init__(self): - super().__init__() - self.selfpipes = [] - self.resized = False - - def curses_setup(self): - self.scr = curses.initscr() - if hasattr(curses, 'start_color'): - curses.start_color() - has_colour = True - if hasattr(curses, 'use_default_colors'): - curses.use_default_colors() - default_fg = default_bg = -1 - else: - default_fg, default_bg = 7, 0 - - colourmap = { - 0: curses.COLOR_BLACK, - 1: curses.COLOR_RED, - 2: curses.COLOR_GREEN, - 3: curses.COLOR_YELLOW, - 4: curses.COLOR_BLUE, - 5: curses.COLOR_MAGENTA, - 6: curses.COLOR_CYAN, - 7: curses.COLOR_WHITE, - -1: -1, - } - else: - has_colour = False - default_fg = default_bg = None - - self.scr.keypad(1) - self.scr.scrollok(0) - self.scr.nodelay(1) # so we can use get_wch() via a select loop - curses.noecho() - self.scr_h, self.scr_w = self.scr.getmaxyx() - - self.attrs = {} - pairs = {} # (fg, bg) -> curses pair - indexgen = itertools.count(1) - for colour, sgr_codes in text.colourmap.items(): - attr, fg, bg = 0, default_fg, default_bg - for code in sgr_codes: - if code == 0: - attr, fg, bg = 0, default_fg, default_bg - elif code == 1: - attr |= curses.A_BOLD - elif code == 4: - attr |= curses.A_UNDERLINE - elif code == 7: - attr |= curses.A_REVERSE - elif 30 <= code <= 37: - fg = code - 30 - elif 40 <= code <= 47: - bg = code - 40 - elif code == 39: - fg = default_fg - elif code == 49: - bg = default_bg - - if has_colour and (fg, bg) != (-1, -1): - if (fg, bg) not in pairs: - pairindex = next(indexgen) - curses.init_pair(pairindex, colourmap[fg], colourmap[bg]) - pairs[fg, bg] = pairindex - attr |= curses.color_pair(pairs[fg, bg]) - - self.attrs[colour] = attr - - def curses_shutdown(self): - self.scr.erase() - self.scr.refresh() - curses.endwin() - - def print_at(self, y, x, s): - for frag, colour, w in s.frags(): - self.scr.addstr(y, x, frag, self.attrs[colour]) - x += w - - def move_cursor_to(self, y, x): - self.scr.move(y, x) - - def clear_line(self, y): - try: - self.print_at(y, 0, text.ColouredString(' ' * self.scr_w)) - except curses.error: - pass # this can happen if clearing the bottom right screen corner - - def clear(self): - for y in range(self.scr_h): - self.clear_line(y) - - def get_wch(self): - try: - ch = self.scr.get_wch() - except curses.error: - return curses.ERR - return ch - - def get_input(self): - # There might be keystrokes still in curses's buffer from a - # previous read from stdin, so deal with those before we go - # back to the select loop - ch = self.get_wch() - if ch != curses.ERR: - return ch - - rfds_in = [0] - for (sp, handler, _) in self.selfpipes: - rfds_in.append(sp.rfd) - rfds_out, _, _ = select.select(rfds_in, [], []) - rfds_out = set(rfds_out) - activity = False - for (sp, handler, _) in self.selfpipes: - if sp.rfd in rfds_out and sp.check(): - if handler(): - activity = True - if activity: - return None - if 0 in rfds_out: - return self.get_wch() - else: - return None - - def add_streaming_selfpipe(self, url, handler): - sp = util.SelfPipe() - gen = self.get_streaming_lines(url) - def threadfn(): - for line in gen: - # ignore heartbeat lines - if line.startswith("event"): - sp.signal() - th = threading.Thread(target=threadfn, daemon=True) - th.start() - self.selfpipes.append((sp, handler, th)) - - def add_sigwinch_selfpipe(self): - sp = util.SelfPipe() - signal.signal(signal.SIGWINCH, lambda *args: sp.signal()) - def handler(): - size = os.get_terminal_size() - curses.resizeterm(size.lines, size.columns) - # Signal to the main loop that we've been resized. If - # curses were doing this itself, we'd see KEY_RESIZE in - # our input stream, but apparently passing that to - # curses.unget_wch does the wrong thing, and instead we - # receive U+019A from get_wch. (0x19A is the numerical - # value of curses.KEY_RESIZE, so it seems reasonably - # obvious what manner of confusion has happened there.) - # - # So instead, we set a flag of our own and return True, - # which causes get_input() to return None, which causes - # the main loop to check the resize flag. - self.resized = True - return True - self.selfpipes.append((sp, handler, None)) - - def get_composer(self): - if self.composer is None: - self.composer = Composer(self) - return self.composer - - def new_composer(self, text, reply_header, reply_id): - self.composer = Composer(self, text, reply_header, reply_id) - return self.composer - - def run(self): - home_feed = self.home_timeline_feed() - mentions_feed = self.mentions_feed() - ego_feed = self.ego_feed() - - def extend_both(): - home_feed.extend_future() - ego_feed.extend_future() - if mentions_feed.extend_future(): - curses.beep() - # FIXME: should we try to replicate Mono's ~1s delay - # before throwing you into messages? - - # FIXME: this is also the point to check if the - # topmost activity is uninterruptible, e.g. an editor - if not any(a is self.mentions_feed - for a in self.activity_stack): - self.activity_stack.append(self.mentions_timeline) - - self.add_streaming_selfpipe("streaming/user", extend_both) - - self.home_timeline = StatusFile( - self, home_feed, - text.ColouredString("Home timeline ", - "HHHHHHHHHHHHHHHHHKH")) - self.mentions_timeline = StatusFile( - self, mentions_feed, - text.ColouredString("Mentions [ESC][R]", - "HHHHHHHHHHHHKKKHHKH")) - self.ego_timeline = NotificationsFile( - self, ego_feed, - text.ColouredString("Ego Log [ESC][L][L][E]", - "HHHHHHHHHHHKKKHHKHHKHHKH")) - - self.add_sigwinch_selfpipe() - - self.main_menu = MainMenu(self) - self.escape_menu = EscMenu(self) - self.log_menu = LogMenu(self) - self.log_menu_2 = LogMenu2(self) - self.exit_menu = ExitMenu(self) - - self.composer = None # these are ephemeral, when we're writing a post - - self.activity_stack = [self.main_menu] - - try: - self.curses_setup() - - while True: - self.clear() - self.activity_stack[-1].render() - self.scr.refresh() - - ch = self.get_input() - if ch is None and self.resized: - self.resized = False - ch = curses.KEY_RESIZE - - if ch == ctrl('['): - if self.activity_stack[-1] is not self.escape_menu: - self.escape_menu.activation = time.monotonic() - self.activity_stack.append(self.escape_menu) - elif ch == curses.KEY_RESIZE: - self.scr_h, self.scr_w = self.scr.getmaxyx() - elif ch is not None: - result = self.activity_stack[-1].handle_key(ch) - if result == 'quit': - if len(self.activity_stack) > 1: - self.activity_stack.pop() - elif result == 'exit': - return - - finally: - self.curses_shutdown() - -class Activity: - def chain_to(self, activity): - assert self.cc.activity_stack[-1] is self - if (len(self.cc.activity_stack) > 1 and - self.cc.activity_stack[-2] == activity): - self.cc.activity_stack.pop() - else: - self.cc.activity_stack[-1] = activity - - def push_to(self, activity): - self.cc.activity_stack.append(activity) - - def optionally_chain_to(self, activity): - if self.cc.activity_stack[-1] is self: - self.chain_to(activity) - else: - self.push_to(activity) - -class Menu(Activity): - status_extra_text = None - - def __init__(self, cc): - self.cc = cc - self.items = [] - self.normalised = False - - def normalise(self): - if self.normalised: - return - - maxw = 0 - for _ in range(2): - for item in self.items: - if isinstance(item, text.MenuKeypressLine): - maxw = item.expand_key_width(maxw) - - self.normalised = True - - def renormalise(self): - self.normalised = False - self.normalise() - - def render(self): - self.normalise() - y = 0 - header = text.FileHeader(self.title) - - for line in header.render(self.cc.scr_w): - self.cc.print_at(y, 0, line) - y += 1 - - # FIXME: handle menus too large for the screen, so that you - # have to add > and < keypresses to scroll them - y += 1 - for item in self.items: - for line in item.render(self.cc.scr_w): - self.cc.print_at(y, 0, line) - y += 1 - - sl = text.FileStatusLine(self.status_extra_text) - self.add_keys(sl) - sl_rendered = util.exactly_one(sl.render(self.cc.scr_w)) - self.cc.print_at(self.cc.scr_h - 1, 0, sl_rendered) - - def add_keys(self, sl): - sl.keys.append(('RET', 'Back')) - - def handle_key(self, ch): - if ch in {'q', 'Q', '\n', '\r'}: - return 'quit' - -class MainMenu(Menu): - status_extra_text = text.ColouredString("Select an option") - - def __init__(self, cc): - super().__init__(cc) - self.title = text.ColouredString( - "Mastodonochrome Main Menu", 'H') - self.items.append(text.MenuKeypressLine( - 'H', text.ColouredString("Home timeline", - "K "))) - self.items.append(text.BlankLine()) - self.items.append(text.MenuKeypressLine( - 'P', text.ColouredString("Public timeline (all servers)", - "K "))) - self.items.append(text.MenuKeypressLine( - 'L', text.ColouredString("Local public timeline (this server)", - "K "))) - self.items.append(text.MenuKeypressLine( - '#', text.ColouredString("Timeline for a #hashtag", - " K "))) - self.items.append(text.BlankLine()) - self.items.append(text.MenuKeypressLine( - 'I', text.ColouredString("View a post by its ID", - " K "))) - self.items.append(text.BlankLine()) - self.items.append(text.MenuKeypressLine( - 'C', text.ColouredString("Compose a post", - "K "))) - self.items.append(text.BlankLine()) - self.items.append(text.MenuKeypressLine( - 'ESC', text.ColouredString("Utilities and Exit"))) - - def add_keys(self, sl): - # intentionally don't call the base class, because [Q] doesn't - # do anything on this menu - pass - - def handle_key(self, ch): - if ch in {'h', 'H'}: - self.push_to(self.cc.home_timeline) - elif ch in {'c', 'C'}: - self.push_to(self.cc.get_composer()) - elif ch in {'p', 'P'}: - feed = client.PublicTimelineFeed(self.cc, local=False) - title = text.ColouredString("Public timeline

", - "HHHHHHHHHHHHHHHHHHHKH") - self.push_to(StatusFile(self.cc, feed, title)) - elif ch in {'l', 'L'}: - feed = client.PublicTimelineFeed(self.cc, local=True) - title = text.ColouredString("Local public timeline ", - "HHHHHHHHHHHHHHHHHHHHHHHHHKH") - self.push_to(StatusFile(self.cc, feed, title)) - elif ch in {'#'}: - self.push_to(BottomLinePrompt( - self.cc, self.got_hashtag_to_view, - "View feed for hashtag: ")) - elif ch in {'i', 'I'}: - self.push_to(BottomLinePrompt( - self.cc, self.got_post_id_to_view, - "View post with id: ")) - else: - return super().handle_key(ch) - - def got_hashtag_to_view(self, tag): - tag = tag.strip().lstrip("@") - if tag == "": - return - - feed = client.HashtagStatusesFeed(self.cc, tag) - title = text.ColouredString( - f"Posts mentioning hashtag #{tag}", 'H') - self.push_to(StatusFile(self.cc, feed, title)) - - def got_post_id_to_view(self, post_id): - post_id = post_id.strip() - if post_id == "": - return - - try: - self.push_to(StatusInfoFile(self.cc, post_id)) - except client.HTTPError: - curses.beep() - -class EscMenu(Menu): - def __init__(self, cc): - super().__init__(cc) - self.title = text.ColouredString( - "Utilities [ESC]", - "HHHHHHHHHHHKKKH") - self.items.append(text.MenuKeypressLine( - 'E', text.ColouredString("Examine User", - "K "))) - self.items.append(text.MenuKeypressLine( - 'Y', text.ColouredString("Examine Yourself", - " K "))) - self.items.append(text.BlankLine()) - self.items.append(text.MenuKeypressLine( - 'L', text.ColouredString("Logs menu", - "K "))) - self.items.append(text.BlankLine()) - self.items.append(text.MenuKeypressLine( - 'X', text.ColouredString("EXit Mastodonochrome", - " K "))) - - self.activation = None - - def recently_activated(self): - return (self.activation is not None and - time.monotonic() - self.activation < 0.5) - - def prompt_to(self, prompt): - if self.recently_activated(): - self.chain_to(prompt) - else: - self.push_to(prompt) - - def handle_key(self, ch): - if ch in {'r', 'R'}: - self.chain_to(self.cc.mentions_timeline) - elif ch in {'l', 'L'}: - self.chain_to(self.cc.log_menu) - elif ch in {'x', 'X'}: - self.chain_to(self.cc.exit_menu) - elif ch in {'g', 'G'}: - self.cc.activity_stack[:] = [self.cc.main_menu] - elif ch in {'e', 'E'}: - self.prompt_to(BottomLinePrompt( - self.cc, self.got_user_to_examine, - "Examine User: ")) - elif ch in {'y', 'Y'}: - self.chain_to(UserInfoFile.by_id(self.cc, self.cc.account_id)) - else: - return super().handle_key(ch) - - def got_user_to_examine(self, text): - text = text.strip().lstrip("@") - if text == "": - return - - try: - acct = self.cc.get("accounts/lookup", acct=text) - except client.HTTPError: - curses.beep() # FIXME: better error report? - return - if "id" not in acct: - curses.beep() # FIXME: better error report? - return - self.optionally_chain_to(UserInfoFile(self.cc, acct)) - -class LogMenu(Menu): - def __init__(self, cc): - super().__init__(cc) - self.title = text.ColouredString( - "Client Logs [ESC][L]", - "HHHHHHHHHHHHHKKKHHKH") - self.items.append(text.MenuKeypressLine( - 'L', text.ColouredString("Server Logs", - " K "))) - - def handle_key(self, ch): - if ch in {'l', 'L'}: - self.chain_to(self.cc.log_menu_2) - else: - return super().handle_key(ch) - -class LogMenu2(Menu): - def __init__(self, cc): - super().__init__(cc) - self.title = text.ColouredString( - "Server Logs [ESC][L][L]", - "HHHHHHHHHHHHHKKKHHKHHKH") - self.items.append(text.MenuKeypressLine( - 'E', text.ColouredString("Ego Log (Boosts, Follows and Faves)", - "K "))) - - def handle_key(self, ch): - if ch in {'e', 'E'}: - self.chain_to(self.cc.ego_timeline) - else: - return super().handle_key(ch) - -class ExitMenu(Menu): - def __init__(self, cc): - super().__init__(cc) - self.title = text.ColouredString( - "Exit Mastodonochrome [ESC][X]", - "HHHHHHHHHHHHHHHHHHHHHHKKKHHKH") - self.items.append(text.MenuKeypressLine( - 'X', text.ColouredString("Confirm eXit", - " K "))) - - def handle_key(self, ch): - if ch in {'x', 'X'}: - return 'exit' - else: - return super().handle_key(ch) - -class File(Activity): - # Base class for anything where you page up and down. - def __init__(self, cc): - self.cc = cc - self.mode = 'normal' - self.search_direction = None - self.search_re = None - - def handle_key(self, ch): - backward_keys = {'-', 'b', 'B', curses.KEY_PPAGE, curses.KEY_LEFT} - - if self.mode == 'normal': - if ch in {' ', curses.KEY_NPAGE, curses.KEY_RIGHT}: - self.down_screen() - elif ch in backward_keys: - self.up_screen() - elif ch == curses.KEY_DOWN: - self.down_line() - elif ch in {'\n', '\r'}: - if not self.down_line(): - return 'quit' - elif ch in {curses.KEY_UP}: - self.up_line() - elif ch in {'0', curses.KEY_HOME}: - self.goto_top() - elif ch in {'z', 'Z', curses.KEY_END}: - self.goto_bottom() - elif ch in {'q', 'Q'}: - return 'quit' - elif ch in {'s', 'S'}: - self.send_mode() - elif ch in {'f', 'F'}: - self.favourite_mode() - elif ch in {ctrl('b')}: - self.boost_mode() - elif ch in {'t', 'T'}: - self.thread_mode() - elif ch in {'i', 'I'}: - self.info_mode() - elif ch in {'e', 'E'}: - self.examine_mode() - elif ch in {'\\'}: - self.search_direction = -1 - self.push_to(BottomLinePrompt( - self.cc, self.got_search_text, - "Search back (blank = last): ")) - elif ch in {'/'}: - self.search_direction = +1 - self.push_to(BottomLinePrompt( - self.cc, self.got_search_text, - "Search (blank = last): ")) - elif ch in {'n', 'N'}: - if (self.search_direction is not None and - self.search_re is not None): - self.search(self.search_direction, self.search_re) - elif self.mode == 'select': - if ch in {'q', 'Q'}: - self.mode = 'normal' - elif ch in {'-', 'b', 'B', curses.KEY_UP}: - self.prev_select_target() - elif ch in {'+', curses.KEY_DOWN}: - self.next_select_target() - elif (self.select_type == 'send' and - ch in {' ', 'i', 'I', 'a', 'A', 'l', 'L'}): - self.send_complete() - elif (self.select_type == 'favourite' and - ch in {'e', 'E', ' '}): - self.favourite_complete(+1) - elif (self.select_type == 'favourite' and - ch in {'d', 'D'}): - self.favourite_complete(-1) - elif (self.select_type == 'boost' and - ch in {'e', 'E', ' '}): - self.boost_complete(+1) - elif (self.select_type == 'boost' and - ch in {'d', 'D'}): - self.boost_complete(-1) - elif (self.select_type == 'thread' and - ch in {' '}): - self.thread_complete(False) - elif (self.select_type == 'thread' and - ch in {'f', 'F'}): - self.thread_complete(True) - elif (self.select_type == 'info' and - ch in {' '}): - self.info_complete(False) - elif (self.select_type == 'examine' and - ch in {' '}): - self.examine_complete(False) - - def got_search_text(self, text): - if len(text) != 0: - try: - self.search_re = re.compile(text, flags=re.IGNORECASE) - except re.error: - curses.beep() - # FIXME: real Mono reports an error in the status line - # (but still also beeps); waits for you to press Return; - # then returns to paging through the file - return - - if (self.search_direction is not None and - self.search_re is not None): - self.search(self.search_direction, self.search_re) - - def send_mode(self): - pass # not supported - def favourite_mode(self): - pass # not supported - def boost_mode(self): - pass # not supported - def thread_mode(self): - pass # not supported - def info_mode(self): - pass # not supported - def examine_mode(self): - pass # not supported - -class ObjectFile(File): - def __init__(self, cc, constructor, feed, title): - super().__init__(cc) - self.feed = feed - self.feed.start() - - self.header = text.FileHeader(title) - self.constructor = constructor - - self.history_closed = not self.feed.can_extend() - self.minpos = self.feed.min_index() - self.maxpos = self.feed.max_index() - self.statuses = {i: self.constructor(self.feed[i], cc) - for i in range(self.minpos, self.maxpos)} - self.itempos = self.maxpos - 1 - - self.lines = None - self.linepos = None - self.width = None - self.select_target = None - self.old_display_state = None - self.index_by_line = [] - - def top_line_pos(self): - return max(0, self.linepos - (self.cc.scr_h - 1)) - - def primed_to_extend(self): - if self.linepos is None: - return False # we haven't set up at all yet - if self.history_closed: - return False # we can't be primed to extend if we can't extend - return self.top_line_pos() == 0 - - def iter_text_indexed(self): - yield self.header, None - if not self.history_closed: - yield text.ExtendableIndicator(self.primed_to_extend()), None - for i in range(self.minpos, self.maxpos): - for thing in self.statuses[i].text(): - yield thing, i # FIXME: maybe just yield the last? - - def fetch_new(self): - got_any = False - - new_minpos = self.feed.min_index() - while self.minpos > new_minpos: - self.minpos -= 1 - self.statuses[self.minpos] = self.constructor( - self.feed[self.minpos], self.cc) - got_any = True - - new_maxpos = self.feed.max_index() - while self.maxpos < new_maxpos: - self.statuses[self.maxpos] = self.constructor( - self.feed[self.maxpos], self.cc) - self.maxpos += 1 - got_any = True - - return got_any - - def regenerate_lines(self, width): - # We need to recompute our line position in the buffer if the - # width has changed (so everything was rewrapped) or new stuff - # has arrived. - got_new = self.fetch_new() - recompute_line = got_new or self.width != width - - # If not that, and also nothing _else_ has changed, we don't - # need to do anything at all. - display_state = (self.mode, self.select_target, - self.primed_to_extend()) - if not recompute_line and display_state == self.old_display_state: - return - - # If we're recomputing our line position but the width - # _hasn't_ changed, we should be able to keep our exact - # location within the current item. - pos_within_item = 0 - if self.width == width: - line_index = self.linepos - while (line_index < len(self.lines) and - self.index_by_line[line_index] == self.itempos): - line_index += 1 - pos_within_item = line_index - self.linepos - - self.old_display_state = display_state - self.lines = [] - self.index_by_line = [] - pos = 0 - last_itemindex = None - curr_itemtop = 0 - for thing, itemindex in self.iter_text_indexed(): - params = {} - if (self.mode == 'select' and itemindex == self.select_target and - hasattr(thing, 'can_highlight_as_target')): - params['target'] = True - if itemindex != last_itemindex: - curr_itemtop = len(self.lines) - last_itemindex = itemindex - for line in thing.render(width, **params): - for s in line.split(width): - self.lines.append(s) - self.index_by_line.append(itemindex) - if itemindex == self.itempos: - itemheight = len(self.lines) - curr_itemtop - pos = len(self.lines) - min(pos_within_item, itemheight) - - self.width = width - if recompute_line: - self.move_to(pos) - - def render(self): - self.regenerate_lines(self.cc.scr_w) - topline = self.top_line_pos() - for y, line in enumerate(self.lines[topline:topline+self.cc.scr_h-1]): - self.cc.print_at(y, 0, line) - - sl = text.FileStatusLine() - if self.mode == 'select': - if self.select_type == 'send': - sl.keys.append(('SPACE', 'Reply')) - elif self.select_type == 'favourite': - if not self.statuses[self.select_target].favourited: - sl.keys.append(('SPACE', 'Fave')) - if self.statuses[self.select_target].favourited: - sl.keys.append(('D', 'Unfave')) - elif self.select_type == 'boost': - if not self.statuses[self.select_target].boosted: - sl.keys.append(('SPACE', 'Boost')) - if self.statuses[self.select_target].boosted: - sl.keys.append(('D', 'Unboost')) - elif self.select_type == 'thread': - sl.keys.append(('SPACE', 'Show Thread Context')) - sl.keys.append(('F', 'Show Full Thread')) - elif self.select_type == 'examine': - sl.keys.append(('SPACE', 'Examine')) - elif self.select_type == 'info': - sl.keys.append(('SPACE', 'Show Post Info')) - sl.keys.append(('-', None)) - sl.keys.append(('+', None)) - sl.keys.append(('Q', 'Quit')) - elif self.mode == 'list_users': - if isinstance(self, UserInfoFile): - sl.keys.append(('I', 'List Followers')) - sl.keys.append(('O', 'List Followed')) - if isinstance(self, StatusInfoFile): - sl.keys.append(('F', 'List Favouriters')) - sl.keys.append(('B', 'List Boosters')) - sl.keys.append(('Q', 'Quit')) - elif self.mode == 'list_posts': - if isinstance(self, UserInfoFile): - sl.keys.append(('A', 'All')) - sl.keys.append(('O', 'Original')) - sl.keys.append(('T', 'Top-level')) - sl.keys.append(('Q', 'Quit')) - else: - if self.linepos >= len(self.lines): - sl.keys.append(('-', 'Up')) - else: - sl.keys.append(('SPACE', 'More')) - if self.items_are_statuses: - sl.keys.append(('S', 'Reply')) - elif self.items_have_authors: - sl.keys.append(('S', 'Send')) - # FIXME: for when we can auto-shrink bottom line - # sl.keys.append(('E', 'Examine')) - if isinstance(self, UserInfoFile): - sl.keys.append(('P', 'Posts')) - sl.keys.append(('L', 'List')) - if isinstance(self, StatusInfoFile): - sl.keys.append(('L', 'List')) - if self.items_are_statuses: - sl.keys.append(('F', 'Fave')) - sl.keys.append(('^B', 'Boost')) - # FIXME: for when we can auto-shrink bottom line - # sl.keys.append(('T', 'Thread')) - # sl.keys.append(('I', 'Info')) - sl.keys.append(('Q', 'Exit')) - sl.proportion = self.linepos / len(self.lines) - sl_rendered = util.exactly_one(sl.render(self.cc.scr_w)) - self.cc.print_at(self.cc.scr_h - 1, 0, sl_rendered) - - def last_index(self): - return self.index_by_line[min(self.linepos-1, len(self.lines)-1)] - def send_mode(self): - if self.items_have_authors: - self.mode = 'select' - self.select_type = 'send' - self.select_target = self.last_index() - def favourite_mode(self): - if self.items_are_statuses: - self.mode = 'select' - self.select_type = 'favourite' - self.select_target = self.last_index() - def boost_mode(self): - if self.items_are_statuses: - self.mode = 'select' - self.select_type = 'boost' - self.select_target = self.last_index() - def thread_mode(self): - if self.items_are_statuses: - self.mode = 'select' - self.select_type = 'thread' - self.select_target = self.last_index() - def info_mode(self): - if self.items_are_statuses: - self.mode = 'select' - self.select_type = 'info' - self.select_target = self.last_index() - def examine_mode(self): - if self.items_have_authors: - self.mode = 'select' - self.select_type = 'examine' - self.select_target = self.last_index() - def prev_select_target(self): - self.select_target = max(self.minpos, self.select_target-1) - def next_select_target(self): - self.select_target = min(self.maxpos-1, self.select_target+1) - def send_complete(self): - self.mode = 'normal' - - recipients = collections.OrderedDict() - for r in self.statuses[self.select_target].get_reply_recipients(): - if r == self.cc.fq_username or r in recipients: - continue - recipients[r] = 1 - initial_content = "".join(f"@{r} " for r in recipients) - - reply_id = self.statuses[self.select_target].get_reply_id() - if reply_id is not None: - hp = text.HTMLParser() - try: - reply_status = self.cc.get_status_by_id(reply_id) - hp.feed(reply_status['content']) - except client.HTTPError as ex: - hp.feed(f'[unavailable: {ex.response.status_code}]') - except KeyError: # returned 200 with an empty JSON object - hp.feed(f'[unavailable]') - hp.done() - reply_header = text.InReplyToLine(hp.paras) - else: - reply_header = None - - self.push_to(self.cc.new_composer( - initial_content, reply_header, reply_id)) - - def favourite_complete(self, direction): - self.mode = 'normal' - target = self.statuses[self.select_target] - reply_id = target.get_reply_id() - verb = "favourite" if direction > 0 else "unfavourite" - if self.cc.readonly: - print(verb, reply_id, file=sys.stderr) - return - data = self.cc.post(f"statuses/{reply_id}/{verb}") - target.update_fave_boost(data) - - def boost_complete(self, direction): - self.mode = 'normal' - target = self.statuses[self.select_target] - reply_id = target.get_reply_id() - verb = "reblog" if direction > 0 else "unreblog" - if self.cc.readonly: - print(verb, reply_id, file=sys.stderr) - return - data = self.cc.post(f"statuses/{reply_id}/{verb}") - target.update_fave_boost(data) - - def thread_complete(self, full): - self.mode = 'normal' - target = self.statuses[self.select_target] - reply_id = target.get_reply_id() - feed = self.cc.thread_feed(reply_id, full) - feed.start() - if feed.is_full: - title = "Full thread of post " + reply_id - else: - title = "Thread context of post " + reply_id - self.push_to(StatusFile( - self.cc, feed, text.ColouredString(title, "H"))) - - def info_complete(self, full): - self.mode = 'normal' - target = self.statuses[self.select_target] - reply_id = target.get_reply_id() - self.push_to(StatusInfoFile(self.cc, reply_id)) - - def examine_complete(self, full): - self.mode = 'normal' - target = self.statuses[self.select_target] - account_id = target.get_account_id() - self.push_to(UserInfoFile.by_id(self.cc, account_id)) - - def move_to(self, pos): - old_linepos = self.linepos - self.linepos = pos - self.linepos = max(self.linepos, self.cc.scr_h - 1) - self.linepos = min(self.linepos, len(self.lines)) - self.itempos = self.index_by_line[self.linepos - 1] - return self.linepos != old_linepos - - def move_by(self, delta): - return self.move_to(self.linepos + delta) - - def down_screen(self): return self.move_by(max(1, self.cc.scr_h - 3)) - def up_screen(self): self.move_by(-max(1, self.cc.scr_h - 3)) - def down_line(self): return self.move_by(+1) - def up_line(self): return self.move_by(-1) - def goto_top(self): - if self.primed_to_extend(): - if self.feed.extend_past() is False: # None does not count - self.history_closed = True - return self.move_to(0) - def goto_bottom(self): return self.move_to(len(self.lines)) - - def search(self, direction, re): - pos = self.linepos - 1 - while True: - pos += direction - if not 0 <= pos < len(self.lines): - curses.beep() - return False - if re.search(str(self.lines[pos])): - return self.move_to(pos + 1) - -class StatusFile(ObjectFile): - items_are_statuses = True - items_have_authors = True - def __init__(self, cc, feed, title): - super().__init__(cc, client.Status, feed, title) - -class NotificationsFile(ObjectFile): - items_are_statuses = False - items_have_authors = True - def __init__(self, cc, feed, title): - super().__init__(cc, client.Notification, feed, title) - -class UserListFile(ObjectFile): - items_are_statuses = False - items_have_authors = True - def __init__(self, cc, feed, title): - super().__init__(cc, client.UserListEntry, feed, title) - -class StatusInfoFile(ObjectFile): - items_are_statuses = True - items_have_authors = True - def __init__(self, cc, postid): - title = text.ColouredString(f"Information about post {postid}", 'H') - self.data = cc.get_status_by_id(postid) - self.postid = postid - super().__init__( - cc, lambda x,cc:x, client.StatusInfoFeed(cc, self.data), title) - - def handle_key(self, ch): - if self.mode == 'normal' and ch in {'l', 'L'}: - self.mode = 'list_users' - elif self.mode == 'list_users' and ch in {'f', 'F'}: - feed = client.UserListFeed(self.cc, - f"statuses/{self.postid}/favourited_by") - title = text.ColouredString( - f"Users who favourited post {self.postid}", 'H') - self.chain_to(UserListFile(self.cc, feed, title)) - elif self.mode == 'list_users' and ch in {'b', 'B'}: - feed = client.UserListFeed(self.cc, - f"statuses/{self.postid}/reblogged_by") - title = text.ColouredString( - f"Users who boosted post {self.postid}", 'H') - self.chain_to(UserListFile(self.cc, feed, title)) - elif self.mode == 'list_users': - self.mode = 'normal' - else: - return super().handle_key(ch) - -class UserInfoFile(ObjectFile): - items_are_statuses = False - items_have_authors = True - def __init__(self, cc, account): - self.account = account - self.account_id = account['id'] - name = cc.fq(account['acct']) - title = text.ColouredString(f"Information about user {name}", 'H') - super().__init__( - cc, lambda x,cc:x, client.UserInfoFeed(cc, self.account), title) - - @classmethod - def by_id(cls, cc, account_id): - account = cc.get("accounts/" + account_id) - return cls(cc, account) - - def handle_key(self, ch): - if self.mode == 'normal' and ch in {'p', 'P'}: - self.mode = 'list_posts' - elif self.mode == 'list_posts' and ch in {'a', 'A'}: - feed = client.UserStatusesFeed( - self.cc, self.account_id, - include_boosts=True, include_replies=True) - name = self.cc.fq(self.account['acct']) - title = text.ColouredString(f"All posts from user {name}", 'H') - self.chain_to(StatusFile(self.cc, feed, title)) - elif self.mode == 'list_posts' and ch in {'o', 'O'}: - feed = client.UserStatusesFeed( - self.cc, self.account_id, - include_boosts=False, include_replies=True) - name = self.cc.fq(self.account['acct']) - title = text.ColouredString(f"Original posts from user {name}", 'H') - self.chain_to(StatusFile(self.cc, feed, title)) - elif self.mode == 'list_posts' and ch in {'t', 'T'}: - feed = client.UserStatusesFeed( - self.cc, self.account_id, - include_boosts=False, include_replies=False) - name = self.cc.fq(self.account['acct']) - title = text.ColouredString( - f"Top-level posts from user {name}", 'H') - self.chain_to(StatusFile(self.cc, feed, title)) - elif self.mode == 'list_posts': - self.mode = 'normal' - - elif self.mode == 'normal' and ch in {'l', 'L'}: - self.mode = 'list_users' - elif self.mode == 'list_users' and ch in {'i', 'I'}: - feed = client.UserListFeed(self.cc, - f"accounts/{self.account_id}/followers") - name = self.cc.fq(self.account['acct']) - title = text.ColouredString(f"Users who follow {name}", 'H') - self.chain_to(UserListFile(self.cc, feed, title)) - elif self.mode == 'list_users' and ch in {'o', 'O'}: - feed = client.UserListFeed(self.cc, - f"accounts/{self.account_id}/following") - name = self.cc.fq(self.account['acct']) - title = text.ColouredString(f"Users who {name} follows", 'H') - self.chain_to(UserListFile(self.cc, feed, title)) - elif self.mode == 'list_users': - self.mode = 'normal' - - else: - return super().handle_key(ch) - -class EditorCommon: - # Common editing operations between the post editor and the line editor. - # Expects self.text to be the editor buffer, and self.point to be - # the cursor position within it, from 0 to len(self.text) _inclusive_. - - def move_to(self, pos): - self.point = max(0, min(len(self.text), pos)) - - def handle_common_editing_keys(self, ch): - if ch in {ctrl('b'), curses.KEY_LEFT}: - self.move_to(self.point - 1) - elif ch in {ctrl('f'), curses.KEY_RIGHT}: - self.move_to(self.point + 1) - elif ch in {ctrl('h'), '\x7F', curses.KEY_BACKSPACE}: - if self.point > 0: - self.text = (self.text[:self.point - 1] + - self.text[self.point:]) - self.point -= 1 - elif ch in {ctrl('d'), curses.KEY_DC}: - if self.point < len(self.text): - self.text = (self.text[:self.point] + - self.text[self.point + 1:]) - elif ch in {ctrl('w')}: - if self.point > 0: - while True: - self.point -= 1 - if self.word_boundary(self.point): - break - elif ch in {ctrl('t')}: - if self.point < len(self.text): - while True: - self.point += 1 - if self.word_boundary(self.point): - break - elif ch in {ctrl('y')}: - self.text = ( - self.text[:self.point] + self.ctrl_k_paste_buffer + - self.text[self.point:]) - self.point += len(self.ctrl_k_paste_buffer) - elif isinstance(ch, str) and (' ' <= ch < '\x7F' or '\xA0' <= ch): - # TODO: overwrite mode - self.text = (self.text[:self.point] + ch + - self.text[self.point:]) - self.point += 1 - -class BottomLinePrompt(Activity, EditorCommon): - def __init__(self, cc, callback, prompt, text=""): - self.cc = cc - self.prompt = prompt - self.text = text - self.point = len(self.text) - self.callback = callback - - @property - def parent_activity(self): - assert len(self.cc.activity_stack) >= 2 - assert self.cc.activity_stack[-1] is self - return self.cc.activity_stack[-2] - - def render(self): - self.parent_activity.render() - - y = self.cc.scr_h-1 - self.cc.clear_line(y) - - prompt_string = text.ColouredString(self.prompt) - text_string = text.ColouredString(self.text) - # FIXME: prevent overflow - self.cc.print_at(y, 0, prompt_string + text_string) - self.cc.move_cursor_to( - y, prompt_string.width + text_string[:self.point].width) - - def handle_key(self, ch): - if ch in {ctrl('a'), curses.KEY_HOME}: - self.move_to(0) - elif ch in {ctrl('e'), curses.KEY_END}: - self.move_to(len(self.text)) - elif ch in {ctrl('k')}: - if self.point < len(self.text): - self.ctrl_k_paste_buffer = self.text[self.point:] - self.text = (self.text[:self.point]) - elif ch in {'\r', '\n'}: - self.chain_to(self.parent_activity) - self.callback(self.text) - else: - self.handle_common_editing_keys(ch) - -class Composer(Activity, EditorCommon): - class DisplayText: - def __init__(self, text): - self.text = text - self.regions = [] - - types = [('#', scan_re.hashtag), - ('@', scan_re.mention), - ('u', scan_re.url)] - - pos = 0 - while pos < len(text): - mstart = mend = len(text) - gotdesc = None - for desc, re in types: - match = re.search(self.text, pos=pos) - if match is not None and match.start() < mstart: - mstart, mend = match.span() - gotdesc = desc - - if pos < mstart: - self.regions.append((pos, mstart, ' ')) - if mstart < mend: - assert gotdesc is not None - self.regions.append((mstart, mend, gotdesc)) - - pos = mend - - def colourise(self, char_limit, url_cost): - self.cs = text.ColouredString("") - nchars = 0 - for start, end, desc in self.regions: - region_len = end - start - if desc == 'u': - # URLs have a fixed length - region_len = url_cost - elif desc == '@': - # Fully qualified username mentions @foo@instance.domain - # only count for their local-part - mention = str(self.text)[start:end] - try: - mention = mention[:mention.index('@', 1)] - except ValueError: - pass - region_len = len(mention) - - if nchars > char_limit: - self.cs += text.ColouredString(self.text[start:end], '!') - elif nchars + region_len > char_limit: - nbad_chars = nchars + region_len - char_limit - nok_chars = max(0, end - start - nbad_chars) - self.cs += text.ColouredString( - self.text[start:start+nok_chars], desc) - self.cs += text.ColouredString( - self.text[start+nok_chars:end], '!') - else: - self.cs += text.ColouredString(self.text[start:end], desc) - - nchars += region_len - return self.cs - - def layout_para(self, width, para, startpos, y): - pos = 0 - while pos < len(para): - soft_wrap_point = hard_wrap_point = None - x = 0 - i = pos - while i <= len(para): - self.yx[startpos + i] = (y, x) - if i == len(para): - hard_wrap_point = soft_wrap_point = i - break - else: - x += para[i].width - if x > width: - break - - i += 1 - hard_wrap_point = i - if (str(para[i-1]) == ' ' and - (i+1 >= len(para) or str(para[i]) != ' ')): - soft_wrap_point = i - - assert hard_wrap_point is not None - wrap_point = (soft_wrap_point if soft_wrap_point is not None - else hard_wrap_point) - self.lines.append(para[pos:wrap_point]) - pos = wrap_point - y += 1 - return y - - def layout(self, width, char_limit, url_cost): - self.colourise(char_limit, url_cost) - self.yx = [None] * (len(self.text) + 1) - self.yx[0] = 0, 0 # in case there's no text at all - self.lines = [] - y = 0 - pos = 0 - csstr = str(self.cs) - while pos < len(self.cs): - try: - next_nl = csstr.index('\n', pos) - except ValueError: - next_nl = len(self.cs) - - yold = y - y = self.layout_para(width, self.cs[pos:next_nl], pos, y) - if y == yold: - # An empty paragraph should still show up - self.yx[pos] = y, 0 - self.lines.append(text.ColouredString("")) - y += 1 - pos = next_nl + 1 - if pos == len(self.cs): - self.yx[pos] = y, 0 - - def __init__(self, cc, initial_text="", reply_header=None, reply_id=None): - self.cc = cc - self.reply_header = reply_header - self.reply_id = reply_id - if self.reply_header is None: - assert self.reply_id is None - self.header = text.FileHeader("Compose a post") - else: - assert self.reply_id is not None - self.header = text.FileHeader("Compose a reply") - self.text = initial_text - self.point = len(self.text) - self.goal_column = None - self.ctrl_k_paste_buffer = "" - self.mode = 'normal' - - instance_data = self.cc.get("instance") - try: - self.char_limit = instance_data[ - "configuration"]["statuses"]["max_characters"] - except KeyError: - self.char_limit = 500 - try: - self.url_cost = instance_data[ - "configuration"]["statuses"]["characters_reserved_per_url"] - except KeyError: - self.url_cost = 23 - - self.language = "en" # FIXME: find a better default from somewhere - self.content_warning = "" - self.visibility = "public" - if self.reply_id is not None: - reply_status = self.cc.get_status_by_id(reply_id) - self.visibility = reply_status['visibility'] - - def layout(self, wrapwidth): - # Layout during rendering, abstracted into its own function - # for standalone testing. - self.dtext = self.DisplayText(self.text) - self.dtext.layout(wrapwidth, self.char_limit, self.url_cost) - self.cy, self.cx = self.dtext.yx[self.point] - - def render(self): - y = 0 - - for line in self.header.render(self.cc.scr_w): - self.cc.print_at(y, 0, line) - y += 1 - - if self.reply_header is not None: - for line in self.reply_header.render(self.cc.scr_w): - self.cc.print_at(y, 0, line) - y += 1 - - # FIXME: here the real Mono editor has some keypress help of the form - # [F1]:Options (or [^O]) [F3]:Mark Block [F6]:Goto [F8]:Read File - # [F2]:Finish [F7]:Find [F9]:Write File - - # FIXME: then, if there's a sendheader/file header, a whole - # row of ~~~~ followed by that - - for line in text.EditorHeaderSeparator().render(self.cc.scr_w): - self.cc.print_at(y, 0, line) - y += 1 - - self.layout(self.cc.scr_w - 1) - - ytop = y - for line in self.dtext.lines: - self.cc.print_at(y, 0, line) - y += 1 - - self.cc.move_cursor_to(self.cy + ytop, self.cx) - - def word_boundary(self, pos): - if pos == 0 or pos == len(self.text): - return True - if self.text[pos-1] == '\n' or self.text[pos] == '\n': - return True - if self.text[pos-1] == ' ' and self.text[pos] != ' ': - return True - return False - - def handle_key(self, ch): - is_updown = ch in {ctrl('n'), ctrl('p'), - curses.KEY_DOWN, curses.KEY_UP} - if self.goal_column is None or not is_updown: - self.goal_column = self.cx - - if self.mode == 'normal': - # TODO: - # - # ^V and ^Z for page up/down. (Even in short posts this is - # useful because my reflexes want ^V to go to the bottom) - # - # ^O ^N and ^O ^P to go to bottom/top. (Any synonyms, like - # ^O up?) - # - # Might still want to change ^K so that it stops at end of - # current screen line, instead of end of paragraph. - # - # Maybe an extra copy/paste approach with explicit - # selection start/end points? Probably don't want to - # replicate Mono's underpowered [^O][B] business. - - if ch in {ctrl('n'), curses.KEY_DOWN}: - try: - new_point = util.last( - (i for i, yx in enumerate(self.dtext.yx[self.point:], - self.point) - if yx <= (self.cy + 1, self.goal_column))) - except ValueError: - new_point = len(self.text) - - self.move_to(new_point) - if self.dtext.yx[self.point][0] != self.cy + 1: - # Failed to go down; probably went to the end; reset - # the goal column. - self.goal_column = None - elif ch in {ctrl('p'), curses.KEY_UP}: - try: - new_point = util.last( - (i for i, yx in enumerate(self.dtext.yx[:self.point+1]) - if yx <= (self.cy - 1, self.goal_column))) - except ValueError: - new_point = 0 - - self.move_to(new_point) - if self.dtext.yx[self.point][0] != self.cy - 1: - # Failed to go down; probably went to the start; reset - # the goal column. - self.goal_column = None - elif ch in {ctrl('a'), curses.KEY_HOME}: - new_point = next( - i for i, yx in enumerate(self.dtext.yx[:self.point+1]) - if yx[0] == self.cy) - self.move_to(new_point) - elif ch in {ctrl('e'), curses.KEY_END}: - new_point = util.last( - i for i, yx in enumerate(self.dtext.yx[self.point:], - self.point) - if yx[0] == self.cy) - self.move_to(new_point) - elif ch in {ctrl('k')}: - if self.point < len(self.text): - line_end = util.last( - i for i, yx in enumerate(self.dtext.yx[self.point:], - self.point) - if yx[0] == self.cy) - end_of_para = self.text[line_end:line_end+1] in {'', '\n'} - if not end_of_para and line_end < len(self.text): - line_end += 1 - - if line_end == self.point: - if end_of_para: - self.text = (self.text[:self.point] + - self.text[self.point+1:]) - else: - self.ctrl_k_paste_buffer = self.text[ - self.point:line_end] - self.text = (self.text[:self.point] + - ("\n" if not end_of_para else "") + - self.text[line_end:]) - elif ch in {'\r', '\n'}: - self.text = (self.text[:self.point] + '\n' + - self.text[self.point:]) - self.point += 1 - if self.text[self.point-3:self.point] == '\n.\n': - self.text = (self.text[:self.point-2] + - self.text[self.point:]) - self.point -= 2 - self.chain_to(PostMenu(self)) - elif ch in {ctrl('o')}: - self.mode = 'ctrlo' - else: - self.handle_common_editing_keys(ch) - elif self.mode == 'ctrlo': - if ch == ' ': - self.chain_to(PostMenu(self)) - elif ch == 'q': - self.cc.composer = None - return 'quit' - else: - self.mode = 'normal' - - if not is_updown: - self.goal_column = None - - def post(self): - params = { - "status": self.text.rstrip("\n"), - "visibility": self.visibility, - "language": self.language, - } - if self.reply_id is not None: - params["in_reply_to_id"] = self.reply_id - if self.content_warning != "": - params["sensitive"] = True - params["spoiler_text"] = self.content_warning - if self.cc.readonly: - print("post", file=sys.stderr) - for key, value in params.items(): - print(f" {key}: {value!r}", file=sys.stderr) - return - self.cc.post("statuses", **params) - -class PostMenu(Menu): - def __init__(self, composer): - super().__init__(composer.cc) - self.composer = composer - self.title = text.ColouredString("Post a status", "H") - self.refresh_items() - - def refresh_items(self): - self.items.clear() - self.items.append(text.MenuKeypressLine( - 'SPACE', text.ColouredString("Post"))) - self.items.append(text.BlankLine()) - self.items.append(text.MenuKeypressLine( - 'Q', text.ColouredString("Cancel post"))) - self.items.append(text.MenuKeypressLine( - 'A', text.ColouredString("Re-edit post"))) - self.items.append(text.BlankLine()) - self.items.append(text.MenuKeypressLine( - 'V', text.ColouredString("Visibility: " + - self.composer.visibility))) - # FIXME: including a help string here would be nice - plus a - # warning if you haven't actually mentioned any users and are - # selecting 'direct'? - self.items.append(text.BlankLine()) - cw = ('none' if self.composer.content_warning == '' - else f"'{self.composer.content_warning}'") - self.items.append(text.MenuKeypressLine( - 'W', text.ColouredString("Content warning: " + cw))) - self.items.append(text.MenuKeypressLine( - 'L', text.ColouredString(f"Language: '{self.composer.language}'"))) - self.renormalise() - - def set_language(self, lang): - self.composer.language = lang - self.refresh_items() - - def set_content_warning(self, cw): - self.composer.content_warning = cw - self.refresh_items() - - def handle_key(self, ch): - if ch in {'q', 'Q'}: - self.cc.composer = None - return 'quit' # FIXME: maybe a confirmation, like real Mono? - elif ch in {'a', 'A'}: - self.chain_to(self.composer) - elif ch in {'l', 'L'}: - self.push_to(BottomLinePrompt( - self.cc, self.set_language, "Language tag for post: ", - self.composer.language)) - elif ch in {'w', 'W'}: - self.push_to(BottomLinePrompt( - self.cc, self.set_content_warning, "Content warning: ", - self.composer.content_warning)) - elif ch in {'v', 'V'}: - visibilities = ['public', 'unlisted', 'private', 'direct'] - nextvis = {visibilities[i-1]: visibilities[i] - for i in range(len(visibilities))} - self.composer.visibility = nextvis[self.composer.visibility] - self.refresh_items() - elif ch in {' '}: - self.composer.post() - self.cc.composer = None - self.cc.activity_stack.pop() - else: - return super().handle_key(ch) - -class testComposerLayout(unittest.TestCase): - def testLayout(self): - t = Composer.DisplayText("abc") - t.layout(10, 500, 23) - self.assertEqual(t.lines, [text.ColouredString("abc")]) - self.assertEqual(t.yx, [(0,i) for i in range(4)]) - - t.layout(3, 500, 23) - self.assertEqual(t.lines, [text.ColouredString("abc")]) - self.assertEqual(t.yx, [(0,i) for i in range(4)]) - - t = Composer.DisplayText("abc def ghi jkl") - t.layout(10, 500, 23) - self.assertEqual(t.lines, [text.ColouredString("abc def "), - text.ColouredString("ghi jkl")]) - self.assertEqual(t.yx, ([(0,i) for i in range(8)] + - [(1,i) for i in range(8)])) - - t = Composer.DisplayText("abcxdefxghixjkl") - t.layout(10, 500, 23) - self.assertEqual(t.lines, [text.ColouredString("abcxdefxgh"), - text.ColouredString("ixjkl")]) - self.assertEqual(t.yx, ([(0,i) for i in range(10)] + - [(1,i) for i in range(6)])) - - t = Composer.DisplayText("") - t.layout(10, 500, 23) - self.assertEqual(t.lines, []) - self.assertEqual(t.yx, [(0,0)]) - - t = Composer.DisplayText("\n") - t.layout(10, 500, 23) - self.assertEqual(t.lines, [text.ColouredString("")]) - self.assertEqual(t.yx, [(0,0),(1,0)]) - - t = Composer.DisplayText("abc def ") - t.layout(8, 500, 23) - self.assertEqual(t.lines, [text.ColouredString("abc def ")]) - self.assertEqual(t.yx, ([(0,i) for i in range(9)])) - - t = Composer.DisplayText("abc def gh") - t.layout(8, 500, 23) - self.assertEqual(t.lines, [text.ColouredString("abc def "), - text.ColouredString("gh")]) - self.assertEqual(t.yx, ([(0,i) for i in range(8)] + - [(1,i) for i in range(3)])) - - t = Composer.DisplayText("abc def g") - t.layout(8, 500, 23) - self.assertEqual(t.lines, [text.ColouredString("abc def "), - text.ColouredString("g")]) - self.assertEqual(t.yx, ([(0,i) for i in range(8)] + - [(1,i) for i in range(2)])) - - def testCtrlY(self): - def setup(point): - class FakeCC: - def get(self, str): return {} - - t = Composer(FakeCC(), "abc def ghi jkl\nmno pqr stu vwx") - t.point = point - t.layout(10) - self.assertEqual(t.dtext.lines, [text.ColouredString("abc def "), - text.ColouredString("ghi jkl"), - text.ColouredString("mno pqr "), - text.ColouredString("stu vwx")]) - return t - - # On the last line of a paragraph: delete up to the next - # newline, but not including it - t = setup(12) - t.handle_key(ctrl('k')) - self.assertEqual(t.text, "abc def ghi \nmno pqr stu vwx") - - # Same, but the paragraph in question is at the end of the - # buffer and has no trailing newline. Do the same, without - # tripping over the newline - t = setup(28) - t.handle_key(ctrl('k')) - self.assertEqual(t.text, "abc def ghi jkl\nmno pqr stu ") - - # Sitting on a newline: delete just the newline itself - t = setup(15) - t.handle_key(ctrl('k')) - self.assertEqual(t.text, "abc def ghi jklmno pqr stu vwx") - - # On a non-final line of a paragraph: delete up to the next - # line break, and _insert_ a newline. (Logically weird but - # intuitive in use, given that 'me' would leave a miswrapped - # paragraph) - t = setup(4) - t.handle_key(ctrl('k')) - self.assertEqual(t.text, "abc \nghi jkl\nmno pqr stu vwx") - - # If you're at the very end of the non-final line, this - # translates to _just_ replacing a space with a newline. - t = setup(7) - t.handle_key(ctrl('k')) - self.assertEqual(t.text, "abc def\nghi jkl\nmno pqr stu vwx") - - # At the very end of the buffer: do nothing, including not - # crashing - t = setup(31) - t.handle_key(ctrl('k')) - self.assertEqual(t.text, "abc def ghi jkl\nmno pqr stu vwx") diff --git a/login.py b/login.py deleted file mode 100644 index 48a2212..0000000 --- a/login.py +++ /dev/null @@ -1,111 +0,0 @@ -import client -import json -import os -import xdg - -override_config_dir = None -def config_dir(): - if override_config_dir is not None: - return override_config_dir - try: - config_home = xdg.XDG_CONFIG_HOME - except AttributeError: - config_home = os.path.join(os.environ["HOME"], ".config") - return os.path.join(config_home, "mastodonochrome") -def config_file(): - return os.path.join(config_dir(), "auth") - -class LoginUI(client.Client): - def run(self): - redirect_uri = 'urn:ietf:wg:oauth:2.0:oob' - - instance_url = input("Enter a Mastodon instance URL: ") - if "://" not in instance_url: - instance_url = "https://" + instance_url - - self.set_instance_url(instance_url) - - app = self.post( - "apps", - client_name='Mastodonochrome', - redirect_uris=redirect_uri, - scopes='read write push', - website='https://www.chiark.greenend.org.uk/~sgtatham/mastodonochrome/', - ) - - client_id = app['client_id'] - client_secret = app['client_secret'] - - app_token = self.post( - "token", base="auth", - client_id=client_id, - client_secret=client_secret, - redirect_uri=redirect_uri, - grant_type='client_credentials', - ) - - self.bearer_token = app_token['access_token'] - self.get("apps/verify_credentials") - self.bearer_token = None - - url = self.get_url( - "authorize", base="auth", - client_id=client_id, - scope='read write push', - redirect_uri=redirect_uri, - response_type='code', - ) - - print("Visit this URL to request an authorisation code:") - print(url) - - auth_code = input("Enter the code from the website: ") - - user_token = self.post( - "token", base="auth", - client_id=client_id, - client_secret=client_secret, - redirect_uri=redirect_uri, - grant_type='authorization_code', - code=auth_code, - scope='read write push', - ) - - self.bearer_token = user_token['access_token'] - account = self.get("accounts/verify_credentials") - instance = self.get("instance") - self.bearer_token = None - - data = { - 'account_id': account['id'], - 'username': account['username'], - 'instance_url': instance_url, - 'instance_domain': instance['uri'], - 'client_id': client_id, - 'client_secret': client_secret, - 'user_token': user_token['access_token'], - } - - try: - old_umask = os.umask(0o77) - - try: - os.makedirs(config_dir()) - except FileExistsError: - pass - - with open(config_file(), "w") as fh: - json.dump(data, fh, indent=4) - finally: - os.umask(old_umask) - -def setup_client(cl): - if isinstance(cl, LoginUI): - return # that would be silly - - with open(config_file()) as fh: - data = json.load(fh) - cl.set_instance_url(data['instance_url']) - cl.set_username(data['username'], data['instance_domain'], - data['account_id']) - cl.bearer_token = data['user_token'] diff --git a/mastodonochrome b/mastodonochrome deleted file mode 100755 index 301b4bc..0000000 --- a/mastodonochrome +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python3 - -''' -Textual Mastodon client with a UI inspired by Monochrome BBS. -''' - -import argparse -import itertools -import sys -import unittest - -import client -import cursesclient -import login - -class CombinedUI(client.Client): - def combined_feed(self): - feeds = [ - ((item['created_at'], item) - for item in self.get_incremental("timelines/home")), - ((item['created_at'], item['status']) - for item in self.get_incremental("notifications") - if item['type'] == 'mention'), - ] - - items = [] - - nexts = [None for _ in feeds] - - while True: - next_item = None - - for i in range(len(feeds)): - if feeds[i] is not None and nexts[i] is None: - try: - nexts[i] = next(feeds[i]) - except StopIteration: - feeds[i] = None - if nexts[i] is not None: - if next_item is None: - next_item = nexts[i] - elif next_item[0] < nexts[i][0]: - next_item = nexts[i] - if next_item is None: - break - yield next_item[1] - for i in range(len(feeds)): - if (nexts[i] is not None and - nexts[i][1]['id'] == next_item[1]['id']): - nexts[i] = None - - def run(self): - items = list(itertools.islice(self.combined_feed(), 0, 100)) - for item in reversed(items): - p = client.Status(item, self) - for thing in p.text(): - for line in thing.render(80): - print(line.ecma48()) - -class StreamUI(client.Client): - def run(self): - import time - for chunk in self.get_streaming_lines("streaming/user"): - print(time.strftime("%Y-%m-%d %H:%M:%S"), repr(chunk)) - -class MyTestLoader(unittest.TestLoader): - def loadTestsFromModule(self, module): - suite = super().loadTestsFromModule(module) - - if module.__name__ == '__main__': - import text - suite.addTests(super().loadTestsFromModule(text)) - - import scan_re - suite.addTests(super().loadTestsFromModule(scan_re)) - - import cursesclient - suite.addTests(super().loadTestsFromModule(cursesclient)) - - return suite - -def main(): - parser = argparse.ArgumentParser( - description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("--log", help="File to log debug information to.") - parser.add_argument("--test", nargs=argparse.REMAINDER, - help="Run unit tests.") - parser.add_argument("--readonly", action="store_true", - help="Disable write operations (posting, favouriting " - "etc), and just log on stderr what would have been " - "done.") - parser.add_argument("--combined", action="store_const", dest="action", - const=CombinedUI, help="Temporary mode to fetch " - "the user's timeline and mentions, interleave them, " - "and print the result on the terminal.") - parser.add_argument("--stream", action="store_const", dest="action", - const=StreamUI, help="Test mode for streaming " - "HTTP retrievals.") - parser.add_argument("--login", action="store_const", dest="action", - const=login.LoginUI, help="Log in to a user account.") - parser.add_argument("--config", help="Alternative config directory.") - parser.set_defaults(action=cursesclient.CursesUI) - args = parser.parse_args() - - login.override_config_dir = args.config - - if args.test is not None: - return unittest.main(argv=[sys.argv[0]] + args.test, - testLoader=MyTestLoader()) - - client = args.action() - login.setup_client(client) - if args.log is not None: - client.enable_debug(args.log) - if args.readonly: - client.set_readonly() - client.run() - -if __name__ == '__main__': - main() diff --git a/scan_re.py b/scan_re.py deleted file mode 100644 index e0bceb9..0000000 --- a/scan_re.py +++ /dev/null @@ -1,102 +0,0 @@ -# Word characters - -import re -import unittest - -word = r'0-9A-Z_a-z\xaa\xb5\xba\xc0-\xd6\xd8-\xf6\xf8-\u02c1\u02c6-\u02d1\u02e0-\u02e4\u02ec\u02ee\u0300-\u0374\u0376-\u0377\u037a-\u037d\u037f\u0386\u0388-\u038a\u038c\u038e-\u03a1\u03a3-\u03f5\u03f7-\u0481\u0483-\u052f\u0531-\u0556\u0559\u0560-\u0588\u0591-\u05bd\u05bf\u05c1-\u05c2\u05c4-\u05c5\u05c7\u05d0-\u05ea\u05ef-\u05f2\u0610-\u061a\u0620-\u0669\u066e-\u06d3\u06d5-\u06dc\u06df-\u06e8\u06ea-\u06fc\u06ff\u0710-\u074a\u074d-\u07b1\u07c0-\u07f5\u07fa\u07fd\u0800-\u082d\u0840-\u085b\u0860-\u086a\u08a0-\u08b4\u08b6-\u08bd\u08d3-\u08e1\u08e3-\u0963\u0966-\u096f\u0971-\u0983\u0985-\u098c\u098f-\u0990\u0993-\u09a8\u09aa-\u09b0\u09b2\u09b6-\u09b9\u09bc-\u09c4\u09c7-\u09c8\u09cb-\u09ce\u09d7\u09dc-\u09dd\u09df-\u09e3\u09e6-\u09f1\u09fc\u09fe\u0a01-\u0a03\u0a05-\u0a0a\u0a0f-\u0a10\u0a13-\u0a28\u0a2a-\u0a30\u0a32-\u0a33\u0a35-\u0a36\u0a38-\u0a39\u0a3c\u0a3e-\u0a42\u0a47-\u0a48\u0a4b-\u0a4d\u0a51\u0a59-\u0a5c\u0a5e\u0a66-\u0a75\u0a81-\u0a83\u0a85-\u0a8d\u0a8f-\u0a91\u0a93-\u0aa8\u0aaa-\u0ab0\u0ab2-\u0ab3\u0ab5-\u0ab9\u0abc-\u0ac5\u0ac7-\u0ac9\u0acb-\u0acd\u0ad0\u0ae0-\u0ae3\u0ae6-\u0aef\u0af9-\u0aff\u0b01-\u0b03\u0b05-\u0b0c\u0b0f-\u0b10\u0b13-\u0b28\u0b2a-\u0b30\u0b32-\u0b33\u0b35-\u0b39\u0b3c-\u0b44\u0b47-\u0b48\u0b4b-\u0b4d\u0b56-\u0b57\u0b5c-\u0b5d\u0b5f-\u0b63\u0b66-\u0b6f\u0b71\u0b82-\u0b83\u0b85-\u0b8a\u0b8e-\u0b90\u0b92-\u0b95\u0b99-\u0b9a\u0b9c\u0b9e-\u0b9f\u0ba3-\u0ba4\u0ba8-\u0baa\u0bae-\u0bb9\u0bbe-\u0bc2\u0bc6-\u0bc8\u0bca-\u0bcd\u0bd0\u0bd7\u0be6-\u0bef\u0c00-\u0c0c\u0c0e-\u0c10\u0c12-\u0c28\u0c2a-\u0c39\u0c3d-\u0c44\u0c46-\u0c48\u0c4a-\u0c4d\u0c55-\u0c56\u0c58-\u0c5a\u0c60-\u0c63\u0c66-\u0c6f\u0c80-\u0c83\u0c85-\u0c8c\u0c8e-\u0c90\u0c92-\u0ca8\u0caa-\u0cb3\u0cb5-\u0cb9\u0cbc-\u0cc4\u0cc6-\u0cc8\u0cca-\u0ccd\u0cd5-\u0cd6\u0cde\u0ce0-\u0ce3\u0ce6-\u0cef\u0cf1-\u0cf2\u0d00-\u0d03\u0d05-\u0d0c\u0d0e-\u0d10\u0d12-\u0d44\u0d46-\u0d48\u0d4a-\u0d4e\u0d54-\u0d57\u0d5f-\u0d63\u0d66-\u0d6f\u0d7a-\u0d7f\u0d82-\u0d83\u0d85-\u0d96\u0d9a-\u0db1\u0db3-\u0dbb\u0dbd\u0dc0-\u0dc6\u0dca\u0dcf-\u0dd4\u0dd6\u0dd8-\u0ddf\u0de6-\u0def\u0df2-\u0df3\u0e01-\u0e3a\u0e40-\u0e4e\u0e50-\u0e59\u0e81-\u0e82\u0e84\u0e86-\u0e8a\u0e8c-\u0ea3\u0ea5\u0ea7-\u0ebd\u0ec0-\u0ec4\u0ec6\u0ec8-\u0ecd\u0ed0-\u0ed9\u0edc-\u0edf\u0f00\u0f18-\u0f19\u0f20-\u0f29\u0f35\u0f37\u0f39\u0f3e-\u0f47\u0f49-\u0f6c\u0f71-\u0f84\u0f86-\u0f97\u0f99-\u0fbc\u0fc6\u1000-\u1049\u1050-\u109d\u10a0-\u10c5\u10c7\u10cd\u10d0-\u10fa\u10fc-\u1248\u124a-\u124d\u1250-\u1256\u1258\u125a-\u125d\u1260-\u1288\u128a-\u128d\u1290-\u12b0\u12b2-\u12b5\u12b8-\u12be\u12c0\u12c2-\u12c5\u12c8-\u12d6\u12d8-\u1310\u1312-\u1315\u1318-\u135a\u135d-\u135f\u1380-\u138f\u13a0-\u13f5\u13f8-\u13fd\u1401-\u166c\u166f-\u167f\u1681-\u169a\u16a0-\u16ea\u16ee-\u16f8\u1700-\u170c\u170e-\u1714\u1720-\u1734\u1740-\u1753\u1760-\u176c\u176e-\u1770\u1772-\u1773\u1780-\u17d3\u17d7\u17dc-\u17dd\u17e0-\u17e9\u180b-\u180d\u1810-\u1819\u1820-\u1878\u1880-\u18aa\u18b0-\u18f5\u1900-\u191e\u1920-\u192b\u1930-\u193b\u1946-\u196d\u1970-\u1974\u1980-\u19ab\u19b0-\u19c9\u19d0-\u19d9\u1a00-\u1a1b\u1a20-\u1a5e\u1a60-\u1a7c\u1a7f-\u1a89\u1a90-\u1a99\u1aa7\u1ab0-\u1abe\u1b00-\u1b4b\u1b50-\u1b59\u1b6b-\u1b73\u1b80-\u1bf3\u1c00-\u1c37\u1c40-\u1c49\u1c4d-\u1c7d\u1c80-\u1c88\u1c90-\u1cba\u1cbd-\u1cbf\u1cd0-\u1cd2\u1cd4-\u1cfa\u1d00-\u1df9\u1dfb-\u1f15\u1f18-\u1f1d\u1f20-\u1f45\u1f48-\u1f4d\u1f50-\u1f57\u1f59\u1f5b\u1f5d\u1f5f-\u1f7d\u1f80-\u1fb4\u1fb6-\u1fbc\u1fbe\u1fc2-\u1fc4\u1fc6-\u1fcc\u1fd0-\u1fd3\u1fd6-\u1fdb\u1fe0-\u1fec\u1ff2-\u1ff4\u1ff6-\u1ffc\u203f-\u2040\u2054\u2071\u207f\u2090-\u209c\u20d0-\u20f0\u2102\u2107\u210a-\u2113\u2115\u2119-\u211d\u2124\u2126\u2128\u212a-\u212d\u212f-\u2139\u213c-\u213f\u2145-\u2149\u214e\u2160-\u2188\u24b6-\u24e9\u2c00-\u2c2e\u2c30-\u2c5e\u2c60-\u2ce4\u2ceb-\u2cf3\u2d00-\u2d25\u2d27\u2d2d\u2d30-\u2d67\u2d6f\u2d7f-\u2d96\u2da0-\u2da6\u2da8-\u2dae\u2db0-\u2db6\u2db8-\u2dbe\u2dc0-\u2dc6\u2dc8-\u2dce\u2dd0-\u2dd6\u2dd8-\u2dde\u2de0-\u2dff\u2e2f\u3005-\u3007\u3021-\u302f\u3031-\u3035\u3038-\u303c\u3041-\u3096\u3099-\u309a\u309d-\u309f\u30a1-\u30fa\u30fc-\u30ff\u3105-\u312f\u3131-\u318e\u31a0-\u31ba\u31f0-\u31ff\u3400-\u4db5\u4e00-\u9fef\ua000-\ua48c\ua4d0-\ua4fd\ua500-\ua60c\ua610-\ua62b\ua640-\ua672\ua674-\ua67d\ua67f-\ua6f1\ua717-\ua71f\ua722-\ua788\ua78b-\ua7bf\ua7c2-\ua7c6\ua7f7-\ua827\ua840-\ua873\ua880-\ua8c5\ua8d0-\ua8d9\ua8e0-\ua8f7\ua8fb\ua8fd-\ua92d\ua930-\ua953\ua960-\ua97c\ua980-\ua9c0\ua9cf-\ua9d9\ua9e0-\ua9fe\uaa00-\uaa36\uaa40-\uaa4d\uaa50-\uaa59\uaa60-\uaa76\uaa7a-\uaac2\uaadb-\uaadd\uaae0-\uaaef\uaaf2-\uaaf6\uab01-\uab06\uab09-\uab0e\uab11-\uab16\uab20-\uab26\uab28-\uab2e\uab30-\uab5a\uab5c-\uab67\uab70-\uabea\uabec-\uabed\uabf0-\uabf9\uac00-\ud7a3\ud7b0-\ud7c6\ud7cb-\ud7fb\uf900-\ufa6d\ufa70-\ufad9\ufb00-\ufb06\ufb13-\ufb17\ufb1d-\ufb28\ufb2a-\ufb36\ufb38-\ufb3c\ufb3e\ufb40-\ufb41\ufb43-\ufb44\ufb46-\ufbb1\ufbd3-\ufd3d\ufd50-\ufd8f\ufd92-\ufdc7\ufdf0-\ufdfb\ufe00-\ufe0f\ufe20-\ufe2f\ufe33-\ufe34\ufe4d-\ufe4f\ufe70-\ufe74\ufe76-\ufefc\uff10-\uff19\uff21-\uff3a\uff3f\uff41-\uff5a\uff66-\uffbe\uffc2-\uffc7\uffca-\uffcf\uffd2-\uffd7\uffda-\uffdc\U00010000-\U0001000b\U0001000d-\U00010026\U00010028-\U0001003a\U0001003c-\U0001003d\U0001003f-\U0001004d\U00010050-\U0001005d\U00010080-\U000100fa\U00010140-\U00010174\U000101fd\U00010280-\U0001029c\U000102a0-\U000102d0\U000102e0\U00010300-\U0001031f\U0001032d-\U0001034a\U00010350-\U0001037a\U00010380-\U0001039d\U000103a0-\U000103c3\U000103c8-\U000103cf\U000103d1-\U000103d5\U00010400-\U0001049d\U000104a0-\U000104a9\U000104b0-\U000104d3\U000104d8-\U000104fb\U00010500-\U00010527\U00010530-\U00010563\U00010600-\U00010736\U00010740-\U00010755\U00010760-\U00010767\U00010800-\U00010805\U00010808\U0001080a-\U00010835\U00010837-\U00010838\U0001083c\U0001083f-\U00010855\U00010860-\U00010876\U00010880-\U0001089e\U000108e0-\U000108f2\U000108f4-\U000108f5\U00010900-\U00010915\U00010920-\U00010939\U00010980-\U000109b7\U000109be-\U000109bf\U00010a00-\U00010a03\U00010a05-\U00010a06\U00010a0c-\U00010a13\U00010a15-\U00010a17\U00010a19-\U00010a35\U00010a38-\U00010a3a\U00010a3f\U00010a60-\U00010a7c\U00010a80-\U00010a9c\U00010ac0-\U00010ac7\U00010ac9-\U00010ae6\U00010b00-\U00010b35\U00010b40-\U00010b55\U00010b60-\U00010b72\U00010b80-\U00010b91\U00010c00-\U00010c48\U00010c80-\U00010cb2\U00010cc0-\U00010cf2\U00010d00-\U00010d27\U00010d30-\U00010d39\U00010f00-\U00010f1c\U00010f27\U00010f30-\U00010f50\U00010fe0-\U00010ff6\U00011000-\U00011046\U00011066-\U0001106f\U0001107f-\U000110ba\U000110d0-\U000110e8\U000110f0-\U000110f9\U00011100-\U00011134\U00011136-\U0001113f\U00011144-\U00011146\U00011150-\U00011173\U00011176\U00011180-\U000111c4\U000111c9-\U000111cc\U000111d0-\U000111da\U000111dc\U00011200-\U00011211\U00011213-\U00011237\U0001123e\U00011280-\U00011286\U00011288\U0001128a-\U0001128d\U0001128f-\U0001129d\U0001129f-\U000112a8\U000112b0-\U000112ea\U000112f0-\U000112f9\U00011300-\U00011303\U00011305-\U0001130c\U0001130f-\U00011310\U00011313-\U00011328\U0001132a-\U00011330\U00011332-\U00011333\U00011335-\U00011339\U0001133b-\U00011344\U00011347-\U00011348\U0001134b-\U0001134d\U00011350\U00011357\U0001135d-\U00011363\U00011366-\U0001136c\U00011370-\U00011374\U00011400-\U0001144a\U00011450-\U00011459\U0001145e-\U0001145f\U00011480-\U000114c5\U000114c7\U000114d0-\U000114d9\U00011580-\U000115b5\U000115b8-\U000115c0\U000115d8-\U000115dd\U00011600-\U00011640\U00011644\U00011650-\U00011659\U00011680-\U000116b8\U000116c0-\U000116c9\U00011700-\U0001171a\U0001171d-\U0001172b\U00011730-\U00011739\U00011800-\U0001183a\U000118a0-\U000118e9\U000118ff\U000119a0-\U000119a7\U000119aa-\U000119d7\U000119da-\U000119e1\U000119e3-\U000119e4\U00011a00-\U00011a3e\U00011a47\U00011a50-\U00011a99\U00011a9d\U00011ac0-\U00011af8\U00011c00-\U00011c08\U00011c0a-\U00011c36\U00011c38-\U00011c40\U00011c50-\U00011c59\U00011c72-\U00011c8f\U00011c92-\U00011ca7\U00011ca9-\U00011cb6\U00011d00-\U00011d06\U00011d08-\U00011d09\U00011d0b-\U00011d36\U00011d3a\U00011d3c-\U00011d3d\U00011d3f-\U00011d47\U00011d50-\U00011d59\U00011d60-\U00011d65\U00011d67-\U00011d68\U00011d6a-\U00011d8e\U00011d90-\U00011d91\U00011d93-\U00011d98\U00011da0-\U00011da9\U00011ee0-\U00011ef6\U00012000-\U00012399\U00012400-\U0001246e\U00012480-\U00012543\U00013000-\U0001342e\U00014400-\U00014646\U00016800-\U00016a38\U00016a40-\U00016a5e\U00016a60-\U00016a69\U00016ad0-\U00016aed\U00016af0-\U00016af4\U00016b00-\U00016b36\U00016b40-\U00016b43\U00016b50-\U00016b59\U00016b63-\U00016b77\U00016b7d-\U00016b8f\U00016e40-\U00016e7f\U00016f00-\U00016f4a\U00016f4f-\U00016f87\U00016f8f-\U00016f9f\U00016fe0-\U00016fe1\U00016fe3\U00017000-\U000187f7\U00018800-\U00018af2\U0001b000-\U0001b11e\U0001b150-\U0001b152\U0001b164-\U0001b167\U0001b170-\U0001b2fb\U0001bc00-\U0001bc6a\U0001bc70-\U0001bc7c\U0001bc80-\U0001bc88\U0001bc90-\U0001bc99\U0001bc9d-\U0001bc9e\U0001d165-\U0001d169\U0001d16d-\U0001d172\U0001d17b-\U0001d182\U0001d185-\U0001d18b\U0001d1aa-\U0001d1ad\U0001d242-\U0001d244\U0001d400-\U0001d454\U0001d456-\U0001d49c\U0001d49e-\U0001d49f\U0001d4a2\U0001d4a5-\U0001d4a6\U0001d4a9-\U0001d4ac\U0001d4ae-\U0001d4b9\U0001d4bb\U0001d4bd-\U0001d4c3\U0001d4c5-\U0001d505\U0001d507-\U0001d50a\U0001d50d-\U0001d514\U0001d516-\U0001d51c\U0001d51e-\U0001d539\U0001d53b-\U0001d53e\U0001d540-\U0001d544\U0001d546\U0001d54a-\U0001d550\U0001d552-\U0001d6a5\U0001d6a8-\U0001d6c0\U0001d6c2-\U0001d6da\U0001d6dc-\U0001d6fa\U0001d6fc-\U0001d714\U0001d716-\U0001d734\U0001d736-\U0001d74e\U0001d750-\U0001d76e\U0001d770-\U0001d788\U0001d78a-\U0001d7a8\U0001d7aa-\U0001d7c2\U0001d7c4-\U0001d7cb\U0001d7ce-\U0001d7ff\U0001da00-\U0001da36\U0001da3b-\U0001da6c\U0001da75\U0001da84\U0001da9b-\U0001da9f\U0001daa1-\U0001daaf\U0001e000-\U0001e006\U0001e008-\U0001e018\U0001e01b-\U0001e021\U0001e023-\U0001e024\U0001e026-\U0001e02a\U0001e100-\U0001e12c\U0001e130-\U0001e13d\U0001e140-\U0001e149\U0001e14e\U0001e2c0-\U0001e2f9\U0001e800-\U0001e8c4\U0001e8d0-\U0001e8d6\U0001e900-\U0001e94b\U0001e950-\U0001e959\U0001ee00-\U0001ee03\U0001ee05-\U0001ee1f\U0001ee21-\U0001ee22\U0001ee24\U0001ee27\U0001ee29-\U0001ee32\U0001ee34-\U0001ee37\U0001ee39\U0001ee3b\U0001ee42\U0001ee47\U0001ee49\U0001ee4b\U0001ee4d-\U0001ee4f\U0001ee51-\U0001ee52\U0001ee54\U0001ee57\U0001ee59\U0001ee5b\U0001ee5d\U0001ee5f\U0001ee61-\U0001ee62\U0001ee64\U0001ee67-\U0001ee6a\U0001ee6c-\U0001ee72\U0001ee74-\U0001ee77\U0001ee79-\U0001ee7c\U0001ee7e\U0001ee80-\U0001ee89\U0001ee8b-\U0001ee9b\U0001eea1-\U0001eea3\U0001eea5-\U0001eea9\U0001eeab-\U0001eebb\U0001f130-\U0001f149\U0001f150-\U0001f169\U0001f170-\U0001f189\U00020000-\U0002a6d6\U0002a700-\U0002b734\U0002b740-\U0002b81d\U0002b820-\U0002cea1\U0002ceb0-\U0002ebe0\U0002f800-\U0002fa1d\U000e0100-\U000e01ef' - -alpha = 'A-Za-z\xaa\xb5\xba\xc0-\xd6\xd8-\xf6\xf8-\u02c1\u02c6-\u02d1\u02e0-\u02e4\u02ec\u02ee\u0345\u0370-\u0374\u0376-\u0377\u037a-\u037d\u037f\u0386\u0388-\u038a\u038c\u038e-\u03a1\u03a3-\u03f5\u03f7-\u0481\u048a-\u052f\u0531-\u0556\u0559\u0560-\u0588\u05b0-\u05bd\u05bf\u05c1-\u05c2\u05c4-\u05c5\u05c7\u05d0-\u05ea\u05ef-\u05f2\u0610-\u061a\u0620-\u0657\u0659-\u065f\u066e-\u06d3\u06d5-\u06dc\u06e1-\u06e8\u06ed-\u06ef\u06fa-\u06fc\u06ff\u0710-\u073f\u074d-\u07b1\u07ca-\u07ea\u07f4-\u07f5\u07fa\u0800-\u0817\u081a-\u082c\u0840-\u0858\u0860-\u086a\u08a0-\u08b4\u08b6-\u08bd\u08d4-\u08df\u08e3-\u08e9\u08f0-\u093b\u093d-\u094c\u094e-\u0950\u0955-\u0963\u0971-\u0983\u0985-\u098c\u098f-\u0990\u0993-\u09a8\u09aa-\u09b0\u09b2\u09b6-\u09b9\u09bd-\u09c4\u09c7-\u09c8\u09cb-\u09cc\u09ce\u09d7\u09dc-\u09dd\u09df-\u09e3\u09f0-\u09f1\u09fc\u0a01-\u0a03\u0a05-\u0a0a\u0a0f-\u0a10\u0a13-\u0a28\u0a2a-\u0a30\u0a32-\u0a33\u0a35-\u0a36\u0a38-\u0a39\u0a3e-\u0a42\u0a47-\u0a48\u0a4b-\u0a4c\u0a51\u0a59-\u0a5c\u0a5e\u0a70-\u0a75\u0a81-\u0a83\u0a85-\u0a8d\u0a8f-\u0a91\u0a93-\u0aa8\u0aaa-\u0ab0\u0ab2-\u0ab3\u0ab5-\u0ab9\u0abd-\u0ac5\u0ac7-\u0ac9\u0acb-\u0acc\u0ad0\u0ae0-\u0ae3\u0af9-\u0afc\u0b01-\u0b03\u0b05-\u0b0c\u0b0f-\u0b10\u0b13-\u0b28\u0b2a-\u0b30\u0b32-\u0b33\u0b35-\u0b39\u0b3d-\u0b44\u0b47-\u0b48\u0b4b-\u0b4c\u0b56-\u0b57\u0b5c-\u0b5d\u0b5f-\u0b63\u0b71\u0b82-\u0b83\u0b85-\u0b8a\u0b8e-\u0b90\u0b92-\u0b95\u0b99-\u0b9a\u0b9c\u0b9e-\u0b9f\u0ba3-\u0ba4\u0ba8-\u0baa\u0bae-\u0bb9\u0bbe-\u0bc2\u0bc6-\u0bc8\u0bca-\u0bcc\u0bd0\u0bd7\u0c00-\u0c03\u0c05-\u0c0c\u0c0e-\u0c10\u0c12-\u0c28\u0c2a-\u0c39\u0c3d-\u0c44\u0c46-\u0c48\u0c4a-\u0c4c\u0c55-\u0c56\u0c58-\u0c5a\u0c60-\u0c63\u0c80-\u0c83\u0c85-\u0c8c\u0c8e-\u0c90\u0c92-\u0ca8\u0caa-\u0cb3\u0cb5-\u0cb9\u0cbd-\u0cc4\u0cc6-\u0cc8\u0cca-\u0ccc\u0cd5-\u0cd6\u0cde\u0ce0-\u0ce3\u0cf1-\u0cf2\u0d00-\u0d03\u0d05-\u0d0c\u0d0e-\u0d10\u0d12-\u0d3a\u0d3d-\u0d44\u0d46-\u0d48\u0d4a-\u0d4c\u0d4e\u0d54-\u0d57\u0d5f-\u0d63\u0d7a-\u0d7f\u0d82-\u0d83\u0d85-\u0d96\u0d9a-\u0db1\u0db3-\u0dbb\u0dbd\u0dc0-\u0dc6\u0dcf-\u0dd4\u0dd6\u0dd8-\u0ddf\u0df2-\u0df3\u0e01-\u0e3a\u0e40-\u0e46\u0e4d\u0e81-\u0e82\u0e84\u0e86-\u0e8a\u0e8c-\u0ea3\u0ea5\u0ea7-\u0eb9\u0ebb-\u0ebd\u0ec0-\u0ec4\u0ec6\u0ecd\u0edc-\u0edf\u0f00\u0f40-\u0f47\u0f49-\u0f6c\u0f71-\u0f81\u0f88-\u0f97\u0f99-\u0fbc\u1000-\u1036\u1038\u103b-\u103f\u1050-\u108f\u109a-\u109d\u10a0-\u10c5\u10c7\u10cd\u10d0-\u10fa\u10fc-\u1248\u124a-\u124d\u1250-\u1256\u1258\u125a-\u125d\u1260-\u1288\u128a-\u128d\u1290-\u12b0\u12b2-\u12b5\u12b8-\u12be\u12c0\u12c2-\u12c5\u12c8-\u12d6\u12d8-\u1310\u1312-\u1315\u1318-\u135a\u1380-\u138f\u13a0-\u13f5\u13f8-\u13fd\u1401-\u166c\u166f-\u167f\u1681-\u169a\u16a0-\u16ea\u16ee-\u16f8\u1700-\u170c\u170e-\u1713\u1720-\u1733\u1740-\u1753\u1760-\u176c\u176e-\u1770\u1772-\u1773\u1780-\u17b3\u17b6-\u17c8\u17d7\u17dc\u1820-\u1878\u1880-\u18aa\u18b0-\u18f5\u1900-\u191e\u1920-\u192b\u1930-\u1938\u1950-\u196d\u1970-\u1974\u1980-\u19ab\u19b0-\u19c9\u1a00-\u1a1b\u1a20-\u1a5e\u1a61-\u1a74\u1aa7\u1b00-\u1b33\u1b35-\u1b43\u1b45-\u1b4b\u1b80-\u1ba9\u1bac-\u1baf\u1bba-\u1be5\u1be7-\u1bf1\u1c00-\u1c36\u1c4d-\u1c4f\u1c5a-\u1c7d\u1c80-\u1c88\u1c90-\u1cba\u1cbd-\u1cbf\u1ce9-\u1cec\u1cee-\u1cf3\u1cf5-\u1cf6\u1cfa\u1d00-\u1dbf\u1de7-\u1df4\u1e00-\u1f15\u1f18-\u1f1d\u1f20-\u1f45\u1f48-\u1f4d\u1f50-\u1f57\u1f59\u1f5b\u1f5d\u1f5f-\u1f7d\u1f80-\u1fb4\u1fb6-\u1fbc\u1fbe\u1fc2-\u1fc4\u1fc6-\u1fcc\u1fd0-\u1fd3\u1fd6-\u1fdb\u1fe0-\u1fec\u1ff2-\u1ff4\u1ff6-\u1ffc\u2071\u207f\u2090-\u209c\u2102\u2107\u210a-\u2113\u2115\u2119-\u211d\u2124\u2126\u2128\u212a-\u212d\u212f-\u2139\u213c-\u213f\u2145-\u2149\u214e\u2160-\u2188\u24b6-\u24e9\u2c00-\u2c2e\u2c30-\u2c5e\u2c60-\u2ce4\u2ceb-\u2cee\u2cf2-\u2cf3\u2d00-\u2d25\u2d27\u2d2d\u2d30-\u2d67\u2d6f\u2d80-\u2d96\u2da0-\u2da6\u2da8-\u2dae\u2db0-\u2db6\u2db8-\u2dbe\u2dc0-\u2dc6\u2dc8-\u2dce\u2dd0-\u2dd6\u2dd8-\u2dde\u2de0-\u2dff\u2e2f\u3005-\u3007\u3021-\u3029\u3031-\u3035\u3038-\u303c\u3041-\u3096\u309d-\u309f\u30a1-\u30fa\u30fc-\u30ff\u3105-\u312f\u3131-\u318e\u31a0-\u31ba\u31f0-\u31ff\u3400-\u4db5\u4e00-\u9fef\ua000-\ua48c\ua4d0-\ua4fd\ua500-\ua60c\ua610-\ua61f\ua62a-\ua62b\ua640-\ua66e\ua674-\ua67b\ua67f-\ua6ef\ua717-\ua71f\ua722-\ua788\ua78b-\ua7bf\ua7c2-\ua7c6\ua7f7-\ua805\ua807-\ua827\ua840-\ua873\ua880-\ua8c3\ua8c5\ua8f2-\ua8f7\ua8fb\ua8fd-\ua8ff\ua90a-\ua92a\ua930-\ua952\ua960-\ua97c\ua980-\ua9b2\ua9b4-\ua9bf\ua9cf\ua9e0-\ua9ef\ua9fa-\ua9fe\uaa00-\uaa36\uaa40-\uaa4d\uaa60-\uaa76\uaa7a-\uaabe\uaac0\uaac2\uaadb-\uaadd\uaae0-\uaaef\uaaf2-\uaaf5\uab01-\uab06\uab09-\uab0e\uab11-\uab16\uab20-\uab26\uab28-\uab2e\uab30-\uab5a\uab5c-\uab67\uab70-\uabea\uac00-\ud7a3\ud7b0-\ud7c6\ud7cb-\ud7fb\uf900-\ufa6d\ufa70-\ufad9\ufb00-\ufb06\ufb13-\ufb17\ufb1d-\ufb28\ufb2a-\ufb36\ufb38-\ufb3c\ufb3e\ufb40-\ufb41\ufb43-\ufb44\ufb46-\ufbb1\ufbd3-\ufd3d\ufd50-\ufd8f\ufd92-\ufdc7\ufdf0-\ufdfb\ufe70-\ufe74\ufe76-\ufefc\uff21-\uff3a\uff41-\uff5a\uff66-\uffbe\uffc2-\uffc7\uffca-\uffcf\uffd2-\uffd7\uffda-\uffdc\U00010000-\U0001000b\U0001000d-\U00010026\U00010028-\U0001003a\U0001003c-\U0001003d\U0001003f-\U0001004d\U00010050-\U0001005d\U00010080-\U000100fa\U00010140-\U00010174\U00010280-\U0001029c\U000102a0-\U000102d0\U00010300-\U0001031f\U0001032d-\U0001034a\U00010350-\U0001037a\U00010380-\U0001039d\U000103a0-\U000103c3\U000103c8-\U000103cf\U000103d1-\U000103d5\U00010400-\U0001049d\U000104b0-\U000104d3\U000104d8-\U000104fb\U00010500-\U00010527\U00010530-\U00010563\U00010600-\U00010736\U00010740-\U00010755\U00010760-\U00010767\U00010800-\U00010805\U00010808\U0001080a-\U00010835\U00010837-\U00010838\U0001083c\U0001083f-\U00010855\U00010860-\U00010876\U00010880-\U0001089e\U000108e0-\U000108f2\U000108f4-\U000108f5\U00010900-\U00010915\U00010920-\U00010939\U00010980-\U000109b7\U000109be-\U000109bf\U00010a00-\U00010a03\U00010a05-\U00010a06\U00010a0c-\U00010a13\U00010a15-\U00010a17\U00010a19-\U00010a35\U00010a60-\U00010a7c\U00010a80-\U00010a9c\U00010ac0-\U00010ac7\U00010ac9-\U00010ae4\U00010b00-\U00010b35\U00010b40-\U00010b55\U00010b60-\U00010b72\U00010b80-\U00010b91\U00010c00-\U00010c48\U00010c80-\U00010cb2\U00010cc0-\U00010cf2\U00010d00-\U00010d27\U00010f00-\U00010f1c\U00010f27\U00010f30-\U00010f45\U00010fe0-\U00010ff6\U00011000-\U00011045\U00011082-\U000110b8\U000110d0-\U000110e8\U00011100-\U00011132\U00011144-\U00011146\U00011150-\U00011172\U00011176\U00011180-\U000111bf\U000111c1-\U000111c4\U000111da\U000111dc\U00011200-\U00011211\U00011213-\U00011234\U00011237\U0001123e\U00011280-\U00011286\U00011288\U0001128a-\U0001128d\U0001128f-\U0001129d\U0001129f-\U000112a8\U000112b0-\U000112e8\U00011300-\U00011303\U00011305-\U0001130c\U0001130f-\U00011310\U00011313-\U00011328\U0001132a-\U00011330\U00011332-\U00011333\U00011335-\U00011339\U0001133d-\U00011344\U00011347-\U00011348\U0001134b-\U0001134c\U00011350\U00011357\U0001135d-\U00011363\U00011400-\U00011441\U00011443-\U00011445\U00011447-\U0001144a\U0001145f\U00011480-\U000114c1\U000114c4-\U000114c5\U000114c7\U00011580-\U000115b5\U000115b8-\U000115be\U000115d8-\U000115dd\U00011600-\U0001163e\U00011640\U00011644\U00011680-\U000116b5\U000116b8\U00011700-\U0001171a\U0001171d-\U0001172a\U00011800-\U00011838\U000118a0-\U000118df\U000118ff\U000119a0-\U000119a7\U000119aa-\U000119d7\U000119da-\U000119df\U000119e1\U000119e3-\U000119e4\U00011a00-\U00011a32\U00011a35-\U00011a3e\U00011a50-\U00011a97\U00011a9d\U00011ac0-\U00011af8\U00011c00-\U00011c08\U00011c0a-\U00011c36\U00011c38-\U00011c3e\U00011c40\U00011c72-\U00011c8f\U00011c92-\U00011ca7\U00011ca9-\U00011cb6\U00011d00-\U00011d06\U00011d08-\U00011d09\U00011d0b-\U00011d36\U00011d3a\U00011d3c-\U00011d3d\U00011d3f-\U00011d41\U00011d43\U00011d46-\U00011d47\U00011d60-\U00011d65\U00011d67-\U00011d68\U00011d6a-\U00011d8e\U00011d90-\U00011d91\U00011d93-\U00011d96\U00011d98\U00011ee0-\U00011ef6\U00012000-\U00012399\U00012400-\U0001246e\U00012480-\U00012543\U00013000-\U0001342e\U00014400-\U00014646\U00016800-\U00016a38\U00016a40-\U00016a5e\U00016ad0-\U00016aed\U00016b00-\U00016b2f\U00016b40-\U00016b43\U00016b63-\U00016b77\U00016b7d-\U00016b8f\U00016e40-\U00016e7f\U00016f00-\U00016f4a\U00016f4f-\U00016f87\U00016f8f-\U00016f9f\U00016fe0-\U00016fe1\U00016fe3\U00017000-\U000187f7\U00018800-\U00018af2\U0001b000-\U0001b11e\U0001b150-\U0001b152\U0001b164-\U0001b167\U0001b170-\U0001b2fb\U0001bc00-\U0001bc6a\U0001bc70-\U0001bc7c\U0001bc80-\U0001bc88\U0001bc90-\U0001bc99\U0001bc9e\U0001d400-\U0001d454\U0001d456-\U0001d49c\U0001d49e-\U0001d49f\U0001d4a2\U0001d4a5-\U0001d4a6\U0001d4a9-\U0001d4ac\U0001d4ae-\U0001d4b9\U0001d4bb\U0001d4bd-\U0001d4c3\U0001d4c5-\U0001d505\U0001d507-\U0001d50a\U0001d50d-\U0001d514\U0001d516-\U0001d51c\U0001d51e-\U0001d539\U0001d53b-\U0001d53e\U0001d540-\U0001d544\U0001d546\U0001d54a-\U0001d550\U0001d552-\U0001d6a5\U0001d6a8-\U0001d6c0\U0001d6c2-\U0001d6da\U0001d6dc-\U0001d6fa\U0001d6fc-\U0001d714\U0001d716-\U0001d734\U0001d736-\U0001d74e\U0001d750-\U0001d76e\U0001d770-\U0001d788\U0001d78a-\U0001d7a8\U0001d7aa-\U0001d7c2\U0001d7c4-\U0001d7cb\U0001e000-\U0001e006\U0001e008-\U0001e018\U0001e01b-\U0001e021\U0001e023-\U0001e024\U0001e026-\U0001e02a\U0001e100-\U0001e12c\U0001e137-\U0001e13d\U0001e14e\U0001e2c0-\U0001e2eb\U0001e800-\U0001e8c4\U0001e900-\U0001e943\U0001e947\U0001e94b\U0001ee00-\U0001ee03\U0001ee05-\U0001ee1f\U0001ee21-\U0001ee22\U0001ee24\U0001ee27\U0001ee29-\U0001ee32\U0001ee34-\U0001ee37\U0001ee39\U0001ee3b\U0001ee42\U0001ee47\U0001ee49\U0001ee4b\U0001ee4d-\U0001ee4f\U0001ee51-\U0001ee52\U0001ee54\U0001ee57\U0001ee59\U0001ee5b\U0001ee5d\U0001ee5f\U0001ee61-\U0001ee62\U0001ee64\U0001ee67-\U0001ee6a\U0001ee6c-\U0001ee72\U0001ee74-\U0001ee77\U0001ee79-\U0001ee7c\U0001ee7e\U0001ee80-\U0001ee89\U0001ee8b-\U0001ee9b\U0001eea1-\U0001eea3\U0001eea5-\U0001eea9\U0001eeab-\U0001eebb\U0001f130-\U0001f149\U0001f150-\U0001f169\U0001f170-\U0001f189\U00020000-\U0002a6d6\U0002a700-\U0002b734\U0002b740-\U0002b81d\U0002b820-\U0002cea1\U0002ceb0-\U0002ebe0\U0002f800-\U0002fa1d' - -cyrillic = '\u0400-\u0484\u0487-\u052f\u1c80-\u1c88\u1d2b\u1d78\u2de0-\u2dff\ua640-\ua69f\ufe2e-\ufe2f' -accented = '\xC0-\xD6\xD8-\xF6\xF8-\xFF\u0100-\u024F\u0253-\u0254\u0256-\u0257\u0259\u025B\u0263\u0268\u026F\u0272\u0289\u028B\u02BB\u0300-\u036F\u1E00-\u1EFF' -Pd = r'\-\u058A\u05BE\u1400\u1806\u2010-\u2015\u2E17\u2E1A\u2E3A\u2E3B\u2E40\u2E5D\u301C\u3030\u30A0\uFE31\uFE32\uFE58\uFE63\uFF0D\U00010EAD' - -directional = '\u061C\u200E\u200F\u202A\u202B\u202C\u202D\u202E\u2066\u2067\u2068\u2069' -ctrl = '\x00-\x1F\x7F' -space = '\x09-\x0D\x20\x85\xA0\u1680\u180E\u2000-\u200A\u2028\u2029\u202F\u205F\u3000' - -username = r'(?i:[a-z0-9_]+([a-z0-9_.-]+[a-z0-9_]+)?)' -mention = re.compile(r'(?i:(??@\[\]^\`{|}~' -domain_invalid_end_chars = domain_invalid_middle_chars + '_-' -domain_component = (r'[^' + domain_invalid_end_chars + ']' + - r'(?:[^' + domain_invalid_middle_chars + ']*' + - r'[^' + domain_invalid_end_chars + '])?') -# This is not quite the way the server does it, because the server has -# a huge list of valid TLDs! I can't face that. And I think it's only -# there so that it can match URLs _without_ an http[s] prefix and -# avoid too many false positives. So my compromise is to trust the -# user, when composing a toot, to only enter URLs with sensible -# domains, otherwise we'll mis-highlight them and get the character -# counts wrong. -domain = domain_component + r'(?:\.' + domain_component + r')*' - -path_end_chars = r'a-z' + cyrillic + accented + r'0-9=_#/\+\-' -path_mid_chars = path_end_chars + r'!\*\';:\,\.\$\%\[\]~&\|@' + Pd - -path_bracketed_once = r'\([' + path_mid_chars + ']*\)' -path_char_or_bracketed_once = r'(?:[' + path_mid_chars + r']|' + path_bracketed_once + r')' -path_bracketed = r'\(' + path_char_or_bracketed_once + '*\)' - -path = ( - r'(?:[' + path_mid_chars + ']|' + path_bracketed + r')*' + - r'(?:[' + path_end_chars + ']|' + path_bracketed + r')') - -query_end_chars = r'a-z0-9_&=#/\-' -query_mid_chars = query_end_chars + r'!?\*\'\(\);:\+\$%\[\]\.,~|@' - -url = re.compile( - r'(?i:' + - r'(? tags - '#': [0, 36], # #hashtags - '@': [0, 32], # @mentions of a user - '_': [0, 4], # tags - 's': [0, 1], # tags - 'u': [0, 1, 4, 34], # URL - 'M': [0, 1, 4, 35], # media URL - 'm': [0, 35], # media description - '!': [0, 1, 7, 43, 31], # error report - 'J': [0, 1, 7, 47, 34], # Mastodonochrome logo in file headers - '~': [0, 34], # ~~~~~ underline in file headers - 'H': [0, 36], # actual header text in file headers - 'K': [0, 1, 36], # keypress / keypath names in file headers - 'k': [0, 1], # keypresses in file status lines - '-': [0, 7, 40, 36], # separator line between editor header and content - '0': [0, 34], # something really boring, like 'none' in place of data - 'r': [0, 31], # red nastinesses like blocking/muting in Examine User - '>': [0, 7], # reverse-video > indicating a truncated too-long line -} - -wcswidth_cache = {} -def cached_wcswidth(s): - if s not in wcswidth_cache: - wcswidth_cache[s] = wcwidth.wcswidth(s) - return wcswidth_cache[s] - -class ColouredString: - def __init__(self, string, colour=' '): - if isinstance(string, ColouredString): - self.s, self.c = string.s, string.c - else: - if len(colour) != len(string): - assert len(colour) == 1, "Colour ids are single characters" - colour = colour * len(string) - self.s, self.c = string, colour - self.width = cached_wcswidth(self.s) - - def __add__(self, rhs): - rhs = type(self)(rhs) - return type(self)(self.s + rhs.s, self.c + rhs.c) - - def __radd__(self, lhs): - lhs = type(self)(lhs) - return type(self)(lhs.s + self.s, lhs.c + self.c) - - def __mul__(self, rhs): - return type(self)(self.s * rhs, self.c * rhs) - - def __len__(self): - return len(self.s) - - def __str__(self): - return self.s - - def __repr__(self): - if self.c.rstrip(" ") == "": - return f"ColouredString({self.s!r})" - else: - return f"ColouredString({self.s!r}, {self.c!r})" - - def __iter__(self): - return (ColouredString(sc, cc) for sc, cc in zip(self.s, self.c)) - - def __getitem__(self, n): - return ColouredString(self.s[n], self.c[n]) - - def __eq__(self, rhs): - rhs = type(self)(rhs) - return (self.s, self.c) == (rhs.s, rhs.c) - def __ne__(self, rhs): - return not (self == rhs) - - def ecma48(self): - buf = io.StringIO() - colour = ' ' - for sc, cc in itertools.chain(zip(self.s, self.c), [('',' ')]): - if cc != colour: - buf.write("\033[{}m".format(";".join(map(str, colourmap[cc])))) - colour = cc - buf.write(sc) - return buf.getvalue() - - def frags(self): - # Return maximal substrings with the same attribute. - pos = 0 - while pos < len(self.c): - colour = self.c[pos] - fraglen = len(self.c) - pos - len(self.c[pos:].lstrip(colour)) - frag = self.s[pos:pos+fraglen] - yield frag, colour, cached_wcswidth(frag) - pos += fraglen - - def split(self, width): - # Split on to multiple physical lines. - line = ColouredString("") - for c in self: - if line.width + c.width > width: - yield line - line = ColouredString("") - line += c - yield line - - def is_colour(self, c): - return self.c == c * len(self.c) - -class BlankLine: - def render(self, width): - yield ColouredString("") - -class SeparatorLine: - def __init__(self, timestamp=None, favourited=False, boosted=False): - self.timestamp = timestamp - self.favourited = favourited - self.boosted = boosted - - def render(self, width): - suffix = ColouredString("") - if self.timestamp is not None: - date = time.strftime("%a %b %e %H:%M:%S %Y", - time.localtime(self.timestamp)) - suffix = (ColouredString("[", 'S') + - ColouredString(date, 'D') + - ColouredString("]--", 'S')) + suffix - if self.boosted: - suffix = (ColouredString("[", 'S') + - ColouredString('B', 'D') + - ColouredString("]--", 'S')) + suffix - if self.favourited: - suffix = (ColouredString("[", 'S') + - ColouredString('F', 'D') + - ColouredString("]--", 'S')) + suffix - yield ColouredString("-", 'S') * (width - 1 - suffix.width) + suffix - -class EditorHeaderSeparator: - def render(self, width): - yield ColouredString("-" * (width - 2) + "|", '-') - -class UsernameHeader: - def __init__(self, account, nameline): - self.account = account - self.nameline = nameline - - def render(self, width, target=False): - # FIXME: truncate - yield (ColouredString(self.header + ": ", 'f' if target else ' ') + - ColouredString(f"{self.nameline} ({self.account})", - self.colour)) - -class FromLine(UsernameHeader): - can_highlight_as_target = True - header = "From" - colour = "F" -class BoosterLine(UsernameHeader): - header = "Via" - colour = "f" - -class InReplyToLine: - def __init__(self, cparas): - self.para = Paragraph() - self.para.add(ColouredString("Re:")) - self.para.end_word() - - currlen = len(self.para) - for cpara in cparas: - self.para.add_para(cpara) - self.para.delete_mention_words_from(currlen) - - def render(self, width): - it = self.para.render(width-3) - line = next(it) - try: - next(it) - - if line.width < width-3: - line += ColouredString(" ") - line += ColouredString("...") - except StopIteration: - pass - yield line - -class Media: - def __init__(self, url, description): - self.url = url - self.description = [] - if description is not None: - for line in description.splitlines(): - desc = Paragraph() - desc.add(ColouredString(line, 'm')) - desc.end_word() - self.description.append(desc) - - def render(self, width): - yield ColouredString(self.url, "M") - if self.description is not None: - for para in self.description: - for line in para.render(width-4): - yield ColouredString(" ") + line - yield ColouredString("") - -class FileHeader: - def __init__(self, text): - if not isinstance(text, ColouredString): - text = ColouredString(text, "H") - self.text = text - - def render(self, width): - logo = [ColouredString('(o)', 'J'), ColouredString('/J\\', 'J')] - logowidth = logo[0].width - assert(all(s.width == logowidth for s in logo)) - - # FIXME: truncate - headertext = self.text - - midspace = width - 1 - 2 * logowidth - 2 - space = midspace - headertext.width - lspace = space // 2 - rspace = space - lspace - - yield (logo[0] + ColouredString(" " * (lspace+1)) + - headertext + ColouredString(" " * (rspace+1)) + logo[0]) - yield (logo[1] + ColouredString(" ") + - ColouredString("~" * midspace, '~') + ColouredString(" ") + - logo[1]) - -class ExtendableIndicator: - def __init__(self, primed): - self.primed = primed - - def render(self, width): - if self.primed: - message = ColouredString("Press [0] to extend", - "HHHHHHHKHHHHHHHHHHH") - else: - message = ColouredString("Press [0] twice to extend", - "HHHHHHHKHHHHHHHHHHHHHHHHH") - space = width - message.width - lspace = space // 2 + 1 - rspace = space - lspace + 1 - - yield ColouredString("") - yield ColouredString(" " * lspace) + message - yield ColouredString("") - -class FileStatusLine: - def __init__(self, text=None): - self.keys = [] - self.proportion = None - self.text = text - - def render(self, width): - message = ColouredString('') - sep = ColouredString('') - sep2 = ColouredString(' ') - if self.text is not None: - message += sep + self.text - sep = sep2 - for key, action in self.keys: - message += ( - sep + ColouredString('[') + ColouredString(key, 'k') + - ColouredString(']')) - if action is not None: - message += ColouredString(':' + action) - sep = sep2 - if self.proportion is not None: - message += sep + ColouredString('({:d}%)'.format( - int(self.proportion * 100))) - sep = sep2 - elif message.width != 0: - message += ColouredString('.') - - space = width - message.width - lspace = space // 2 + 1 - rspace = space - lspace + 1 - - yield (ColouredString(" " * lspace) + message) - -class MenuKeypressLine: - def __init__(self, key, description): - self.key = ColouredString(key) - self.description = ColouredString(description) - self.max_key_width = self.key.width - - def expand_key_width(self, new_max): - self.max_key_width = max(self.max_key_width, new_max) - return self.max_key_width - - def render(self, width): - equalpos = (width - 1) // 2 - 1 - lspace = equalpos - self.max_key_width - 3 - kspace = self.max_key_width - self.key.width - klspace = kspace // 2 - krspace = kspace - klspace - yield (ColouredString(" " * lspace) + - ColouredString(" " * klspace) + - ColouredString("[") + - ColouredString(self.key, 'k') + - ColouredString("]") + - ColouredString(" " * krspace) + - ColouredString(" = ") + - self.description) - -class Paragraph: - def __init__(self, text=None): - self.words = [] - self.space_colours = [] - self.unfinished_word = ColouredString('') - self.wrap = True - - if text is not None: - self.add(text) - self.end_word() - - def set_wrap(self, wrap): - self.wrap = wrap - - def render(self, width, laterwidth=None): - if laterwidth is None: - laterwidth = width - - if not self.wrap: - line, space = ColouredString(''), ColouredString('') - for word, space_colour in zip(self.words, self.space_colours): - oldlen = len(line) - line += space + word - space = ColouredString(' ', space_colour) - if line.width >= width: - line = next(line.split(width-2)) - while line.width < width-2: - line += ' ' - line += ColouredString(">", ">") - break - yield line - return - - # For the moment, greedy algorithm. We can worry about cleverness later - line, space = ColouredString(''), ColouredString('') - for word, space_colour in zip(self.words, self.space_colours): - if line != "" and (line + space + word).width >= width: - yield line - line, space = ColouredString(''), ColouredString('') - width = laterwidth - - line += space + word - space = ColouredString(' ', space_colour) - - if line.width >= width: - # FIXME: wrap explicitly? - yield line - line, space = ColouredString(''), ColouredString('') - width = laterwidth - - if len(line) != 0 or len(self.words) == 0: - yield line - - def empty(self): - return len(self.words) == 0 - - def end_word(self, space_colour=' '): - if len(self.unfinished_word) > 0: - self.words.append(self.unfinished_word) - self.space_colours.append(space_colour) - self.unfinished_word = ColouredString('') - - def add(self, text): - for c in text: - if str(c) == ' ': - self.end_word(ColouredString(c).c) - else: - self.unfinished_word += c - - def add_para(self, para): - self.end_word() - self.words.extend(para.words) - self.space_colours.extend(para.space_colours) - - def delete_mention_words_from(self, pos): - while pos < len(self.words) and self.words[pos].is_colour('@'): - self.words[pos:pos+1] = [] - self.space_colours[pos:pos+1] = [] - - def __len__(self): - return len(self.words) - - def __repr__(self): - return f"Paragraph({self.words!r}, unfinished={self.unfinished_word!r})" - -class IndentedParagraph(Paragraph): - def __init__(self, firstindent, laterindent, text=None): - super().__init__(text) - self.firstindent = firstindent - self.laterindent = laterindent - - def render(self, width): - it = super().render(width - self.firstindent, width - self.laterindent) - try: - yield " " * self.firstindent + next(it) - except StopIteration: - return - for line in it: - yield " " * self.laterindent + line - -class NotificationLog: - can_highlight_as_target = True - - def __init__(self, timestamp, account, nameline, ntype, cparas): - self.timestamp = timestamp - self.account = account - self.nameline = nameline - - self.date = ColouredString( - time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.timestamp))) - self.account_desc = f"{self.nameline} ({self.account})" - - self.para = Paragraph() - want_content = False - if ntype == 'reblog': - self.para.add('boosted:') - want_content = True - elif ntype == 'favourite': - self.para.add('favourited:') - want_content = True - elif ntype == 'follow': - self.para.add('followed you') - self.para.end_word() - - if want_content: - currlen = len(self.para) - for cpara in cparas: - self.para.add_para(cpara) - self.para.delete_mention_words_from(currlen) - - def render(self, width, target=False): - full_para = Paragraph() - full_para.add(ColouredString(self.date + " ")) - full_para.add(ColouredString( - self.account_desc, "f" if target else " ")) - full_para.add_para(self.para) - full_para.end_word() - - it = full_para.render(width, max(0, width-2)) - yield next(it) - try: - line = next(it) - except StopIteration: - return - line = ColouredString(" ") + line - try: - next(it) - except StopIteration: - yield line - return - line = next(line.split(width-3)) - if line.width < width-3: - line += ColouredString(" ") - line += ColouredString("...") - yield line - -class UserListEntry: - can_highlight_as_target = True - - def __init__(self, account, nameline): - self.account = account - self.nameline = nameline - self.account_desc = f"{self.nameline} ({self.account})" - - def render(self, width, target=False): - para = IndentedParagraph(0, 2) - para.add(ColouredString( - self.account_desc, "f" if target else " ")) - para.end_word() - yield from para.render(width) - -class HTMLParser(html.parser.HTMLParser): - def __init__(self): - super().__init__() - self.paras = [Paragraph()] - self.colourstack = [' '] - self.bad_tags = set() - self.indent = 0 - self.pre_tag = 0 - - def new_para(self): - return (Paragraph() if self.indent == 0 - else IndentedParagraph(self.indent, self.indent)) - - def handle_starttag(self, tag, attrs): - attrdict = dict(attrs) - - if tag == "a": - classes = set(attrdict.get("class", "").split()) - colour = ("#" if "hashtag" in classes else - "@" if "mention" in classes else - "u" if "href" in attrdict else " ") - self.colourstack.append(colour) - return - - if tag == "span": - return - - if tag == "p": - if not self.paras[-1].empty(): - self.paras.append(Paragraph()) - self.paras.append(self.new_para()) - return - - if tag == "pre": - if not self.paras[-1].empty(): - self.paras.append(Paragraph()) - self.paras.append(self.new_para()) - self.pre_tag += 1 - self.colourstack.append('c') - return - - if tag == "br": - self.paras.append(self.new_para()) - return - - if tag == "blockquote": - self.indent += 2 - self.paras.append(self.new_para()) - return - - if tag == "code": - self.colourstack.append('c') - return - - if tag == "strong": - self.colourstack.append('s') - return - - if tag in {"em", "i"}: - self.colourstack.append('_') - return - - # FIXME: need

, e.g. in
-        # https://neuromatch.social/@mstimberg/111375114784712346
-        # and _perhaps_ that ought to generate paragraphs with a
-        # 'truncate, don't wrap' attribute?
-
-        self.bad_tags.add(tag)
-
-    def handle_endtag(self, tag):
-        if tag == "span":
-            return
-
-        if tag == "p":
-            if not self.paras[-1].empty():
-                self.paras.append(self.new_para())
-            return
-
-        if tag == "pre":
-            self.pre_tag -= 1
-            self.colourstack.pop()
-            if not self.paras[-1].empty():
-                self.paras.append(self.new_para())
-            return
-
-        if tag == "blockquote":
-            if not self.paras[-1].empty():
-                self.paras.append(Paragraph())
-            self.indent -= 2
-            self.paras.append(self.new_para())
-            return
-
-        if tag in {"a", "code", "strong", "em", "i"}:
-            self.colourstack.pop()
-            return
-
-    def handle_data(self, data):
-        if self.pre_tag > 0:
-            def add_pre_text(data):
-                self.paras[-1].set_wrap(False)
-                self.paras[-1].add(ColouredString(data, self.colourstack[-1]))
-            lines = list(data.split('\n'))
-            for i, line in enumerate(lines):
-                add_pre_text(line)
-                if i + 1 < len(lines):
-                    self.paras.append(self.new_para())
-        else:
-            data = data.replace('\n', ' ')
-            self.paras[-1].add(ColouredString(data, self.colourstack[-1]))
-
-    def done(self):
-        for para in self.paras:
-            para.end_word()
-        while len(self.paras) > 0 and self.paras[0].empty():
-            self.paras.pop(0)
-        while len(self.paras) > 0 and self.paras[-1].empty():
-            self.paras.pop()
-
-        if len(self.bad_tags) > 0:
-            error_para = Paragraph()
-            text = "Unsupported markup tags: " + " ".join(
-                f"<{tag}>" for tag in sorted(self.bad_tags))
-            error_para.add(ColouredString(text, '!'))
-            error_para.end_word()
-            self.paras[0:0] = [error_para, Paragraph()]
-
-class RenderTests(unittest.TestCase):
-    def testBlank(self):
-        bl = BlankLine()
-        self.assertEqual(list(bl.render(80)), [ColouredString('')])
-
-    def testSeparator(self):
-        sl = SeparatorLine(time.mktime((2023,12,2,13,14,15,-1,-1,-1)))
-        self.assertEqual(list(sl.render(40)), [
-            ColouredString('-----------[Sat Dec  2 13:14:15 2023]--',
-                           'SSSSSSSSSSSSDDDDDDDDDDDDDDDDDDDDDDDDSSS'),
-        ])
-
-    def testFrom(self):
-        fl = FromLine("@a@b.c", "abc abc")
-        self.assertEqual(list(fl.render(80)), [
-            ColouredString('From: abc abc (@a@b.c)',
-                           '      FFFFFFFFFFFFFFFF'),
-        ])
-
-    def parse_html(self, html, width=50):
-        pp = HTMLParser()
-        pp.feed(html)
-        pp.done()
-        return list(itertools.chain(*[para.render(width)
-                                      for para in pp.paras]))
-
-    def testParagraphs(self):
-        html = "

Testing, testing, 1, 2, 3

" - self.assertEqual(self.parse_html(html), [ - ColouredString('Testing, testing, 1, 2, 3'), - ]) - - html = "

First para

Second para

" - self.assertEqual(self.parse_html(html), [ - ColouredString('First para'), - ColouredString(''), - ColouredString('Second para'), - ]) - - html = "

First line
Second line

" - self.assertEqual(self.parse_html(html), [ - ColouredString('First line'), - ColouredString('Second line'), - ]) - - def testWrapping(self): - html = ("

Pease porridge hot, pease porridge cold, pease porridge " - "in the pot, nine days old

") - self.assertEqual(self.parse_html(html), [ - ColouredString('Pease porridge hot, pease porridge cold, pease'), - ColouredString('porridge in the pot, nine days old'), - ]) - - def testMarkup(self): - html = "

Test of some literal code

" - self.assertEqual(self.parse_html(html), [ - ColouredString('Test of some literal code', - ' cccccccccccc'), - ]) - - html = "

Test of some strong text

" - self.assertEqual(self.parse_html(html), [ - ColouredString('Test of some strong text', - ' sssssssssss'), - ]) - - html = """

Test of a #hashtag

""" - self.assertEqual(self.parse_html(html), [ - ColouredString('Test of a #hashtag', - ' ########'), - ]) - - html = """

Test of a @username

""" - self.assertEqual(self.parse_html(html), [ - ColouredString('Test of a @username', - ' @@@@@@@@@'), - ]) - - def testError(self): - html = """

Test of some unsupported HTML tags

""" - self.assertEqual(self.parse_html(html), [ - ColouredString('Unsupported markup tags: ', - '!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'), - ColouredString(''), - ColouredString('Test of some unsupported HTML tags'), - ]) - - def testMedia(self): - ma = Media('https://a.b/c', 'foo foo foo foo foo foo foo') - self.assertEqual(list(ma.render(16)), [ - ColouredString('https://a.b/c', - 'MMMMMMMMMMMMM'), - ColouredString(' foo foo foo', - ' mmmmmmmmmmm'), - ColouredString(' foo foo foo', - ' mmmmmmmmmmm'), - ColouredString(' foo', - ' mmm'), - ColouredString(''), - ]) - self.assertEqual(list(ma.render(15)), [ - ColouredString('https://a.b/c', - 'MMMMMMMMMMMMM'), - ColouredString(' foo foo', - ' mmmmmmm'), - ColouredString(' foo foo', - ' mmmmmmm'), - ColouredString(' foo foo', - ' mmmmmmm'), - ColouredString(' foo', - ' mmm'), - ColouredString(''), - ]) - - ma = Media('https://a.b/c', 'foo\nbar') - self.assertEqual(list(ma.render(40)), [ - ColouredString('https://a.b/c', - 'MMMMMMMMMMMMM'), - ColouredString(' foo', - ' mmm'), - ColouredString(' bar', - ' mmm'), - ColouredString(''), - ]) diff --git a/util.py b/util.py deleted file mode 100644 index 59c908d..0000000 --- a/util.py +++ /dev/null @@ -1,49 +0,0 @@ -import os - -def ctrl(ch): - return chr(0x1F & ord(ch)) - -class SelfPipe: - def __init__(self): - self.rfd, self.wfd = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC) - - def nonblocking_read(self, size=4096): - try: - return os.read(self.rfd, size) - except BlockingIOError: - return b'' - - def nonblocking_write(self, data): - try: - os.write(self.wfd, data) - except BlockingIOError: - pass - - def signal(self): - self.nonblocking_write(b'x') - - def check(self): - if len(self.nonblocking_read()) == 0: - return False - while len(self.nonblocking_read()) != 0: - pass - return True - -def exactly_one(stuff): - it = iter(stuff) - toret = next(it) - try: - next(it) - except StopIteration: - return toret - raise ValueError("exactly_one got more than one") - -def last(stuff): - it = iter(stuff) - try: - toret = next(it) - except StopIteration: - raise ValueError("last of no things") - for thing in it: - toret = thing - return toret -- 2.30.2