1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-09-30 21:12:05 +00:00
Radicale/radicale/auth/ldap.py
2025-09-29 20:17:16 +02:00

421 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2022-2024 Peter Varkoly
# Copyright © 2024-2024 Peter Bieringer <pb@bieringer.de>
# Copyright © 2024-2025 Peter Marschall <peter@adpm.de>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
"""
Authentication backend that checks credentials with a LDAP server.
The following parameters are needed in the configuration:
ldap_uri URI to the LDAP server
ldap_base Base DN of the LDAP server
ldap_reader_dn DN of an LDAP user with read access to get the user accounts
ldap_secret Password of the 'ldap_reader_dn'
Better: use 'ldap_secret_file'!
ldap_secret_file Path of the file containing the password of the 'ldap_reader_dn'
ldap_filter Search filter to find the user DN to authenticate
The following parameters control TLS connections:
ldap_use_ssl Use ssl on the ldap connection.
Deprecated, use 'ldap_security' instead!
ldap_security Encryption mode to be used,
one of: *none* | tls | starttls
ldap_ssl_verify_mode Certificate verification mode for tls and starttls;
one of: *REQUIRED* | OPTIONAL | NONE
ldap_ssl_ca_file Path to the CA file in PEM format to certify the server certificate
The following parameters are optional:
ldap_user_attribute Attribute to be used as username after authentication, e.g. cn;
if not given, the name used to logon is used.
ldap_groups_attribute Attribute in the user entry to read the user's group memberships from,
e.g. memberof, groupMememberShip. This may even be a non-DN attribute!
ldap_group_base Base DN to search for groups;
only if it differs from 'ldap_base' and if 'ldap_group_members_attribute' is set
ldap_group_filter Search filter to search for groups having the user DN found as member;
only if 'ldap_group_members_attribute' is set
ldap_group_members_attribute Attribute in the group entries to read the group's members from,
e.g. member.
The following parameters are for LDAP servers with oddities
ldap_ignore_attribute_create_modify_timestamp
Ignore modifyTimestamp and createTimestamp attributes. Needed for Authentik LDAP server
"""
import ssl
from radicale import auth, config
from radicale.log import logger
class Auth(auth.BaseAuth):
_ldap_uri: str
_ldap_base: str
_ldap_reader_dn: str
_ldap_secret: str
_ldap_filter: str
_ldap_attributes: list[str] = []
_ldap_user_attr: str
_ldap_groups_attr: str
_ldap_group_base: str
_ldap_group_filter: str
_ldap_group_members_attr: str
_ldap_module_version: int = 3
_ldap_security: str = "none"
_ldap_ssl_verify_mode: str = "REQUIRED"
_ldap_ssl_ca_file: str = ""
def __init__(self, configuration: config.Configuration) -> None:
super().__init__(configuration)
try:
import ldap3
self.ldap3 = ldap3
except ImportError:
try:
import ldap
import ldap.filter
self._ldap_module_version = 2
self.ldap = ldap
except ImportError as e:
raise RuntimeError("LDAP authentication requires the ldap3 or ldap module") from e
self._ldap_ignore_attribute_create_modify_timestamp = configuration.get("auth", "ldap_ignore_attribute_create_modify_timestamp")
self._ldap_uri = configuration.get("auth", "ldap_uri")
self._ldap_base = configuration.get("auth", "ldap_base")
self._ldap_reader_dn = configuration.get("auth", "ldap_reader_dn")
self._ldap_secret = configuration.get("auth", "ldap_secret")
self._ldap_filter = configuration.get("auth", "ldap_filter")
self._ldap_user_attr = configuration.get("auth", "ldap_user_attribute")
self._ldap_groups_attr = configuration.get("auth", "ldap_groups_attribute")
self._ldap_group_base = configuration.get("auth", "ldap_group_base")
self._ldap_group_filter = configuration.get("auth", "ldap_group_filter")
self._ldap_group_members_attr = configuration.get("auth", "ldap_group_members_attribute")
ldap_secret_file_path = configuration.get("auth", "ldap_secret_file")
if ldap_secret_file_path:
with open(ldap_secret_file_path, 'r') as file:
self._ldap_secret = file.read().rstrip('\n')
self._ldap_security = configuration.get("auth", "ldap_security")
if self._ldap_security not in ("none", "tls", "starttls"):
raise RuntimeError("Illegal value for config setting ´ldap_security'")
ldap_use_ssl = configuration.get("auth", "ldap_use_ssl")
if ldap_use_ssl:
logger.warning("Configuration uses deprecated 'ldap_use_ssl': use 'ldap_security' ('none', 'tls', 'starttls') instead.")
if self._ldap_security == "starttls":
raise RuntimeError("Deprecated config setting 'ldap_use_ssl = True' conflicts with 'ldap_security' = 'starttls'")
elif self._ldap_security != "tls":
logger.warning("Update configuration: set 'ldap_security = tls' instead of deprecated 'ldap_use_ssl = True'")
self._ldap_security = "tls"
self._ldap_ssl_ca_file = configuration.get("auth", "ldap_ssl_ca_file")
self._ldap_ssl_verify_mode = configuration.get("auth", "ldap_ssl_verify_mode")
if self._ldap_ssl_verify_mode not in ("NONE", "OPTIONAL", "REQUIRED"):
raise RuntimeError("Illegal value for config setting ´ldap_ssl_verify_mode'")
if self._ldap_uri.lower().startswith("ldaps://") and self._ldap_security not in ("tls", "starttls"):
logger.info("Inferring 'ldap_security' = tls from 'ldap_uri' starting with 'ldaps://'")
self._ldap_security = "tls"
if self._ldap_uri.lower().startswith("ldapi://") and self._ldap_ssl_verify_mode != "NONE":
logger.info("Lowering 'ldap_'ldap_ssl_verify_mode' to NONE for 'ldap_uri' starting with 'ldapi://'")
self._ldap_ssl_verify_mode = "NONE"
if self._ldap_ssl_ca_file == "" and self._ldap_ssl_verify_mode != "NONE" and self._ldap_security in ("tls", "starttls"):
logger.warning("Certificate verification not possible: 'ldap_ssl_ca_file' not set")
if self._ldap_ssl_ca_file and self._ldap_security not in ("tls", "starttls"):
logger.warning("Config setting 'ldap_ssl_ca_file' useless without encrypted LDAP connection")
logger.info("auth.ldap_uri : %r" % self._ldap_uri)
logger.info("auth.ldap_base : %r" % self._ldap_base)
logger.info("auth.ldap_reader_dn : %r" % self._ldap_reader_dn)
logger.info("auth.ldap_filter : %r" % self._ldap_filter)
if self._ldap_user_attr:
logger.info("auth.ldap_user_attribute : %r" % self._ldap_user_attr)
else:
logger.info("auth.ldap_user_attribute : (not provided)")
if self._ldap_groups_attr:
logger.info("auth.ldap_groups_attribute : %r" % self._ldap_groups_attr)
else:
logger.info("auth.ldap_groups_attribute : (not provided)")
if self._ldap_group_base:
logger.info("auth.ldap_group_base : %r" % self._ldap_group_base)
else:
logger.info("auth.ldap_group_base : (not provided, using ldap_base)")
self._ldap_group_base = self._ldap_base
if self._ldap_group_filter:
logger.info("auth.ldap_group_filter : %r" % self._ldap_group_filter)
else:
logger.info("auth.ldap_group_filter : (not provided)")
if self._ldap_group_members_attr:
logger.info("auth.ldap_group_members_attr: %r" % self._ldap_group_members_attr)
else:
logger.info("auth.ldap_group_members_attr: (not provided)")
if ldap_secret_file_path:
logger.info("auth.ldap_secret_file_path : %r" % ldap_secret_file_path)
if self._ldap_secret:
logger.info("auth.ldap_secret : (from file)")
else:
logger.info("auth.ldap_secret_file_path : (not provided)")
if self._ldap_secret:
logger.info("auth.ldap_secret : (from config)")
if self._ldap_reader_dn and not self._ldap_secret:
logger.error("auth.ldap_secret : (not provided)")
raise RuntimeError("LDAP authentication requires ldap_secret for ldap_reader_dn")
logger.info("auth.ldap_use_ssl : %s" % ldap_use_ssl)
logger.info("auth.ldap_security : %s" % self._ldap_security)
logger.info("auth.ldap_ssl_verify_mode : %s" % self._ldap_ssl_verify_mode)
if self._ldap_ssl_ca_file:
logger.info("auth.ldap_ssl_ca_file : %r" % self._ldap_ssl_ca_file)
else:
logger.info("auth.ldap_ssl_ca_file : (not provided)")
if self._ldap_ignore_attribute_create_modify_timestamp:
logger.info("auth.ldap_ignore_attribute_create_modify_timestamp applied (relevant for ldap3 only)")
"""Extend attributes to to be returned in the user query"""
if self._ldap_groups_attr:
self._ldap_attributes.append(self._ldap_groups_attr)
if self._ldap_user_attr:
self._ldap_attributes.append(self._ldap_user_attr)
logger.info("ldap_attributes : %r" % self._ldap_attributes)
def _login2(self, login: str, password: str) -> str:
try:
"""Bind as reader dn"""
logger.debug(f"_login2 {self._ldap_uri}, {self._ldap_reader_dn}")
conn = self.ldap.initialize(self._ldap_uri)
conn.protocol_version = self.ldap.VERSION3
conn.set_option(self.ldap.OPT_REFERRALS, 0)
if self._ldap_security in ("tls", "starttls"):
"""certificate validation mode"""
verifyMode = {"NONE": self.ldap.OPT_X_TLS_NEVER,
"OPTIONAL": self.ldap.OPT_X_TLS_ALLOW,
"REQUIRED": self.ldap.OPT_X_TLS_DEMAND}
conn.set_option(self.ldap.OPT_X_TLS_REQUIRE_CERT, verifyMode[self._ldap_ssl_verify_mode])
"""CA file to validate certificate against"""
if self._ldap_ssl_ca_file:
conn.set_option(self.ldap.OPT_X_TLS_CACERTFILE, self._ldap_ssl_ca_file)
"""create TLS context- this must be the last TLS setting"""
conn.set_option(self.ldap.OPT_X_TLS_NEWCTX, self.ldap.OPT_ON)
if self._ldap_security == "starttls":
conn.start_tls_s()
conn.simple_bind_s(self._ldap_reader_dn, self._ldap_secret)
"""Search for the dn of user to authenticate"""
escaped_login = self.ldap.filter.escape_filter_chars(login)
logger.debug(f"_login2 login escaped for LDAP filters: {escaped_login}")
res = conn.search_s(
self._ldap_base,
self.ldap.SCOPE_SUBTREE,
filterstr=self._ldap_filter.format(escaped_login),
attrlist=self._ldap_attributes
)
if len(res) != 1:
"""User could not be found unambiguously"""
logger.debug(f"_login2 no unique DN found for '{login}'")
return ""
user_entry = res[0]
user_dn = user_entry[0]
logger.debug(f"_login2 found LDAP user DN {user_dn}")
"""Let's collect the groups of the user."""
groupDNs = []
if self._ldap_groups_attr:
groupDNs = user_entry[1][self._ldap_groups_attr]
"""Search for all groups having the user_dn found as member."""
if self._ldap_group_members_attr:
groupDNs = []
res = conn.search_s(
self._ldap_group_base,
self.ldap.SCOPE_SUBTREE,
filterstr="(&{0}({1}={2}))".format(
self._ldap_group_filter,
self._ldap_group_members_attr,
self.ldap.filter.escape_filter_chars(user_dn)),
attrlist=['1.1']
)
"""Fill groupDNs with DNs of groups found"""
if len(res) > 0:
groupDNs = []
for dn, entry in res:
groupDNs.append(dn)
except Exception as e:
raise RuntimeError(f"Invalid LDAP configuration:{e}")
try:
"""Bind as user to authenticate"""
conn.simple_bind_s(user_dn, password)
if self._ldap_user_attr:
if user_entry[1][self._ldap_user_attr]:
login = user_entry[1][self._ldap_user_attr][0]
if isinstance(login, bytes):
login = login.decode('utf-8')
logger.debug(f"_login2 user set to: '{login}'")
"""Get RDNs of groups' DNs"""
tmp = []
for g in groupDNs:
try:
rdns = self.ldap.dn.explode_dn(g, notypes=True)
tmp.append(rdns[0])
except Exception:
if isinstance(g, bytes):
g = g.decode('utf-8')
tmp.append(g)
self._ldap_groups = set(tmp)
logger.debug("_login2 LDAP groups of user: %s", ",".join(self._ldap_groups))
conn.unbind()
logger.debug(f"_login2 {login} successfully authenticated")
return login
except self.ldap.INVALID_CREDENTIALS:
return ""
def _login3(self, login: str, password: str) -> str:
if self._ldap_ignore_attribute_create_modify_timestamp:
self.ldap3.utils.config._ATTRIBUTES_EXCLUDED_FROM_CHECK.extend(['createTimestamp', 'modifyTimestamp'])
"""Connect the server"""
try:
logger.debug(f"_login3 {self._ldap_uri}, {self._ldap_reader_dn}")
if self._ldap_security in ("tls", "starttls"):
logger.debug("_login3 using encryption (reader)")
verifyMode = {"NONE": ssl.CERT_NONE,
"OPTIONAL": ssl.CERT_OPTIONAL,
"REQUIRED": ssl.CERT_REQUIRED}
tls = self.ldap3.Tls(validate=verifyMode[self._ldap_ssl_verify_mode])
if self._ldap_ssl_ca_file != "":
tls = self.ldap3.Tls(validate=verifyMode[self._ldap_ssl_verify_mode], ca_certs_file=self._ldap_ssl_ca_file)
if self._ldap_security == "tls":
logger.debug("_login3 using ssl (reader)")
server = self.ldap3.Server(self._ldap_uri, use_ssl=True, tls=tls)
else:
server = self.ldap3.Server(self._ldap_uri, use_ssl=False, tls=tls)
else:
logger.debug("_login3 not using encryption (reader)")
server = self.ldap3.Server(self._ldap_uri)
try:
conn = self.ldap3.Connection(server, self._ldap_reader_dn, password=self._ldap_secret, auto_bind=False, raise_exceptions=True)
if self._ldap_security == "starttls":
logger.debug("_login3 using starttls (reader)")
conn.start_tls()
except self.ldap3.core.exceptions.LDAPStartTLSError as e:
raise RuntimeError(f"_login3 StartTLS Error: {e}")
except self.ldap3.core.exceptions.LDAPSocketOpenError:
raise RuntimeError("Unable to reach LDAP server")
except Exception as e:
logger.debug(f"_login3 error 1 {e} (reader)")
pass
if not conn.bind(read_server_info=False):
logger.debug("_login3 cannot bind (reader)")
raise RuntimeError("Unable to read from LDAP server")
logger.debug(f"_login3 bind as {self._ldap_reader_dn}")
"""Search the user dn"""
escaped_login = self.ldap3.utils.conv.escape_filter_chars(login)
logger.debug(f"_login3 login escaped for LDAP filters: {escaped_login}")
try:
conn.search(
search_base=self._ldap_base,
search_filter=self._ldap_filter.format(escaped_login),
search_scope=self.ldap3.SUBTREE,
attributes=self._ldap_attributes
)
except Exception as e:
logger.error(f"_login3 LDAP search for {login} failed: {e}")
return ""
if len(conn.entries) != 1:
"""User could not be found unambiguously"""
logger.debug(f"_login3 no unique DN found for '{login}'")
return ""
user_entry = conn.response[0]
user_dn = user_entry['dn']
logger.debug(f"_login3 found LDAP user DN {user_dn}")
"""Let's collect the groups of the user."""
groupDNs = []
if self._ldap_groups_attr:
if user_entry['attributes'][self._ldap_groups_attr]:
if isinstance(user_entry['attributes'][self._ldap_groups_attr], list):
groupDNs = user_entry['attributes'][self._ldap_groups_attr]
else:
groupDNs.append(user_entry['attributes'][self._ldap_groups_attr])
"""Search for all groups having the user_dn found as member."""
if self._ldap_group_members_attr:
try:
conn.search(
search_base=self._ldap_group_base,
search_filter="(&{0}({1}={2}))".format(
self._ldap_group_filter,
self._ldap_group_members_attr,
self.ldap3.utils.conv.escape_filter_chars(user_dn)),
search_scope=self.ldap3.SUBTREE,
attributes=['1.1']
)
except Exception as e:
"""LDAP search failed: consider it as non-fatal - only groups missing"""
logger.debug(f"_ldap3: LDAP group search failed: {e}")
else:
"""Fill groupDNs with DNs of groups found"""
groupDNs = []
for group in conn.response:
groupDNs.append(group['dn'])
"""Close LDAP connection"""
conn.unbind()
try:
"""Try to bind as the user itself"""
try:
conn = self.ldap3.Connection(server, user_dn, password=password, auto_bind=False)
if self._ldap_security == "starttls":
logger.debug("_login3 using starttls (user)")
conn.start_tls()
except self.ldap3.core.exceptions.LDAPStartTLSError as e:
raise RuntimeError(f"_login3 StartTLS Error: {e}")
if not conn.bind(read_server_info=False):
logger.debug(f"_login3 user '{login}' cannot be found")
return ""
"""Get RDNs of groups' DNs"""
tmp = []
for g in groupDNs:
try:
rdns = self.ldap3.utils.dn.parse_dn(g)
tmp.append(rdns[0][1])
except Exception:
tmp.append(g)
self._ldap_groups = set(tmp)
logger.debug("_login3 LDAP groups of user: %s", ",".join(self._ldap_groups))
if self._ldap_user_attr:
if user_entry['attributes'][self._ldap_user_attr]:
if isinstance(user_entry['attributes'][self._ldap_user_attr], list):
login = user_entry['attributes'][self._ldap_user_attr][0]
else:
login = user_entry['attributes'][self._ldap_user_attr]
logger.debug(f"_login3 user set to: '{login}'")
conn.unbind()
logger.debug(f"_login3 {login} successfully authenticated")
return login
except Exception as e:
logger.debug(f"_login3 error 2 {e}")
pass
return ""
def _login(self, login: str, password: str) -> str:
"""Validate credentials.
In first step we make a connection to the LDAP server with the ldap_reader_dn credential.
In next step the DN of the user to authenticate will be searched.
In the last step the authentication of the user will be proceeded.
"""
if self._ldap_module_version == 2:
return self._login2(login, password)
return self._login3(login, password)