diff --git a/radicale.fcgi b/radicale.fcgi
index ec00581f..f1831951 100755
--- a/radicale.fcgi
+++ b/radicale.fcgi
@@ -23,14 +23,12 @@ Launch a Radicale FastCGI server according to configuration.
"""
-try:
- from flup.server.fcgi import WSGIServer
-except ImportError:
- from flipflop import WSGIServer
+import os
import radicale
+from flipflop import WSGIServer
-radicale.log.start()
-radicale.log.LOGGER.info("Starting Radicale FastCGI server")
-WSGIServer(radicale.Application()).run()
-radicale.log.LOGGER.info("Stopping Radicale FastCGI server")
+configuration = radicale.config.load([os.environ.get("RADICALE_CONFIG")])
+logger = radicale.log.start()
+WSGIServer(radicale.Application(configuration, logger)).run()
+
diff --git a/radicale.wsgi b/radicale.wsgi
index 45760809..cb49645d 100755
--- a/radicale.wsgi
+++ b/radicale.wsgi
@@ -21,8 +21,10 @@ Radicale WSGI file (mod_wsgi and uWSGI compliant).
"""
+import os
import radicale
-radicale.log.start()
-application = radicale.Application()
+configuration = radicale.config.load([os.environ.get("RADICALE_CONFIG")])
+logger = radicale.log.start()
+application = radicale.Application(configuration, logger)
diff --git a/radicale/__init__.py b/radicale/__init__.py
index 6f210a35..98554fe8 100644
--- a/radicale/__init__.py
+++ b/radicale/__init__.py
@@ -36,7 +36,9 @@ import re
from http import client
from urllib.parse import unquote, urlparse
-from . import auth, config, log, rights, storage, xmlutils
+import vobject
+
+from . import auth, rights, storage, xmlutils
VERSION = "2.0.0-pre"
@@ -71,30 +73,20 @@ class HTTPServer(wsgiref.simple_server.WSGIServer, object):
class HTTPSServer(HTTPServer):
"""HTTPS server."""
+
+ # These class attributes must be set before creating instance
+ certificate = None
+ key = None
+ protocol = None
+ cyphers = None
+
def __init__(self, address, handler):
"""Create server by wrapping HTTP socket in an SSL socket."""
- super().__init__(address, handler, False)
+ super().__init__(address, handler, bind_and_activate=False)
- # Test if the SSL files can be read
- for name in ("certificate", "key"):
- filename = config.get("server", name)
- try:
- open(filename, "r").close()
- except IOError as exception:
- log.LOGGER.warning(
- "Error while reading SSL %s %r: %s" % (
- name, filename, exception))
-
- ssl_kwargs = dict(
- server_side=True,
- certfile=config.get("server", "certificate"),
- keyfile=config.get("server", "key"),
- ssl_version=getattr(
- ssl, config.get("server", "protocol"), ssl.PROTOCOL_SSLv23))
-
- ssl_kwargs["ciphers"] = config.get("server", "ciphers") or None
-
- self.socket = ssl.wrap_socket(self.socket, **ssl_kwargs)
+ self.socket = ssl.wrap_socket(
+ self.socket, self.key, self.certificate, server_side=True,
+ ssl_version=self.protocol, cyphers=self.cyphers)
self.server_bind()
self.server_activate()
@@ -105,25 +97,19 @@ class RequestHandler(wsgiref.simple_server.WSGIRequestHandler):
def log_message(self, *args, **kwargs):
"""Disable inner logging management."""
- def address_string(self):
- """Client address, formatted for logging."""
- if config.getboolean("server", "dns_lookup"):
- return (
- wsgiref.simple_server.WSGIRequestHandler.address_string(self))
- else:
- return self.client_address[0]
-
-class Application(object):
+class Application:
"""WSGI application managing collections."""
- def __init__(self):
+ def __init__(self, configuration, logger):
"""Initialize application."""
super().__init__()
- auth._load()
- storage._load()
- rights._load()
- self.encoding = config.get("encoding", "request")
- if config.getboolean("logging", "full_environment"):
+ self.configuration = configuration
+ self.logger = logger
+ self.is_authenticated = auth.load(configuration, logger)
+ self.Collection = storage.load(configuration, logger)
+ self.authorized = rights.load(configuration, logger)
+ self.encoding = configuration.get("encoding", "request")
+ if configuration.getboolean("logging", "full_environment"):
self.headers_log = lambda environ: environ
# This method is overriden in __init__ if full_environment is set
@@ -170,27 +156,27 @@ class Application(object):
write_allowed_items = []
for item in items:
- if isinstance(item, storage.Collection):
- if rights.authorized(user, item, "r"):
- log.LOGGER.debug(
+ if isinstance(item, self.Collection):
+ if self.authorized(user, item, "r"):
+ self.logger.debug(
"%s has read access to collection %s" %
(user or "Anonymous", item.path or "/"))
read_last_collection_allowed = True
read_allowed_items.append(item)
else:
- log.LOGGER.debug(
+ self.logger.debug(
"%s has NO read access to collection %s" %
(user or "Anonymous", item.path or "/"))
read_last_collection_allowed = False
- if rights.authorized(user, item, "w"):
- log.LOGGER.debug(
+ if self.authorized(user, item, "w"):
+ self.logger.debug(
"%s has write access to collection %s" %
(user or "Anonymous", item.path or "/"))
write_last_collection_allowed = True
write_allowed_items.append(item)
else:
- log.LOGGER.debug(
+ self.logger.debug(
"%s has NO write access to collection %s" %
(user or "Anonymous", item.path or "/"))
write_last_collection_allowed = False
@@ -199,22 +185,22 @@ class Application(object):
# collection we've met in the loop. Only add this item
# if this last collection was allowed.
if read_last_collection_allowed:
- log.LOGGER.debug(
+ self.logger.debug(
"%s has read access to item %s" %
(user or "Anonymous", item.href))
read_allowed_items.append(item)
else:
- log.LOGGER.debug(
+ self.logger.debug(
"%s has NO read access to item %s" %
(user or "Anonymous", item.href))
if write_last_collection_allowed:
- log.LOGGER.debug(
+ self.logger.debug(
"%s has write access to item %s" %
(user or "Anonymous", item.href))
write_allowed_items.append(item)
else:
- log.LOGGER.debug(
+ self.logger.debug(
"%s has NO write access to item %s" %
(user or "Anonymous", item.href))
@@ -222,21 +208,21 @@ class Application(object):
def __call__(self, environ, start_response):
"""Manage a request."""
- log.LOGGER.info("%s request at %s received" % (
+ self.logger.info("%s request at %s received" % (
environ["REQUEST_METHOD"], environ["PATH_INFO"]))
headers = pprint.pformat(self.headers_log(environ))
- log.LOGGER.debug("Request headers:\n%s" % headers)
+ self.logger.debug("Request headers:\n%s" % headers)
# Strip base_prefix from request URI
- base_prefix = config.get("server", "base_prefix")
+ base_prefix = self.configuration.get("server", "base_prefix")
if environ["PATH_INFO"].startswith(base_prefix):
environ["PATH_INFO"] = environ["PATH_INFO"][len(base_prefix):]
- elif config.get("server", "can_skip_base_prefix"):
- log.LOGGER.debug(
+ elif self.configuration.get("server", "can_skip_base_prefix"):
+ self.logger.debug(
"Prefix already stripped from path: %s", environ["PATH_INFO"])
else:
# Request path not starting with base_prefix, not allowed
- log.LOGGER.debug(
+ self.logger.debug(
"Path not starting with prefix: %s", environ["PATH_INFO"])
status, headers, _ = NOT_ALLOWED
start_response(status, list(headers.items()))
@@ -245,7 +231,7 @@ class Application(object):
# Sanitize request URI
environ["PATH_INFO"] = storage.sanitize_path(
unquote(environ["PATH_INFO"]))
- log.LOGGER.debug("Sanitized path: %s", environ["PATH_INFO"])
+ self.logger.debug("Sanitized path: %s", environ["PATH_INFO"])
path = environ["PATH_INFO"]
@@ -265,30 +251,32 @@ class Application(object):
well_known = WELL_KNOWN_RE.match(path)
if well_known:
- redirect = config.get("well-known", well_known.group(1))
+ redirect = self.configuration.get(
+ "well-known", well_known.group(1))
try:
redirect = redirect % ({"user": user} if user else {})
except KeyError:
status = client.UNAUTHORIZED
+ realm = self.configuration.get("server", "realm")
headers = {
"WWW-Authenticate":
- "Basic realm=\"%s\"" % config.get("server", "realm")}
- log.LOGGER.info(
+ "Basic realm=\"%s\"" % realm}
+ self.logger.info(
"Refused /.well-known/ redirection to anonymous user")
else:
status = client.SEE_OTHER
- log.LOGGER.info("/.well-known/ redirection to: %s" % redirect)
+ self.logger.info("/.well-known/ redirection to: %s" % redirect)
headers = {"Location": redirect}
status = "%i %s" % (
status, client.responses.get(status, "Unknown"))
start_response(status, list(headers.items()))
return []
- is_authenticated = auth.is_authenticated(user, password)
+ is_authenticated = self.is_authenticated(user, password)
is_valid_user = is_authenticated or not user
if is_valid_user:
- items = storage.Collection.discover(
+ items = self.Collection.discover(
path, environ.get("HTTP_DEPTH", "0"))
read_allowed_items, write_allowed_items = (
self.collect_allowed_items(items, user))
@@ -300,7 +288,7 @@ class Application(object):
if content_length:
content = self.decode(
environ["wsgi.input"].read(content_length), environ)
- log.LOGGER.debug("Request content:\n%s" % content)
+ self.logger.debug("Request content:\n%s" % content)
else:
content = None
@@ -314,30 +302,29 @@ class Application(object):
else:
status, headers, answer = NOT_ALLOWED
- if ((status, headers, answer) == NOT_ALLOWED and
- not auth.is_authenticated(user, password) and
- config.get("auth", "type") != "None"):
+ if (status, headers, answer) == NOT_ALLOWED and not is_authenticated:
# Unknown or unauthorized user
- log.LOGGER.info("%s refused" % (user or "Anonymous user"))
+ self.logger.info("%s refused" % (user or "Anonymous user"))
status = client.UNAUTHORIZED
+ realm = self.configuration.get("server", "realm")
headers = {
"WWW-Authenticate":
- "Basic realm=\"%s\"" % config.get("server", "realm")}
+ "Basic realm=\"%s\"" % realm}
answer = None
# Set content length
if answer:
- log.LOGGER.debug(
- "Response content:\n%s" % self.decode(answer, environ))
+ self.logger.debug("Response content:\n%s" % answer, environ)
+ answer = answer.encode(self.encoding)
headers["Content-Length"] = str(len(answer))
- if config.has_section("headers"):
- for key in config.options("headers"):
- headers[key] = config.get("headers", key)
+ if self.configuration.has_section("headers"):
+ for key in self.configuration.options("headers"):
+ headers[key] = self.configuration.get("headers", key)
# Start response
status = "%i %s" % (status, client.responses.get(status, "Unknown"))
- log.LOGGER.debug("Answer status: %s" % status)
+ self.logger.debug("Answer status: %s" % status)
start_response(status, list(headers.items()))
# Return response content
@@ -378,7 +365,7 @@ class Application(object):
# Display a "Radicale works!" message if the root URL is requested
if environ["PATH_INFO"] == "/":
headers = {"Content-type": "text/html"}
- answer = b"\n
RadicaleRadicale works!"
+ answer = "\nRadicaleRadicale works!"
return client.OK, headers, answer
if not read_collections:
@@ -400,7 +387,7 @@ class Application(object):
# Get whole collection
answer_text = collection.serialize()
if not answer_text:
- log.LOGGER.debug("Collection at %s unknown" % environ["PATH_INFO"])
+ self.logger.debug("Collection at %s unknown" % environ["PATH_INFO"])
return client.NOT_FOUND, {}, None
etag = collection.etag
@@ -408,7 +395,7 @@ class Application(object):
"Content-Type": storage.MIMETYPES[collection.get_meta("tag")],
"Last-Modified": collection.last_modified,
"ETag": etag}
- answer = answer_text.encode(self.encoding)
+ answer = answer_text
return client.OK, headers, answer
def do_HEAD(self, environ, read_collections, write_collections, content,
@@ -429,7 +416,7 @@ class Application(object):
props = xmlutils.props_from_request(content)
# TODO: use this?
# timezone = props.get("C:calendar-timezone")
- collection = storage.Collection.create_collection(
+ collection = self.Collection.create_collection(
environ["PATH_INFO"], tag="VCALENDAR")
for key, value in props.items():
collection.set_meta(key, value)
@@ -444,7 +431,7 @@ class Application(object):
collection = write_collections[0]
props = xmlutils.props_from_request(content)
- collection = storage.Collection.create_collection(environ["PATH_INFO"])
+ collection = self.Collection.create_collection(environ["PATH_INFO"])
for key, value in props.items():
collection.set_meta(key, value)
return client.CREATED, {}, None
@@ -465,7 +452,7 @@ class Application(object):
if to_url_parts.netloc == environ["HTTP_HOST"]:
to_url = to_url_parts.path
to_path, to_name = to_url.rstrip("/").rsplit("/", 1)
- for to_collection in storage.Collection.discover(
+ for to_collection in self.Collection.discover(
to_path, depth="0"):
if to_collection in write_collections:
to_collection.upload(to_name, item)
@@ -509,8 +496,7 @@ class Application(object):
collection = write_collections[0]
- answer = xmlutils.proppatch(
- environ["PATH_INFO"], content, collection)
+ answer = xmlutils.proppatch(environ["PATH_INFO"], content, collection)
headers = {
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
"Content-Type": "text/xml"}
@@ -540,10 +526,22 @@ class Application(object):
# Case 1: No item and no ETag precondition: Add new item
# Case 2: Item and ETag precondition verified: Modify item
# Case 3: Item and no Etag precondition: Force modifying item
- new_item = xmlutils.put(environ["PATH_INFO"], content, collection)
+ items = list(vobject.readComponents(content))
+ if items:
+ if item:
+ # PUT is modifying an existing item
+ new_item = collection.update(item_name, items[0])
+ elif item_name:
+ # PUT is adding a new item
+ new_item = collection.upload(item_name, items[0])
+ else:
+ # PUT is replacing the whole collection
+ collection.delete()
+ new_item = self.Collection.create_collection(
+ environ["PATH_INFO"], items)
+ if new_item:
+ headers["ETag"] = new_item.etag
status = client.CREATED
- if new_item:
- headers["ETag"] = new_item.etag
else:
# PUT rejected in all other cases
status = client.PRECONDITION_FAILED
diff --git a/radicale/__main__.py b/radicale/__main__.py
index c7c50aff..73f7f2b4 100644
--- a/radicale/__main__.py
+++ b/radicale/__main__.py
@@ -29,6 +29,7 @@ import optparse
import select
import signal
import socket
+import ssl
from wsgiref.simple_server import make_server
from . import (
@@ -75,9 +76,17 @@ def run():
options = parser.parse_args()[0]
- # Read in the configuration specified by the command line (if specified)
- configuration_found = (
- config.read(options.config) if options.config else True)
+ if options.config:
+ configuration = config.load()
+ configuration_found = configuration.read(options.config)
+ else:
+ configuration_paths = [
+ "/etc/radicale/config",
+ os.path.expanduser("~/.config/radicale/config")]
+ if "RADICALE_CONFIG" in os.environ:
+ configuration_paths.append(os.environ["RADICALE_CONFIG"])
+ configuration = config.load(configuration_paths)
+ configuration_found = True
# Update Radicale configuration according to options
for option in parser.option_list:
@@ -86,32 +95,33 @@ def run():
section = "logging" if key == "debug" else "server"
value = getattr(options, key)
if value is not None:
- config.set(section, key, str(value))
+ configuration.set(section, key, str(value))
# Start logging
- log.start()
+ filename = os.path.expanduser(configuration.get("logging", "config"))
+ debug = configuration.getboolean("logging", "debug")
+ logger = log.start("radicale", filename, debug)
# Log a warning if the configuration file of the command line is not found
if not configuration_found:
- log.LOGGER.warning(
- "Configuration file '%s' not found" % options.config)
+ logger.warning("Configuration file '%s' not found" % options.config)
# Fork if Radicale is launched as daemon
- if config.getboolean("server", "daemon"):
+ if configuration.getboolean("server", "daemon"):
# Check and create PID file in a race-free manner
- if config.get("server", "pid"):
+ if configuration.get("server", "pid"):
try:
pid_fd = os.open(
- config.get("server", "pid"),
+ configuration.get("server", "pid"),
os.O_CREAT | os.O_EXCL | os.O_WRONLY)
except:
raise OSError(
- "PID file exists: %s" % config.get("server", "pid"))
+ "PID file exists: %s" % configuration.get("server", "pid"))
pid = os.fork()
if pid:
sys.exit()
# Write PID
- if config.get("server", "pid"):
+ if configuration.get("server", "pid"):
with os.fdopen(pid_fd, "w") as pid_file:
pid_file.write(str(os.getpid()))
# Decouple environment
@@ -127,35 +137,55 @@ def run():
# Register exit function
def cleanup():
"""Remove the PID files."""
- log.LOGGER.debug("Cleaning up")
+ logger.debug("Cleaning up")
# Remove PID file
- if (config.get("server", "pid") and
- config.getboolean("server", "daemon")):
- os.unlink(config.get("server", "pid"))
+ if (configuration.get("server", "pid") and
+ configuration.getboolean("server", "daemon")):
+ os.unlink(configuration.get("server", "pid"))
atexit.register(cleanup)
- log.LOGGER.info("Starting Radicale")
+ logger.info("Starting Radicale")
- log.LOGGER.debug(
- "Base URL prefix: %s" % config.get("server", "base_prefix"))
+ logger.debug(
+ "Base URL prefix: %s" % configuration.get("server", "base_prefix"))
# Create collection servers
servers = {}
- server_class = (
- HTTPSServer if config.getboolean("server", "ssl") else HTTPServer)
+ if configuration.getboolean("server", "ssl"):
+ server_class = HTTPSServer
+ server_class.certificate = configuration.get("server", "certificate")
+ server_class.key = configuration.get("server", "key")
+ server_class.cyphers = configuration.get("server", "cyphers")
+ server_class.certificate = getattr(
+ ssl, configuration.get("server", "protocol"), ssl.PROTOCOL_SSLv23)
+ # Test if the SSL files can be read
+ for name in ("certificate", "key"):
+ filename = getattr(server_class, name)
+ try:
+ open(filename, "r").close()
+ except IOError as exception:
+ logger.warning(
+ "Error while reading SSL %s %r: %s" % (
+ name, filename, exception))
+ else:
+ server_class = HTTPServer
+
+ if not configuration.getboolean("server", "dns_lookup"):
+ RequestHandler.address_string = lambda self: self.client_address[0]
+
shutdown_program = [False]
- for host in config.get("server", "hosts").split(","):
+ for host in configuration.get("server", "hosts").split(","):
address, port = host.strip().rsplit(":", 1)
address, port = address.strip("[] "), int(port)
- server = make_server(address, port, Application(),
- server_class, RequestHandler)
+ application = Application(configuration, logger)
+ server = make_server(
+ address, port, application, server_class, RequestHandler)
servers[server.socket] = server
- log.LOGGER.debug(
- "Listening to %s port %s" % (
- server.server_name, server.server_port))
- if config.getboolean("server", "ssl"):
- log.LOGGER.debug("Using SSL")
+ logger.debug("Listening to %s port %s" % (
+ server.server_name, server.server_port))
+ if configuration.getboolean("server", "ssl"):
+ logger.debug("Using SSL")
# Create a socket pair to notify the select syscall of program shutdown
# This is not available in python < 3.5 on Windows
@@ -171,7 +201,7 @@ def run():
if shutdown_program[0]:
# Ignore following signals
return
- log.LOGGER.info("Stopping Radicale")
+ logger.info("Stopping Radicale")
shutdown_program[0] = True
if shutdown_program_socket_in:
shutdown_program_socket_in.sendall(b"goodbye")
@@ -187,7 +217,7 @@ def run():
else:
# Fallback to busy waiting
select_timeout = 1.0
- log.LOGGER.debug("Radicale server ready")
+ logger.debug("Radicale server ready")
while not shutdown_program[0]:
try:
rlist, _, xlist = select.select(
diff --git a/radicale/auth.py b/radicale/auth.py
index eb66d909..2431aea4 100644
--- a/radicale/auth.py
+++ b/radicale/auth.py
@@ -28,8 +28,8 @@ by using the system's CRYPT routine. The CRYPT and SHA1 encryption methods
implemented by htpasswd are considered as insecure. MD5-APR1 provides medium
security as of 2015. Only BCRYPT can be considered secure by current standards.
-MD5-APR1-encrypted credentials can be written by all versions of htpasswd (its
-the default, in fact), whereas BCRYPT requires htpasswd 2.4.x or newer.
+MD5-APR1-encrypted credentials can be written by all versions of htpasswd (it
+is the default, in fact), whereas BCRYPT requires htpasswd 2.4.x or newer.
The `is_authenticated(user, password)` function provided by this module
verifies the user-given credentials by parsing the htpasswd credential file
@@ -55,127 +55,135 @@ following significantly more secure schemes are parsable by Radicale:
import base64
import hashlib
import os
-import sys
-
-from . import config, log
+from importlib import import_module
-def _load():
+def load(configuration, logger):
"""Load the authentication manager chosen in configuration."""
- auth_type = config.get("auth", "type")
- log.LOGGER.debug("Authentication type is %s" % auth_type)
+ auth_type = configuration.get("auth", "type")
+ logger.debug("Authentication type is %s" % auth_type)
if auth_type == "None":
- sys.modules[__name__].is_authenticated = lambda user, password: True
+ return lambda user, password: True
elif auth_type == "htpasswd":
- pass # is_authenticated is already defined
+ return Auth(configuration, logger).is_authenticated
else:
- __import__(auth_type)
- sys.modules[__name__].is_authenticated = (
- sys.modules[auth_type].is_authenticated)
+ module = import_module(auth_type)
+ return module.Auth(configuration, logger).is_authenticated
-FILENAME = os.path.expanduser(config.get("auth", "htpasswd_filename"))
-ENCRYPTION = config.get("auth", "htpasswd_encryption")
+class BaseAuth:
+ def __init__(self, configuration, logger):
+ self.configuration = configuration
+ self.logger = logger
+
+ def is_authenticated(self, user, password):
+ """Validate credentials.
+
+ Iterate through htpasswd credential file until user matches, extract hash
+ (encrypted password) and check hash against user-given password, using the
+ method specified in the Radicale config.
+
+ """
+ raise NotImplementedError
-def _plain(hash_value, password):
- """Check if ``hash_value`` and ``password`` match, using plain method."""
- return hash_value == password
+class Auth(BaseAuth):
+ def __init__(self, configuration, logger):
+ super().__init__(configuration, logger)
+ self.filename = os.path.expanduser(
+ configuration.get("auth", "htpasswd_filename"))
+ self.encryption = configuration.get("auth", "htpasswd_encryption")
+
+ if self.encryption == "ssha":
+ self.verify = self._ssha
+ elif self.encryption == "sha1":
+ self.verify = self._sha1
+ elif self.encryption == "plain":
+ self.verify = self._plain
+ elif self.encryption == "md5":
+ try:
+ from passlib.hash import apr_md5_crypt as _passlib_md5apr1
+ except ImportError:
+ raise RuntimeError(
+ "The htpasswd encryption method 'md5' requires "
+ "the passlib module.")
+ self.verify = self._md5apr1
+ elif self.encryption == "bcrypt":
+ try:
+ from passlib.hash import bcrypt as _passlib_bcrypt
+ except ImportError:
+ raise RuntimeError(
+ "The htpasswd encryption method 'bcrypt' requires "
+ "the passlib module with bcrypt support.")
+ # A call to `encrypt` raises passlib.exc.MissingBackendError with a
+ # good error message if bcrypt backend is not available. Trigger
+ # this here.
+ _passlib_bcrypt.encrypt("test-bcrypt-backend")
+ self.verify = self._bcrypt
+ elif self.encryption == "crypt":
+ try:
+ import crypt
+ except ImportError:
+ raise RuntimeError(
+ "The htpasswd encryption method 'crypt' requires "
+ "the crypt() system support.")
+ self.verify = self._crypt
+ else:
+ raise RuntimeError(
+ "The htpasswd encryption method '%s' is not "
+ "supported." % self.encryption)
+
+ def _plain(self, hash_value, password):
+ """Check if ``hash_value`` and ``password`` match, using plain method."""
+ return hash_value == password
-def _crypt(hash_value, password):
- """Check if ``hash_value`` and ``password`` match, using crypt method."""
- return crypt.crypt(password, hash_value) == hash_value
+ def _crypt(self, hash_value, password):
+ """Check if ``hash_value`` and ``password`` match, using crypt method."""
+ return crypt.crypt(password, hash_value) == hash_value
-def _sha1(hash_value, password):
- """Check if ``hash_value`` and ``password`` match, using sha1 method."""
- hash_value = hash_value.replace("{SHA}", "").encode("ascii")
- password = password.encode(config.get("encoding", "stock"))
- sha1 = hashlib.sha1() # pylint: disable=E1101
- sha1.update(password)
- return sha1.digest() == base64.b64decode(hash_value)
+ def _sha1(self, hash_value, password):
+ """Check if ``hash_value`` and ``password`` match, using sha1 method."""
+ hash_value = hash_value.replace("{SHA}", "").encode("ascii")
+ password = password.encode(self.configuration.get("encoding", "stock"))
+ sha1 = hashlib.sha1() # pylint: disable=E1101
+ sha1.update(password)
+ return sha1.digest() == base64.b64decode(hash_value)
-def _ssha(hash_salt_value, password):
- """Check if ``hash_salt_value`` and ``password`` match, using salted sha1
- method. This method is not directly supported by htpasswd, but it can be
- written with e.g. openssl, and nginx can parse it."""
- hash_salt_value = hash_salt_value.replace(
- "{SSHA}", "").encode("ascii").decode('base64')
- password = password.encode(config.get("encoding", "stock"))
- hash_value = hash_salt_value[:20]
- salt_value = hash_salt_value[20:]
- sha1 = hashlib.sha1() # pylint: disable=E1101
- sha1.update(password)
- sha1.update(salt_value)
- return sha1.digest() == hash_value
+ def _ssha(self, hash_salt_value, password):
+ """Check if ``hash_salt_value`` and ``password`` match, using salted sha1
+ method. This method is not directly supported by htpasswd, but it can be
+ written with e.g. openssl, and nginx can parse it."""
+ hash_salt_value = hash_salt_value.replace(
+ "{SSHA}", "").encode("ascii").decode('base64')
+ password = password.encode(self.configuration.get("encoding", "stock"))
+ hash_value = hash_salt_value[:20]
+ salt_value = hash_salt_value[20:]
+ sha1 = hashlib.sha1() # pylint: disable=E1101
+ sha1.update(password)
+ sha1.update(salt_value)
+ return sha1.digest() == hash_value
-def _bcrypt(hash_value, password):
- return _passlib_bcrypt.verify(password, hash_value)
+ def _bcrypt(self, hash_value, password):
+ return _passlib_bcrypt.verify(password, hash_value)
-def _md5apr1(hash_value, password):
- return _passlib_md5apr1.verify(password, hash_value)
+ def _md5apr1(self, hash_value, password):
+ return _passlib_md5apr1.verify(password, hash_value)
-
-# Prepare mapping between encryption names and verification functions.
-# Pre-fill with methods that do not have external dependencies.
-_verifuncs = {
- "ssha": _ssha,
- "sha1": _sha1,
- "plain": _plain}
-
-
-# Conditionally attempt to import external dependencies.
-if ENCRYPTION == "md5":
- try:
- from passlib.hash import apr_md5_crypt as _passlib_md5apr1
- except ImportError:
- raise RuntimeError(("The htpasswd_encryption method 'md5' requires "
- "availability of the passlib module."))
- _verifuncs["md5"] = _md5apr1
-elif ENCRYPTION == "bcrypt":
- try:
- from passlib.hash import bcrypt as _passlib_bcrypt
- except ImportError:
- raise RuntimeError(("The htpasswd_encryption method 'bcrypt' requires "
- "availability of the passlib module with bcrypt support."))
- # A call to `encrypt` raises passlib.exc.MissingBackendError with a good
- # error message if bcrypt backend is not available. Trigger this here.
- _passlib_bcrypt.encrypt("test-bcrypt-backend")
- _verifuncs["bcrypt"] = _bcrypt
-elif ENCRYPTION == "crypt":
- try:
- import crypt
- except ImportError:
- raise RuntimeError(("The htpasswd_encryption method 'crypt' requires "
- "crypt() system support."))
- _verifuncs["crypt"] = _crypt
-
-
-# Validate initial configuration.
-if ENCRYPTION not in _verifuncs:
- raise RuntimeError(("The htpasswd encryption method '%s' is not "
- "supported." % ENCRYPTION))
-
-
-def is_authenticated(user, password):
- """Validate credentials.
-
- Iterate through htpasswd credential file until user matches, extract hash
- (encrypted password) and check hash against user-given password, using the
- method specified in the Radicale config.
-
- """
- with open(FILENAME) as f:
- for line in f:
- strippedline = line.strip()
- if strippedline:
- login, hash_value = strippedline.split(":")
- if login == user:
- # Allow encryption method to be overridden at runtime.
- return _verifuncs[ENCRYPTION](hash_value, password)
- return False
+ def is_authenticated(self, user, password):
+ # The content of the file is not cached because reading is generally a
+ # very cheap operation, and it's useful to get live updates of the
+ # htpasswd file.
+ with open(self.filename) as fd:
+ for line in fd:
+ line = line.strip()
+ if line:
+ login, hash_value = line.split(":")
+ if login == user:
+ return self.verify(hash_value, password)
+ return False
diff --git a/radicale/config.py b/radicale/config.py
index b31e76d3..71a315bf 100644
--- a/radicale/config.py
+++ b/radicale/config.py
@@ -24,7 +24,6 @@ Give a configparser-like interface to read and write configuration.
"""
import os
-import sys
from configparser import RawConfigParser as ConfigParser
@@ -66,18 +65,14 @@ INITIAL_CONFIG = {
"debug": "False",
"full_environment": "False"}}
-# Create a ConfigParser and configure it
-_CONFIG_PARSER = ConfigParser()
-for section, values in INITIAL_CONFIG.items():
- _CONFIG_PARSER.add_section(section)
- for key, value in values.items():
- _CONFIG_PARSER.set(section, key, value)
-
-_CONFIG_PARSER.read("/etc/radicale/config")
-_CONFIG_PARSER.read(os.path.expanduser("~/.config/radicale/config"))
-if "RADICALE_CONFIG" in os.environ:
- _CONFIG_PARSER.read(os.environ["RADICALE_CONFIG"])
-
-# Wrap config module into ConfigParser instance
-sys.modules[__name__] = _CONFIG_PARSER
+def load(paths=()):
+ config = ConfigParser()
+ for section, values in INITIAL_CONFIG.items():
+ config.add_section(section)
+ for key, value in values.items():
+ config.set(section, key, value)
+ for path in paths:
+ if path:
+ config.read(path)
+ return config
diff --git a/radicale/log.py b/radicale/log.py
index 641ca092..c2bd7bb6 100644
--- a/radicale/log.py
+++ b/radicale/log.py
@@ -28,40 +28,38 @@ import logging
import logging.config
import signal
-from . import config
-
-LOGGER = logging.getLogger()
-
-
-def configure_from_file(filename, debug):
+def configure_from_file(filename, debug, logger):
logging.config.fileConfig(filename)
if debug:
- LOGGER.setLevel(logging.DEBUG)
- for handler in LOGGER.handlers:
+ logger.setLevel(logging.DEBUG)
+ for handler in logger.handlers:
handler.setLevel(logging.DEBUG)
+ return logger
-def start():
+def start(name="radicale", filename=None, debug=False):
"""Start the logging according to the configuration."""
- filename = os.path.expanduser(config.get("logging", "config"))
- debug = config.getboolean("logging", "debug")
-
+ logger = logging.getLogger(name)
if os.path.exists(filename):
# Configuration taken from file
- configure_from_file(filename, debug)
+ configure_from_file(logger, filename, debug)
# Reload config on SIGHUP (UNIX only)
if hasattr(signal, 'SIGHUP'):
- def handler(signum, frame):
- configure_from_file(filename, debug)
+ def handler_generator(logger, filename, debug):
+ def handler(signum, frame):
+ configure_from_file(logger, filename, debug)
+ handler = handler_generator(logger, filename, debug)
signal.signal(signal.SIGHUP, handler)
else:
# Default configuration, standard output
- handler = logging.StreamHandler(sys.stdout)
- handler.setFormatter(logging.Formatter("%(message)s"))
- LOGGER.addHandler(handler)
- if debug:
- LOGGER.setLevel(logging.DEBUG)
- LOGGER.debug(
+ if filename:
+ logger.warning(
"Logging configuration file '%s' not found, using stdout." %
filename)
+ handler = logging.StreamHandler(sys.stdout)
+ handler.setFormatter(logging.Formatter("%(message)s"))
+ logger.addHandler(handler)
+ if debug:
+ logger.setLevel(logging.DEBUG)
+ return logger
diff --git a/radicale/rights.py b/radicale/rights.py
index ff9c03ce..69649081 100644
--- a/radicale/rights.py
+++ b/radicale/rights.py
@@ -39,24 +39,21 @@ Leading or ending slashes are trimmed from collection's path.
import os.path
import re
-import sys
from configparser import ConfigParser
from io import StringIO
-
-from . import config, log
+from importlib import import_module
-def _load():
+def load(configuration, logger):
"""Load the rights manager chosen in configuration."""
- rights_type = config.get("rights", "type")
+ rights_type = configuration.get("rights", "type")
if rights_type == "None":
- sys.modules[__name__].authorized = (
- lambda user, collection, permission: True)
+ return lambda user, collection, permission: True
elif rights_type in DEFINED_RIGHTS or rights_type == "from_file":
- pass # authorized is already defined
+ return Rights(configuration, logger).authorized
else:
- __import__(rights_type)
- sys.modules[__name__].authorized = sys.modules[rights_type].authorized
+ module = import_module(rights_type)
+ return module.Rights(configuration, logger).authorized
DEFINED_RIGHTS = {
@@ -84,53 +81,57 @@ permission:rw
"""}
-def _read_from_sections(user, collection_url, permission):
- """Get regex sections."""
- filename = os.path.expanduser(config.get("rights", "file"))
- rights_type = config.get("rights", "type").lower()
- # Prevent "regex injection"
- user_escaped = re.escape(user)
- collection_url_escaped = re.escape(collection_url)
- regex = ConfigParser({"login": user_escaped, "path": collection_url_escaped})
- if rights_type in DEFINED_RIGHTS:
- log.LOGGER.debug("Rights type '%s'" % rights_type)
- regex.readfp(StringIO(DEFINED_RIGHTS[rights_type]))
- elif rights_type == "from_file":
- log.LOGGER.debug("Reading rights from file %s" % filename)
- if not regex.read(filename):
- log.LOGGER.error("File '%s' not found for rights" % filename)
- return False
- else:
- log.LOGGER.error("Unknown rights type '%s'" % rights_type)
+class BaseRights:
+ def __init__(self, configuration, logger):
+ self.configuration = configuration
+ self.logger = logger
+
+ def authorized(self, user, collection, permission):
+ """Check if the user is allowed to read or write the collection.
+
+ If the user is empty, check for anonymous rights.
+
+ """
+ raise NotImplementedError
+
+
+class Rights(BaseRights):
+ def __init__(self, configuration, logger):
+ super().__init__()
+ self.filename = os.path.expanduser(configuration.get("rights", "file"))
+ self.rights_type = configuration.get("rights", "type").lower()
+
+ def authorized(self, user, collection, permission):
+ collection_url = collection.path.rstrip("/") or "/"
+ if collection_url in (".well-known/carddav", ".well-known/caldav"):
+ return permission == "r"
+ # Prevent "regex injection"
+ user_escaped = re.escape(user)
+ collection_url_escaped = re.escape(collection_url)
+ regex = ConfigParser(
+ {"login": user_escaped, "path": collection_url_escaped})
+ if self.rights_type in DEFINED_RIGHTS:
+ self.logger.debug("Rights type '%s'" % self.rights_type)
+ regex.readfp(StringIO(DEFINED_RIGHTS[self.rights_type]))
+ else:
+ self.logger.debug("Reading rights from file '%s'" % self.filename)
+ if not regex.read(self.filename):
+ self.logger.error(
+ "File '%s' not found for rights" % self.filename)
+ return False
+
+ for section in regex.sections():
+ re_user = regex.get(section, "user")
+ re_collection = regex.get(section, "collection")
+ self.logger.debug(
+ "Test if '%s:%s' matches against '%s:%s' from section '%s'" % (
+ user, collection_url, re_user, re_collection, section))
+ user_match = re.match(re_user, user)
+ if user_match:
+ re_collection = re_collection.format(*user_match.groups())
+ if re.match(re_collection, collection_url):
+ self.logger.debug("Section '%s' matches" % section)
+ return permission in regex.get(section, "permission")
+ else:
+ self.logger.debug("Section '%s' does not match" % section)
return False
-
- for section in regex.sections():
- re_user = regex.get(section, "user")
- re_collection = regex.get(section, "collection")
- log.LOGGER.debug(
- "Test if '%s:%s' matches against '%s:%s' from section '%s'" % (
- user, collection_url, re_user, re_collection, section))
- user_match = re.match(re_user, user)
- if user_match:
- re_collection = re_collection.format(*user_match.groups())
- if re.match(re_collection, collection_url):
- log.LOGGER.debug("Section '%s' matches" % section)
- return permission in regex.get(section, "permission")
- else:
- log.LOGGER.debug("Section '%s' does not match" % section)
- return False
-
-
-def authorized(user, collection, permission):
- """Check if the user is allowed to read or write the collection.
-
- If the user is empty, check for anonymous rights.
-
- """
- collection_url = collection.path.rstrip("/") or "/"
- if collection_url in (".well-known/carddav", ".well-known/caldav"):
- return permission == "r"
- rights_type = config.get("rights", "type").lower()
- return (
- rights_type == "none" or
- _read_from_sections(user or "", collection_url, permission))
diff --git a/radicale/storage.py b/radicale/storage.py
index 5bb0f05a..41dd8387 100644
--- a/radicale/storage.py
+++ b/radicale/storage.py
@@ -29,31 +29,29 @@ import json
import os
import posixpath
import shutil
-import sys
import time
from contextlib import contextmanager
from hashlib import md5
+from importlib import import_module
from uuid import uuid4
import vobject
-from . import config, log
-
-def _load():
+def load(configuration, logger):
"""Load the storage manager chosen in configuration."""
- storage_type = config.get("storage", "type")
+ storage_type = configuration.get("storage", "type")
if storage_type == "multifilesystem":
- module = sys.modules[__name__]
+ collection_class = Collection
else:
- __import__(storage_type)
- module = sys.modules[storage_type]
- sys.modules[__name__].Collection = module.Collection
+ collection_class = import_module(storage_type).Collection
+ class CollectionCopy(collection_class):
+ """Collection copy, avoids overriding the original class attributes."""
+ CollectionCopy.configuration = configuration
+ CollectionCopy.logger = logger
+ return CollectionCopy
-FOLDER = os.path.expanduser(config.get("storage", "filesystem_folder"))
-FILESYSTEM_ENCODING = sys.getfilesystemencoding()
-STORAGE_ENCODING = config.get("encoding", "stock")
MIMETYPES = {"VADDRESSBOOK": "text/vcard", "VCALENDAR": "text/calendar"}
@@ -106,15 +104,14 @@ def path_to_filesystem(root, *paths):
continue
for part in path.split("/"):
if not is_safe_filesystem_path_component(part):
- log.LOGGER.debug(
- "Can't translate path safely to filesystem: %s", path)
raise ValueError("Unsafe path")
safe_path = os.path.join(safe_path, part)
return safe_path
class Item:
- def __init__(self, item, href, last_modified=None):
+ def __init__(self, collection, item, href, last_modified=None):
+ self.collection = collection
self.item = item
self.href = href
self.last_modified = last_modified
@@ -122,17 +119,17 @@ class Item:
def __getattr__(self, attr):
return getattr(self.item, attr)
- @property
- def content_length(self):
- return len(self.serialize().encode(config.get("encoding", "request")))
-
@property
def etag(self):
return get_etag(self.serialize())
-class Collection:
- """Collection stored in several files per calendar."""
+class BaseCollection:
+
+ # Overriden on copy by the "load" function
+ configuration = None
+ logger = None
+
def __init__(self, path, principal=False):
"""Initialize the collection.
@@ -140,17 +137,7 @@ class Collection:
the slash as the folder delimiter, with no leading nor trailing slash.
"""
- self.encoding = "utf-8"
- # path should already be sanitized
- self.path = sanitize_path(path).strip("/")
- self._filesystem_path = path_to_filesystem(FOLDER, self.path)
- split_path = self.path.split("/")
- if len(split_path) > 1:
- # URL with at least one folder
- self.owner = split_path[0]
- else:
- self.owner = None
- self.is_principal = principal
+ raise NotImplementedError
@classmethod
def discover(cls, path, depth="1"):
@@ -167,6 +154,117 @@ class Collection:
The ``path`` is relative.
"""
+ raise NotImplementedError
+
+ @property
+ def etag(self):
+ return get_etag(self.serialize())
+
+ @classmethod
+ def create_collection(cls, href, collection=None, tag=None):
+ """Create a collection.
+
+ ``collection`` is a list of vobject components.
+
+ ``tag`` is the type of collection (VCALENDAR or VADDRESSBOOK). If
+ ``tag`` is not given, it is guessed from the collection.
+
+ """
+ raise NotImplementedError
+
+ def list(self):
+ """List collection items."""
+ raise NotImplementedError
+
+ def get(self, href):
+ """Fetch a single item."""
+ raise NotImplementedError
+
+ def get_multi(self, hrefs):
+ """Fetch multiple items. Duplicate hrefs must be ignored.
+
+ Functionally similar to ``get``, but might bring performance benefits
+ on some storages when used cleverly.
+
+ """
+ for href in set(hrefs):
+ yield self.get(href)
+
+ def has(self, href):
+ """Check if an item exists by its href.
+
+ Functionally similar to ``get``, but might bring performance benefits
+ on some storages when used cleverly.
+
+ """
+ return self.get(href) is not None
+
+ def upload(self, href, vobject_item):
+ """Upload a new item."""
+ raise NotImplementedError
+
+ def update(self, href, vobject_item, etag=None):
+ """Update an item.
+
+ Functionally similar to ``delete`` plus ``upload``, but might bring
+ performance benefits on some storages when used cleverly.
+
+ """
+ self.delete(href, etag)
+ self.upload(href, vobject_item)
+
+ def delete(self, href=None, etag=None):
+ """Delete an item.
+
+ When ``href`` is ``None``, delete the collection.
+
+ """
+ raise NotImplementedError
+
+ @contextmanager
+ def at_once(self):
+ """Set a context manager buffering the reads and writes."""
+ # TODO: use in code
+ yield
+
+ def get_meta(self, key):
+ """Get metadata value for collection."""
+ raise NotImplementedError
+
+ def set_meta(self, key, value):
+ """Set metadata value for collection."""
+ raise NotImplementedError
+
+ @property
+ def last_modified(self):
+ """Get the HTTP-datetime of when the collection was modified."""
+ raise NotImplementedError
+
+ def serialize(self):
+ """Get the unicode string representing the whole collection."""
+ raise NotImplementedError
+
+
+class Collection(BaseCollection):
+ """Collection stored in several files per calendar."""
+
+ def __init__(self, path, principal=False):
+ folder = os.path.expanduser(
+ self.configuration.get("storage", "filesystem_folder"))
+ # path should already be sanitized
+ self.path = sanitize_path(path).strip("/")
+ self.storage_encoding = self.configuration.get("encoding", "stock")
+ self._filesystem_path = path_to_filesystem(folder, self.path)
+ split_path = self.path.split("/")
+ if len(split_path) > 1:
+ # URL with at least one folder
+ self.owner = split_path[0]
+ else:
+ self.owner = None
+ self.is_principal = principal
+
+ @classmethod
+ def discover(cls, path, depth="1"):
# path == None means wrong URL
if path is None:
return
@@ -178,12 +276,14 @@ class Collection:
return
# Try to guess if the path leads to a collection or an item
- if not os.path.isdir(path_to_filesystem(FOLDER, sane_path)):
+ folder = os.path.expanduser(
+ cls.configuration.get("storage", "filesystem_folder"))
+ if not os.path.isdir(path_to_filesystem(folder, sane_path)):
# path is not a collection
- if os.path.isfile(path_to_filesystem(FOLDER, sane_path)):
+ if os.path.isfile(path_to_filesystem(folder, sane_path)):
# path is an item
attributes.pop()
- elif os.path.isdir(path_to_filesystem(FOLDER, *attributes[:-1])):
+ elif os.path.isdir(path_to_filesystem(folder, *attributes[:-1])):
# path parent is a collection
attributes.pop()
# TODO: else: return?
@@ -207,15 +307,9 @@ class Collection:
@classmethod
def create_collection(cls, href, collection=None, tag=None):
- """Create a collection.
-
- ``collection`` is a list of vobject components.
-
- ``tag`` is the type of collection (VCALENDAR or VADDRESSBOOK). If
- ``tag`` is not given, it is guessed from the collection.
-
- """
- path = path_to_filesystem(FOLDER, href)
+ folder = os.path.expanduser(
+ cls.configuration.get("storage", "filesystem_folder"))
+ path = path_to_filesystem(folder, href)
if not os.path.exists(path):
os.makedirs(path)
if not tag and collection:
@@ -239,7 +333,6 @@ class Collection:
return self
def list(self):
- """List collection items."""
try:
hrefs = os.listdir(self._filesystem_path)
except IOError:
@@ -248,82 +341,63 @@ class Collection:
for href in hrefs:
path = os.path.join(self._filesystem_path, href)
if not href.endswith(".props") and os.path.isfile(path):
- with open(path, encoding=STORAGE_ENCODING) as fd:
+ with open(path, encoding=self.storage_encoding) as fd:
yield href, get_etag(fd.read())
def get(self, href):
- """Fetch a single item."""
if not href:
return
href = href.strip("{}").replace("/", "_")
if is_safe_filesystem_path_component(href):
path = os.path.join(self._filesystem_path, href)
if os.path.isfile(path):
- with open(path, encoding=STORAGE_ENCODING) as fd:
+ with open(path, encoding=self.storage_encoding) as fd:
text = fd.read()
last_modified = time.strftime(
"%a, %d %b %Y %H:%M:%S GMT",
time.gmtime(os.path.getmtime(path)))
- return Item(vobject.readOne(text), href, last_modified)
+ return Item(self, vobject.readOne(text), href, last_modified)
else:
- log.LOGGER.debug(
+ self.logger.debug(
"Can't tranlate name safely to filesystem, "
"skipping component: %s", href)
- def get_multi(self, hrefs):
- """Fetch multiple items. Duplicate hrefs must be ignored.
-
- Functionally similar to ``get``, but might bring performance benefits
- on some storages when used cleverly.
-
- """
- for href in set(hrefs):
- yield self.get(href)
-
def has(self, href):
- """Check if an item exists by its href."""
return self.get(href) is not None
def upload(self, href, vobject_item):
- """Upload a new item."""
# TODO: use returned object in code
if is_safe_filesystem_path_component(href):
path = path_to_filesystem(self._filesystem_path, href)
if not os.path.exists(path):
- item = Item(vobject_item, href)
- with open(path, "w", encoding=STORAGE_ENCODING) as fd:
+ item = Item(self, vobject_item, href)
+ with open(path, "w", encoding=self.storage_encoding) as fd:
fd.write(item.serialize())
return item
else:
- log.LOGGER.debug(
+ self.logger.debug(
"Can't tranlate name safely to filesystem, "
"skipping component: %s", href)
def update(self, href, vobject_item, etag=None):
- """Update an item."""
# TODO: use etag in code and test it here
# TODO: use returned object in code
if is_safe_filesystem_path_component(href):
path = path_to_filesystem(self._filesystem_path, href)
if os.path.exists(path):
- with open(path, encoding=STORAGE_ENCODING) as fd:
+ with open(path, encoding=self.storage_encoding) as fd:
text = fd.read()
if not etag or etag == get_etag(text):
- item = Item(vobject_item, href)
- with open(path, "w", encoding=STORAGE_ENCODING) as fd:
+ item = Item(self, vobject_item, href)
+ with open(path, "w", encoding=self.storage_encoding) as fd:
fd.write(item.serialize())
return item
else:
- log.LOGGER.debug(
+ self.logger.debug(
"Can't tranlate name safely to filesystem, "
"skipping component: %s", href)
def delete(self, href=None, etag=None):
- """Delete an item.
-
- When ``href`` is ``None``, delete the collection.
-
- """
# TODO: use etag in code and test it here
# TODO: use returned object in code
if href is None:
@@ -338,49 +412,44 @@ class Collection:
# Delete an item
path = path_to_filesystem(self._filesystem_path, href)
if os.path.isfile(path):
- with open(path, encoding=STORAGE_ENCODING) as fd:
+ with open(path, encoding=self.storage_encoding) as fd:
text = fd.read()
if not etag or etag == get_etag(text):
os.remove(path)
return
else:
- log.LOGGER.debug(
+ self.logger.debug(
"Can't tranlate name safely to filesystem, "
"skipping component: %s", href)
@contextmanager
def at_once(self):
- """Set a context manager buffering the reads and writes."""
- # TODO: use in code
# TODO: use a file locker
yield
def get_meta(self, key):
- """Get metadata value for collection."""
props_path = self._filesystem_path + ".props"
if os.path.exists(props_path):
- with open(props_path, encoding=STORAGE_ENCODING) as prop_file:
- return json.load(prop_file).get(key)
+ with open(props_path, encoding=self.storage_encoding) as prop:
+ return json.load(prop).get(key)
def set_meta(self, key, value):
- """Get metadata value for collection."""
props_path = self._filesystem_path + ".props"
properties = {}
if os.path.exists(props_path):
- with open(props_path, encoding=STORAGE_ENCODING) as prop_file:
- properties.update(json.load(prop_file))
+ with open(props_path, encoding=self.storage_encoding) as prop:
+ properties.update(json.load(prop))
if value:
properties[key] = value
else:
properties.pop(key, None)
- with open(props_path, "w+", encoding=STORAGE_ENCODING) as prop_file:
- json.dump(properties, prop_file)
+ with open(props_path, "w+", encoding=self.storage_encoding) as prop:
+ json.dump(properties, prop)
@property
def last_modified(self):
- """Get the HTTP-datetime of when the collection was modified."""
last = max([os.path.getmtime(self._filesystem_path)] + [
os.path.getmtime(os.path.join(self._filesystem_path, filename))
for filename in os.listdir(self._filesystem_path)] or [0])
@@ -391,7 +460,7 @@ class Collection:
for href in os.listdir(self._filesystem_path):
path = os.path.join(self._filesystem_path, href)
if os.path.isfile(path) and not path.endswith(".props"):
- with open(path, encoding=STORAGE_ENCODING) as fd:
+ with open(path, encoding=self.storage_encoding) as fd:
items.append(vobject.readOne(fd.read()))
if self.get_meta("tag") == "VCALENDAR":
collection = vobject.iCalendar()
@@ -404,7 +473,3 @@ class Collection:
elif self.get_meta("tag") == "VADDRESSBOOK":
return "".join([item.serialize() for item in items])
return ""
-
- @property
- def etag(self):
- return get_etag(self.serialize())
diff --git a/radicale/xmlutils.py b/radicale/xmlutils.py
index 16fb2b98..fb75dffb 100644
--- a/radicale/xmlutils.py
+++ b/radicale/xmlutils.py
@@ -33,7 +33,7 @@ from urllib.parse import unquote, urlparse
import vobject
-from . import client, config, storage
+from . import client, storage
NAMESPACES = {
@@ -80,9 +80,7 @@ def _pretty_xml(element, level=0):
if level and (not element.tail or not element.tail.strip()):
element.tail = i
if not level:
- output_encoding = config.get("encoding", "request")
- return ('\n' + ET.tostring(
- element, "utf-8").decode("utf-8")).encode(output_encoding)
+ return '\n%s' % ET.tostring(element, "unicode")
def _tag(short_name, local):
@@ -112,9 +110,11 @@ def _response(code):
return "HTTP/1.1 %i %s" % (code, client.responses[code])
-def _href(href):
+def _href(collection, href):
"""Return prefixed href."""
- return "%s%s" % (config.get("server", "base_prefix"), href.lstrip("/"))
+ return "%s%s" % (
+ collection.configuration.get("server", "base_prefix"),
+ href.lstrip("/"))
def name_from_path(path, collection):
@@ -183,7 +183,7 @@ def delete(path, collection):
multistatus.append(response)
href = ET.Element(_tag("D", "href"))
- href.text = _href(path)
+ href.text = _href(collection, path)
response.append(href)
status = ET.Element(_tag("D", "status"))
@@ -234,10 +234,13 @@ def propfind(path, xml_request, read_collections, write_collections, user=None):
def _propfind_response(path, item, props, user, write=False):
"""Build and return a PROPFIND response."""
- is_collection = isinstance(item, storage.Collection)
+ # TODO: fix this
+ is_collection = hasattr(item, "list")
if is_collection:
- # TODO: fix this
is_leaf = bool(item.list())
+ collection = item
+ else:
+ collection = item.collection
response = ET.Element(_tag("D", "response"))
@@ -254,7 +257,7 @@ def _propfind_response(path, item, props, user, write=False):
uri = "/".join((path, item.href))
# TODO: fix this
- href.text = _href(uri.replace("//", "/"))
+ href.text = _href(collection, uri.replace("//", "/"))
response.append(href)
propstat404 = ET.Element(_tag("D", "propstat"))
@@ -274,7 +277,7 @@ def _propfind_response(path, item, props, user, write=False):
element.text = item.etag
elif tag == _tag("D", "principal-URL"):
tag = ET.Element(_tag("D", "href"))
- tag.text = _href(path)
+ tag.text = _href(collection, path)
element.append(tag)
elif tag == _tag("D", "getlastmodified"):
element.text = item.last_modified
@@ -283,7 +286,7 @@ def _propfind_response(path, item, props, user, write=False):
_tag("CR", "addressbook-home-set"),
_tag("C", "calendar-home-set")):
tag = ET.Element(_tag("D", "href"))
- tag.text = _href(path)
+ tag.text = _href(collection, path)
element.append(tag)
elif tag == _tag("C", "supported-calendar-component-set"):
# This is not a Todo
@@ -304,7 +307,7 @@ def _propfind_response(path, item, props, user, write=False):
# pylint: enable=W0511
elif tag == _tag("D", "current-user-principal") and user:
tag = ET.Element(_tag("D", "href"))
- tag.text = _href("/%s/" % user)
+ tag.text = _href(collection, "/%s/" % user)
element.append(tag)
elif tag == _tag("D", "current-user-privilege-set"):
privilege = ET.Element(_tag("D", "privilege"))
@@ -381,7 +384,8 @@ def _propfind_response(path, item, props, user, write=False):
# resourcetype must be returned empty for non-collection elements
pass
elif tag == _tag("D", "getcontentlength"):
- element.text = str(item.content_length)
+ encoding = collection.configuration.get("encoding", "request")
+ element.text = str(len(item.serialize().encode(encoding)))
else:
is404 = True
@@ -447,7 +451,7 @@ def proppatch(path, xml_request, collection):
multistatus.append(response)
href = ET.Element(_tag("D", "href"))
- href.text = _href(path)
+ href.text = _href(collection, path)
response.append(href)
for short_name, value in props_to_set.items():
@@ -461,23 +465,6 @@ def proppatch(path, xml_request, collection):
return _pretty_xml(multistatus)
-def put(path, ical_request, collection):
- """Read PUT requests."""
- name = name_from_path(path, collection)
- items = list(vobject.readComponents(ical_request))
- if items:
- if collection.has(name):
- # PUT is modifying an existing item
- return collection.update(name, items[0])
- elif name:
- # PUT is adding a new item
- return collection.upload(name, items[0])
- else:
- # PUT is replacing the whole collection
- collection.delete()
- return storage.Collection.create_collection(path, items)
-
-
def report(path, xml_request, collection):
"""Read and answer REPORT requests.
@@ -496,7 +483,7 @@ def report(path, xml_request, collection):
if root.tag in (_tag("C", "calendar-multiget"),
_tag("CR", "addressbook-multiget")):
# Read rfc4791-7.9 for info
- base_prefix = config.get("server", "base_prefix")
+ base_prefix = collection.configuration.get("server", "base_prefix")
hreferences = set()
for href_element in root.findall(_tag("D", "href")):
href_path = unquote(urlparse(href_element.text).path)
@@ -560,8 +547,7 @@ def report(path, xml_request, collection):
found_props.append(element)
elif tag in (_tag("C", "calendar-data"),
_tag("CR", "address-data")):
- if isinstance(item, (storage.Item, storage.Collection)):
- element.text = item.serialize()
+ element.text = item.serialize()
found_props.append(element)
else:
not_found_props.append(element)
diff --git a/tests/__init__.py b/tests/__init__.py
index 95830223..d483329f 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -25,11 +25,8 @@ from io import BytesIO
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
-os.environ["RADICALE_CONFIG"] = os.path.join(os.path.dirname(
- os.path.dirname(__file__)), "config")
-
-class BaseTest(object):
+class BaseTest:
"""Base class for tests."""
def request(self, method, path, data=None, **args):
"""Send a request."""
diff --git a/tests/custom/auth.py b/tests/custom/auth.py
index f3d678f3..a0d93310 100644
--- a/tests/custom/auth.py
+++ b/tests/custom/auth.py
@@ -23,6 +23,9 @@ Just check username for testing
"""
+from radicale import auth
-def is_authenticated(user, password):
- return user == 'tmp'
+
+class Auth(auth.BaseAuth):
+ def is_authenticated(self, user, password):
+ return user == 'tmp'
diff --git a/tests/custom/storage.py b/tests/custom/storage.py
index 4c5d0720..ece1c17e 100644
--- a/tests/custom/storage.py
+++ b/tests/custom/storage.py
@@ -24,5 +24,10 @@ Copy of filesystem storage backend for testing
from radicale import storage
+# TODO: make something more in this collection (and test it)
class Collection(storage.Collection):
"""Collection stored in a folder."""
+ def __init__(self, path, principal=False):
+ super().__init__(path, principal)
+ self._filesystem_path = storage.path_to_filesystem(
+ self.configuration.get("storage", "test_folder"), self.path)
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 067cffd1..d25208dc 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -22,11 +22,12 @@ Radicale tests with simple requests and authentication.
import base64
import hashlib
+import logging
import os
-import radicale
+import shutil
import tempfile
-from radicale import config, auth
+from radicale import Application, config
from . import BaseTest
@@ -37,38 +38,40 @@ class TestBaseAuthRequests(BaseTest):
We should setup auth for each type before creating the Application object.
"""
-
def setup(self):
- self.userpass = "dG1wOmJlcG8="
+ self.colpath = tempfile.mkdtemp()
def teardown(self):
- config.set("auth", "type", "None")
- radicale.auth.is_authenticated = lambda *_: True
+ shutil.rmtree(self.colpath)
def test_root(self):
"""Htpasswd authentication."""
- self.colpath = tempfile.mkdtemp()
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
with open(htpasswd_file_path, "wb") as fd:
fd.write(b"tmp:{SHA}" + base64.b64encode(
hashlib.sha1(b"bepo").digest()))
- config.set("auth", "type", "htpasswd")
- auth.FILENAME = htpasswd_file_path
- auth.ENCRYPTION = "sha1"
+ configuration = config.load()
+ configuration.set("auth", "type", "htpasswd")
+ configuration.set("auth", "htpasswd_filename", htpasswd_file_path)
+ configuration.set("auth", "htpasswd_encryption", "sha1")
- self.application = radicale.Application()
+ self.application = Application(
+ configuration, logging.getLogger("radicale_test"))
status, headers, answer = self.request(
- "GET", "/", HTTP_AUTHORIZATION=self.userpass)
+ "GET", "/", HTTP_AUTHORIZATION="dG1wOmJlcG8=")
assert status == 200
assert "Radicale works!" in answer
def test_custom(self):
"""Custom authentication."""
- config.set("auth", "type", "tests.custom.auth")
- self.application = radicale.Application()
+ configuration = config.load()
+ configuration.set("auth", "type", "tests.custom.auth")
+ self.application = Application(
+ configuration, logging.getLogger("radicale_test"))
+
status, headers, answer = self.request(
- "GET", "/", HTTP_AUTHORIZATION=self.userpass)
+ "GET", "/", HTTP_AUTHORIZATION="dG1wOmJlcG8=")
assert status == 200
assert "Radicale works!" in answer
diff --git a/tests/test_base.py b/tests/test_base.py
index c0f54736..a02fd539 100644
--- a/tests/test_base.py
+++ b/tests/test_base.py
@@ -19,20 +19,24 @@ Radicale tests with simple requests.
"""
-import radicale
+import logging
import shutil
import tempfile
+from radicale import Application, config
+
from . import BaseTest
from .helpers import get_file_content
-class BaseRequests(object):
+class BaseRequests:
"""Tests with simple requests."""
storage_type = None
def setup(self):
- radicale.config.set("storage", "type", self.storage_type)
+ self.configuration = config.load()
+ self.configuration.set("storage", "type", self.storage_type)
+ self.logger = logging.getLogger("radicale_test")
def test_root(self):
"""GET request at "/"."""
@@ -95,30 +99,25 @@ class TestMultiFileSystem(BaseRequests, BaseTest):
storage_type = "multifilesystem"
def setup(self):
- """Setup function for each test."""
+ super().setup()
self.colpath = tempfile.mkdtemp()
- from radicale import storage
- storage.FOLDER = self.colpath
- self.application = radicale.Application()
+ self.configuration.set("storage", "filesystem_folder", self.colpath)
+ self.application = Application(self.configuration, self.logger)
def teardown(self):
- """Teardown function for each test."""
shutil.rmtree(self.colpath)
class TestCustomStorageSystem(BaseRequests, BaseTest):
"""Base class for custom backend tests."""
- storage_type = "custom"
+ storage_type = "tests.custom.storage"
def setup(self):
- """Setup function for each test."""
super().setup()
self.colpath = tempfile.mkdtemp()
- radicale.config.set("storage", "type", "tests.custom.storage")
- from tests.custom import storage
- storage.FOLDER = self.colpath
- self.application = radicale.Application()
+ self.configuration.set("storage", "filesystem_folder", self.colpath)
+ self.configuration.set("storage", "test_folder", self.colpath)
+ self.application = Application(self.configuration, self.logger)
def teardown(self):
- """Teardown function for each test."""
shutil.rmtree(self.colpath)