1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-07-23 17:48:30 +00:00

Rebase galaxy4public patch on top of bf4f5834

This commit is contained in:
Jean-Denis Girard 2024-10-30 10:33:10 -10:00
parent bf4f5834af
commit f25a5fbc79
4 changed files with 282 additions and 1 deletions

View file

@ -37,7 +37,8 @@ from radicale.log import logger
INTERNAL_TYPES: Sequence[str] = ("none", "remote_user", "http_x_remote_user", INTERNAL_TYPES: Sequence[str] = ("none", "remote_user", "http_x_remote_user",
"denyall", "denyall",
"htpasswd", "htpasswd",
"ldap") "ldap",
"dovecot")
def load(configuration: "config.Configuration") -> "BaseAuth": def load(configuration: "config.Configuration") -> "BaseAuth":

178
radicale/auth/dovecot.py Normal file
View file

@ -0,0 +1,178 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2014 Giel van Schijndel
# Copyright © 2019 (GalaxyMaster)
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import base64
import itertools
import os
import socket
from contextlib import closing
from radicale import auth
from radicale.log import logger
class Auth(auth.BaseAuth):
def __init__(self, configuration):
super().__init__(configuration)
self.socket = configuration.get("auth", "dovecot_socket")
self.timeout = 5
self.request_id_gen = itertools.count(1)
def login(self, login, password):
"""Validate credentials.
Check if the ``login``/``password`` pair is valid according to Dovecot.
This implementation communicates with a Dovecot server through the
Dovecot Authentication Protocol v1.1.
https://dovecot.org/doc/auth-protocol.txt
"""
logger.info("Authentication request (dovecot): '{}'".format(login))
if not login or not password:
return ""
with closing(socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM)
) as sock:
try:
sock.settimeout(self.timeout)
sock.connect(self.socket)
buf = bytes()
supported_mechs = []
done = False
seen_part = [0, 0, 0]
# Upon the initial connection we only care about the
# handshake, which is usually just around 100 bytes long,
# e.g.
#
# VERSION 1 2
# MECH PLAIN plaintext
# SPID 22901
# CUID 1
# COOKIE 2dbe4116a30fb4b8a8719f4448420af7
# DONE
#
# Hence, we try to read just once with a buffer big
# enough to hold all of it.
buf = sock.recv(1024)
while b'\n' in buf and not done:
line, buf = buf.split(b'\n', 1)
parts = line.split(b'\t')
first, parts = parts[0], parts[1:]
if first == b'VERSION':
if seen_part[0]:
logger.warning(
"Server presented multiple VERSION "
"tokens, ignoring"
)
continue
version = parts
logger.debug("Dovecot server version: '{}'".format(
(b'.'.join(version)).decode()
))
if int(version[0]) != 1:
logger.fatal(
"Only Dovecot 1.x versions are supported!"
)
return ""
seen_part[0] += 1
elif first == b'MECH':
supported_mechs.append(parts[0])
seen_part[1] += 1
elif first == b'DONE':
seen_part[2] += 1
if not (seen_part[0] and seen_part[1]):
logger.fatal(
"An unexpected end of the server "
"handshake received!"
)
return ""
done = True
if not done:
logger.fatal("Encountered a broken server handshake!")
return ""
logger.debug(
"Supported auth methods: '{}'"
.format((b"', '".join(supported_mechs)).decode())
)
if b'PLAIN' not in supported_mechs:
logger.info(
"Authentication method 'PLAIN' is not supported, "
"but is required!"
)
return ""
# Handshake
logger.debug("Sending auth handshake")
sock.send(b'VERSION\t1\t1\n')
sock.send(b'CPID\t%u\n' % os.getpid())
request_id = next(self.request_id_gen)
logger.debug(
"Authenticating with request id: '{}'"
.format(request_id)
)
sock.send(
b'AUTH\t%u\tPLAIN\tservice=radicale\tresp=%b\n' %
(
request_id, base64.b64encode(
b'\0%b\0%b' %
(login.encode(), password.encode())
)
)
)
logger.debug("Processing auth response")
buf = sock.recv(1024)
line = buf.split(b'\n', 1)[0]
parts = line.split(b'\t')[:2]
resp, reply_id, params = (
parts[0], int(parts[1]),
dict(part.split('=', 1) for part in parts[2:])
)
logger.debug(
"Auth response: result='{}', id='{}', parameters={}"
.format(resp.decode(), reply_id, params)
)
if request_id != reply_id:
logger.fatal(
"Unexpected reply ID {} received (expected {})"
.format(
reply_id, request_id
)
)
return ""
if resp == b'OK':
return login
except socket.error as e:
logger.fatal(
"Failed to communicate with Dovecot socket %r: %s" %
(self.socket, e)
)
return ""

