1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-16 19:00:54 +00:00
Radicale/radicale/__init__.py
Guillaume Ayoub 6adc7f5fed Enhance collection discovering
When the request path leads to a non-existing item, try to create the
Collection object according to an existing collection at request path's
parent.

This change means that the requests whose path leads to a collection
that doesn't exist (at least MKCOL, MKCALENDAR and PUT) need to rely on
the request path more than on the Collection path. It was already done
for PUT, it's been fixed for MKCOL and MKCALENDAR.

Fix #378.
2016-04-19 10:39:52 +09:00

570 lines
22 KiB
Python

# This file is part of Radicale Server - Calendar Server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2016 Guillaume Ayoub
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
"""
Radicale Server module.
This module offers a WSGI application class.
To use this module, you should take a look at the file ``radicale.py`` that
should have been included in this package.
"""
import os
import pprint
import base64
import socket
import ssl
import wsgiref.simple_server
import re
from http import client
from urllib.parse import unquote, urlparse
from . import auth, config, log, rights, storage, xmlutils
VERSION = "2.0.0-pre"
# Standard "not allowed" response that is returned when an authenticated user
# tries to access information they don't have rights to
NOT_ALLOWED = (client.FORBIDDEN, {}, None)
WELL_KNOWN_RE = re.compile(r"/\.well-known/(carddav|caldav)/?$")
class HTTPServer(wsgiref.simple_server.WSGIServer, object):
"""HTTP server."""
def __init__(self, address, handler, bind_and_activate=True):
"""Create server."""
ipv6 = ":" in address[0]
if ipv6:
self.address_family = socket.AF_INET6
# Do not bind and activate, as we might change socket options
super().__init__(address, handler, False)
if ipv6:
# Only allow IPv6 connections to the IPv6 socket
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
if bind_and_activate:
self.server_bind()
self.server_activate()
class HTTPSServer(HTTPServer):
"""HTTPS server."""
def __init__(self, address, handler):
"""Create server by wrapping HTTP socket in an SSL socket."""
super().__init__(address, handler, False)
# Test if the SSL files can be read
for name in ("certificate", "key"):
filename = config.get("server", name)
try:
open(filename, "r").close()
except IOError as exception:
log.LOGGER.warning(
"Error while reading SSL %s %r: %s" % (
name, filename, exception))
ssl_kwargs = dict(
server_side=True,
certfile=config.get("server", "certificate"),
keyfile=config.get("server", "key"),
ssl_version=getattr(
ssl, config.get("server", "protocol"), ssl.PROTOCOL_SSLv23))
ssl_kwargs["ciphers"] = config.get("server", "ciphers") or None
self.socket = ssl.wrap_socket(self.socket, **ssl_kwargs)
self.server_bind()
self.server_activate()
class RequestHandler(wsgiref.simple_server.WSGIRequestHandler):
"""HTTP requests handler."""
def log_message(self, *args, **kwargs):
"""Disable inner logging management."""
def address_string(self):
"""Client address, formatted for logging."""
if config.getboolean("server", "dns_lookup"):
return (
wsgiref.simple_server.WSGIRequestHandler.address_string(self))
else:
return self.client_address[0]
class Application(object):
"""WSGI application managing collections."""
def __init__(self):
"""Initialize application."""
super().__init__()
auth._load()
storage._load()
rights._load()
self.encoding = config.get("encoding", "request")
if config.getboolean("logging", "full_environment"):
self.headers_log = lambda environ: environ
# This method is overriden in __init__ if full_environment is set
# pylint: disable=E0202
@staticmethod
def headers_log(environ):
"""Remove environment variables from the headers for logging."""
request_environ = dict(environ)
for shell_variable in os.environ:
if shell_variable in request_environ:
del request_environ[shell_variable]
return request_environ
# pylint: enable=E0202
def decode(self, text, environ):
"""Try to magically decode ``text`` according to given ``environ``."""
# List of charsets to try
charsets = []
# First append content charset given in the request
content_type = environ.get("CONTENT_TYPE")
if content_type and "charset=" in content_type:
charsets.append(
content_type.split("charset=")[1].split(";")[0].strip())
# Then append default Radicale charset
charsets.append(self.encoding)
# Then append various fallbacks
charsets.append("utf-8")
charsets.append("iso8859-1")
# Try to decode
for charset in charsets:
try:
return text.decode(charset)
except UnicodeDecodeError:
pass
raise UnicodeDecodeError
def collect_allowed_items(self, items, user):
"""Get items from request that user is allowed to access."""
read_last_collection_allowed = None
write_last_collection_allowed = None
read_allowed_items = []
write_allowed_items = []
for item in items:
if isinstance(item, storage.Collection):
if rights.authorized(user, item, "r"):
log.LOGGER.debug(
"%s has read access to collection %s" %
(user or "Anonymous", item.path or "/"))
read_last_collection_allowed = True
read_allowed_items.append(item)
else:
log.LOGGER.debug(
"%s has NO read access to collection %s" %
(user or "Anonymous", item.path or "/"))
read_last_collection_allowed = False
if rights.authorized(user, item, "w"):
log.LOGGER.debug(
"%s has write access to collection %s" %
(user or "Anonymous", item.path or "/"))
write_last_collection_allowed = True
write_allowed_items.append(item)
else:
log.LOGGER.debug(
"%s has NO write access to collection %s" %
(user or "Anonymous", item.path or "/"))
write_last_collection_allowed = False
else:
# item is not a collection, it's the child of the last
# collection we've met in the loop. Only add this item
# if this last collection was allowed.
if read_last_collection_allowed:
log.LOGGER.debug(
"%s has read access to item %s" %
(user or "Anonymous", item.href))
read_allowed_items.append(item)
else:
log.LOGGER.debug(
"%s has NO read access to item %s" %
(user or "Anonymous", item.href))
if write_last_collection_allowed:
log.LOGGER.debug(
"%s has write access to item %s" %
(user or "Anonymous", item.href))
write_allowed_items.append(item)
else:
log.LOGGER.debug(
"%s has NO write access to item %s" %
(user or "Anonymous", item.href))
return read_allowed_items, write_allowed_items
def __call__(self, environ, start_response):
"""Manage a request."""
log.LOGGER.info("%s request at %s received" % (
environ["REQUEST_METHOD"], environ["PATH_INFO"]))
headers = pprint.pformat(self.headers_log(environ))
log.LOGGER.debug("Request headers:\n%s" % headers)
# Strip base_prefix from request URI
base_prefix = config.get("server", "base_prefix")
if environ["PATH_INFO"].startswith(base_prefix):
environ["PATH_INFO"] = environ["PATH_INFO"][len(base_prefix):]
elif config.get("server", "can_skip_base_prefix"):
log.LOGGER.debug(
"Prefix already stripped from path: %s", environ["PATH_INFO"])
else:
# Request path not starting with base_prefix, not allowed
log.LOGGER.debug(
"Path not starting with prefix: %s", environ["PATH_INFO"])
status, headers, _ = NOT_ALLOWED
start_response(status, list(headers.items()))
return []
# Sanitize request URI
environ["PATH_INFO"] = storage.sanitize_path(
unquote(environ["PATH_INFO"]))
log.LOGGER.debug("Sanitized path: %s", environ["PATH_INFO"])
path = environ["PATH_INFO"]
# Get function corresponding to method
function = getattr(self, "do_%s" % environ["REQUEST_METHOD"].upper())
# Ask authentication backend to check rights
authorization = environ.get("HTTP_AUTHORIZATION", None)
if authorization:
authorization = authorization.lstrip("Basic").strip()
user, password = self.decode(base64.b64decode(
authorization.encode("ascii")), environ).split(":", 1)
else:
user = environ.get("REMOTE_USER")
password = None
well_known = WELL_KNOWN_RE.match(path)
if well_known:
redirect = config.get("well-known", well_known.group(1))
try:
redirect = redirect % ({"user": user} if user else {})
except KeyError:
status = client.UNAUTHORIZED
headers = {
"WWW-Authenticate":
"Basic realm=\"%s\"" % config.get("server", "realm")}
log.LOGGER.info(
"Refused /.well-known/ redirection to anonymous user")
else:
status = client.SEE_OTHER
log.LOGGER.info("/.well-known/ redirection to: %s" % redirect)
headers = {"Location": redirect}
status = "%i %s" % (
status, client.responses.get(status, "Unknown"))
start_response(status, list(headers.items()))
return []
is_authenticated = auth.is_authenticated(user, password)
is_valid_user = is_authenticated or not user
if is_valid_user:
items = storage.Collection.discover(
path, environ.get("HTTP_DEPTH", "0"))
read_allowed_items, write_allowed_items = (
self.collect_allowed_items(items, user))
else:
read_allowed_items, write_allowed_items = None, None
# Get content
content_length = int(environ.get("CONTENT_LENGTH") or 0)
if content_length:
content = self.decode(
environ["wsgi.input"].read(content_length), environ)
log.LOGGER.debug("Request content:\n%s" % content)
else:
content = None
if is_valid_user and (
(read_allowed_items or write_allowed_items) or
(is_authenticated and function == self.do_PROPFIND) or
function == self.do_OPTIONS):
status, headers, answer = function(
environ, read_allowed_items, write_allowed_items, content,
user)
else:
status, headers, answer = NOT_ALLOWED
if ((status, headers, answer) == NOT_ALLOWED and
not auth.is_authenticated(user, password) and
config.get("auth", "type") != "None"):
# Unknown or unauthorized user
log.LOGGER.info("%s refused" % (user or "Anonymous user"))
status = client.UNAUTHORIZED
headers = {
"WWW-Authenticate":
"Basic realm=\"%s\"" % config.get("server", "realm")}
answer = None
# Set content length
if answer:
log.LOGGER.debug(
"Response content:\n%s" % self.decode(answer, environ))
headers["Content-Length"] = str(len(answer))
if config.has_section("headers"):
for key in config.options("headers"):
headers[key] = config.get("headers", key)
# Start response
status = "%i %s" % (status, client.responses.get(status, "Unknown"))
log.LOGGER.debug("Answer status: %s" % status)
start_response(status, list(headers.items()))
# Return response content
return [answer] if answer else []
# All these functions must have the same parameters, some are useless
# pylint: disable=W0612,W0613,R0201
def do_DELETE(self, environ, read_collections, write_collections, content,
user):
"""Manage DELETE request."""
if not write_collections:
return NOT_ALLOWED
collection = write_collections[0]
if collection.path == environ["PATH_INFO"].strip("/"):
# Path matching the collection, the collection must be deleted
item = collection
else:
# Try to get an item matching the path
name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
item = collection.get(name)
if item:
if_match = environ.get("HTTP_IF_MATCH", "*")
if if_match in ("*", item.etag):
# No ETag precondition or precondition verified, delete item
answer = xmlutils.delete(environ["PATH_INFO"], collection)
return client.OK, {}, answer
# No item or ETag precondition not verified, do not delete item
return client.PRECONDITION_FAILED, {}, None
def do_GET(self, environ, read_collections, write_collections, content,
user):
"""Manage GET request."""
# Display a "Radicale works!" message if the root URL is requested
if environ["PATH_INFO"] == "/":
headers = {"Content-type": "text/html"}
answer = b"<!DOCTYPE html>\n<title>Radicale</title>Radicale works!"
return client.OK, headers, answer
if not read_collections:
return NOT_ALLOWED
collection = read_collections[0]
item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
if item_name:
# Get collection item
item = collection.get(item_name)
if item:
answer_text = item.serialize()
etag = item.etag
else:
return client.NOT_FOUND, {}, None
else:
# Get whole collection
answer_text = collection.serialize()
if not answer_text:
log.LOGGER.debug("Collection at %s unknown" % environ["PATH_INFO"])
return client.NOT_FOUND, {}, None
etag = collection.etag
headers = {
"Content-Type": storage.MIMETYPES[collection.get_meta("tag")],
"Last-Modified": collection.last_modified,
"ETag": etag}
answer = answer_text.encode(self.encoding)
return client.OK, headers, answer
def do_HEAD(self, environ, read_collections, write_collections, content,
user):
"""Manage HEAD request."""
status, headers, answer = self.do_GET(
environ, read_collections, write_collections, content, user)
return status, headers, None
def do_MKCALENDAR(self, environ, read_collections, write_collections,
content, user):
"""Manage MKCALENDAR request."""
if not write_collections:
return NOT_ALLOWED
collection = write_collections[0]
props = xmlutils.props_from_request(content)
# TODO: use this?
# timezone = props.get("C:calendar-timezone")
collection = storage.Collection.create_collection(
environ["PATH_INFO"], tag="VCALENDAR")
for key, value in props.items():
collection.set_meta(key, value)
return client.CREATED, {}, None
def do_MKCOL(self, environ, read_collections, write_collections, content,
user):
"""Manage MKCOL request."""
if not write_collections:
return NOT_ALLOWED
collection = write_collections[0]
props = xmlutils.props_from_request(content)
collection = storage.Collection.create_collection(environ["PATH_INFO"])
for key, value in props.items():
collection.set_meta(key, value)
return client.CREATED, {}, None
def do_MOVE(self, environ, read_collections, write_collections, content,
user):
"""Manage MOVE request."""
if not write_collections:
return NOT_ALLOWED
from_collection = write_collections[0]
from_name = xmlutils.name_from_path(
environ["PATH_INFO"], from_collection)
item = from_collection.get(from_name)
if item:
# Move the item
to_url_parts = urlparse(environ["HTTP_DESTINATION"])
if to_url_parts.netloc == environ["HTTP_HOST"]:
to_url = to_url_parts.path
to_path, to_name = to_url.rstrip("/").rsplit("/", 1)
for to_collection in storage.Collection.discover(
to_path, depth="0"):
if to_collection in write_collections:
to_collection.upload(to_name, item)
from_collection.delete(from_name)
return client.CREATED, {}, None
else:
return NOT_ALLOWED
else:
# Remote destination server, not supported
return client.BAD_GATEWAY, {}, None
else:
# No item found
return client.GONE, {}, None
def do_OPTIONS(self, environ, read_collections, write_collections,
content, user):
"""Manage OPTIONS request."""
headers = {
"Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, "
"OPTIONS, PROPFIND, PROPPATCH, PUT, REPORT"),
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"}
return client.OK, headers, None
def do_PROPFIND(self, environ, read_collections, write_collections,
content, user):
"""Manage PROPFIND request."""
if not read_collections:
return client.NOT_FOUND, {}, None
headers = {
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
"Content-Type": "text/xml"}
answer = xmlutils.propfind(
environ["PATH_INFO"], content, read_collections, write_collections, user)
return client.MULTI_STATUS, headers, answer
def do_PROPPATCH(self, environ, read_collections, write_collections,
content, user):
"""Manage PROPPATCH request."""
if not write_collections:
return NOT_ALLOWED
collection = write_collections[0]
answer = xmlutils.proppatch(
environ["PATH_INFO"], content, collection)
headers = {
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
"Content-Type": "text/xml"}
return client.MULTI_STATUS, headers, answer
def do_PUT(self, environ, read_collections, write_collections, content,
user):
"""Manage PUT request."""
if not write_collections:
return NOT_ALLOWED
collection = write_collections[0]
content_type = environ.get("CONTENT_TYPE")
if content_type:
tags = {value: key for key, value in storage.MIMETYPES.items()}
collection.set_meta("tag", tags[content_type.split(";")[0]])
headers = {}
item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
item = collection.get(item_name)
etag = environ.get("HTTP_IF_MATCH", "")
match = environ.get("HTTP_IF_NONE_MATCH", "") == "*"
if (not item and not etag) or (
item and ((etag or item.etag) == item.etag) and not match):
# PUT allowed in 3 cases
# Case 1: No item and no ETag precondition: Add new item
# Case 2: Item and ETag precondition verified: Modify item
# Case 3: Item and no Etag precondition: Force modifying item
xmlutils.put(environ["PATH_INFO"], content, collection)
status = client.CREATED
# Try to return the etag in the header.
# If the added item doesn't have the same name as the one given
# by the client, then there's no obvious way to generate an
# etag, we can safely ignore it.
new_item = collection.get(item_name)
if new_item:
headers["ETag"] = new_item.etag
else:
# PUT rejected in all other cases
status = client.PRECONDITION_FAILED
return status, headers, None
def do_REPORT(self, environ, read_collections, write_collections, content,
user):
"""Manage REPORT request."""
if not read_collections:
return NOT_ALLOWED
collection = read_collections[0]
headers = {"Content-Type": "text/xml"}
answer = xmlutils.report(environ["PATH_INFO"], content, collection)
return client.MULTI_STATUS, headers, answer
# pylint: enable=W0612,W0613,R0201