diff --git a/radicale/auth/ldap.py b/radicale/auth/ldap.py index 2c4d63c3..fa2a4891 100644 --- a/radicale/auth/ldap.py +++ b/radicale/auth/ldap.py @@ -30,6 +30,10 @@ Following parameters controls SSL connections: ldap_security The encryption mode to be used: *none*|tls|starttls ldap_ssl_verify_mode The certificate verification mode. Works for tls and starttls. NONE, OPTIONAL, default is REQUIRED ldap_ssl_ca_file + The following parameters are optional: + ldap_group_base Base DN to search for groups. Only if it differs from ldap_base and if ldap_group_members_attribute is set + ldap_group_filter Search filter to search for groups having the user as member. Only if ldap_group_members_attribute is set + ldap_group_members_attribute Attribute in the group entries to read the group's members from """ import ssl @@ -47,6 +51,9 @@ class Auth(auth.BaseAuth): _ldap_attributes: list[str] = [] _ldap_user_attr: str _ldap_groups_attr: str + _ldap_group_base: str + _ldap_group_filter: str + _ldap_group_members_attr: str _ldap_module_version: int = 3 _ldap_use_ssl: bool = False _ldap_security: str = "none" @@ -78,6 +85,9 @@ class Auth(auth.BaseAuth): self._ldap_filter = configuration.get("auth", "ldap_filter") self._ldap_user_attr = configuration.get("auth", "ldap_user_attribute") self._ldap_groups_attr = configuration.get("auth", "ldap_groups_attribute") + self._ldap_group_base = configuration.get("auth", "ldap_group_base") + self._ldap_group_filter = configuration.get("auth", "ldap_group_filter") + self._ldap_group_members_attr = configuration.get("auth", "ldap_group_members_attribute") ldap_secret_file_path = configuration.get("auth", "ldap_secret_file") if ldap_secret_file_path: with open(ldap_secret_file_path, 'r') as file: @@ -110,6 +120,19 @@ class Auth(auth.BaseAuth): logger.info("auth.ldap_groups_attribute: %r" % self._ldap_groups_attr) else: logger.info("auth.ldap_groups_attribute: (not provided)") + if self._ldap_group_base: + logger.info("auth.ldap_group_base : %r" % self._ldap_group_base) + else: + logger.info("auth.ldap_group_base : (not provided, using ldap_base)") + self._ldap_group_base = self._ldap_base + if self._ldap_group_filter: + logger.info("auth.ldap_group_filter: %r" % self._ldap_group_filter) + else: + logger.info("auth.ldap_group_filter: (not provided)") + if self._ldap_group_members_attr: + logger.info("auth.ldap_group_members_attr: %r" % self._ldap_group_members_attr) + else: + logger.info("auth.ldap_group_members_attr: (not provided)") if ldap_secret_file_path: logger.info("auth.ldap_secret_file_path: %r" % ldap_secret_file_path) if self._ldap_secret: @@ -160,6 +183,30 @@ class Auth(auth.BaseAuth): user_entry = res[0] user_dn = user_entry[0] logger.debug(f"_login2 found LDAP user DN {user_dn}") + + """Let's collect the groups of the user.""" + groupDNs: list[str] = [] + if self._ldap_groups_attr: + groupDNs = user_entry[1][self._ldap_groups_attr] + + """Search for all groups having the user_dn found as member.""" + if self._ldap_group_members_attr: + groupDNs = [] + res = conn.search_s( + self._ldap_group_base, + self.ldap.SCOPE_SUBTREE, + filterstr="(&{0}({1}={2}))".format( + self._ldap_group_filter, + self._ldap_group_members_attr, + self.ldap.filter.escape_filter_chars(user_dn)), + attrlist=['1.1'] + ) + """Fill groupDNs with DNs of groups found""" + if len(res) > 0: + groupDNs = [] + for dn,entry in res: + groupDNs.append(dn) + """Close LDAP connection""" conn.unbind() except Exception as e: @@ -171,23 +218,23 @@ class Auth(auth.BaseAuth): conn.protocol_version = 3 conn.set_option(self.ldap.OPT_REFERRALS, 0) conn.simple_bind_s(user_dn, password) - tmp: list[str] = [] - if self._ldap_groups_attr: - tmp = [] - for g in user_entry[1][self._ldap_groups_attr]: - """Get group g's RDN's attribute value""" - try: - rdns = self.ldap.dn.explode_dn(g, notypes=True) - tmp.append(rdns[0]) - except Exception: - tmp.append(g.decode('utf8')) - self._ldap_groups = set(tmp) - logger.debug("_login2 LDAP groups of user: %s", ",".join(self._ldap_groups)) if self._ldap_user_attr: if user_entry[1][self._ldap_user_attr]: tmplogin = user_entry[1][self._ldap_user_attr][0] login = tmplogin.decode('utf-8') logger.debug(f"_login2 user set to: '{login}'") + + """Get RDNs of groups' DNs""" + tmp: list[str] = [] + for g in groupDNs: + try: + rdns = self.ldap.dn.explode_dn(g, notypes=True) + tmp.append(rdns[0]) + except Exception: + tmp.append(g.decode('utf8')) + self._ldap_groups = set(tmp) + logger.debug("_login2 LDAP groups of user: %s", ",".join(self._ldap_groups)) + conn.unbind() logger.debug(f"_login2 {login} successfully authenticated") return login @@ -249,9 +296,42 @@ class Auth(auth.BaseAuth): return "" user_entry = conn.response[0] - conn.unbind() user_dn = user_entry['dn'] logger.debug(f"_login3 found LDAP user DN {user_dn}") + + """Let's collect the groups of the user.""" + groupDNs: list[str] = [] + if self._ldap_groups_attr: + if user_entry['attributes'][self._ldap_groups_attr]: + if isinstance(user_entry['attributes'][self._ldap_groups_attr], list): + groupDNs = user_entry['attributes'][self._ldap_groups_attr] + else: + groupDNs.append(user_entry['attributes'][self._ldap_groups_attr]) + + """Search for all groups having the user_dn found as member.""" + if self._ldap_group_members_attr: + try: + conn.search( + search_base=self._ldap_group_base, + search_filter="(&{0}({1}={2}))".format( + self._ldap_group_filter, + self._ldap_group_members_attr, + self.ldap3.utils.conv.escape_filter_chars(user_dn)), + search_scope=self.ldap3.SUBTREE, + attributes=['1.1'] + ) + except Exception as e: + """LDAP search failed: consider it as non-fatal - only groups missing""" + logger.debug(f"_ldap3: LDAP group search failed: {e}") + else: + """Fill groupDNs with DNs of groups found""" + groupDNs = [] + for group in conn.response: + groupDNs.append(group['dn']) + + """Close LDAP connection""" + conn.unbind() + try: """Try to bind as the user itself""" try: @@ -264,18 +344,18 @@ class Auth(auth.BaseAuth): if not conn.bind(read_server_info=False): logger.debug(f"_login3 user '{login}' cannot be found") return "" + + """Get RDNs of groups' DNs""" tmp: list[str] = [] - if self._ldap_groups_attr: - tmp = [] - for g in user_entry['attributes'][self._ldap_groups_attr]: - """Get group g's RDN's attribute value""" - try: - rdns = self.ldap3.utils.dn.parse_dn(g) - tmp.append(rdns[0][1]) - except Exception: - tmp.append(g) - self._ldap_groups = set(tmp) - logger.debug("_login3 LDAP groups of user: %s", ",".join(self._ldap_groups)) + for g in groupDNs: + try: + rdns = self.ldap3.utils.dn.parse_dn(g) + tmp.append(rdns[0][1]) + except Exception: + tmp.append(g) + self._ldap_groups = set(tmp) + logger.debug("_login3 LDAP groups of user: %s", ",".join(self._ldap_groups)) + if self._ldap_user_attr: if user_entry['attributes'][self._ldap_user_attr]: if isinstance(user_entry['attributes'][self._ldap_user_attr], list): diff --git a/radicale/config.py b/radicale/config.py index c6d93f41..77fcb04e 100644 --- a/radicale/config.py +++ b/radicale/config.py @@ -297,6 +297,18 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([ "value": "", "help": "attribute to read the group memberships from", "type": str}), + ("ldap_group_members_attribute", { + "value": "", + "help": "Attribute in the group entries to read the group's members from", + "type": str}), + ("ldap_group_base", { + "value": "", + "help": "Base DN to search for groups. Only if it differs from ldap_base and if ldap_group_members_attribute is set", + "type": str}), + ("ldap_group_filter", { + "value": "", + "help": "Search filter to search for groups having the user as member. Only if ldap_group_members_attribute is set", + "type": str}), ("ldap_use_ssl", { "value": "False", "help": "Use ssl on the ldap connection. Soon to be deprecated, use ldap_security instead",