Source code for oauth_dropins.mastodon

"""Mastodon OAuth drop-in.

Mastodon is an ActivityPub implementation, but it also has a REST + OAuth 2 API
independent of AP.

API docs: https://docs.joinmastodon.org/api/

Interestingly: as usual w/OAuth, they require registering apps beforehand...but
since AP and Mastodon are decentralized, there's no single place to register an
app. So they have an API for registering apps, per instance:
https://docs.joinmastodon.org/api/authentication/
Surprising, and unusual, but makes sense.
"""
import logging
from urllib.parse import quote_plus, unquote, urlencode, urljoin, urlparse, urlunparse

from flask import request
from google.cloud import ndb
import requests

from . import views
from .models import BaseAuth
from .webutil import appengine_info, flask_util, util
from .webutil.util import decode_oauth_state, encode_oauth_state, json_dumps, json_loads

logger = logging.getLogger(__name__)

# https://docs.joinmastodon.org/api/oauth-scopes/
ALL_SCOPES = (
  'read',
  'read:accounts',
  'read:blocks',
  'read:bookmarks',
  'read:favourites',
  'read:filters',
  'read:follows',
  'read:lists',
  'read:mutes',
  'read:notifications',
  'read:search',
  'read:statuses',
  'write',
  'write:accounts',
  'write:blocks',
  'write:bookmarks',
  'write:favourites',
  'write:filters',
  'write:follows',
  'write:lists',
  'write:media',
  'write:mutes',
  'write:notifications',
  'write:reports',
  'write:statuses',
  'follow',
  'push',
)

INSTANCE_API = '/api/v1/instance'
REGISTER_APP_API = '/api/v1/apps'
VERIFY_API = '/api/v1/accounts/verify_credentials'

# URL templates. Can't (easily) use urlencode() because I want to keep
# the %(...)s placeholders as is and fill them in later in code.
AUTH_CODE_API = '&'.join((
  '/oauth/authorize?'
  'response_type=code',
  'client_id=%(client_id)s',
  'client_secret=%(client_secret)s',
  # https://docs.microsoft.com/en-us/linkedin/shared/integrations/people/profile-api?context=linkedin/consumer/context#permissions
  'scope=%(scope)s',
  # must be the same in the access token request
  'redirect_uri=%(redirect_uri)s',
  'state=%(state)s',
))

ACCESS_TOKEN_API = '/oauth/token'


