Source code for oauth_dropins.webutil.models

"""App Engine datastore model base classes, properties, and utilites."""
import base64
import enum
import os
from google.cloud import ndb

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from oauth_dropins.webutil import util
from oauth_dropins.webutil.util import json_dumps, json_loads

# 1MB limit: https://cloud.google.com/datastore/docs/concepts/limits
# use this to check an entity's size:
#   len(entity._to_pb().Encode())
MAX_ENTITY_SIZE = 1 * 1000 * 1000

ENCRYPTED_PROPERTY_KEY = None
if key_base64 := util.read('encrypted_property_key'):  # base-64 encoded key bytes
  key_bytes = base64.b64decode(key_base64)
  assert len(key_bytes) == 32
  ENCRYPTED_PROPERTY_KEY = AESGCM(key_bytes)


[docs] class StringIdModel(ndb.Model): """An :class:`ndb.Model` class that requires a string id."""
[docs] def put(self, *args, **kwargs): """Raises AssertionError if string id is not provided.""" assert self.key and self.key.string_id(), 'string id required but not provided' return super(StringIdModel, self).put(*args, **kwargs)
[docs] class JsonProperty(ndb.TextProperty): """Fork of ndb's that subclasses :class:`ndb.TextProperty` instead of :class:`ndb.BlobProperty`. This makes values show up as normal, human-readable, serialized JSON in the web console. https://github.com/googleapis/python-ndb/issues/874#issuecomment-1442753255 Duplicated in arroba: https://github.com/snarfed/arroba/blob/main/arroba/ndb_storage.py """ def _validate(self, value): if not isinstance(value, dict): raise TypeError('JSON property must be a dict') def _to_base_type(self, value): as_str = json_dumps(value, separators=(',', ':'), ensure_ascii=True) return as_str.encode('ascii') def _from_base_type(self, value): if not isinstance(value, str): value = value.decode('ascii') return json_loads(value)
[docs] class ComputedJsonProperty(JsonProperty, ndb.ComputedProperty): """Custom :class:`ndb.ComputedProperty` for JSON values that stores them as strings. ...instead of like :class:`ndb.StructuredProperty`, with "entity" type, which bloats them unnecessarily in the datastore. """ def __init__(self, *args, **kwargs): kwargs['indexed'] = False super().__init__(*args, **kwargs)
[docs] class EnumProperty(ndb.IntegerProperty): """Property for storing Python Enum values. Stores the enum's integer value in the datastore. """ def __init__(self, enum_class, **kwargs): if not issubclass(enum_class, enum.Enum): raise TypeError('enum_class must be a subclass of enum.Enum') self._enum_class = enum_class super().__init__(**kwargs) def _validate(self, value): if value is not None and not isinstance(value, self._enum_class): raise TypeError(f'Expected {self._enum_class.__name__}, got {type(value).__name__}') def _to_base_type(self, value): if value is None: return None return value.value def _from_base_type(self, value): if value is None: return None return next((item for item in self._enum_class if item.value == value), None)
[docs] class EncryptedProperty(ndb.BlobProperty): """Property that stores encrypted bytes. Encrypts bytes values using AES-256-GCM before storing in the datastore, and decrypts them when reading back. The AES-256-GCM key should be in the ``encrypted_property_key`` file, base64 encoded. Here's example code to generate an AES-256-GCM key and base64 encode it: import base64 from cryptography.hazmat.primitives.ciphers.aead import AESGCM key_bytes = AESGCM.generate_key(bit_length=256) print(base64.b64encode(key_bytes)) """ def _validate(self, value): if value is not None and not isinstance(value, bytes): raise TypeError('EncryptedProperty value must be bytes') def _to_base_type(self, value): if value is None: return None if not ENCRYPTED_PROPERTY_KEY: raise RuntimeError('No encryption key found in encrypted_property_key.pem') nonce = os.urandom(12) # 96-bit nonce for GCM ciphertext = ENCRYPTED_PROPERTY_KEY.encrypt(nonce, value, None) # concatenate nonce and ciphertext for storage return nonce + ciphertext def _from_base_type(self, value): if value is None: return None if not ENCRYPTED_PROPERTY_KEY: raise RuntimeError('No encryption key found in encrypted_property_key.pem') nonce = value[:12] ciphertext = value[12:] return ENCRYPTED_PROPERTY_KEY.decrypt(nonce, ciphertext, None)