From a4cb597ea4fd24e108e306384de3fd2b332ee8e5 Mon Sep 17 00:00:00 2001 From: Simon Tatham Date: Sat, 2 Dec 2023 17:14:16 +0000 Subject: [PATCH] Implemented login. --- client.py | 44 +++++++++++++++++++----- login.py | 91 +++++++++++++++++++++++++++++++++++++++++++++++++ mastodonochrome | 12 +++---- 3 files changed, 133 insertions(+), 14 deletions(-) create mode 100644 login.py diff --git a/client.py b/client.py index ad0a2f6..c3ed0f6 100644 --- a/client.py +++ b/client.py @@ -1,4 +1,5 @@ import calendar +import os import requests import string import time @@ -15,30 +16,57 @@ class HTTPError(Exception): f"returned status {self.response.status_code}") class Client: - def __init__(self, instance): - self.base_url = instance + "/api/v1/" + def __init__(self): + # Ensure the requests module doesn't replace our bearer tokens + # with any random stuff that might be in .netrc + os.environ["NETRC"] = "/dev/null" + self.bearer_token = None self.log_response = lambda *args, **kws: None + def set_instance(self, instance): + self.urls = { + 'auth': instance + "/oauth/", + 'api': instance + "/api/v1/", + } + def enable_debug(self, logfile): - logfh = open(logfile, "w") - pr = lambda *args, **kws: print(*args, file=logfh, **kws) + self.logfh = open(logfile, "w") + pr = lambda *args, **kws: print(*args, file=self.logfh, **kws) def log_response(rsp): - pr("Request: {rsp.request.method} {rsp.request.url}") - pr(" Response status: {rsp.status_code}") + pr(f"Request: {rsp.request.method} {rsp.request.url}") + pr(" Request headers:") + for k, v in rsp.request.headers.items(): + pr(f" {k}: {v}") + pr(f" Response status: {rsp.status_code}") pr(" Response headers:") for k, v in rsp.headers.items(): pr(f" {k}: {v}") + pr(f" Response: {rsp.content!r}") self.log_response = log_response - def get_public(self, path, **params): - rsp = requests.get(self.base_url + path, params=params) + def method(self, method, path, base, params): + headers = {} + if self.bearer_token is not None: + headers['Authorization'] = 'Bearer ' + self.bearer_token + rsp = method(self.urls[base] + path, params=params, headers=headers) self.log_response(rsp) if rsp.status_code != 200: raise HTTPError(rsp) return rsp.json() + def get(self, path, base='api', **params): + return self.method(requests.get, path, base, params) + def post(self, path, base='api', **params): + return self.method(requests.post, path, base, params) + + def get_url(self, path, base='api', **params): + r = requests.Request(method="GET", url=self.urls[base] + path, + params=params) + p = r.prepare() + return p.url + class Status: def __init__(self, data): self.post_id = data['id'] diff --git a/login.py b/login.py new file mode 100644 index 0000000..231d91b --- /dev/null +++ b/login.py @@ -0,0 +1,91 @@ +import client +import json +import os +import xdg + +def config_dir(): + return os.path.join(xdg.XDG_CONFIG_HOME, "mastodonochrome") +def config_file(): + return os.path.join(config_dir(), "auth") + +class LoginUI(client.Client): + def run(self): + redirect_uri = 'urn:ietf:wg:oauth:2.0:oob' + + instance = input("Enter a Mastodon instance name: ") + if "://" not in instance: + instance = "https://" + instance + + self.set_instance(instance) + + app = self.post( + "apps", + client_name='Mastodonochrome', + redirect_uris=redirect_uri, + scopes='read write push', + website='https://www.chiark.greenend.org.uk/~sgtatham/mastodonochrome/', + ) + + client_id = app['client_id'] + client_secret = app['client_secret'] + + app_token = self.post( + "token", base="auth", + client_id=client_id, + client_secret=client_secret, + redirect_uri=redirect_uri, + grant_type='client_credentials', + ) + + self.bearer_token = app_token['access_token'] + self.get("apps/verify_credentials") + self.bearer_token = None + + url = self.get_url( + "authorize", base="auth", + client_id=client_id, + scope='read write push', + redirect_uri=redirect_uri, + response_type='code', + ) + + print("Visit this URL to request an authorisation code:") + print(url) + + auth_code = input("Enter the code from the website: ") + + user_token = self.post( + "token", base="auth", + client_id=client_id, + client_secret=client_secret, + redirect_uri=redirect_uri, + grant_type='authorization_code', + code=auth_code, + scope='read write push', + ) + + self.bearer_token = user_token['access_token'] + account = self.get("accounts/verify_credentials") + self.bearer_token = None + + data = { + 'account_id': account['id'], + 'username': account['username'], + 'instance': instance.split("://", 1)[-1], + 'client_id': client_id, + 'client_secret': client_secret, + 'user_token': user_token['access_token'], + } + + try: + old_umask = os.umask(0o77) + + try: + os.makedirs(config_dir()) + except FileExistsError: + pass + + with open(config_file(), "w") as fh: + json.dump(data, fh, indent=4) + finally: + os.umask(old_umask) diff --git a/mastodonochrome b/mastodonochrome index 35441b3..ebf056a 100755 --- a/mastodonochrome +++ b/mastodonochrome @@ -10,10 +10,12 @@ import unittest import client import cursesclient +import login class PublicTimelineUI(client.Client): def run(self): - for item in self.get_public("timelines/public", limit=10): + self.set_instance("https://hachyderm.io") # FIXME + for item in self.get("timelines/public", limit=10): p = client.Status(item) for thing in p.text(): for line in thing.render(80): @@ -36,6 +38,8 @@ def main(): parser.add_argument("--public", action="store_const", dest="action", const=PublicTimelineUI, help="Temporary mode to fetch " "a public timeline and print it on the terminal.") + parser.add_argument("--login", action="store_const", dest="action", + const=login.LoginUI, help="Log in to a user account.") parser.set_defaults(action=cursesclient.CursesUI) args = parser.parse_args() @@ -43,11 +47,7 @@ def main(): return unittest.main(argv=[sys.argv[0]] + args.test, testLoader=MyTestLoader()) - instance = "hachyderm.io" # FIXME - if "://" not in instance: - instance = "https://" + instance - - thing = args.action(instance) + thing = args.action() if args.log is not None: thing.enable_debug(args.log) thing.run() -- 2.30.2