[docs] class MastodonApp(ndb.Model): """A Mastodon API OAuth2 app registered with a specific instance.""" instance = ndb.StringProperty(required=True) # URL, eg https://mastodon.social/ data = ndb.TextProperty(required=True) # JSON; includes client id/secret instance_info = ndb.TextProperty() # JSON; from /api/v1/instance app_url = ndb.StringProperty() app_name = ndb.StringProperty() created_at = ndb.DateTimeProperty(auto_now_add=True, required=True)
[docs] class MastodonLogin(ndb.Model): """An in-progress Mastodon OAuth login. Ephemeral. Stores the state query parameter across the three-way OAuth user login process. Only needed as a workaround for a long-standing Mastodon/Doorkeeper configuration bug: https://github.com/snarfed/bridgy/issues/911 https://github.com/mastodon/mastodon/issues/12915 """ app = ndb.KeyProperty(required=True) state = ndb.TextProperty(required=True) @classmethod def load(cls, id): if not util.is_int(id): flask_util.error(f'State {id} not found') login = cls.get_by_id(int(id)) if not login: flask_util.error(f'State {id} not found') return login
[docs] class MastodonAuth(BaseAuth): """An authenticated Mastodon user. Provides methods that return information about this user and make OAuth-signed requests to the Mastodon REST API. Stores OAuth credentials in the datastore. See models.BaseAuth for usage details. Key name is the fully qualified actor address, ie @username@instance.tld. Mastodon scopes are per access token, so :attr:`SCOPES_RESET` is True. * https://github.com/snarfed/bridgy/issues/1015 * https://github.com/snarfed/bridgy/issues/1342 Implements get() and post() but not urlopen() or api(). """ SCOPES_RESET = True app = ndb.KeyProperty() access_token_str = ndb.StringProperty(required=True) user_json = ndb.TextProperty()
[docs] def site_name(self): return 'Mastodon'
[docs] def user_display_name(self): """Returns the user's full ActivityPub address, eg @ryan@mastodon.social.""" return self.key.id()
[docs] def instance(self): """Returns the instance base URL, eg https://mastodon.social/.""" return self.app.get().instance
[docs] def username(self): """Returns the user's username, eg ryan.""" return json_loads(self.user_json).get('username')
[docs] def user_id(self): """Returns the user's id, eg 123.""" return json_loads(self.user_json).get('id')
[docs] def access_token(self): """Returns the OAuth access token string.""" return self.access_token_str
[docs] def get(self, *args, **kwargs): """Wraps requests.get() and adds instance base URL and Bearer token header.""" url = urljoin(self.instance(), args[0]) return self._requests_call(util.requests_get, url, *args[1:], **kwargs)
[docs] def post(self, *args, **kwargs): """Wraps requests.post() and adds the Bearer token header.""" return self._requests_call(util.requests_post, *args, **kwargs)
def _requests_call(self, fn, *args, **kwargs): headers = kwargs.setdefault('headers', {}) headers['Authorization'] = 'Bearer ' + self.access_token_str resp = fn(*args, **kwargs) try: resp.raise_for_status() except BaseException as e: util.interpret_http_exception(e) raise return resp
[docs] class Start(views.Start): """Starts Mastodon auth. Requests an auth code and expects a redirect back. Attributes: DEFAULT_SCOPE: string, default OAuth scope(s) to request REDIRECT_PATHS: sequence of string URL paths (on this host) to register as OAuth callback (aka redirect) URIs in the OAuth app SCOPE_SEPARATOR: string, used to separate multiple scopes APP_CLASS: API app datastore class EXPIRE_APPS_BEFORE: datetime, if the API client app was created before this, it will be discarded and a new one will be created. Set to the last time you changed something material about the client, eg redirect URLs or scopes. """ NAME = 'mastodon' LABEL = 'Mastodon' DEFAULT_SCOPE = 'read:accounts' REDIRECT_PATHS = () SCOPE_SEPARATOR = ' ' APP_CLASS = MastodonApp # https://github.com/snarfed/bridgy/issues/1344 EXPIRE_APPS_BEFORE = None
[docs] def app_name(self): """Returns the user-visible name of this application. To be overridden by subclasses. Displayed in Mastodon's OAuth prompt. """ return 'oauth-dropins demo'
[docs] def app_url(self): """Returns this application's web site. To be overridden by subclasses. Displayed in Mastodon's OAuth prompt. """ # normalize trailing slash. oddly sometimes request.host_url has it, # sometimes it doesn't. return urljoin(request.host_url, '/')
@classmethod def _version_ok(cls, version): return 'Pixelfed' not in version
[docs] def redirect_url(self, state=None, instance=None): """Returns the local URL for Mastodon to redirect back to after OAuth prompt. Args: state: string, user-provided value to be returned as a query parameter in the return redirect instance: string, Mastodon instance base URL, e.g. 'https://mastodon.social'. May also be provided in the 'instance' request as a URL query parameter or POST body. Raises: ValueError if instance isn't a Mastodon instance. """ # normalize instance to URL if not instance: instance = request.values['instance'] instance = instance.strip().split('@')[-1] # handle addresses, eg user@host.com parsed = urlparse(instance) if not parsed.scheme: instance = 'https://' + instance # fetch instance info from this instance's API (mostly to test that it's # actually a Mastodon instance) try: resp = util.requests_get(urljoin(instance, INSTANCE_API)) resp.raise_for_status() except requests.RequestException: logger.info('Error', exc_info=True) resp = None is_json = resp and resp.headers.get('Content-Type', '').strip().startswith( 'application/json') if is_json: logger.info(resp.text) if (not resp or not resp.ok or not is_json or not self._version_ok(resp.json().get('version'))): msg = f"{instance} doesn't look like a {self.LABEL} instance." logger.info(resp) logger.info(msg) raise ValueError(msg) # if we got redirected, update instance URL parsed = list(urlparse(resp.url)) parsed[2] = '/' # path instance = urlunparse(parsed) app_name = self.app_name() app_url = self.app_url() query = self.APP_CLASS.query(self.APP_CLASS.instance == instance, self.APP_CLASS.app_url == app_url) if appengine_info.DEBUG: # disambiguate different apps in dev_appserver, since their app_url will # always be localhost query = query.filter(self.APP_CLASS.app_name == app_name) app = query.get() if app: if self.EXPIRE_APPS_BEFORE and app.created_at < self.EXPIRE_APPS_BEFORE: logging.info(f'Creating new client app for {instance} because existing app {app.key} was created before EXPIRE_APPS_BEFORE {self.EXPIRE_APPS_BEFORE}') app = None elif MastodonAuth.query(MastodonAuth.app == app.key).get() == None: # we haven't used this OAuth app to get an access token yet, and # Mastodon garbage collects unused apps, so check that it still exists # first. https://github.com/mastodon/mastodon/issues/27740 logging.info(f'Existing app {app.key.id()} got garbage collected! Creating new one.') app = None if not app: app = self._register_app(instance, app_name, app_url) app.instance_info = resp.text app.put() logger.info(f'Starting OAuth for {self.LABEL} instance {instance}') app_data = json_loads(app.data) login_id = MastodonLogin(app=app.key, state=state or '').put().id() return urljoin(instance, AUTH_CODE_API % { 'client_id': app_data['client_id'], 'client_secret': app_data['client_secret'], 'redirect_uri': quote_plus(self.to_url()), 'state': str(login_id), 'scope': self.scope, })
def _register_app(self, instance, app_name, app_url): """Register a Mastodon API app on a specific instance. https://docs.joinmastodon.org/methods/apps/ Args: instance: string app_name: string app_url: string Returns: :class:`APP_CLASS` """ logger.info(f"first time we've seen {self.LABEL} instance {instance} with app {app_name} {app_url}! registering an API app.") redirect_uris = {urljoin(request.host_url, path) for path in set(self.REDIRECT_PATHS)} redirect_uris.add(self.to_url()) resp = util.requests_post( urljoin(instance, REGISTER_APP_API), data=urlencode({ 'client_name': app_name, # Mastodon uses Doorkeeper for OAuth, which allows registering # multiple redirect URIs, separated by newlines. # https://github.com/doorkeeper-gem/doorkeeper/pull/298 # https://docs.joinmastodon.org/methods/apps/ 'redirect_uris': '\n'.join(redirect_uris), 'website': app_url, # https://docs.joinmastodon.org/api/oauth-scopes/ 'scopes': self.SCOPE_SEPARATOR.join(ALL_SCOPES), }), # Pixelfed requires this headers={'Content-Type': 'application/x-www-form-urlencoded'}) resp.raise_for_status() app_data = json_loads(resp.text) logger.info(f'Got {app_data}') # generate a client_credential token (without expiration) to # prevent Mastodon from garbage collecting this OAuth client # https://github.com/mastodon/mastodon/issues/27740 data = { 'grant_type': 'client_credentials', 'client_id': app_data['client_id'], 'client_secret': app_data['client_secret'], } resp = util.requests_post(urljoin(instance, ACCESS_TOKEN_API), data=urlencode(data)) if resp.ok: resp_json = resp.json() logger.info(f'Got client_credential: {json_dumps(resp_json)}') if token := resp_json.get('access_token'): app_data['client_credentials_token'] = token return self.APP_CLASS(instance=instance, app_name=app_name, app_url=app_url, data=json_dumps(app_data))
[docs] @classmethod def button_html(cls, *args, **kwargs): kwargs['form_extra'] = kwargs.get('form_extra', '') + f""" <input type="url" name="instance" class="form-control" placeholder="{cls.LABEL} instance" scheme="https" required style="width: 135px; height: 50px; display:inline;" />""" return super(Start, cls).button_html( *args, input_style='background-color: #EBEBEB; padding: 5px', **kwargs)
[docs] class Callback(views.Callback): """The OAuth callback. Fetches an access token and stores it.""" AUTH_CLASS = MastodonAuth
[docs] def dispatch_request(self): # handle errors error = request.values.get('error') desc = request.values.get('error_description') if error: # user_cancelled_login and user_cancelled_authorize are non-standard. # https://tools.ietf.org/html/rfc6749#section-4.1.2.1 if error in ('user_cancelled_login', 'user_cancelled_authorize', 'access_denied'): logger.info(f"User declined: {request.values.get('error_description')}") state = request.values.get('state') if state: login = MastodonLogin.load(state) return self.finish(None, state=login.state) else: flask_util.error(f'{error} {desc}') login = MastodonLogin.load(request.values['state']) app = login.app.get() assert app app_data = json_loads(app.data) # extract auth code and request access token auth_code = request.values['code'] data = { 'grant_type': 'authorization_code', 'code': auth_code, 'client_id': app_data['client_id'], 'client_secret': app_data['client_secret'], # redirect_uri here must be the same in the oauth code request! # (the value here doesn't actually matter since it's requested server side.) 'redirect_uri': request.base_url, } resp = util.requests_post( urljoin(app.instance, ACCESS_TOKEN_API), data=urlencode(data), # Pixelfed requires this headers={'Content-Type': 'application/x-www-form-urlencoded'}) resp.raise_for_status() resp_json = resp.json() logger.debug(f'Access token response: {resp_json}') if resp_json.get('error'): flask_util.error(resp_json) access_token = resp_json['access_token'] user = self.AUTH_CLASS(app=app.key, access_token_str=access_token).get(VERIFY_API).json() logger.debug(f'User: {user}') address = f"@{user['username']}@{urlparse(app.instance).netloc}" auth = self.AUTH_CLASS(id=address, app=app.key, access_token_str=access_token, user_json=json_dumps(user)) auth.put() return self.finish(auth, state=login.state)