View file

@ -183,6 +183,10 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
"value": "autodetect", "value": "autodetect",
"help": "htpasswd encryption method", "help": "htpasswd encryption method",
"type": str}), "type": str}),
("dovecot_socket", {
"value": "/var/run/dovecot/auth-client",
"help": "dovecot auth socket",
"type": str}),
("realm", { ("realm", {
"value": "Radicale - Password Required", "value": "Radicale - Password Required",
"help": "message displayed when a password is needed", "help": "message displayed when a password is needed",

View file

@ -22,6 +22,7 @@ Radicale tests with simple requests and authentication.
""" """
import base64
import os import os
import sys import sys
from typing import Iterable, Tuple, Union from typing import Iterable, Tuple, Union
@ -159,6 +160,103 @@ class TestBaseAuthRequests(BaseTest):
href_element = prop.find(xmlutils.make_clark("D:href")) href_element = prop.find(xmlutils.make_clark("D:href"))
assert href_element is not None and href_element.text == "/test/" assert href_element is not None and href_element.text == "/test/"
def _test_dovecot(
self, user, password, expected_status,
response=b'FAIL\n1\n', mech=[b'PLAIN'], broken=None):
import socket
from unittest.mock import DEFAULT, patch
self.configure({"auth": {"type": "dovecot",
"dovecot_socket": "./dovecot.sock"}})
if broken is None:
broken = []
handshake = b''
if "version" not in broken:
handshake += b'VERSION\t'
if "incompatible" in broken:
handshake += b'2'
else:
handshake += b'1'
handshake += b'\t2\n'
if "mech" not in broken:
handshake += b'MECH\t%b\n' % b' '.join(mech)
if "duplicate" in broken:
handshake += b'VERSION\t1\t2\n'
if "done" not in broken:
handshake += b'DONE\n'
with patch.multiple(
'socket.socket',
connect=DEFAULT,
send=DEFAULT,
recv=DEFAULT
) as mock_socket:
if "socket" in broken:
mock_socket["connect"].side_effect = socket.error(
"Testing error with the socket"
)
mock_socket["recv"].side_effect = [handshake, response]
status, _, answer = self.request(
"PROPFIND", "/",
HTTP_AUTHORIZATION="Basic %s" % base64.b64encode(
("%s:%s" % (user, password)).encode()).decode())
assert status == expected_status
def test_dovecot_no_user(self):
self._test_dovecot("", "", 401)
def test_dovecot_no_password(self):
self._test_dovecot("user", "", 401)
def test_dovecot_broken_handshake_no_version(self):
self._test_dovecot("user", "password", 401, broken=["version"])
def test_dovecot_broken_handshake_incompatible(self):
self._test_dovecot("user", "password", 401, broken=["incompatible"])
def test_dovecot_broken_handshake_duplicate(self):
self._test_dovecot(
"user", "password", 207, response=b'OK\t1',
broken=["duplicate"]
)
def test_dovecot_broken_handshake_no_mech(self):
self._test_dovecot("user", "password", 401, broken=["mech"])
def test_dovecot_broken_handshake_unsupported_mech(self):
self._test_dovecot("user", "password", 401, mech=[b'ONE', b'TWO'])
def test_dovecot_broken_handshake_no_done(self):
self._test_dovecot("user", "password", 401, broken=["done"])
def test_dovecot_broken_socket(self):
self._test_dovecot("user", "password", 401, broken=["socket"])
def test_dovecot_auth_good1(self):
self._test_dovecot("user", "password", 207, response=b'OK\t1')
def test_dovecot_auth_good2(self):
self._test_dovecot(
"user", "password", 207, response=b'OK\t1',
mech=[b'PLAIN\nEXTRA\tTERM']
)
self._test_dovecot("user", "password", 207, response=b'OK\t1')
def test_dovecot_auth_bad1(self):
self._test_dovecot("user", "password", 401, response=b'FAIL\t1')
def test_dovecot_auth_bad2(self):
self._test_dovecot("user", "password", 401, response=b'CONT\t1')
def test_dovecot_auth_id_mismatch(self):
self._test_dovecot("user", "password", 401, response=b'OK\t2')
def test_custom(self) -> None: def test_custom(self) -> None:
"""Custom authentication.""" """Custom authentication."""
self.configure({"auth": {"type": "radicale.tests.custom.auth"}}) self.configure({"auth": {"type": "radicale.tests.custom.auth"}})