Source code for oauth_dropins.bluesky

"""Bluesky auth drop-in. Supports both app password login and OAuth.

Use :class:`PasswordStart` and :class:`PasswordCallback` for app password,
class:`OAuthStart` and :class:`OAuthCallback` for OAuth.

https://atproto.com/specs/xrpc#:~:text=App,passwords
https://docs.bsky.app/docs/advanced-guides/oauth-client
https://atproto.com/specs/oauth
https://guillp.github.io/requests_oauth2client/
https://github.com/guillp/requests_oauth2client?tab=readme-ov-file#using-dpop
"""
import logging
import os
import re
from urllib.parse import quote, urljoin, urlparse

import arroba.did
from flask import redirect, request
from google.cloud import ndb
from lexrpc import Client
import requests
from requests_oauth2client import (
  AuthorizationRequestSerializer,
  DPoPToken,
  DPoPTokenSerializer,
  OAuth2Client,
  OAuth2Error,
  OAuth2AccessTokenAuth,
)

from . import views, models
from .webutil import flask_util, util
from .webutil.models import JsonProperty
from .webutil.util import json_dumps, json_loads

logger = logging.getLogger(__name__)

os.environ.setdefault('PLC_HOST', 'plc.directory')

# https://docs.bsky.app/docs/advanced-guides/oauth-client#client-and-server-metadata
PROTECTED_RESOURCE_PATH = '/.well-known/oauth-protected-resource'
RESOURCE_METADATA_PATH = '/.well-known/oauth-authorization-server'
CLIENT_METADATA_TEMPLATE = {
  # Clients must fill these in
  'client_id': None,      # eg 'https://app.example.com/oauth/client-metadata.json'
  'client_name': None,    # eg 'My Example App'
  'client_uri': None,     # eg 'https://app.example.com'
  'redirect_uris': None,  # eg ['https://app.example.com/oauth/callback'],

  # standard
  'application_type': 'web',
  'dpop_bound_access_tokens': True,
  'grant_types': [
    'authorization_code',
    'refresh_token',
  ],
  'response_types': ['code'],
  'scope': 'atproto transition:generic',
  'token_endpoint_auth_method': 'none',
}

_APP_CLIENT_METADATA = {
  **CLIENT_METADATA_TEMPLATE,
  'client_id': 'https://oauth-dropins.appspot.com/bluesky/client-metadata.json',
  'client_name': 'oauth-dropins demo',
  'client_uri': 'https://oauth-dropins.appspot.com/',
  'redirect_uris': ['https://oauth-dropins.appspot.com/bluesky/oauth_callback'],
}


def error(msg):
  logger.warning(msg)
  raise ValueError(msg)


