"""IndieAuth drop-in.
https://indieauth.com/developers
"""
import logging
import urllib.parse
from flask import request
from google.cloud import ndb
import mf2util
import requests
from . import models, views
from .webutil import flask_util, util
from .webutil.util import json_dumps, json_loads
INDIEAUTH_CLIENT_ID = util.read('indieauth_client_id')
INDIEAUTH_URL = 'https://indieauth.com/auth'
[docs]def discover_authorization_endpoint(me, resp=None):
"""Fetch a URL and look for authorization_endpoint Link header or
rel-value.
Args:
me: string, URL to fetch
resp: :class:`requests.Response` (optional), re-use response if it's already
been fetched
Return:
string, the discovered indieauth URL or the default indieauth.com URL
"""
try:
resp = resp or util.requests_get(me)
except (ValueError, requests.URLRequired, requests.TooManyRedirects) as e:
flask_util.error(str(e))
if resp.status_code // 100 != 2:
logging.warning(
'could not fetch user url "%s". got response code: %d',
me, resp.status_code)
return INDIEAUTH_URL
# check authorization_endpoint header first
auth_endpoint = resp.links.get('authorization_endpoint', {}).get('url')
if auth_endpoint:
return auth_endpoint
# check the html content
soup = util.parse_html(resp.text)
auth_link = soup.find('link', {'rel': 'authorization_endpoint'})
auth_endpoint = auth_link and auth_link.get('href')
if auth_endpoint:
return auth_endpoint
return INDIEAUTH_URL
[docs]def build_user_json(me, resp=None):
"""user_json contains an h-card, rel-me links, and "me"
Args:
me: string, URL of the user, returned by
resp: :class:`requests.Response` (optional), re-use response if it's already
been fetched
Return:
dict, with 'me', the URL for this person; 'h-card', the representative h-card
for this page; 'rel-me', a list of rel-me URLs found at this page
"""
user_json = {'me': me}
resp = resp or util.requests_get(me)
if resp.status_code // 100 != 2:
logging.warning(
'could not fetch user url "%s". got response code: %d',
me, resp.status_code)
return user_json
mf2 = util.parse_mf2(resp, resp.url)
user_json['rel-me'] = mf2['rels'].get('me')
user_json['h-card'] = mf2util.representative_hcard(mf2, me)
logging.debug('built user-json %r', user_json)
return util.trim_nulls(user_json)
[docs]class IndieAuth(models.BaseAuth):
"""An authenticated IndieAuth user.
Provides methods that return information about this user. Stores credentials
in the datastore. Key is the domain name. See models.BaseAuth for usage
details.
"""
user_json = ndb.TextProperty(required=True) # generally this has only 'me'
[docs] def site_name(self):
return 'IndieAuth'
[docs] def user_display_name(self):
"""Returns the user's domain."""
return self.key_id()
[docs] def access_token(self):
"""Return the access token, N/A for IndieAuth"""
return None
[docs]class Start(views.Start):
"""Starts the IndieAuth flow. Requires the 'me' parameter with the
user URL that we want to authenticate.
"""
NAME = 'indieauth'
LABEL = 'IndieAuth'
[docs] def redirect_url(self, state=None, me=None):
assert INDIEAUTH_CLIENT_ID, (
"Please fill in the indieauth_client_id in your app's root directory.")
# TODO: unify with mastodon?
if not me:
me = request.values['me']
parsed = urllib.parse.urlparse(me)
if not parsed.scheme:
me = 'http://' + me
redirect_uri = self.to_url()
endpoint = discover_authorization_endpoint(me)
url = endpoint + '?' + urllib.parse.urlencode({
'me': me,
'client_id': INDIEAUTH_CLIENT_ID,
'redirect_uri': redirect_uri,
'state': util.encode_oauth_state({
'endpoint': endpoint,
'me': me,
'state': state,
}),
})
logging.info('Redirecting to IndieAuth: %s', url)
return url
@classmethod
def button_html(cls, *args, **kwargs):
return super(cls, cls).button_html(
*args,
input_style='background-color: #EBEBEB; padding: 5px',
form_extra='<input type="url" name="me" class="form-control" placeholder="Your web site" required style="width: 150px; height: 50px; display:inline;" />',
**kwargs)
[docs]class Callback(views.Callback):
"""The callback view from the IndieAuth request. POSTs back to the
auth endpoint to verify the authentication code."""
[docs] def dispatch_request(self):
code = request.values['code']
state = util.decode_oauth_state(request.values['state'])
endpoint = state.get('endpoint')
me = state.get('me')
if not endpoint or not me:
flask_util.error("invalid state parameter")
state = state.get('state') or ''
validate_resp = util.requests_post(endpoint, data={
'me': me,
'client_id': INDIEAUTH_CLIENT_ID,
'code': code,
'redirect_uri': request.base_url,
'state': state,
})
if validate_resp.status_code // 100 == 2:
data = util.sniff_json_or_form_encoded(validate_resp.text)
if data.get('me'):
verified = data.get('me')
user_json = build_user_json(verified)
indie_auth = IndieAuth(id=verified, user_json=json_dumps(user_json))
indie_auth.put()
return self.finish(indie_auth, state=state)
else:
flask_util.error('Verification response missing required "me" field')
else:
flask_util.error('IndieAuth verification failed: %s %s' %
(validate_resp.status_code, validate_resp.text))