"""Bluesky auth drop-in. Supports both app password login and OAuth.
Use :class:`PasswordStart` and :class:`PasswordCallback` for app password,
class:`OAuthStart` and :class:`OAuthCallback` for OAuth.
https://atproto.com/specs/xrpc#:~:text=App,passwords
https://docs.bsky.app/docs/advanced-guides/oauth-client
https://atproto.com/specs/oauth
https://guillp.github.io/requests_oauth2client/
https://github.com/guillp/requests_oauth2client?tab=readme-ov-file#using-dpop
"""
import logging
import os
import re
from urllib.parse import quote, urljoin, urlparse
import arroba.did
from flask import redirect, request
from google.cloud import ndb
from lexrpc import Client
import requests
from requests_oauth2client import (
AuthorizationRequestSerializer,
OAuth2Client,
OAuth2Error,
OAuth2AccessTokenAuth,
TokenSerializer,
)
from webutil import flask_util, util
from webutil.models import JsonProperty
from webutil.util import json_dumps, json_loads
from . import views, models
logger = logging.getLogger(__name__)
os.environ.setdefault('PLC_HOST', 'plc.directory')
# https://docs.bsky.app/docs/advanced-guides/oauth-client#client-and-server-metadata
PROTECTED_RESOURCE_PATH = '/.well-known/oauth-protected-resource'
RESOURCE_METADATA_PATH = '/.well-known/oauth-authorization-server'
CLIENT_METADATA_TEMPLATE = {
# Clients must fill these in
'client_id': None, # eg 'https://app.example.com/oauth/client-metadata.json'
'client_name': None, # eg 'My Example App'
'client_uri': None, # eg 'https://app.example.com'
'redirect_uris': None, # eg ['https://app.example.com/oauth/callback'],
# standard
'application_type': 'web',
'dpop_bound_access_tokens': True,
'grant_types': [
'authorization_code',
'refresh_token',
],
'response_types': ['code'],
'scope': 'atproto transition:generic',
'token_endpoint_auth_method': 'none',
}
_APP_CLIENT_METADATA = {
**CLIENT_METADATA_TEMPLATE,
'client_id': 'https://oauth-dropins.appspot.com/bluesky/client-metadata.json',
'client_name': 'oauth-dropins demo',
'client_uri': 'https://oauth-dropins.appspot.com/',
'redirect_uris': ['https://oauth-dropins.appspot.com/bluesky/oauth_callback'],
}
def error(msg):
logger.warning(msg)
raise ValueError(msg)
[docs]
class BlueskyLogin(ndb.Model):
"""An in-progress Bluesky OAuth login. Ephemeral.
Stores a serialized :class:`requests_oauth2client.AuthorizationRequest` across
HTTP requests.
"""
state = ndb.TextProperty()
did = ndb.StringProperty(required=True)
authz_request = ndb.TextProperty(required=True)
"""Serialized :class:`requests_oauth2client.AuthorizationRequest`.
Uses :meth:`requests_oauth2client.AuthorizationRequestSerializer.default_dumper` /
:meth:`requests_oauth2client.AuthorizationRequestSerializer.default_loader`.
"""
@classmethod
def load(cls, id):
if not util.is_int(id):
error(f'State {id} not found')
login = cls.get_by_id(int(id))
if not login:
error(f'State {id} not found')
return login
[docs]
class BlueskyAuth(models.BaseAuth):
"""An authenticated Bluesky user.
Key id is DID.
"""
password = ndb.StringProperty()
pds_url = ndb.StringProperty()
user_json = ndb.TextProperty(required=True)
"""app.bsky.actor.defs#profileViewDetailed"""
session = JsonProperty()
dpop_token = ndb.TextProperty()
def site_name(self):
return 'Bluesky'
[docs]
def access_token(self):
"""
Returns:
str:
"""
if self.session:
return self.session.get('accessJwt')
[docs]
def user_display_name(self):
"""
Returns:
str:
"""
return json_loads(self.user_json).get('handle')
[docs]
def image_url(self):
"""
Returns:
str:
"""
return json_loads(self.user_json).get('avatar')
[docs]
def oauth_api(self, client_metadata):
"""Returns an OAuth-based :class:`lexrpc.Client` for this user.
Requires :attr:`dpop_token` to be set.
TODO: unify with :meth:`granary.bluesky.Bluesky.from_auth`?
Args:
client_metadata (dict): client info metadata,
https://docs.bsky.app/docs/advanced-guides/oauth-client#client-and-server-metadata
Returns:
lexrpc.Client:
"""
assert self.dpop_token
pds_url = self.pds_url or pds_for_did(self.key.id())
oauth_client = oauth_client_for_pds(client_metadata, pds_url)
dpop_token = TokenSerializer().loads(self.dpop_token)
auth = OAuth2AccessTokenAuth(client=oauth_client, token=dpop_token)
return Client(pds_url, auth=auth, requests_session=util.session)
def _api(self, **kwargs):
"""
Args:
kwargs: passed to the :class:`lexrpc.Client` constructor
Returns:
lexrpc.Client:
"""
did = self.key.id()
client = Client(address=self.pds_url, headers={'User-Agent': util.user_agent},
requests_session=util.session, **kwargs)
if self.session and (self.session.get('accessJwt')
or self.session.get('refreshJwt')):
client.session = self.session
elif self.password:
client.com.atproto.server.createSession({
'identifier': did,
'password': self.password,
})
else:
raise ValueError(f'No tokens or password for {did}')
return client
@staticmethod
def _api_from_password(handle, password, **kwargs):
"""
Args:
handle (str)
password (str)
kwargs: passed to the :class:`lexrpc.Client` constructor
Returns:
lexrpc.Client:
"""
did = arroba.did.resolve_handle(handle, get_fn=util.requests_get)
if not did:
error(f"Couldn't resolve {handle} as a Bluesky handle")
logger.info(f'resolved {handle} to {did}')
pds_url = pds_for_did(did)
logger.info(f'Logging into {pds_url} as {did}...')
client = Client(address=pds_url, headers={'User-Agent': util.user_agent},
requests_session=util.session, **kwargs)
resp = client.com.atproto.server.createSession({
'identifier': handle,
'password': password,
})
return client
[docs]
class StartBase(views.Start):
"""Base class for starting Bluesky auth; only used to provide the button.
"""
NAME = 'bluesky'
LABEL = 'Bluesky'
@classmethod
def button_html(cls, *args, handle=None, **kwargs):
if handle:
input = f'<input name="handle" type="hidden" value="{handle}" />'
else:
input = f'<input name="handle" type="text" class="form-control" placeholder="{cls.LABEL} handle" required style="width: 135px; height: 50px; display:inline;" />'
kwargs['form_extra'] = kwargs.get('form_extra', '') + input
return super().button_html(
*args,
image_file='bluesky_logotype.png' if handle else 'bluesky_logo.png',
input_style='background-color: #EEEEEE',
**kwargs)
Start = StartBase
[docs]
class PasswordCallback(views.Callback):
"""
App password login callback stub.
"""
def dispatch_request(self):
handle = util.remove_invisible_chars(request.values['username'])\
.strip().lower().removeprefix('@')
password = request.values['password'].strip()
state = request.values.get('state')
# get the DID (portable user ID)
try:
client = BlueskyAuth._api_from_password(handle, password)
except ValueError as e:
logger.warning(f'Login failed: {e}')
return self.finish(None, state=state)
profile = {
'$type': 'app.bsky.actor.defs#profileViewDetailed',
**client.app.bsky.actor.getProfile(actor=handle)
}
auth = BlueskyAuth(
id=profile['did'],
password=password,
pds_url=client.address,
user_json=util.json_dumps(profile),
session=client.session,
)
auth.put()
return self.finish(auth, state=state)
Callback = PasswordCallback
[docs]
def pds_for_did(did):
"""Resolves a DID document and extracts its PDS URL.
https://atproto.com/specs/did#did-documents
Args:
did (str)
Returns:
str: PDS URL
Raises:
ValueError: if the DID couldn't be resolved, or if its DID document has no
ATProto PDS endpoint
"""
did_doc = arroba.did.resolve(did, get_fn=util.requests_get)
if not did_doc:
error(f"Couldn't resolve DID {did}")
# based on bridgy_fed.atproto.ATProto.pds_for
for service in did_doc.get('service', []):
if service.get('id') in ('#atproto_pds', f'{did}#atproto_pds'):
pds = service.get('serviceEndpoint')
logger.info(f'{did} has PDS {pds}')
return pds
error(f"{did}'s DID doc has no ATProto PDS")
[docs]
def oauth_client_for_pds(client_metadata, pds_url, redirect_uri=None):
"""Discovers a PDS's OAuth endpoints and creates a client.
Args:
client_metadata (dict)
pds_url (str)
redirect_uri (str): if not provided, defaults to the first element in
``redirect_uris`` in ``client_metadata`
Returns:
OAuth2Client:
Raises:
ValueError: if the DID couldn't be resolved, or if its DID document has no
ATProto PDS endpoint
"""
resp = util.requests_get(urljoin(pds_url, PROTECTED_RESOURCE_PATH))
resp.raise_for_status()
auth_server = resp.json()['authorization_servers'][0]
logger.info(f'PDS {pds_url} has auth server {auth_server}')
# OAuth special case for localhost client_id and redirect_uri
# https://atproto.com/specs/oauth#:~:text=Localhost%20Client%20Development
client_id = client_metadata['client_id']
if not redirect_uri:
redirect_uri = client_metadata['redirect_uris'][0]
if request.host.split(':')[0] in ('localhost', '127.0.0.1'):
redirect_uri = urljoin('http://127.0.0.1:8080', urlparse(redirect_uri).path)
scope = quote(CLIENT_METADATA_TEMPLATE['scope'])
client_id = f'http://localhost?redirect_uri={redirect_uri}&scope={scope}'
return OAuth2Client.from_discovery_endpoint(
urljoin(auth_server, RESOURCE_METADATA_PATH),
client_id=client_id,
redirect_uri=redirect_uri,
dpop_bound_access_tokens=True,
)
[docs]
class OAuthStart(StartBase):
"""Starts the OAuth flow.
Subclasses must populate:
* :attr:`CLIENT_METADATA` (dict): client info metadata,
https://docs.bsky.app/docs/advanced-guides/oauth-client#client-and-server-metadata
"""
CLIENT_METADATA = None
SCOPE = CLIENT_METADATA_TEMPLATE['scope']
# available scopes as of Feb 2025:
# atproto, transition:generic, transition:chat.bsky
# https://bsky.social/.well-known/oauth-authorization-server
DEFAULT_SCOPE = 'atproto transition:generic'
SCOPE_SEPARATOR = ' '
[docs]
def redirect_url(self, state=None, handle=None):
"""Returns the URL for Bluesky to redirect back to after the OAuth prompt.
Args:
state (str): user-provided value to be returned as a query parameter in
the return redirect
handle (str): Bluesky domain handle. If ````None``, uses the ``handle``
parameter in POST form data.
Raises:
ValueError, RequestException: if handle isn't a valid domain
"""
assert self.CLIENT_METADATA
client_id = self.CLIENT_METADATA['client_id']
if not handle:
handle = request.form['handle']
handle = util.remove_invisible_chars(handle).strip().lstrip('@')
if not re.fullmatch(util.DOMAIN_RE, handle):
error(f"{handle} doesn't look like a domain")
# resolve handle to DID doc and PDS base URL
# https://atproto.com/specs/handle#handle-resolution
did = arroba.did.resolve_handle(handle, get_fn=util.requests_get)
if not did:
error(f"Couldn't resolve {handle} as a Bluesky handle")
logger.info(f'resolved {handle} to {did}')
# generate authz URL, store session, redirect
redirect_uri = self.to_url()
if request.host.split(':')[0] in ('localhost', '127.0.0.1'):
redirect_uri = urljoin('http://127.0.0.1:8080', urlparse(redirect_uri).path)
client = oauth_client_for_pds(self.CLIENT_METADATA, pds_for_did(did),
redirect_uri=redirect_uri)
login_key = BlueskyLogin.allocate_ids(1)[0]
try:
authz_request = client.authorization_request(
redirect_uri=redirect_uri, scope=self.SCOPE, state=login_key.id())
par_request = client.pushed_authorization_request(authz_request)
except OAuth2Error as e:
error(e)
serialized = AuthorizationRequestSerializer().dumps(authz_request)
BlueskyLogin(key=login_key, state=state, did=did, authz_request=serialized).put()
return par_request.uri
[docs]
class OAuthCallback(views.Callback):
"""Finishes the OAuth flow.
Subclasses must populate:
* :attr:`CLIENT_METADATA` (dict): client info metadata,
https://docs.bsky.app/docs/advanced-guides/oauth-client#client-and-server-metadata
"""
CLIENT_METADATA = None
def dispatch_request(self):
login = None
if state_id := request.values.get('state'):
login = BlueskyLogin.load(state_id)
# handle errors
err = request.values.get('error')
desc = request.values.get('error_description')
if err:
msg = f'Error: {err}: {desc}'
logger.info(msg)
if err == 'access_denied':
return self.finish(None, state=login.state if login else None)
else:
error(msg)
if not login:
error('Missing login')
pds_url = pds_for_did(login.did)
client = oauth_client_for_pds(self.CLIENT_METADATA, pds_url,
redirect_uri=request.url)
# validate authz response, get access token
try:
authz_request = AuthorizationRequestSerializer().loads(login.authz_request)
authz_resp = authz_request.validate_callback(request.url)
token = client.authorization_code(authz_resp, validate=True)
except OAuth2Error as e:
error(e)
if token.sub != login.did:
error(f'Started login with {login.did} but authenticated {token.sub}')
# get user profile
# https://docs.bsky.app/docs/advanced-guides/oauth-client#callback-and-access-token-request
auth = OAuth2AccessTokenAuth(client=client, token=token)
pds_client = Client(pds_url, auth=auth, requests_session=util.session)
try:
profile = pds_client.app.bsky.actor.getProfile(actor=login.did)
except BaseException as e:
code, body = util.interpret_http_exception(e)
if code:
error(f'{code} {body}')
raise
profile['$type'] = 'app.bsky.actor.defs#profileViewDetailed'
auth = BlueskyAuth(id=login.did,
pds_url=pds_url,
dpop_token=TokenSerializer().dumps(token),
user_json=util.json_dumps(profile))
auth.put()
return self.finish(auth, state=login.state)
[docs]
def make_session_callback(auth_entity):
"""Returns a ``session_callback`` for storing refreshed tokens to the datastore.
Used with :class:`granary.Bluesky` and :class:`lexrpc.Client`. Handles both
legacy app password sessions (dict) and OAuth DPoP tokens
(:class:`requests_oauth2client.OAuth2AccessTokenAuth`).
Args:
auth_entity (BlueskyAuth)
Returns:
callable (dict or OAuth2AccessTokenAuth) => None:
"""
def callback(session_or_auth):
if (isinstance(session_or_auth, dict)
and session_or_auth != auth_entity.session):
logger.info(f'Storing session for {auth_entity.key.id()}')
auth_entity.session = session_or_auth
auth_entity.put()
elif isinstance(session_or_auth, OAuth2AccessTokenAuth):
serialized = TokenSerializer().dumps(session_or_auth.token)
if serialized != auth_entity.dpop_token:
logger.info(f'Storing DPoP token for {auth_entity.key.id()}')
auth_entity.dpop_token = serialized
auth_entity.put()
return callback