[docs] class BlueskyLogin(ndb.Model): """An in-progress Bluesky OAuth login. Ephemeral. Stores a serialized :class:`requests_oauth2client.AuthorizationRequest` across HTTP requests. """ state = ndb.TextProperty() did = ndb.StringProperty(required=True) authz_request = ndb.TextProperty(required=True) """Serialized :class:`requests_oauth2client.AuthorizationRequest`. Uses :meth:`requests_oauth2client.AuthorizationRequestSerializer.default_dumper` / :meth:`requests_oauth2client.AuthorizationRequestSerializer.default_loader`. """ @classmethod def load(cls, id): if not util.is_int(id): error(f'State {id} not found') login = cls.get_by_id(int(id)) if not login: error(f'State {id} not found') return login
[docs] class BlueskyAuth(models.BaseAuth): """An authenticated Bluesky user. Key id is DID. """ password = ndb.StringProperty() pds_url = ndb.StringProperty() user_json = ndb.TextProperty(required=True) """app.bsky.actor.defs#profileViewDetailed""" session = JsonProperty() dpop_token = ndb.TextProperty() def site_name(self): return 'Bluesky'
[docs] def access_token(self): """ Returns: str: """ if self.session: return self.session.get('accessJwt')
[docs] def user_display_name(self): """ Returns: str: """ return json_loads(self.user_json).get('handle')
[docs] def image_url(self): """ Returns: str: """ return json_loads(self.user_json).get('avatar')
[docs] def oauth_api(self, client_metadata): """Returns an OAuth-based :class:`lexrpc.Client` for this user. Requires :attr:`dpop_token` to be set. Args: client_metadata (dict): client info metadata, https://docs.bsky.app/docs/advanced-guides/oauth-client#client-and-server-metadata Returns: lexrpc.Client: """ assert self.dpop_token pds_url = self.pds_url or pds_for_did(self.key.id()) oauth_client = oauth_client_for_pds(client_metadata, pds_url) dpop_token = DPoPTokenSerializer.default_loader(self.dpop_token) auth = OAuth2AccessTokenAuth(client=oauth_client, token=dpop_token) return Client(pds_url, auth=auth)
def _api(self, **kwargs): """ Args: kwargs: passed to the :class:`lexrpc.Client` constructor Returns: lexrpc.Client: """ client = Client(address=self.pds_url, headers={'User-Agent': util.user_agent}, **kwargs) client.session = self.session return client @staticmethod def _api_from_password(handle, password, **kwargs): """ Args: handle (str) password (str) kwargs: passed to the :class:`lexrpc.Client` constructor Returns: lexrpc.Client: """ did = arroba.did.resolve_handle(handle) if not did: error(f"Couldn't resolve {handle} as a Bluesky handle") logger.info(f'resolved {handle} to {did}') pds_url = pds_for_did(did) logger.info(f'Logging into {pds_url} as {did}...') client = Client(address=pds_url, headers={'User-Agent': util.user_agent}, **kwargs) resp = client.com.atproto.server.createSession({ 'identifier': handle, 'password': password, }) return client
[docs] class StartBase(views.Start): """Base class for starting Bluesky auth; only used to provide the button. """ NAME = 'bluesky' LABEL = 'Bluesky' @classmethod def button_html(cls, *args, **kwargs): kwargs['form_extra'] = kwargs.get('form_extra', '') + f""" <input name="handle" type="text" class="form-control" placeholder="{cls.LABEL} handle" required style="width: 135px; height: 50px; display:inline;" />""" return super().button_html( *args, image_file='bluesky_logo.png', input_style='background-color: #EEEEEE', **kwargs)
Start = StartBase
[docs] class PasswordCallback(views.Callback): """ App password login callback stub. """ def dispatch_request(self): handle = request.values['username'].strip().lower().removeprefix('@') password = request.values['password'].strip() state = request.values.get('state') # get the DID (portable user ID) try: client = BlueskyAuth._api_from_password(handle, password) except ValueError as e: logger.warning(f'Login failed: {e}') return self.finish(None, state=state) profile = { '$type': 'app.bsky.actor.defs#profileViewDetailed', **client.app.bsky.actor.getProfile(actor=handle) } auth = BlueskyAuth( id=profile['did'], password=password, # TODO: resolve DID's PDS pds_url = 'https://bsky.social', user_json=util.json_dumps(profile), session=client.session, ) auth.put() return self.finish(auth, state=state)
Callback = PasswordCallback
[docs] def pds_for_did(did): """Resolves a DID document and extracts its PDS URL. https://atproto.com/specs/did#did-documents Args: did (str) Returns: str: PDS URL Raises: ValueError: if the DID couldn't be resolved, or if its DID document has no ATProto PDS endpoint """ did_doc = arroba.did.resolve(did) if not did_doc: error(f"Couldn't resolve DID {did}") # based on bridgy_fed.atproto.ATProto.pds_for for service in did_doc.get('service', []): if service.get('id') in ('#atproto_pds', f'{did}#atproto_pds'): pds = service.get('serviceEndpoint') logger.info(f'{did} has PDS {pds}') return pds error(f"{did}'s DID doc has no ATProto PDS")
[docs] def oauth_client_for_pds(client_metadata, pds_url): """Discovers a PDS's OAuth endpoints and creates a client. Args: client_metadata (dict) pds_url (str) Returns: OAuth2Client: Raises: ValueError: if the DID couldn't be resolved, or if its DID document has no ATProto PDS endpoint """ resp = util.requests_get(urljoin(pds_url, PROTECTED_RESOURCE_PATH)) resp.raise_for_status() auth_server = resp.json()['authorization_servers'][0] logger.info(f'PDS {pds_url} has auth server {auth_server}') # OAuth special case for localhost client_id and redirect_uri # https://atproto.com/specs/oauth#:~:text=Localhost%20Client%20Development client_id = client_metadata['client_id'] redirect_uri = client_metadata['redirect_uris'][0] if request.host.split(':')[0] in ('localhost', '127.0.0.1'): redirect_uri = urljoin('http://127.0.0.1:8080', urlparse(redirect_uri).path) scope = quote(CLIENT_METADATA_TEMPLATE['scope']) client_id = f'http://localhost?redirect_uri={redirect_uri}&scope={scope}' return OAuth2Client.from_discovery_endpoint( urljoin(auth_server, RESOURCE_METADATA_PATH), client_id=client_id, redirect_uri=redirect_uri, dpop_bound_access_tokens=True, )
[docs] class OAuthStart(StartBase): """Starts the OAuth flow. Subclasses must populate: * :attr:`CLIENT_METADATA` (dict): client info metadata, https://docs.bsky.app/docs/advanced-guides/oauth-client#client-and-server-metadata """ CLIENT_METADATA = None SCOPE = CLIENT_METADATA_TEMPLATE['scope'] # available scopes as of Feb 2025: # atproto, transition:generic, transition:chat.bsky # https://bsky.social/.well-known/oauth-authorization-server DEFAULT_SCOPE = 'atproto transition:generic' SCOPE_SEPARATOR = ' '
[docs] def redirect_url(self, state=None, handle=None): """Returns the URL for Bluesky to redirect back to after the OAuth prompt. Args: state (str): user-provided value to be returned as a query parameter in the return redirect handle (str): Bluesky domain handle. If ````None``, uses the ``handle`` parameter in POST form data. Raises: ValueError, RequestException: if handle isn't a valid domain """ assert self.CLIENT_METADATA client_id = self.CLIENT_METADATA['client_id'] if not handle: handle = request.form['handle'] handle = util.remove_invisible_chars(handle).strip().lstrip('@') if not re.fullmatch(util.DOMAIN_RE, handle): error(f"{handle} doesn't look like a domain") # resolve handle to DID doc and PDS base URL # https://atproto.com/specs/handle#handle-resolution did = arroba.did.resolve_handle(handle) if not did: error(f"Couldn't resolve {handle} as a Bluesky handle") logger.info(f'resolved {handle} to {did}') # generate authz URL, store session, redirect client = oauth_client_for_pds(self.CLIENT_METADATA, pds_for_did(did)) login_key = BlueskyLogin.allocate_ids(1)[0] try: authz_request = client.authorization_request(scope=self.SCOPE, state=login_key.id()) par_request = client.pushed_authorization_request(authz_request) except OAuth2Error as e: error(e) serialized = AuthorizationRequestSerializer.default_dumper(authz_request) BlueskyLogin(key=login_key, state=state, did=did, authz_request=serialized).put() return par_request.uri
[docs] class OAuthCallback(views.Callback): """Finishes the OAuth flow. Subclasses must populate: * :attr:`CLIENT_METADATA` (dict): client info metadata, https://docs.bsky.app/docs/advanced-guides/oauth-client#client-and-server-metadata """ CLIENT_METADATA = None def dispatch_request(self): # handle errors err = request.values.get('error') desc = request.values.get('error_description') if err: msg = f'Error: {err}: {desc}' logger.info(msg) if err == 'access_denied': return self.finish(None, state=request.values.get('state')) else: error(msg) login = BlueskyLogin.load(request.values['state']) pds_url = pds_for_did(login.did) client = oauth_client_for_pds(self.CLIENT_METADATA, pds_url) # validate authz response, get access token try: authz_request = AuthorizationRequestSerializer.default_loader( login.authz_request) authz_resp = authz_request.validate_callback(request.url) token = client.authorization_code(authz_resp, validate=True) except OAuth2Error as e: error(e) if token.sub != login.did: error(f'Started login with {login.did} but authenticated {token.sub}') # get user profile # https://docs.bsky.app/docs/advanced-guides/oauth-client#callback-and-access-token-request auth = OAuth2AccessTokenAuth(client=client, token=token) pds_client = Client(pds_url, auth=auth) try: profile = pds_client.app.bsky.actor.getProfile(actor=login.did) except BaseException as e: code, body = util.interpret_http_exception(e) if code: error(f'{code} {body}') raise profile['$type'] = 'app.bsky.actor.defs#profileViewDetailed' auth = BlueskyAuth(id=login.did, pds_url=pds_url, dpop_token=DPoPTokenSerializer.default_dumper(token), user_json=util.json_dumps(profile)) auth.put() return self.finish(auth, state=login.state)