1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-06-26 16:45:52 +00:00

implement htpasswd file caching

This commit is contained in:
Peter Bieringer 2024-12-31 16:14:38 +01:00
parent 5ce0cee8bf
commit 2489356dda

View file

@ -48,8 +48,11 @@ When bcrypt is installed:
""" """
import os
import time
import functools import functools
import hmac import hmac
import threading
from typing import Any from typing import Any
from passlib.hash import apr_md5_crypt, sha256_crypt, sha512_crypt from passlib.hash import apr_md5_crypt, sha256_crypt, sha512_crypt
@ -61,15 +64,26 @@ class Auth(auth.BaseAuth):
_filename: str _filename: str
_encoding: str _encoding: str
_htpasswd: dict # login -> digest
_htpasswd_mtime_ns: int
_htpasswd_size: bytes
_htpasswd_ok: bool
_htpasswd_not_ok_seconds: int
_htpasswd_not_ok_reminder_seconds: int
_lock: threading.Lock
def __init__(self, configuration: config.Configuration) -> None: def __init__(self, configuration: config.Configuration) -> None:
super().__init__(configuration) super().__init__(configuration)
self._filename = configuration.get("auth", "htpasswd_filename") self._filename = configuration.get("auth", "htpasswd_filename")
self._encoding = configuration.get("encoding", "stock") self._encoding = configuration.get("encoding", "stock")
encryption: str = configuration.get("auth", "htpasswd_encryption") encryption: str = configuration.get("auth", "htpasswd_encryption")
logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s'", encryption) logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s'", encryption)
self._htpasswd_ok = False
self._htpasswd_not_ok_reminder_seconds = 60 # currently hardcoded
self._htpasswd_read = self._read_htpasswd(True)
self._lock = threading.Lock()
if encryption == "plain": if encryption == "plain":
self._verify = self._plain self._verify = self._plain
elif encryption == "md5": elif encryption == "md5":
@ -127,6 +141,68 @@ class Auth(auth.BaseAuth):
# assumed plaintext # assumed plaintext
return self._plain(hash_value, password) return self._plain(hash_value, password)
def _read_htpasswd(self, init: bool) -> bool:
"""Read htpasswd file
init == True: stop on error
init == False: warn/skip on error and set mark to log reminder every interval
"""
htpasswd_ok = True
if init is True:
info = "Read"
else:
info = "Re-read"
logger.info("%s content of htpasswd file start: %r", info, self._filename)
htpasswd = dict()
try:
with open(self._filename, encoding=self._encoding) as f:
line_num = 0
entries = 0
duplicates = 0
for line in f:
line_num += 1
line = line.rstrip("\n")
if line.lstrip() and not line.lstrip().startswith("#"):
try:
login, digest = line.split( ":", maxsplit=1)
if login == "" or digest == "":
if init is True:
raise ValueError("htpasswd file contains problematic line not matching <login>:<digest> in line: %d" % line_num)
else:
logger.warning("htpasswd file contains problematic line not matching <login>:<digest> in line: %d (ignored)", line_num)
htpasswd_ok = False
else:
if htpasswd.get(login):
duplicates += 1
if init is True:
raise ValueError("htpasswd file contains duplicate login: '%s'", login, line_num)
else:
logger.warning("htpasswd file contains duplicate login: '%s' (line: %d / ignored)", login, line_num)
htpasswd_ok = False
else:
htpasswd[login] = digest
entries += 1
except ValueError as e:
if init is True:
raise RuntimeError("Invalid htpasswd file %r: %s" % (self._filename, e)) from e
except OSError as e:
if init is True:
raise RuntimeError("Failed to load htpasswd file %r: %s" % (self._filename, e)) from e
else:
logger.warning("Failed to load htpasswd file on re-read: %r" % (self._filename, e))
htpasswd_ok = False
else:
self._htpasswd_size = os.stat(self._filename).st_size
self._htpasswd_time_ns = os.stat(self._filename).st_mtime_ns
self._htpasswd = htpasswd
logger.info("%s content of htpasswd file done: %r (entries: %d, duplicates: %d)", info, self._filename, entries, duplicates)
if htpasswd_ok is True:
self._htpasswd_not_ok_time = 0
else:
self._htpasswd_not_ok_time = time.time()
return htpasswd_ok
def _login(self, login: str, password: str) -> str: def _login(self, login: str, password: str) -> str:
"""Validate credentials. """Validate credentials.
@ -134,33 +210,31 @@ class Auth(auth.BaseAuth):
hash (encrypted password) and check hash against password, hash (encrypted password) and check hash against password,
using the method specified in the Radicale config. using the method specified in the Radicale config.
The content of the file is not cached because reading is generally a The content of the file is cached and live updates will be detected by
very cheap operation, and it's useful to get live updates of the comparing mtime_ns and size
htpasswd file.
""" """
try: # check and re-read file if required
with open(self._filename, encoding=self._encoding) as f: htpasswd_size = os.stat(self._filename).st_size
for line in f: htpasswd_time_ns = os.stat(self._filename).st_mtime_ns
line = line.rstrip("\n") if (htpasswd_size != self._htpasswd_size) or (htpasswd_time_ns != self._htpasswd_time_ns):
if line.lstrip() and not line.lstrip().startswith("#"): with self._lock:
try: self._htpasswd_ok = self._read_htpasswd(False)
hash_login, hash_value = line.split( else:
":", maxsplit=1) # log reminder of problemantic file every interval
# Always compare both login and password to avoid if (self._htpasswd_ok is False) and (self._htpasswd_not_ok_time > 0):
# timing attacks, see #591. current_time = time.time()
login_ok = hmac.compare_digest( if (current_time - self._htpasswd_not_ok_time) > self._htpasswd_not_ok_reminder_seconds:
hash_login.encode(), login.encode()) logger.warning("htpasswd file still contains issues (REMINDER, check warnings in the past): %r" % self._filename)
(method, password_ok) = self._verify(hash_value, password) self._htpasswd_not_ok_time = current_time
if login_ok and password_ok: if self._htpasswd.get(login):
logger.debug("Password verification for user '%s' with method '%s': password_ok=%s", login, method, password_ok) digest = self._htpasswd[login]
return login (method, password_ok) = self._verify(digest, password)
elif login_ok: logger.debug("Login verification successful for user: '%s' (method '%s')", login, method)
logger.debug("Password verification for user '%s' with method '%s': password_ok=%s", login, method, password_ok) if password_ok:
except ValueError as e: return login
raise RuntimeError("Invalid htpasswd file %r: %s" % else:
(self._filename, e)) from e logger.debug("Login verification failed for user: '%s' ( method '%s')", login, method)
except OSError as e: else:
raise RuntimeError("Failed to load htpasswd file %r: %s" % logger.debug("Login verification user not found: '%s'", login)
(self._filename, e)) from e
return "" return ""