Source code for oauth_dropins.webutil.logs

"""A handler that serves all app logs for an App Engine HTTP request.

StackDriver Logging API:
https://cloud.google.com/logging/docs/apis
"""
import calendar
import datetime
import html
import logging
import re
import time
import urllib.request, urllib.parse, urllib.error

from google.cloud import ndb
from google.cloud.logging import Client
import humanize

from .appengine_info import APP_ID
from . import flask_util, util
from .flask_util import error

LEVELS = {
  logging.DEBUG:    'D',
  logging.INFO:     'I',
  logging.WARNING:  'W',
  logging.ERROR:    'E',
  logging.CRITICAL: 'F',
}

CACHE_TIME = datetime.timedelta(days=1)
MAX_LOG_AGE = datetime.timedelta(days=30)
# App Engine's launch, roughly
MIN_START_TIME = time.mktime(datetime.datetime(2008, 4, 1).timetuple())

SANITIZE_RE = re.compile(r"""
  ((?:access|api|oauth)?[ _]?
   (?:code|consumer_key|consumer_secret|nonce|secret|signature|token|verifier)
     (?:u?['"])?
   (?:=|:|\ |,\ |%3D)\ *
     (?:u?['"])?
  )
  [^ &='"]+
""", flags=re.VERBOSE | re.IGNORECASE)


[docs]def sanitize(msg): """Sanitizes access tokens and Authorization headers.""" return SANITIZE_RE.sub(r'\1...', msg)
[docs]def url(when, key): """Returns the relative URL (no scheme or host) to a log page. Args: when: datetime key: ndb.Key """ return 'log?start_time=%s&key=%s' % ( calendar.timegm(when.utctimetuple()), key.urlsafe().decode())
# datastore string keys are url-safe-base64 of, say, at least 32(ish) chars. # https://cloud.google.com/appengine/docs/python/ndb/keyclass#Key_urlsafe # http://tools.ietf.org/html/rfc3548.html#section-4 BASE64 = 'A-Za-z0-9-_=' DATASTORE_KEY_RE = re.compile("([^%s])(([%s]{8})[%s]{24,})([^%s])" % ((BASE64,) * 4))
[docs]def linkify_datastore_keys(msg): """Converts string datastore keys to links to the admin console viewer.""" def linkify_key(match): try: # Useful for logging, but also causes false positives in the search, we # find and use log requests instead of real requests. # logging.debug('Linkifying datastore key: %s', match.group(2)) key = ndb.Key(urlsafe=match.group(2)) tokens = [(kind, '%s:%s' % ('id' if isinstance(id, int) else 'name', id)) for kind, id in key.pairs()] key_str = '0/|' + '|'.join('%d/%s|%d/%s' % (len(kind), kind, len(id), id) for kind, id in tokens) key_quoted = urllib.parse.quote(urllib.parse.quote(key_str, safe=''), safe='') html = "%s<a title='%s' href='https://console.cloud.google.com/datastore/entities;kind=%s;ns=__$DEFAULT$__/edit;key=%s?project=%s'>%s...</a>%s" % ( match.group(1), match.group(2), key.kind(), key_quoted, APP_ID, match.group(3), match.group(4)) logging.debug('Returning %s', html) return html except BaseException: # logging.debug("Couldn't linkify candidate datastore key.") # too noisy return match.group(0) return DATASTORE_KEY_RE.sub(linkify_key, msg)
[docs]def utcfromtimestamp(val): """Wrapper for datetime.utcfromtimestamp that returns HTTP 400 on overflow. ...specifically, if datetime.utcfromtimestamp raises OverflowError because the timestamp is greater than the platform's time_t can hold. https://docs.python.org/3.9/library/datetime.html#datetime.datetime.utcfromtimestamp """ try: return datetime.datetime.utcfromtimestamp(val) except OverflowError: return error(f'start_time too big: {val}')
[docs]def log(): """Flask view that searches for and renders app logs for an HTTP request. URL parameters: start_time: float, seconds since the epoch key: string that should appear in the first app log Install with: app.add_url_rule('/log', view_func=logs.log) Or: @app.get('/log') @cache.cached(600) def log(): return logs.log() """ start_time = flask_util.get_required_param('start_time') if not util.is_float(start_time): return error("Couldn't convert start_time to float: %r" % start_time) start_time = float(start_time) if start_time < MIN_START_TIME: return error("start_time must be >= %s" % MIN_START_TIME) client = Client() project = 'projects/%s' % APP_ID key = urllib.parse.unquote_plus(flask_util.get_required_param('key')) # first, find the individual log message to get the trace id timestamp_filter = 'timestamp>="%s" timestamp<="%s"' % ( utcfromtimestamp(start_time - 60).isoformat() + 'Z', utcfromtimestamp(start_time + 120).isoformat() + 'Z') query = 'logName="%s/logs/app" jsonPayload.message:"%s" %s' % ( project, key, timestamp_filter) logging.info('Searching logs with: %s', query) try: # https://googleapis.dev/python/logging/latest/client.html#google.cloud.logging_v2.client.Client.list_entries log = next(iter(client.list_entries(filter_=query, page_size=1))) except StopIteration: logging.info('No log found!') return 'No log found!', 404 logging.info('Got insert id %s trace %s', log.insert_id, log.trace) # now, print all logs with that trace resp = """\ <html> <body style="font-family: monospace; white-space: pre"> """ query = 'logName="%s/logs/app" trace="%s" resource.type="gae_app" %s' % ( project, log.trace, timestamp_filter) logging.info('Searching logs with: %s', query) # sanitize and render each line for log in client.list_entries(filter_=query, page_size=1000): msg = log.payload.get('message') if msg: msg = linkify_datastore_keys(util.linkify(html.escape( msg if msg.startswith('Created by this poll:') else sanitize(msg), quote=False))) resp += '%s %s %s<br />' % ( log.severity[0], log.timestamp, msg.replace('\n', '<br />')) resp += '</body>\n</html>' return resp, {'Content-Type': 'text/html; charset=utf-8'}