mirror of
https://github.com/Kozea/Radicale.git
synced 2025-08-04 18:22:26 +00:00
Merge pull request #1772 from pbiering/add-argon2
Add argon2 password hash support
This commit is contained in:
commit
2c0c0a7e50
5 changed files with 83 additions and 11 deletions
|
@ -38,8 +38,9 @@ dependencies = [
|
||||||
|
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
test = ["pytest>=7", "waitress", "bcrypt"]
|
test = ["pytest>=7", "waitress", "bcrypt", "argon2-cffi"]
|
||||||
bcrypt = ["bcrypt"]
|
bcrypt = ["bcrypt"]
|
||||||
|
argon2 = ["argon2-cffi"]
|
||||||
ldap = ["ldap3"]
|
ldap = ["ldap3"]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
|
|
|
@ -46,6 +46,9 @@ out-of-the-box:
|
||||||
When bcrypt is installed:
|
When bcrypt is installed:
|
||||||
- BCRYPT (htpasswd -B ...) -- Requires htpasswd 2.4.x
|
- BCRYPT (htpasswd -B ...) -- Requires htpasswd 2.4.x
|
||||||
|
|
||||||
|
When argon2 is installed:
|
||||||
|
- ARGON2 (python -c 'from passlib.hash import argon2; print(argon2.using(type="ID").hash("password"))')
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import functools
|
import functools
|
||||||
|
@ -72,8 +75,10 @@ class Auth(auth.BaseAuth):
|
||||||
_htpasswd_not_ok_time: float
|
_htpasswd_not_ok_time: float
|
||||||
_htpasswd_not_ok_reminder_seconds: int
|
_htpasswd_not_ok_reminder_seconds: int
|
||||||
_htpasswd_bcrypt_use: int
|
_htpasswd_bcrypt_use: int
|
||||||
|
_htpasswd_argon2_use: int
|
||||||
_htpasswd_cache: bool
|
_htpasswd_cache: bool
|
||||||
_has_bcrypt: bool
|
_has_bcrypt: bool
|
||||||
|
_has_argon2: bool
|
||||||
_encryption: str
|
_encryption: str
|
||||||
_lock: threading.Lock
|
_lock: threading.Lock
|
||||||
|
|
||||||
|
@ -89,9 +94,10 @@ class Auth(auth.BaseAuth):
|
||||||
logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s'", self._encryption)
|
logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s'", self._encryption)
|
||||||
|
|
||||||
self._has_bcrypt = False
|
self._has_bcrypt = False
|
||||||
|
self._has_argon2 = False
|
||||||
self._htpasswd_ok = False
|
self._htpasswd_ok = False
|
||||||
self._htpasswd_not_ok_reminder_seconds = 60 # currently hardcoded
|
self._htpasswd_not_ok_reminder_seconds = 60 # currently hardcoded
|
||||||
(self._htpasswd_ok, self._htpasswd_bcrypt_use, self._htpasswd, self._htpasswd_size, self._htpasswd_mtime_ns) = self._read_htpasswd(True, False)
|
(self._htpasswd_ok, self._htpasswd_bcrypt_use, self._htpasswd_argon2_use, self._htpasswd, self._htpasswd_size, self._htpasswd_mtime_ns) = self._read_htpasswd(True, False)
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
if self._encryption == "plain":
|
if self._encryption == "plain":
|
||||||
|
@ -102,7 +108,8 @@ class Auth(auth.BaseAuth):
|
||||||
self._verify = self._sha256
|
self._verify = self._sha256
|
||||||
elif self._encryption == "sha512":
|
elif self._encryption == "sha512":
|
||||||
self._verify = self._sha512
|
self._verify = self._sha512
|
||||||
elif self._encryption == "bcrypt" or self._encryption == "autodetect":
|
|
||||||
|
if self._encryption == "bcrypt" or self._encryption == "autodetect":
|
||||||
try:
|
try:
|
||||||
import bcrypt
|
import bcrypt
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
|
@ -125,7 +132,33 @@ class Auth(auth.BaseAuth):
|
||||||
self._verify = self._autodetect
|
self._verify = self._autodetect
|
||||||
if self._htpasswd_bcrypt_use:
|
if self._htpasswd_bcrypt_use:
|
||||||
self._verify_bcrypt = functools.partial(self._bcrypt, bcrypt)
|
self._verify_bcrypt = functools.partial(self._bcrypt, bcrypt)
|
||||||
|
|
||||||
|
if self._encryption == "argon2" or self._encryption == "autodetect":
|
||||||
|
try:
|
||||||
|
import argon2
|
||||||
|
from passlib.hash import argon2 # noqa: F811
|
||||||
|
except ImportError as e:
|
||||||
|
if (self._encryption == "autodetect") and (self._htpasswd_argon2_use == 0):
|
||||||
|
logger.warning("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s' which can require argon2 module, but currently no entries found", self._encryption)
|
||||||
else:
|
else:
|
||||||
|
raise RuntimeError(
|
||||||
|
"The htpasswd encryption method 'argon2' or 'autodetect' requires "
|
||||||
|
"the argon2 module (entries found: %d)." % self._htpasswd_argon2_use) from e
|
||||||
|
else:
|
||||||
|
self._has_argon2 = True
|
||||||
|
if self._encryption == "autodetect":
|
||||||
|
if self._htpasswd_argon2_use == 0:
|
||||||
|
logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s' and argon2 module found, but currently not required", self._encryption)
|
||||||
|
else:
|
||||||
|
logger.info("auth htpasswd encryption is 'radicale.auth.htpasswd_encryption.%s' and argon2 module found (argon2 entries found: %d)", self._encryption, self._htpasswd_argon2_use)
|
||||||
|
if self._encryption == "argon2":
|
||||||
|
self._verify = functools.partial(self._argon2, argon2)
|
||||||
|
else:
|
||||||
|
self._verify = self._autodetect
|
||||||
|
if self._htpasswd_argon2_use:
|
||||||
|
self._verify_argon2 = functools.partial(self._argon2, argon2)
|
||||||
|
|
||||||
|
if not hasattr(self, '_verify'):
|
||||||
raise RuntimeError("The htpasswd encryption method %r is not "
|
raise RuntimeError("The htpasswd encryption method %r is not "
|
||||||
"supported." % self._encryption)
|
"supported." % self._encryption)
|
||||||
|
|
||||||
|
@ -144,6 +177,9 @@ class Auth(auth.BaseAuth):
|
||||||
else:
|
else:
|
||||||
return ("BCRYPT", bcrypt.checkpw(password=password.encode('utf-8'), hashed_password=hash_value.encode()))
|
return ("BCRYPT", bcrypt.checkpw(password=password.encode('utf-8'), hashed_password=hash_value.encode()))
|
||||||
|
|
||||||
|
def _argon2(self, argon2: Any, hash_value: str, password: str) -> tuple[str, bool]:
|
||||||
|
return ("ARGON2", argon2.verify(password, hash_value.strip()))
|
||||||
|
|
||||||
def _md5apr1(self, hash_value: str, password: str) -> tuple[str, bool]:
|
def _md5apr1(self, hash_value: str, password: str) -> tuple[str, bool]:
|
||||||
if self._encryption == "autodetect" and len(hash_value) != 37:
|
if self._encryption == "autodetect" and len(hash_value) != 37:
|
||||||
return self._plain_fallback("MD5-APR1", hash_value, password)
|
return self._plain_fallback("MD5-APR1", hash_value, password)
|
||||||
|
@ -169,6 +205,9 @@ class Auth(auth.BaseAuth):
|
||||||
elif re.match(r"^\$2(a|b|x|y)?\$", hash_value):
|
elif re.match(r"^\$2(a|b|x|y)?\$", hash_value):
|
||||||
# BCRYPT
|
# BCRYPT
|
||||||
return self._verify_bcrypt(hash_value, password)
|
return self._verify_bcrypt(hash_value, password)
|
||||||
|
elif re.match(r"^\$argon2(i|d|id)\$", hash_value):
|
||||||
|
# ARGON2
|
||||||
|
return self._verify_argon2(hash_value, password)
|
||||||
elif hash_value.startswith("$5$", 0, 3):
|
elif hash_value.startswith("$5$", 0, 3):
|
||||||
# SHA-256
|
# SHA-256
|
||||||
return self._sha256(hash_value, password)
|
return self._sha256(hash_value, password)
|
||||||
|
@ -178,7 +217,7 @@ class Auth(auth.BaseAuth):
|
||||||
else:
|
else:
|
||||||
return self._plain(hash_value, password)
|
return self._plain(hash_value, password)
|
||||||
|
|
||||||
def _read_htpasswd(self, init: bool, suppress: bool) -> Tuple[bool, int, dict, int, int]:
|
def _read_htpasswd(self, init: bool, suppress: bool) -> Tuple[bool, int, int, dict, int, int]:
|
||||||
"""Read htpasswd file
|
"""Read htpasswd file
|
||||||
|
|
||||||
init == True: stop on error
|
init == True: stop on error
|
||||||
|
@ -189,6 +228,7 @@ class Auth(auth.BaseAuth):
|
||||||
"""
|
"""
|
||||||
htpasswd_ok = True
|
htpasswd_ok = True
|
||||||
bcrypt_use = 0
|
bcrypt_use = 0
|
||||||
|
argon2_use = 0
|
||||||
if (init is True) or (suppress is True):
|
if (init is True) or (suppress is True):
|
||||||
info = "Read"
|
info = "Read"
|
||||||
else:
|
else:
|
||||||
|
@ -237,6 +277,14 @@ class Auth(auth.BaseAuth):
|
||||||
logger.warning("htpasswd file contains bcrypt digest login: '%s' (line: %d / ignored because module is not loaded)", login, line_num)
|
logger.warning("htpasswd file contains bcrypt digest login: '%s' (line: %d / ignored because module is not loaded)", login, line_num)
|
||||||
skip = True
|
skip = True
|
||||||
htpasswd_ok = False
|
htpasswd_ok = False
|
||||||
|
if re.match(r"^\$argon2(i|d|id)\$", digest):
|
||||||
|
if init is True:
|
||||||
|
argon2_use += 1
|
||||||
|
else:
|
||||||
|
if self._has_argon2 is False:
|
||||||
|
logger.warning("htpasswd file contains argon2 digest login: '%s' (line: %d / ignored because module is not loaded)", login, line_num)
|
||||||
|
skip = True
|
||||||
|
htpasswd_ok = False
|
||||||
if skip is False:
|
if skip is False:
|
||||||
htpasswd[login] = digest
|
htpasswd[login] = digest
|
||||||
entries += 1
|
entries += 1
|
||||||
|
@ -259,7 +307,7 @@ class Auth(auth.BaseAuth):
|
||||||
self._htpasswd_not_ok_time = 0
|
self._htpasswd_not_ok_time = 0
|
||||||
else:
|
else:
|
||||||
self._htpasswd_not_ok_time = time.time()
|
self._htpasswd_not_ok_time = time.time()
|
||||||
return (htpasswd_ok, bcrypt_use, htpasswd, htpasswd_size, htpasswd_mtime_ns)
|
return (htpasswd_ok, bcrypt_use, argon2_use, htpasswd, htpasswd_size, htpasswd_mtime_ns)
|
||||||
|
|
||||||
def _login(self, login: str, password: str) -> str:
|
def _login(self, login: str, password: str) -> str:
|
||||||
"""Validate credentials.
|
"""Validate credentials.
|
||||||
|
@ -280,7 +328,7 @@ class Auth(auth.BaseAuth):
|
||||||
htpasswd_size = os.stat(self._filename).st_size
|
htpasswd_size = os.stat(self._filename).st_size
|
||||||
htpasswd_mtime_ns = os.stat(self._filename).st_mtime_ns
|
htpasswd_mtime_ns = os.stat(self._filename).st_mtime_ns
|
||||||
if (htpasswd_size != self._htpasswd_size) or (htpasswd_mtime_ns != self._htpasswd_mtime_ns):
|
if (htpasswd_size != self._htpasswd_size) or (htpasswd_mtime_ns != self._htpasswd_mtime_ns):
|
||||||
(self._htpasswd_ok, self._htpasswd_bcrypt_use, self._htpasswd, self._htpasswd_size, self._htpasswd_mtime_ns) = self._read_htpasswd(False, False)
|
(self._htpasswd_ok, self._htpasswd_bcrypt_use, self._htpasswd_argon2_use, self._htpasswd, self._htpasswd_size, self._htpasswd_mtime_ns) = self._read_htpasswd(False, False)
|
||||||
self._htpasswd_not_ok_time = 0
|
self._htpasswd_not_ok_time = 0
|
||||||
|
|
||||||
# log reminder of problemantic file every interval
|
# log reminder of problemantic file every interval
|
||||||
|
@ -298,7 +346,7 @@ class Auth(auth.BaseAuth):
|
||||||
login_ok = True
|
login_ok = True
|
||||||
else:
|
else:
|
||||||
# read file on every request
|
# read file on every request
|
||||||
(htpasswd_ok, htpasswd_bcrypt_use, htpasswd, htpasswd_size, htpasswd_mtime_ns) = self._read_htpasswd(False, True)
|
(htpasswd_ok, htpasswd_bcrypt_use, htpasswd_argon2_use, htpasswd, htpasswd_size, htpasswd_mtime_ns) = self._read_htpasswd(False, True)
|
||||||
if htpasswd.get(login):
|
if htpasswd.get(login):
|
||||||
digest = htpasswd[login]
|
digest = htpasswd[login]
|
||||||
login_ok = True
|
login_ok = True
|
||||||
|
@ -307,7 +355,7 @@ class Auth(auth.BaseAuth):
|
||||||
try:
|
try:
|
||||||
(method, password_ok) = self._verify(digest, password)
|
(method, password_ok) = self._verify(digest, password)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
logger.error("Login verification failed for user: '%s' (htpasswd/%s) with errror '%s'", login, self._encryption, e)
|
logger.error("Login verification failed for user: '%s' (htpasswd/%s) with error '%s'", login, self._encryption, e)
|
||||||
return ""
|
return ""
|
||||||
if password_ok:
|
if password_ok:
|
||||||
logger.debug("Login verification successful for user: '%s' (htpasswd/%s/%s)", login, self._encryption, method)
|
logger.debug("Login verification successful for user: '%s' (htpasswd/%s/%s)", login, self._encryption, method)
|
||||||
|
|
|
@ -49,6 +49,15 @@ class TestBaseAuthRequests(BaseTest):
|
||||||
else:
|
else:
|
||||||
has_bcrypt = 1
|
has_bcrypt = 1
|
||||||
|
|
||||||
|
# test for available argon2 module
|
||||||
|
try:
|
||||||
|
import argon2
|
||||||
|
from passlib.hash import argon2 # noqa: F811
|
||||||
|
except ImportError:
|
||||||
|
has_argon2 = 0
|
||||||
|
else:
|
||||||
|
has_argon2 = 1
|
||||||
|
|
||||||
def _test_htpasswd(self, htpasswd_encryption: str, htpasswd_content: str,
|
def _test_htpasswd(self, htpasswd_encryption: str, htpasswd_content: str,
|
||||||
test_matrix: Union[str, Iterable[Tuple[str, str, bool]]]
|
test_matrix: Union[str, Iterable[Tuple[str, str, bool]]]
|
||||||
= "ascii") -> None:
|
= "ascii") -> None:
|
||||||
|
@ -147,6 +156,18 @@ class TestBaseAuthRequests(BaseTest):
|
||||||
def test_htpasswd_bcrypt_unicode(self) -> None:
|
def test_htpasswd_bcrypt_unicode(self) -> None:
|
||||||
self._test_htpasswd("bcrypt", "😀:$2y$10$Oyz5aHV4MD9eQJbk6GPemOs4T6edK6U9Sqlzr.W1mMVCS8wJUftnW", "unicode")
|
self._test_htpasswd("bcrypt", "😀:$2y$10$Oyz5aHV4MD9eQJbk6GPemOs4T6edK6U9Sqlzr.W1mMVCS8wJUftnW", "unicode")
|
||||||
|
|
||||||
|
@pytest.mark.skipif(has_argon2 == 0, reason="No argon2 module installed")
|
||||||
|
def test_htpasswd_argon2_i(self) -> None:
|
||||||
|
self._test_htpasswd("argon2", "tmp:$argon2i$v=19$m=65536,t=3,p=4$NgZg7F1rzRkDoNSaMwag9A$qmsvMKEn5zOXHm8e3O5fKzzcRo0UESwaDr/cETe5YPI")
|
||||||
|
|
||||||
|
@pytest.mark.skipif(has_argon2 == 0, reason="No argon2 module installed")
|
||||||
|
def test_htpasswd_argon2_d(self) -> None:
|
||||||
|
self._test_htpasswd("argon2", "tmp:$argon2d$v=19$m=65536,t=3,p=4$ufe+txYiJKR0zlkLwVirVQ$MjGqRyVLes38hA6CEOkloMcTYCuLjxCKgIjtfYZ3iSM")
|
||||||
|
|
||||||
|
@pytest.mark.skipif(has_argon2 == 0, reason="No argon2 module installed")
|
||||||
|
def test_htpasswd_argon2_id(self) -> None:
|
||||||
|
self._test_htpasswd("argon2", "tmp:$argon2id$v=19$m=65536,t=3,p=4$t7bWuneOkdIa45xTqjXGmA$ORnRJyz9kHogJs6bDgZrTBPlzi4+p023PSEABb3xX1g")
|
||||||
|
|
||||||
def test_htpasswd_multi(self) -> None:
|
def test_htpasswd_multi(self) -> None:
|
||||||
self._test_htpasswd("plain", "ign:ign\ntmp:bepo")
|
self._test_htpasswd("plain", "ign:ign\ntmp:bepo")
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ _T_co = TypeVar("_T_co", covariant=True)
|
||||||
|
|
||||||
RADICALE_MODULES: Sequence[str] = ("radicale", "vobject", "passlib", "defusedxml",
|
RADICALE_MODULES: Sequence[str] = ("radicale", "vobject", "passlib", "defusedxml",
|
||||||
"bcrypt",
|
"bcrypt",
|
||||||
|
"argon2-cffi",
|
||||||
"pika",
|
"pika",
|
||||||
"ldap",
|
"ldap",
|
||||||
"ldap3",
|
"ldap3",
|
||||||
|
|
|
@ -41,8 +41,9 @@ install_requires = ["defusedxml", "passlib", "vobject>=0.9.6",
|
||||||
"requests",
|
"requests",
|
||||||
]
|
]
|
||||||
bcrypt_requires = ["bcrypt"]
|
bcrypt_requires = ["bcrypt"]
|
||||||
|
argon2_requires = ["argon2-cffi"]
|
||||||
ldap_requires = ["ldap3"]
|
ldap_requires = ["ldap3"]
|
||||||
test_requires = ["pytest>=7", "waitress", *bcrypt_requires]
|
test_requires = ["pytest>=7", "waitress", *bcrypt_requires, *argon2_requires]
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name="Radicale",
|
name="Radicale",
|
||||||
|
@ -60,7 +61,7 @@ setup(
|
||||||
package_data={"radicale": [*web_files, "py.typed"]},
|
package_data={"radicale": [*web_files, "py.typed"]},
|
||||||
entry_points={"console_scripts": ["radicale = radicale.__main__:run"]},
|
entry_points={"console_scripts": ["radicale = radicale.__main__:run"]},
|
||||||
install_requires=install_requires,
|
install_requires=install_requires,
|
||||||
extras_require={"test": test_requires, "bcrypt": bcrypt_requires, "ldap": ldap_requires},
|
extras_require={"test": test_requires, "bcrypt": bcrypt_requires, "argon2": argon2_requires, "ldap": ldap_requires},
|
||||||
keywords=["calendar", "addressbook", "CalDAV", "CardDAV"],
|
keywords=["calendar", "addressbook", "CalDAV", "CardDAV"],
|
||||||
python_requires=">=3.9.0",
|
python_requires=">=3.9.0",
|
||||||
classifiers=[
|
classifiers=[
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue