Source code for server.server

#  server.py, part of the authserver package
#
#  part of https://github.com/varkenvarken/dockerplayground
#
#  (c) 2020 Michel Anders (varkenvarken)
#
#  This program is free software; you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  This program is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program; if not, write to the Free Software
#  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
#  MA 02110-1301, USA.
"""
This module handles the http requests related to user and session management.

It also provides utility functions to initialize the database and admin user.

The attributes listed below configure the authserver and are initialized from
environment variables with the same name.

Additional attributes are defined at the package level (in :mod:`server`) and in :mod:`.smtp` .

Attributes:

- :attr:`DOMAIN`
- :attr:`APPLICATION`
- :attr:`LOGINSCREEN`
- :attr:`CONFIRMREGISTRATION`
- :attr:`RESETPASSWORD`
- :attr:`WEBSITE`
- :attr:`SOFTTIMEOUT`
- :attr:`HARDTIMEOUT`
- :attr:`PWRESETTIMEOUT`
- :attr:`REGISTERTIMEOUT`
- :attr:`EMAILTEMPLATE_FORGOTPASSWORD`
- :attr:`EMAILTEMPLATE_REGISTER`

The following environment variables define credentials but are not present in the module as attributes.

- `ADMIN_USER_FILE` point to a file containing admin user name.
- `ADMIN_USER` admin user name, overrides `ADMIN_USER_FILE`
- `ADMIN_PASSWORD_FILE` point to a file containing admin password.
- `ADMIN_PASSWORD` admin password, overrides `ADMIN_PASSWORD_FILE`

"""

import usercustomize

from datetime import datetime, date, timedelta
from uuid import uuid4 as guid
from hashlib import pbkdf2_hmac
from hmac import compare_digest
from os import urandom
import os
from smtp import fetch_smtp_params, mail
from decimal import Decimal
import json
from time import sleep

import falcon

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import event, create_engine, ForeignKey, Column, Integer, String, DateTime, Boolean
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.engine import Engine

from loguru import logger

from regex import compile  # we use an alternative regular expression library here to support unicode classes like \p{L}
from regex.regex import Pattern

logger.info(usercustomize.coverage)


