diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index ebabc55b..07e35346 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -891,7 +891,41 @@ An example to relax the same-origin policy: Access-Control-Allow-Origin = * ``` -### Supported Clients +#### hook +##### type + +Hook binding for event changes and deletion notifications. + +Available types: + +`none` +: Disabled. Nothing will be notified. + +`rabbitmq` +: Push the message to the rabbitmq server. + +Default: `none` + +#### rabbitmq_endpoint + +End-point address for rabbitmq server. +Ex: amqp://user:password@localhost:5672/ + +Default: + +#### rabbitmq_topic + +RabbitMQ topic to publish message. + +Default: + +#### rabbitmq_queue_type + +RabbitMQ queue type for the topic. + +Default: classic + +## Supported Clients Radicale has been tested with: diff --git a/config b/config index 7c77f5f9..cfd7412e 100644 --- a/config +++ b/config @@ -118,3 +118,12 @@ # Additional HTTP headers #Access-Control-Allow-Origin = * + +[hook] + +# Hook types +# Value: none | rabbitmq +#type = none +#rabbitmq_endpoint = +#rabbitmq_topic = +#rabbitmq_queue_type = classic \ No newline at end of file diff --git a/radicale/app/base.py b/radicale/app/base.py index 4316117d..f4a37025 100644 --- a/radicale/app/base.py +++ b/radicale/app/base.py @@ -21,8 +21,8 @@ import sys import xml.etree.ElementTree as ET from typing import Optional -from radicale import (auth, config, httputils, pathutils, rights, storage, - types, web, xmlutils) +from radicale import (auth, config, hook, httputils, pathutils, rights, + storage, types, web, xmlutils) from radicale.log import logger # HACK: https://github.com/tiran/defusedxml/issues/54 @@ -38,6 +38,7 @@ class ApplicationBase: _rights: rights.BaseRights _web: web.BaseWeb _encoding: str + _hook: hook.BaseHook def __init__(self, configuration: config.Configuration) -> None: self.configuration = configuration @@ -46,6 +47,7 @@ class ApplicationBase: self._rights = rights.load(configuration) self._web = web.load(configuration) self._encoding = configuration.get("encoding", "request") + self._hook = hook.load(configuration) def _read_xml_request_body(self, environ: types.WSGIEnviron ) -> Optional[ET.Element]: diff --git a/radicale/app/delete.py b/radicale/app/delete.py index 69ae5ab4..3d687065 100644 --- a/radicale/app/delete.py +++ b/radicale/app/delete.py @@ -23,6 +23,7 @@ from typing import Optional from radicale import httputils, storage, types, xmlutils from radicale.app.base import Access, ApplicationBase +from radicale.hook import HookNotificationItem, HookNotificationItemTypes def xml_delete(base_prefix: str, path: str, collection: storage.BaseCollection, @@ -67,12 +68,30 @@ class ApplicationPartDelete(ApplicationBase): if if_match not in ("*", item.etag): # ETag precondition not verified, do not delete item return httputils.PRECONDITION_FAILED + hook_notification_item_list = [] if isinstance(item, storage.BaseCollection): + for i in item.get_all(): + hook_notification_item_list.append( + HookNotificationItem( + HookNotificationItemTypes.DELETE, + access.path, + i.uid + ) + ) xml_answer = xml_delete(base_prefix, path, item) else: assert item.collection is not None assert item.href is not None + hook_notification_item_list.append( + HookNotificationItem( + HookNotificationItemTypes.DELETE, + access.path, + item.uid + ) + ) xml_answer = xml_delete( base_prefix, path, item.collection, item.href) + for notification_item in hook_notification_item_list: + self._hook.notify(notification_item) headers = {"Content-Type": "text/xml; charset=%s" % self._encoding} return client.OK, headers, self._xml_response(xml_answer) diff --git a/radicale/app/proppatch.py b/radicale/app/proppatch.py index 934f53b7..c15fddfe 100644 --- a/radicale/app/proppatch.py +++ b/radicale/app/proppatch.py @@ -22,9 +22,12 @@ import xml.etree.ElementTree as ET from http import client from typing import Dict, Optional, cast +import defusedxml.ElementTree as DefusedET + import radicale.item as radicale_item from radicale import httputils, storage, types, xmlutils from radicale.app.base import Access, ApplicationBase +from radicale.hook import HookNotificationItem, HookNotificationItemTypes from radicale.log import logger @@ -93,6 +96,16 @@ class ApplicationPartProppatch(ApplicationBase): try: xml_answer = xml_proppatch(base_prefix, path, xml_content, item) + if xml_content is not None: + hook_notification_item = HookNotificationItem( + HookNotificationItemTypes.CPATCH, + access.path, + DefusedET.tostring( + xml_content, + encoding=self._encoding + ).decode(encoding=self._encoding) + ) + self._hook.notify(hook_notification_item) except ValueError as e: logger.warning( "Bad PROPPATCH request on %r: %s", path, e, exc_info=True) diff --git a/radicale/app/put.py b/radicale/app/put.py index ec495878..cf2a15fb 100644 --- a/radicale/app/put.py +++ b/radicale/app/put.py @@ -30,6 +30,7 @@ import vobject import radicale.item as radicale_item from radicale import httputils, pathutils, rights, storage, types, xmlutils from radicale.app.base import Access, ApplicationBase +from radicale.hook import HookNotificationItem, HookNotificationItemTypes from radicale.log import logger MIMETYPE_TAGS: Mapping[str, str] = {value: key for key, value in @@ -206,6 +207,13 @@ class ApplicationPartPut(ApplicationBase): try: etag = self._storage.create_collection( path, prepared_items, props).etag + for item in prepared_items: + hook_notification_item = HookNotificationItem( + HookNotificationItemTypes.UPSERT, + access.path, + item.serialize() + ) + self._hook.notify(hook_notification_item) except ValueError as e: logger.warning( "Bad PUT request on %r: %s", path, e, exc_info=True) @@ -222,6 +230,12 @@ class ApplicationPartPut(ApplicationBase): href = posixpath.basename(pathutils.strip_path(path)) try: etag = parent_item.upload(href, prepared_item).etag + hook_notification_item = HookNotificationItem( + HookNotificationItemTypes.UPSERT, + access.path, + prepared_item.serialize() + ) + self._hook.notify(hook_notification_item) except ValueError as e: logger.warning( "Bad PUT request on %r: %s", path, e, exc_info=True) diff --git a/radicale/config.py b/radicale/config.py index 02a0b381..b5f45086 100644 --- a/radicale/config.py +++ b/radicale/config.py @@ -35,7 +35,7 @@ from configparser import RawConfigParser from typing import (Any, Callable, ClassVar, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union) -from radicale import auth, rights, storage, types, web +from radicale import auth, hook, rights, storage, types, web DEFAULT_CONFIG_PATH: str = os.pathsep.join([ "?/etc/radicale/config", @@ -210,6 +210,24 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([ "value": "True", "help": "sync all changes to filesystem during requests", "type": bool})])), + ("hook", OrderedDict([ + ("type", { + "value": "none", + "help": "hook backend", + "type": str, + "internal": hook.INTERNAL_TYPES}), + ("rabbitmq_endpoint", { + "value": "", + "help": "endpoint where rabbitmq server is running", + "type": str}), + ("rabbitmq_topic", { + "value": "", + "help": "topic to declare queue", + "type": str}), + ("rabbitmq_queue_type", { + "value": "", + "help": "queue type for topic declaration", + "type": str})])), ("web", OrderedDict([ ("type", { "value": "internal", diff --git a/radicale/hook/__init__.py b/radicale/hook/__init__.py new file mode 100644 index 00000000..dc6b74c5 --- /dev/null +++ b/radicale/hook/__init__.py @@ -0,0 +1,60 @@ +import json +from enum import Enum +from typing import Sequence + +from radicale import pathutils, utils + +INTERNAL_TYPES: Sequence[str] = ("none", "rabbitmq") + + +def load(configuration): + """Load the storage module chosen in configuration.""" + return utils.load_plugin( + INTERNAL_TYPES, "hook", "Hook", BaseHook, configuration) + + +class BaseHook: + def __init__(self, configuration): + """Initialize BaseHook. + + ``configuration`` see ``radicale.config`` module. + The ``configuration`` must not change during the lifetime of + this object, it is kept as an internal reference. + + """ + self.configuration = configuration + + def notify(self, notification_item): + """Upload a new or replace an existing item.""" + raise NotImplementedError + + +class HookNotificationItemTypes(Enum): + CPATCH = "cpatch" + UPSERT = "upsert" + DELETE = "delete" + + +def _cleanup(path): + sane_path = pathutils.strip_path(path) + attributes = sane_path.split("/") if sane_path else [] + + if len(attributes) < 2: + return "" + return attributes[0] + "/" + attributes[1] + + +class HookNotificationItem: + + def __init__(self, notification_item_type, path, content): + self.type = notification_item_type.value + self.point = _cleanup(path) + self.content = content + + def to_json(self): + return json.dumps( + self, + default=lambda o: o.__dict__, + sort_keys=True, + indent=4 + ) diff --git a/radicale/hook/none.py b/radicale/hook/none.py new file mode 100644 index 00000000..b770ab67 --- /dev/null +++ b/radicale/hook/none.py @@ -0,0 +1,6 @@ +from radicale import hook + + +class Hook(hook.BaseHook): + def notify(self, notification_item): + """Notify nothing. Empty hook.""" diff --git a/radicale/hook/rabbitmq/__init__.py b/radicale/hook/rabbitmq/__init__.py new file mode 100644 index 00000000..2323ed43 --- /dev/null +++ b/radicale/hook/rabbitmq/__init__.py @@ -0,0 +1,50 @@ +import pika +from pika.exceptions import ChannelWrongStateError, StreamLostError + +from radicale import hook +from radicale.hook import HookNotificationItem +from radicale.log import logger + + +class Hook(hook.BaseHook): + + def __init__(self, configuration): + super().__init__(configuration) + self._endpoint = configuration.get("hook", "rabbitmq_endpoint") + self._topic = configuration.get("hook", "rabbitmq_topic") + self._queue_type = configuration.get("hook", "rabbitmq_queue_type") + self._encoding = configuration.get("encoding", "stock") + + self._make_connection_synced() + self._make_declare_queue_synced() + + def _make_connection_synced(self): + parameters = pika.URLParameters(self._endpoint) + connection = pika.BlockingConnection(parameters) + self._channel = connection.channel() + + def _make_declare_queue_synced(self): + self._channel.queue_declare(queue=self._topic, durable=True, arguments={"x-queue-type": self._queue_type}) + + def notify(self, notification_item): + if isinstance(notification_item, HookNotificationItem): + self._notify(notification_item, True) + + def _notify(self, notification_item, recall): + try: + self._channel.basic_publish( + exchange='', + routing_key=self._topic, + body=notification_item.to_json().encode( + encoding=self._encoding + ) + ) + except Exception as e: + if (isinstance(e, ChannelWrongStateError) or + isinstance(e, StreamLostError)) and recall: + self._make_connection_synced() + self._notify(notification_item, False) + return + logger.error("An exception occurred during " + "publishing hook notification item: %s", + e, exc_info=True) diff --git a/setup.py b/setup.py index 144e77c5..f9fa4e0d 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ web_files = ["web/internal_data/css/icon.png", "web/internal_data/index.html"] install_requires = ["defusedxml", "passlib", "vobject>=0.9.6", - "python-dateutil>=2.7.3", + "python-dateutil>=2.7.3", "pika>=1.1.0", "setuptools; python_version<'3.9'"] bcrypt_requires = ["passlib[bcrypt]", "bcrypt"] # typeguard requires pytest<7