chiark / gitweb /
Implemented login.
authorSimon Tatham <anakin@pobox.com>
Sat, 2 Dec 2023 17:14:16 +0000 (17:14 +0000)
committerSimon Tatham <anakin@pobox.com>
Sat, 2 Dec 2023 17:14:16 +0000 (17:14 +0000)
client.py
login.py [new file with mode: 0644]
mastodonochrome

index ad0a2f6ad551eff78ae0dccc371d0de22d886ba7..c3ed0f692b14b627a2016db03413a69144c716c5 100644 (file)
--- a/client.py
+++ b/client.py
@@ -1,4 +1,5 @@
 import calendar
+import os
 import requests
 import string
 import time
@@ -15,30 +16,57 @@ class HTTPError(Exception):
                 f"returned status {self.response.status_code}")
 
 class Client:
-    def __init__(self, instance):
-        self.base_url = instance + "/api/v1/"
+    def __init__(self):
+        # Ensure the requests module doesn't replace our bearer tokens
+        # with any random stuff that might be in .netrc
+        os.environ["NETRC"] = "/dev/null"
+        self.bearer_token = None
         self.log_response = lambda *args, **kws: None
 
+    def set_instance(self, instance):
+        self.urls = {
+            'auth': instance + "/oauth/",
+            'api': instance + "/api/v1/",
+        }
+
     def enable_debug(self, logfile):
-        logfh = open(logfile, "w")
-        pr = lambda *args, **kws: print(*args, file=logfh, **kws)
+        self.logfh = open(logfile, "w")
+        pr = lambda *args, **kws: print(*args, file=self.logfh, **kws)
 
         def log_response(rsp):
-            pr("Request: {rsp.request.method} {rsp.request.url}")
-            pr("  Response status: {rsp.status_code}")
+            pr(f"Request: {rsp.request.method} {rsp.request.url}")
+            pr("  Request headers:")
+            for k, v in rsp.request.headers.items():
+                pr(f"    {k}: {v}")
+            pr(f"  Response status: {rsp.status_code}")
             pr("  Response headers:")
             for k, v in rsp.headers.items():
                 pr(f"    {k}: {v}")
+            pr(f"  Response: {rsp.content!r}")
 
         self.log_response = log_response
 
-    def get_public(self, path, **params):
-        rsp = requests.get(self.base_url + path, params=params)
+    def method(self, method, path, base, params):
+        headers = {}
+        if self.bearer_token is not None:
+            headers['Authorization'] = 'Bearer ' + self.bearer_token
+        rsp = method(self.urls[base] + path, params=params, headers=headers)
         self.log_response(rsp)
         if rsp.status_code != 200:
             raise HTTPError(rsp)
         return rsp.json()
 
+    def get(self, path, base='api', **params):
+        return self.method(requests.get, path, base, params)
+    def post(self, path, base='api', **params):
+        return self.method(requests.post, path, base, params)
+
+    def get_url(self, path, base='api', **params):
+        r = requests.Request(method="GET", url=self.urls[base] + path,
+                             params=params)
+        p = r.prepare()
+        return p.url
+
 class Status:
     def __init__(self, data):
         self.post_id = data['id']
diff --git a/login.py b/login.py
new file mode 100644 (file)
index 0000000..231d91b
--- /dev/null
+++ b/login.py
@@ -0,0 +1,91 @@
+import client
+import json
+import os
+import xdg
+
+def config_dir():
+    return os.path.join(xdg.XDG_CONFIG_HOME, "mastodonochrome")
+def config_file():
+    return os.path.join(config_dir(), "auth")
+
+class LoginUI(client.Client):
+    def run(self):
+        redirect_uri = 'urn:ietf:wg:oauth:2.0:oob'
+
+        instance = input("Enter a Mastodon instance name: ")
+        if "://" not in instance:
+            instance = "https://" + instance
+
+        self.set_instance(instance)
+
+        app = self.post(
+            "apps",
+            client_name='Mastodonochrome',
+            redirect_uris=redirect_uri,
+            scopes='read write push',
+            website='https://www.chiark.greenend.org.uk/~sgtatham/mastodonochrome/',
+        )
+
+        client_id = app['client_id']
+        client_secret = app['client_secret']
+
+        app_token = self.post(
+            "token", base="auth",
+            client_id=client_id,
+            client_secret=client_secret,
+            redirect_uri=redirect_uri,
+            grant_type='client_credentials',
+        )
+
+        self.bearer_token = app_token['access_token']
+        self.get("apps/verify_credentials")
+        self.bearer_token = None
+
+        url = self.get_url(
+            "authorize", base="auth",
+            client_id=client_id,
+            scope='read write push',
+            redirect_uri=redirect_uri,
+            response_type='code',
+        )
+
+        print("Visit this URL to request an authorisation code:")
+        print(url)
+
+        auth_code = input("Enter the code from the website: ")
+
+        user_token = self.post(
+            "token", base="auth",
+            client_id=client_id,
+            client_secret=client_secret,
+            redirect_uri=redirect_uri,
+           grant_type='authorization_code',
+            code=auth_code,
+           scope='read write push',
+        )
+
+        self.bearer_token = user_token['access_token']
+        account = self.get("accounts/verify_credentials")
+        self.bearer_token = None
+
+        data = {
+            'account_id': account['id'],
+            'username': account['username'],
+            'instance': instance.split("://", 1)[-1],
+            'client_id': client_id,
+            'client_secret': client_secret,
+            'user_token': user_token['access_token'],
+        }
+
+        try:
+            old_umask = os.umask(0o77)
+
+            try:
+                os.makedirs(config_dir())
+            except FileExistsError:
+                pass
+
+            with open(config_file(), "w") as fh:
+                json.dump(data, fh, indent=4)
+        finally:
+            os.umask(old_umask)
index 35441b3bf8ee696ec225ae051d84c1d96407d03b..ebf056a28d5e36d72ee429cbb74d5de1181e05d2 100755 (executable)
@@ -10,10 +10,12 @@ import unittest
 
 import client
 import cursesclient
+import login
 
 class PublicTimelineUI(client.Client):
     def run(self):
-        for item in self.get_public("timelines/public", limit=10):
+        self.set_instance("https://hachyderm.io") # FIXME
+        for item in self.get("timelines/public", limit=10):
             p = client.Status(item)
             for thing in p.text():
                 for line in thing.render(80):
@@ -36,6 +38,8 @@ def main():
     parser.add_argument("--public", action="store_const", dest="action",
                         const=PublicTimelineUI, help="Temporary mode to fetch "
                         "a public timeline and print it on the terminal.")
+    parser.add_argument("--login", action="store_const", dest="action",
+                        const=login.LoginUI, help="Log in to a user account.")
     parser.set_defaults(action=cursesclient.CursesUI)
     args = parser.parse_args()
 
@@ -43,11 +47,7 @@ def main():
         return unittest.main(argv=[sys.argv[0]] + args.test,
                              testLoader=MyTestLoader())
 
-    instance = "hachyderm.io" # FIXME
-    if "://" not in instance:
-        instance = "https://" + instance
-
-    thing = args.action(instance)
+    thing = args.action()
     if args.log is not None:
         thing.enable_debug(args.log)
     thing.run()