[docs]def number(variable, default): """ Return the integer in the environment variable. Arguments: variable(str): the name of the environment variable default(int): the default value to return if variable is not defined Returns: An integer value. """ if variable in os.environ: return int(os.environ[variable]) return int(default)
[docs]def getvar(variable, default='<unknown>'): """ Return the value of the environment variable. Arguments: variable(str): the name of the environment variable default(str): the default value to return if variable is not defined Returns: A string. """ if variable in os.environ: return os.environ[variable] return default
[docs]def getfile(variable, defaultfilename, default='Hi {name}, click {link}, Regards {website}'): """ Return the contents of the file specified in the environment variable. Arguments: variable(str): the name of the environment variable defaultfilename(str): the filename to use if the variable is not defined default(str): the string to return if the file couldn't be found The default contains the following placeholders that can be used in the actual files as well - {name} the full name of the user - {link} a confirmation link to click - {website} the name of the application/website Returns: A string. """ filename = defaultfilename text = default if variable in os.environ: filename = os.environ[variable] try: with open(filename) as f: text = "".join(f.readlines()) except FileNotFoundError: logger.error(f"could not open {filename}") text = default return text
DOMAIN = getvar('DOMAIN') """Domain to be used in session cookie, e.g. `yourdomain.org` """ APPLICATION = getvar('APPLICATION') """Redirect locations for successful logon, e.g. `/books`""" LOGINSCREEN = getvar('LOGINSCREEN') """Location of login screen, e.g. `/books/login.html`""" CONFIRMREGISTRATION = getvar('CONFIRMREGISTRATION') """Base url in email registration confirmation, e.g. ``https://server.yourdomain.org/auth/confirmregistration``""" RESETPASSWORD = getvar('RESETPASSWORD') """Base url in password reset request confirmation, e.g. ``https://server.yourdomain.org/auth/resetpassword``""" WEBSITE = getvar('WEBSITE') """The name of the app/website, e.g. `Book Collection`""" SOFTTIMEOUT = number('SOFTTIMEOUT', 30) """Session soft limit in minutes""" HARDTIMEOUT = number('HARDTIMEOUT', 8 * 60) """Session hard limit in minutes""" PWRESETTIMEOUT = number('PWRESETTIMEOUT', 1 * 60) """Password reset request confirmation limit in minutes""" REGISTERTIMEOUT = number('REGISTERTIMEOUT', 1 * 60) """Registration confirmation limit in minutes""" EMAILTEMPLATE_FORGOTPASSWORD = getfile('EMAILTEMPLATE_FORGOTPASSWORD', 'mailtemplates/passwordreset.mail') """ File containing email template for password reset confirmation email See :func:`getfile` for allowed template variables inside the text. """ EMAILTEMPLATE_REGISTER = getfile('EMAILTEMPLATE_REGISTER', 'mailtemplates/registration.mail') """ File containing email template for registration confirmation email See :func:`getfile` for allowed template variables inside the text. """ SESSIONID_pattern = compile(r"[01-9a-f]{32}") """Legal pattern for a session id and confirmation ids""" # TODO expand password patterns to full unicode PASSWORD_lower = compile(r"[a-z]") """These characters are considered lowercase in passwords""" PASSWORD_upper = compile(r"[A-Z]") """These characters are considered uppercase in passwords""" PASSWORD_digit = compile(r"[01-9]") """These characters are considered digits in passwords""" PASSWORD_special = compile(r"[ !|@#$%^&*()\-_.,<>?/\\{}\[\]]") """These characters are considered special in passwords"""
[docs]def newpassword(password): """ Return a cryptographic hash of password as a string of hex digits. The password is salted with 16 random bytes. The salt is prepended as 32 hex digits to the returned hash. Arguments: password(str): the password to hash. Returns: A string consisting of 32 + 64 hexadecimal characters. """ salt = urandom(16) dk = pbkdf2_hmac('sha256', password.encode(), salt, 100000) return salt.hex() + dk.hex()
[docs]def checkpassword(password, reference): """ Compare a plaintext password to a hashed reference. The reference is a string of hex digits, the first 32 being the salt. """ salt = bytes.fromhex(reference[:32]) dk = pbkdf2_hmac('sha256', password.encode(), salt, 100000) return compare_digest(salt.hex() + dk.hex(), reference)
[docs]def allowed_password(s): """ Check if a password meets the complexity criteria: - between 8 and 64 characters, - contain at least 1 lowercase, 1 uppercase, 1 digit and 1 special character - it may not contain characters outside those classes - character classes _are_ unicode aware """ if len(s) > 64 or len(s) < 8: return False nlower = len(PASSWORD_lower.findall(s)) nupper = len(PASSWORD_upper.findall(s)) ndigit = len(PASSWORD_digit.findall(s)) nspecial = len(PASSWORD_special.findall(s)) if nlower < 1 or nupper < 1 or ndigit < 1 or nspecial < 1: return False if nlower + nupper + ndigit + nspecial != len(s): return False return True
[docs]class ParameterSet: """ Defines allowable input parameters for a POST or GET request. """
[docs] def __init__(self, specs={}): """ Creates a ParameterSet instance based on a dictionary of parameters. Arguments: specs(dict): specifies a mapping name --> (ex, maxlength) `name` is a case sensitive name of an allowed input parameter `ex` is either a string, a regular expression or a callable that specifies the validity of a value. `maxlength` is an integer that specifies the maximum length of a parameter value If `ex` is a string it is converted to a regular expression. If `ex` is a callable it should return a boolean indicating the validity of a value. """ self.specs = {} for k, v in specs.items(): r, length = v if isinstance(r, str): self.specs[k] = compile(r), length else: self.specs[k] = r, length self.keys = set(self.specs.keys())
[docs] def check(self, params): """ Return true if all params are all allowed. Arguments: params(dict): is a mapping name --> value, where name and value are strings Each value should match the requirements specified for 'name'. If `params` contains extra parameters or it is missing items it is considered invalid. """ logger.debug(f'params {params}') if set(params.keys()) == self.keys: # all input names should be present for k, v in params.items(): r, length = self.specs[k] if len(v) > length: # values that are too long are invalid logger.debug(f'too long {k}:{v}') return False elif isinstance(r, Pattern): if not bool(r.fullmatch(v)): logger.debug(f'no match {k}:{v} re:{r.pattern}') return False elif not r(v): logger.debug(f'not {r.__name__} {k}: {v}') return False return True logger.debug(f'keynames do not match: {params.keys()}, required {self.keys}') return False
Base = declarative_base()
[docs]def alchemyencoder(obj): """ A JSON encoder for SQLAlchemy :func:`declarative_base` objects, :class:`date` and :class:`Decimal` objects. :class:`Base` objects are returned as :class:`dict` object with a key for each column. A column with a name equal to `password` is _not_ included. If a Base object has a `user` column, an extra key `email` is added that contains `user.email`. :class:`date` objects are returned as an isoformat string. :class:`Decimal` objects are returned as a float. Arguments: obj(object): an object to decode Returns: A :class:`dict`, :class:`str`, :class:`float` or `None`. Like all default encoders for json it returns objects which are then encoded to json strings by the main encoder. an example:: import json jsonstring = json.dumps(someobject, default=alchemyencoder) """ if isinstance(obj, Base): d = {c.name: getattr(obj, c.name) for c in obj.__table__.columns if c.name != 'password'} # passwords *never* leave the system, not even encrypted if hasattr(obj, 'user'): d['email'] = obj.user.email return d elif isinstance(obj, date): return obj.isoformat() elif isinstance(obj, Decimal): return float(obj)
[docs]class User(Base): """ ORM representation of a User. """ __tablename__ = 'user' id = Column(Integer, primary_key=True) """Primary key""" email = Column(String(100), unique=True) """user name (a valid email address)""" password = Column(String(100)) """hashed password""" name = Column(String(100)) """full name""" superuser = Column(Boolean(), default=False) """role, true if superuser""" created = Column(DateTime(), default=datetime.now()) """timestamp""" active = Column(Boolean(), default=True) """true if user account is enabled""" attempts = Column(Integer, default=0) """number of failed login attempts""" accessed = Column(DateTime(), default=datetime.now()) """timestamp of last access""" locked = Column(DateTime(), default=datetime.now()) """timestamp of user lockout"""
[docs]class Session(Base): """ ORM representation of a Session. Any session will be deleted if the corresponding user is deleted. """ __tablename__ = 'session' id = Column(String(34), primary_key=True) """primary key holds a guid""" created = Column(DateTime(), default=datetime.now()) """timestamp""" softlimit = Column(DateTime(), default=datetime.now()) """timestamp, session must show activity before this time to be renewed""" hardlimit = Column(DateTime(), default=datetime.now()) """timestamp, after this time the session will be removed regardless""" userid = Column(Integer, ForeignKey('user.id', ondelete='CASCADE')) """foreign key to User, on cascade delete is on""" user = relationship(User) """ORM link to User"""
[docs]class PendingUser(Base): """ ORM representation of a PendingUser (a newly registered user awaiting confirmation). """ __tablename__ = 'pendinguser' id = Column(String(34), primary_key=True) # holds a guid """primary key holds a guid""" email = Column(String(100), unique=True) """user name (a valid email address)""" name = Column(String(100)) """full name""" password = Column(String(100)) """hashed password""" created = Column(DateTime(), default=datetime.now()) """timestamp""" expires = Column(DateTime(), default=datetime.now()) """timestamp, after this time the session will be removed regardless"""
[docs]class PasswordReset(Base): """ ORM representation of a PasswordReset event awaiting confirmation. This PasswordReset will be deleted if the corresponding user is deleted. """ __tablename__ = 'passwordreset' id = Column(String(34), primary_key=True) # holds a guid """primary key holds a guid""" created = Column(DateTime(), default=datetime.now()) """timestamp""" expires = Column(DateTime(), default=datetime.now()) """timestamp, after this time the session will be removed regardless""" userid = Column(Integer, ForeignKey('user.id', ondelete='CASCADE')) """foreign key to User, on cascade delete is on""" user = relationship(User) """ORM link to User"""
verifysession_params = ParameterSet({'sessionid': (r"[01-9a-f]{32}", 34)}) logout_params = ParameterSet({}) login_login_params = ParameterSet({'login': ('Login', 5), 'password': (allowed_password, 64), 'email': (r"[^@]+@[^.]+\.[^.]+(\.[^.]+)*", 100)}) login_register_params = ParameterSet({'login': ('Register', 8), 'name': (r"[\p{L}\p{M}\p{N}][\p{L}\p{M}\p{N} ]+", 100), 'password': (allowed_password, 64), 'password2': (allowed_password, 64), 'email': (r"[^@]+@[^.]+\.[^.]+(\.[^.]+)*", 100)}) login_forgot_params = ParameterSet({'login': ('Forgot', 6), 'email': (r"[^@]+@[^.]+\.[^.]+(\.[^.]+)*", 100)}) newpassword_params = ParameterSet({'choose': ('Choose', 6), 'password': (allowed_password, 64), 'password2': (allowed_password, 64), 'resetid': (r"[01-9a-f]{32}", 34)}) stats_params = ParameterSet() confirmation_params = ParameterSet({'confirmationid': (r"[01-9a-f]{32}", 34)})
[docs]def max_body(limit): """ A :func:`falcon.before` hook to limit the size of request body. Arguments: limit(int): maximum size in bytes of the request body Raises: :exception:`falcon.HTTPPayloadTooLarge` when the body of the request exceeds `limit` Returns: a hook function """ def hook(req, resp, resource, params): length = req.content_length if length is not None and length > limit: msg = ('The size of the request is too large. The body must not ' 'exceed ' + str(limit) + ' bytes in length.') raise falcon.HTTPPayloadTooLarge( 'Request body is too large', msg) return hook
[docs]class LoginResource: """ Routing endpoint that serves the action of a login form. """
[docs] @falcon.before(max_body(1024)) def on_post(self, req, resp): """ Handle a logon POST request. Arguments: req: the request resp: the response Returns: None The method expects its input as www-formencoded parameters in the request body. On success it will set the `Location` header to :attr:`APPLICATION` and return a session cookie. On failure it will set the `Location` header to :attr:`LOGINSCREEN`. It will always set the response status to :attr:`falcon.HTTP_303`. Other Parameters: email: the username of the user (a valid email address) password: the password login: the literal text `Login` Typically these parameters would correspond to input fields in an HTML form and a submit button with a `name=Login` attribute and send as input parameters in the body of the request. """ logger.info('LoginResource') global DBSession if not login_login_params.check(req.params): logger.info('unauthorized, params do not have a proper format') else: email = req.params['email'] session = DBSession() user = session.query(User).filter(User.email == email).first() if user: logger.info(f"valid user found {user.email}") if checkpassword(req.params['password'], user.password): # self.redirect_header("Login succeeded", APPLICATION) for s in session.query(Session).filter(Session.userid == user.id): session.delete(s) now = datetime.now() softlimit = now + timedelta(minutes=SOFTTIMEOUT) hardlimit = now + timedelta(minutes=HARDTIMEOUT) ns = Session(userid=user.id, id=guid().hex, created=now, softlimit=softlimit, hardlimit=hardlimit) session.add(ns) session.commit() resp.set_cookie('session', ns.id, domain=DOMAIN, path='/', http_only=False) # falcon 3 supports samesite argument in set_cookie but falcon 2 doesn't resp._cookies['session']['samesite'] = 'Lax' logger.success(f'user succesfully authenticated {user.email}') resp.status = falcon.HTTP_303 resp.location = APPLICATION return else: logger.info(f'user authentication failed for known user {user.email}') else: logger.info(f'user authentication failed for unknown user {email}') resp.status = falcon.HTTP_303 resp.location = f'{LOGINSCREEN}?failed'
[docs]class RegisterResource: """ Routing endpoint that serves the action of a registration form. """
[docs] @falcon.before(max_body(1024)) def on_post(self, req, resp): """ Handle a register POST request. Arguments: req: the request resp: the response Returns: None The method expects its input as www-formencoded parameters in the request body. On success it will create a pending user request and send an email with a confirmation link. On failure it will do nothing. It will always set the `Location` header to :attr:`LOGINSCREEN`. It will always set the response status to :attr:`falcon.HTTP_303`. Other Parameters: email: the username of the user (a valid email address) name: the full name of the user password: the password ( 8 >= length <= 64, must contain at lease 1 lowercase, 1 uppercase, 1 digit and 1 special char. password2: must be identical to the password parameter login: the literal text `Register` Typically these parameters would correspond to input fields in an HTML form and a submit button with a `name=Register` attribute and send as input parameters in the body of the request. """ logger.info('RegisterResource') global DBSession session = DBSession() resp.status = "303 Registration pending confirmation, email sent to email address" resp.location = f"{LOGINSCREEN}?pending" if not login_register_params.check(req.params): logger.info('unauthorized, login register params do not have proper format') else: params = req.params # TODO lowercase email everywhere email = params['email'] user = session.query(User).filter(User.email == email).first() # We always return the same response (and redirect) no matter # whether the email is in use or not or if anything else is wrong # this is to prevent indexing, i.e. checking which email addresses are in use if params['password'] != params['password2']: logger.info("passwords are not identical") elif user: logger.info(f'email already in use {user.email}') else: user = session.query(PendingUser).filter(PendingUser.email == email).first() if user: # we delete previous pending registration to prevent database overfilling logger.info('previous registration not yet confirmed') session.delete(user) session.commit() else: logger.info('first registration') pu = PendingUser(id=guid().hex, email=email, password=newpassword(params['password']), name=params['name']) session.add(pu) session.commit() # TODO limit number of emails sent to same address user = session.query(PendingUser).filter(PendingUser.email == email).first() logger.info(f"sending confirmation mail to {user.email} ({user.name})") logger.info(f"confirmation id: {pu.id}") u, p, s = fetch_smtp_params() logger.info(f"mailer {u}@{s} (password not shown ...)") if mail(EMAILTEMPLATE_REGISTER.format(name=user.name, website=WEBSITE, link=f"{CONFIRMREGISTRATION}?confirmationid={pu.id}"), "Confirm your registration", fromaddr=u, toaddr=user.email, smtp=s, username=u, password=p): logger.success('mail successfully sent') else: logger.error('mail not sent')
[docs]class ForgotPasswordResource: """ Routing endpoint that serves the action of a forgot password form. """
[docs] @falcon.before(max_body(1024)) def on_post(self, req, resp): """ Handle a forgotpassword POST request. Arguments: req: the request resp: the response Returns: None The method expects its input as www-formencoded parameters in the request body. On success it will create a password reset request and send an email with a confirmation link. On failure it will do nothing. It will always set the `Location` header to :attr:`LOGINSCREEN`. It will always set the response status to ;attr:`falcon.HTTP_303`. Other Parameters: email: the username of the user (a valid email address) login: the literal text `Forgot` Typically these parameters would correspond to input fields in an HTML form and a submit button with a `name=Forgot` attribute and send as input parameters in the body of the request. """ logger.info('ForgotPasswordResource') global DBSession session = DBSession() # we always send the same response, no matter if the user exists or not resp.status = falcon.HTTP_303 resp.location = f"{LOGINSCREEN}?checkemail" params = req.params if not login_forgot_params.check(params): logger.info('unauthorized, login forgot params do not have proper format') else: email = params['email'] user = session.query(User).filter(User.email == email).first() user = session.query(User).filter(User.email == email).first() if not user: # no user found but we are not providing this information logger.info(f"no user found {email}") else: logger.info(f"password reset request received for existing user {email}") pr = PasswordReset(id=guid().hex, userid=user.id) session.add(pr) session.commit() logger.info(f"sending confirmation mail to {user.email} ({user.name})") logger.info(f"reset confirmation id: {pr.id}") u, p, s = fetch_smtp_params() if mail(EMAILTEMPLATE_FORGOTPASSWORD.format(name=user.name, website=WEBSITE, link=f"{RESETPASSWORD}?confirmationid={pr.id}"), "Password change request", fromaddr=u, toaddr=user.email, smtp=s, username=u, password=p): logger.success('mail successfully sent') else: logger.error('mail not sent')
[docs]class VerifySessionResource: """ Routing endpoint that serves as the internal endpoint to verify the existence of a valid session. """
[docs] @falcon.before(max_body(1024)) def on_post(self, req, resp): """ Handle a verifysession POST request. Arguments: req: the request resp: the response Returns: On success the response body will contain the following key=value pairs separated by newline characters: - email(str) - id(int) - name(str) - superuser(bool) The method expects its input as www-formencoded parameters in the request body. On success it will the session data and set the response status to :attr:`falcon.HTTP_200`. On failure it will do nothing and return a response status of :attr:`falcon.HTTP_404`. Other Parameters: email: the username of the user (a valid email address) login: the literal text `Forgot` This method should not be called from the browser. It is typically called by other backend servers to verify the validity of a session and get the email address, name and superuser status of that session. It does not accept requests that have an `X-Forwarded-Host` header defined, so if the authserver and the backend server are positioned behind a reverse proxy that adds these headers (like traefik) all things will be fine. """ logger.info('VerifySessionResource') global DBSession session = DBSession() resp.status = falcon.HTTP_404 # we only allow sessions to be verified by apps running on the same # network, i.e. they should not have any X- headers present if (h := req.get_header('X-Forwarded-Host')) is not None: logger.info(f'unauthorized, verifysession called from outside {h}') # verify that incoming parameters are what we expect elif not verifysession_params.check(req.params): logger.info('unauthorized, sessionid does not have proper format') # if the session is known, compose the response with user data elif (info := self.session_active(session, req.params['sessionid'])) is not None: resp.body = info resp.status = falcon.HTTP_200 else: logger.info(f"unauthorized, no valid session found: {req.params['sessionid']}") # clean up stale sessions session.commit() for s in session.query(Session).filter(Session.hardlimit <= datetime.now()): logger.info(f'deleting session {s.id} for {s.user.email}') session.delete(s)
[docs] def session_active(self, session, sessionid): """ Check whether a sessionid represents an active session. A session is active - if the sessionid exists - its :attr:`Session.softlimit` is in the future. If a session is found to be valid, its :attr:`Session.softlimit` is set to :attr:`SOFTTIMEOUT` minutes from now. Arguments: session(DBSession): SQLAlchemy session sessionid(str): a sessionid Returns: On success: a bytes object with a newline separated list of key=value pairs On failure: None """ now = datetime.now() for s in session.query(Session).filter(Session.id == sessionid, now < Session.hardlimit): if now < s.softlimit: logger.success(f'active session found: {sessionid} {s.user.email}') logger.info(f'email={s.user.email} id={s.user.id} name={s.user.name} superuser={s.user.superuser}') s.softlimit = now + timedelta(minutes=SOFTTIMEOUT) logger.debug(s.softlimit) return bytes(f'email={s.user.email}\nid={s.user.id}\nname={s.user.name}\nsuperuser={s.user.superuser}', 'utf-8') return None
[docs]class LogoutResource: """ Routing endpoint that serves the action of a logout form. """
[docs] @falcon.before(max_body(1024)) def on_post(self, req, resp): """ Handle a logout POST request. Arguments: req: the request resp: the response Returns: None The method expects no input parameters in the request body. A valid sessionid cookie should be present. On success it will remove the session. On failure it will do nothing. It will always set the `Location` header to :attr:`LOGINSCREEN`. It will always set the response status to ;attr:`falcon.HTTP_303`. """ logger.info('LogoutResource') global DBSession session = DBSession() resp.status = falcon.HTTP_404 resp.location = LOGINSCREEN if not logout_params.check(req.params): logger.info('bad request: logout request should not have parameters') elif cv := req.get_cookie_values('session'): cookie = cv[0] logger.info(cookie) for s in session.query(Session).filter(Session.id == cookie): logger.debug(f"deleting session {s.id} for user {s.user.email}") logger.success(f'logout authorized for user {s.user.email}') session.delete(s) session.commit() resp.status = falcon.HTTP_303 return logger.info('logout unauthorized, session not found or no session cookie present')
[docs]class ChoosePasswordResource: """ Routing endpoint that serves the action of a choose new password form. """
[docs] @falcon.before(max_body(1024)) def on_post(self, req, resp): """ Handle a choose password POST request. Arguments: req: the request resp: the response Returns: None The method expects its input as www-formencoded parameters in the request body. It also On success it will change the password of the user. On failure it will do nothing. It will always set the `Location` header to :attr:`LOGINSCREEN`. It will always set the response status to :attr:`falcon.HTTP_303`. Other Parameters: resetid: an id that should be present in the :class:`PendingUser` table password: the password ( 8 >= length <= 64, must contain at lease 1 lowercase, 1 uppercase, 1 digit and 1 special char. password2: must be identical to the password parameter choose: the literal text `Choose` Typically these parameters would correspond to input fields in an HTML form and a submit button with a `choose=Choose` attribute and send as input parameters in the body of the request. """ logger.info('ChoosePasswordResource') global DBSession session = DBSession() resp.status = falcon.HTTP_404 resp.location = LOGINSCREEN if not newpassword_params.check(req.params): logger.info('bad request: malformed parameters') else: params = req.params if params['password'] != params['password2']: logger.info("passwords are not identical") else: for resetuser in session.query(PasswordReset).filter(PasswordReset.id == params['resetid']): logger.success(f'password reset for user {resetuser.user.email}') user = resetuser.user user.password = newpassword(params['password']) resp.status = falcon.HTTP_303 resp.location = f"{LOGINSCREEN}?resetsuccessful" session.delete(resetuser) session.commit() return logger.info('resetid not found or expired')
[docs]class StatsResource: """ Routing endpoint that serves as the REST endpoint for user information overviews. It can return information in JSON format on :class:`User`, :class:`PendingUser`, :class:`PasswordReset` and :class:`Session`. Access is restricted to logged in users woth the superuser role. """
[docs] @falcon.before(max_body(0)) def on_post(self, req, resp, item): """ Handle a stats/{item} POST request, typically one initialed by an AJAX call. Arguments: req: the request resp: the response item(str): the kind of item for which a list should be returned Returns: None Even though this is a POST handler, it should have no input parameters in the body. The items that can be requested are: - `users`: to return a JSON encoded list of :class:`User` objects - `sessions`: to return a JSON encoded list of :class:`Session` objects - `pendingusers`: to return a JSON encoded list of :class:`PendingUser` objects - `passwordreset`: to return a JSON encoded list of :class:`PasswordReset` objects On success the response body will contain a bytes object that is JSON data (in UTF-8 encoding). Note that any password attributes will be stripped form the output. Example: JSON data is returned an object with a data attribute:: { "data": [ {"id": 165675, "email": "abc@example.org", ...}, {"id": 365625, "email": "def@example.org", ...}, ... ] } """ logger.info('StatsResource') itemmap = {'users': User, 'sessions': Session, 'pendingusers': PendingUser, 'passwordresets': PasswordReset} global DBSession session = DBSession() resp.status = falcon.HTTP_404 resp.location = LOGINSCREEN params = req.params if not stats_params.check(params) or item not in itemmap: logger.info('unauthorized, stats params do not have proper format') elif cv := req.get_cookie_values('session'): cookie = cv[0] logger.info(cookie) for s in session.query(Session).filter(Session.id == cookie): if s.user.superuser: logger.success(f'/stats/{item} authorized for user {s.user.email}') resp.status = falcon.HTTP_200 ob = itemmap[item] users = json.dumps([u for u in session.query(ob)], default=alchemyencoder) logger.debug(users) resp.body = bytes(f'{{"data": {users}}}', 'utf-8') return logger.info(f'unauthorized, {s.user.email} (s.user.name) is not a superuser ({s.user.superuser})') return logger.info(f"unauthorized, no valid session found: {self.cookie['session'].value}") else: logger.info('unauthorized, no session cookie provided')
[docs]class ConfirmRegistrationResource: """ Routing endpoint that serves the registration confirmation link. """
[docs] @falcon.before(max_body(0)) def on_get(self, req, resp): """ Arguments: req: the request resp: the response Returns: None The method expects its input as query parameters in the url. On success it will create a :class:`User` from the corresponding :class:`PendingUser`. On failure it will do nothing. It will always set the `Location` header to :attr:`LOGINSCREEN`. It will always set the response status to :attr:`falcon.HTTP_303`. Other Parameters: confirmationid: an id that should be present in the :class:`PendingUser` table """ logger.info('ConfirmRegistrationResource') global DBSession session = DBSession() resp.status = falcon.HTTP_404 resp.location = LOGINSCREEN if not confirmation_params.check(req.params): logger.info('confirmregistration parameters not ok') else: # TODO check for expiration and remove expired entries resp.status = falcon.HTTP_303 confirmationid = req.params['confirmationid'] user = session.query(PendingUser).filter(PendingUser.id == confirmationid).first() if user: logger.success(f'confirmregistration succeeded for {user.email} ({user.name})') # copy user to User table ns = User(email=user.email, password=user.password, name=user.name) session.add(ns) session.commit() # redirect to login page resp.location = f"{LOGINSCREEN}?confirmed" else: # no pending confirmation or expired, redirect to login page logger.info(f'confirmregistration link expired or not present {confirmationid}') resp.location = f"{LOGINSCREEN}?expired"
[docs]class ConfirmForgotPasswordResource: """ Routing endpoint that serves the password reset confirmation link. """
[docs] @falcon.before(max_body(0)) def on_get(self, req, resp): """ Arguments: req: the request resp: the response Returns: None The method expects its input as query parameters in the url. On success it will create a :class:`User` from the corresponding :class:`PendingUser`. On failure it will do nothing. It will always set the `Location` header to :attr:`LOGINSCREEN`. If a pending request was found, `?choosepassword=id` will be appended to the url. If not `?expired` will be added. It will always set the response status to :attr:`falcon.HTTP_303`. Other Parameters: confirmationid: an id that should be present in the :class:`PasswordReset` table """ logger.info('ConfirmForgotPasswordResource') global DBSession session = DBSession() resp.status = falcon.HTTP_404 resp.location = LOGINSCREEN if not confirmation_params.check(req.params): logger.info('resetpassword parameters not ok') else: resp.status = falcon.HTTP_303 confirmationid = req.params['confirmationid'] # TODO remove PasswordReset after successful confirm if pr := session.query(PasswordReset).filter(PasswordReset.id == confirmationid).first(): logger.success('resetpassword confirmation successful') resp.location = f"{LOGINSCREEN}?choosepassword={pr.id}" else: # no pending confirmation or expired, redirect to login page logger.info('resetpassword link not present or expired') resp.location = f"{LOGINSCREEN}?expired"
[docs]def fetch_admin_params(): """ Get admin variables from file or environment. Enviroment variables overrule variables in files. Returns: tuple(admin_user, admin_password) Module level attributes referenced: - :attr:`ADMIN_USER_FILE` filename of file containing super user username (valid email address) - :attr:`ADMIN_USER` username (valid email address) of super user, will override ADMIN_USER_FILE - :attr:`ADMIN_PASSWORD_FILE` filename of file containing super user password in plaintext - :attr:`ADMIN_PASSWORD` super user password in plaintext, will override ADMIN_PASSWORD_FILE """ env = {} for var in ('ADMIN_USER', 'ADMIN_PASSWORD'): if var in os.environ and os.environ[var].strip() != '': env[var] = os.environ[var] else: varf = var + '_FILE' if varf in os.environ: with open(os.environ[varf]) as f: env[var] = f.read().strip() else: raise KeyError(f'{var} and {varf} not defined in environment') return env['ADMIN_USER'], env['ADMIN_PASSWORD']
[docs]def add_superuser(): """ Add superuser account to :class:`User` table. Will remove any user account with the same name along with any associated session. """ global DBSession username, password = fetch_admin_params() session = DBSession() for s in session.query(User).filter(User.email == username): logger.info(f"deleting user {s.email}") session.delete(s) session.commit() ns = User(email=username, password=newpassword(password), name='Administrator', superuser=True) session.add(ns) logger.info(f"adding admin user {ns.email}") session.commit() for s in session.query(Session): logger.info(f"{s.id} {s.created} {s.userid}") session.close() return True
[docs]@event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): """ Enable cascading constraint on the SQLAlchemy tables. See: https://docs.sqlalchemy.org/en/13/dialects/sqlite.html#foreign-key-support """ cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close()
[docs]def get_sessionmaker(connection, timeout, retries): """ Create and initialize a global :class:`sqlalchemy.orm.sessionmaker`. Arguments: connection(str): a sqlite connection string timeout(int): number of seconds to wait between connection retries. doubles with evert attempt. retries(int): number of times to retry a connection to the database. Returns: bool The :class:`sqlalchemy.orm.sessionmaker` object is stored in the global :attr:`DBSession`. """ global DBSession # this does not open a connection (yet), that will happen on create_all db_engine = create_engine(connection, pool_pre_ping=True) # we try to connect to the database several times waited = 0 for i in range(1, retries): try: Base.metadata.create_all(db_engine) break except Exception as e: logger.exception(e) logger.info(f"Database connection refused trial {i}/{retries}, now waiting {timeout} seconds ...") sleep(timeout) waited += timeout timeout *= 2 continue else: logger.critical(f"No database connections after {retries} tries ({waited} seconds)") return False DBSession = sessionmaker(bind=db_engine) return True