Source code for oauth_dropins.webutil.logs

"""A handler that serves all app logs for an App Engine HTTP request.

StackDriver Logging API: https://cloud.google.com/logging/docs/apis
"""
import calendar
from datetime import datetime, timedelta, timezone
import html
import logging
import re
import time
import urllib.request, urllib.parse, urllib.error

from flask import request
from google.cloud import ndb
from google.cloud.logging import Client
from oauth_dropins.webutil.util import json_dumps, json_loads

from .appengine_info import APP_ID
from . import flask_util, util
from .flask_util import error

logger = logging.getLogger(__name__)

LEVELS = {
  logging.DEBUG:    'D',
  logging.INFO:     'I',
  logging.WARNING:  'W',
  logging.ERROR:    'E',
  logging.CRITICAL: 'F',
}

CACHE_TIME = timedelta(days=1)
MAX_LOG_AGE = timedelta(days=30)
# App Engine's launch, roughly
MIN_START_TIME = time.mktime(datetime(2008, 4, 1, tzinfo=timezone.utc).timetuple())
MAX_START_TIME = time.mktime(datetime(2099, 1, 1, tzinfo=timezone.utc).timetuple())

SANITIZE_RE = re.compile(r"""
  ((?:access|api|oauth)?[ _]?
   (?:code|accessJwt|consumer_key|consumer_secret|nonce|password|refreshJwt|secret|signature|token|verifier)
     (?:u?['"])?
   (?:=|:|\ |,\ |%3D)\ *
     (?:u?['"])?
  )
  [^ &='"]+
""", flags=re.VERBOSE | re.IGNORECASE)


[docs] def sanitize(msg): """Sanitizes access tokens and Authorization headers.""" return SANITIZE_RE.sub(r'\1...', msg)
[docs] def url(when, key, **params): """Returns the relative URL (no scheme or host) to a log page. Args: when (:class:`datetime`) key (:class:`ndb.Key` or str) params: included as query params, eg module, path """ assert 'start_time' not in params and 'key' not in params, params if isinstance(params.get('path'), (list, tuple)): for path in params['path']: assert ',' not in path, path params['path'] = ','.join(params['path']) params.update({ 'start_time': calendar.timegm(when.utctimetuple()), 'key': key.urlsafe().decode() if isinstance(key, ndb.Key) else key, }) return 'log?' + urllib.parse.urlencode(params)
# datastore string keys are url-safe-base64 of, say, at least 32(ish) chars. # https://cloud.google.com/appengine/docs/python/ndb/keyclass#Key_urlsafe # http://tools.ietf.org/html/rfc3548.html#section-4 BASE64 = 'A-Za-z0-9-_=' DATASTORE_KEY_RE = re.compile("([^%s])(([%s]{8})[%s]{24,})([^%s])" % ((BASE64,) * 4))
[docs] def linkify_datastore_keys(msg): """Converts string datastore keys to links to the admin console viewer.""" def linkify_key(match): try: # Useful for logging, but also causes false positives in the search, we # find and use log requests instead of real requests. # logger.debug(f'Linkifying datastore key: {'match.group(2)}') key = ndb.Key(urlsafe=match.group(2)) tokens = [(kind, f"{'id' if isinstance(id, int) else 'name'}:{id}") for kind, id in key.pairs()] key_str = '0/|' + '|'.join(f'{len(kind)}/{kind}|{len(id)}/{id}' for kind, id in tokens) key_quoted = urllib.parse.quote(urllib.parse.quote(key_str, safe=''), safe='') html = f"{match.group(1)}<a title='{match.group(2)}' href='https://console.cloud.google.com/datastore/entities;kind={key.kind()};ns=__$DEFAULT$__/edit;key={key_quoted}?project={APP_ID}'>{match.group(3)}...</a>{match.group(4)}" # logger.debug(f'Returning {html}') return html except BaseException: # logger.debug("Couldn't linkify candidate datastore key.") # too noisy return match.group(0) return DATASTORE_KEY_RE.sub(linkify_key, msg)
[docs] def log(module=None, path=None): """Flask view that searches for and renders app logs for an HTTP request. URL parameters: * ``start_time`` (float): seconds since the epoch * ``key`` (str): token to find in the first app log of the request Install with:: app.add_url_rule('/log', view_func=logs.log) Or:: @app.get('/log') @cache.cached(600) def log(): return logs.log() Args: module (str): App Engine module to search. Defaults to all. path (str or sequence of str): optional HTTP request path(s) to limit logs to. Returns: (str response body, dict headers) tuple: Flask response """ if not module: module = request.values.get('module') if not path: path = request.values.get('path') start_time = flask_util.get_required_param('start_time') if not util.is_float(start_time): return error(f"Couldn't convert start_time to float: {start_time!r}") start_time = float(start_time) if start_time < MIN_START_TIME: return error(f'start_time must be >= {MIN_START_TIME}') elif start_time > MAX_START_TIME: return error(f'start_time must be <= {MAX_START_TIME}') client = Client() project = f'projects/{APP_ID}' key = urllib.parse.unquote_plus(flask_util.get_required_param('key')) # first, find the individual log message to get the trace id utcfromtimestamp = datetime.utcfromtimestamp timestamp_filter = ( f"timestamp>=\"{utcfromtimestamp(start_time - 60).isoformat() + 'Z'}\" " f"timestamp<=\"{utcfromtimestamp(start_time + 120).isoformat() + 'Z'}\"") query = f'logName="{project}/logs/python" textPayload:"{key}" {timestamp_filter}' if module: query += f' resource.labels.module_id="{module}"' if path: or_paths = ' OR '.join(f'"{path}"' for path in path.split(',')) query += f' httpRequest.requestUrl:({or_paths})' # don't log key; would be a false positive for future searches logger.info('Searching logs...') # with: {query}') try: # https://googleapis.dev/python/logging/latest/client.html#google.cloud.logging_v2.client.Client.list_entries log = next(iter(client.list_entries(filter_=query, page_size=1))) except StopIteration: logger.info('No log found!') return 'No log found!', 404 logger.info(f'Got insert id {log.insert_id} trace {log.trace}') # now, print all logs with that trace resp = """\ <html> <body style="font-family: monospace; white-space: pre"> """ query = f'logName="{project}/logs/python" trace="{log.trace}" resource.type="gae_app" {timestamp_filter}' logger.info(f'Searching logs with: {query}') # sanitize and render each line for log in client.list_entries(filter_=query, page_size=1000): # payload is a union that can be string, JSON, or protobuf # https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.oneof_payload msg = log.payload if not msg: continue elif isinstance(msg, (dict, list)): msg = json_dumps(msg, indent=2) else: msg = str(msg) msg = linkify_datastore_keys(util.linkify(html.escape( msg if msg.startswith('Created by this poll:') else sanitize(msg), quote=False))) resp += '%s %s %s<br />' % ( log.severity[0], log.timestamp, msg.replace('\n', '<br />')) resp += '</body>\n</html>' return resp, {'Content-Type': 'text/html; charset=utf-8'}