From 5253a464abaeefccfb593f5e00ea6595d99d1381 Mon Sep 17 00:00:00 2001 From: Tuna Celik Date: Mon, 17 Aug 2020 02:05:02 +0200 Subject: [PATCH] Addd hook capability --- radicale/app/__init__.py | 3 +- radicale/app/delete.py | 4 +++ radicale/app/put.py | 4 +++ radicale/config.py | 16 +++++++++- radicale/hook/__init__.py | 25 ++++++++++++++++ radicale/hook/none.py | 6 ++++ radicale/hook/rabbitmq/__init__.py | 47 ++++++++++++++++++++++++++++++ 7 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 radicale/hook/__init__.py create mode 100644 radicale/hook/none.py create mode 100644 radicale/hook/rabbitmq/__init__.py diff --git a/radicale/app/__init__.py b/radicale/app/__init__.py index 85cbcc2b..21c48709 100644 --- a/radicale/app/__init__.py +++ b/radicale/app/__init__.py @@ -41,7 +41,7 @@ import defusedxml.ElementTree as DefusedET import pkg_resources from radicale import (auth, httputils, log, pathutils, rights, storage, web, - xmlutils) + xmlutils, hook) from radicale.app.delete import ApplicationDeleteMixin from radicale.app.get import ApplicationGetMixin from radicale.app.head import ApplicationHeadMixin @@ -82,6 +82,7 @@ class Application( self._rights = rights.load(configuration) self._web = web.load(configuration) self._encoding = configuration.get("encoding", "request") + self._hook = hook.load(configuration) def _headers_log(self, environ): """Sanitize headers for logging.""" diff --git a/radicale/app/delete.py b/radicale/app/delete.py index 8d08d30f..c3a9b12a 100644 --- a/radicale/app/delete.py +++ b/radicale/app/delete.py @@ -21,6 +21,7 @@ from http import client from xml.etree import ElementTree as ET from radicale import app, httputils, storage, xmlutils +from radicale.hook.rabbitmq import QueueItem, QueueItemTypes def xml_delete(base_prefix, path, collection, href=None): @@ -64,8 +65,11 @@ class ApplicationDeleteMixin: return httputils.PRECONDITION_FAILED if isinstance(item, storage.BaseCollection): xml_answer = xml_delete(base_prefix, path, item) + for item in item.get_all(): + self._hook.notify(QueueItem(QueueItemTypes.DELETE, item.uid)) else: xml_answer = xml_delete( base_prefix, path, item.collection, item.href) + self._hook.notify(QueueItem(QueueItemTypes.DELETE, item.uid)) headers = {"Content-Type": "text/xml; charset=%s" % self._encoding} return client.OK, headers, self._write_xml_content(xml_answer) diff --git a/radicale/app/put.py b/radicale/app/put.py index 786ac00c..8f675255 100644 --- a/radicale/app/put.py +++ b/radicale/app/put.py @@ -29,6 +29,7 @@ from radicale import app, httputils from radicale import item as radicale_item from radicale import pathutils, rights, storage, xmlutils from radicale.log import logger +from radicale.hook.rabbitmq import QueueItem, QueueItemTypes MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()} @@ -193,6 +194,8 @@ class ApplicationPutMixin: try: etag = self._storage.create_collection( path, prepared_items, props).etag + for item in prepared_items: + self._hook.notify(QueueItem(QueueItemTypes.UPSERT, item.serialize())) except ValueError as e: logger.warning( "Bad PUT request on %r: %s", path, e, exc_info=True) @@ -208,6 +211,7 @@ class ApplicationPutMixin: href = posixpath.basename(pathutils.strip_path(path)) try: etag = parent_item.upload(href, prepared_item).etag + self._hook.notify(QueueItem(QueueItemTypes.UPSERT, prepared_item.serialize())) except ValueError as e: logger.warning( "Bad PUT request on %r: %s", path, e, exc_info=True) diff --git a/radicale/config.py b/radicale/config.py index d30e677c..7f57560c 100644 --- a/radicale/config.py +++ b/radicale/config.py @@ -32,7 +32,7 @@ import string from collections import OrderedDict from configparser import RawConfigParser -from radicale import auth, rights, storage, web +from radicale import auth, rights, storage, web, hook DEFAULT_CONFIG_PATH = os.pathsep.join([ "?/etc/radicale/config", @@ -207,6 +207,20 @@ DEFAULT_CONFIG_SCHEMA = OrderedDict([ "value": "True", "help": "sync all changes to filesystem during requests", "type": bool})])), + ("hook", OrderedDict([ + ("type", { + "value": "none", + "help": "hook backend", + "type": str, + "internal": hook.INTERNAL_TYPES}), + ("rabbitmq_endpoint", { + "value": "", + "help": "endpoint where rabbitmq server is running", + "type": str}), + ("rabbitmq_topic", { + "value": "", + "help": "topic to declare queue", + "type": str})])), ("web", OrderedDict([ ("type", { "value": "internal", diff --git a/radicale/hook/__init__.py b/radicale/hook/__init__.py new file mode 100644 index 00000000..1dd1d1b8 --- /dev/null +++ b/radicale/hook/__init__.py @@ -0,0 +1,25 @@ +from radicale import utils + +INTERNAL_TYPES = ("none", "rabbitmq") + + +def load(configuration): + """Load the storage module chosen in configuration.""" + return utils.load_plugin( + INTERNAL_TYPES, "hook", "Hook", configuration) + + +class BaseHook: + def __init__(self, configuration): + """Initialize BaseHook. + + ``configuration`` see ``radicale.config`` module. + The ``configuration`` must not change during the lifetime of + this object, it is kept as an internal reference. + + """ + self.configuration = configuration + + def notify(self, content): + """Upload a new or replace an existing item.""" + raise NotImplementedError diff --git a/radicale/hook/none.py b/radicale/hook/none.py new file mode 100644 index 00000000..2e5eb049 --- /dev/null +++ b/radicale/hook/none.py @@ -0,0 +1,6 @@ +from radicale import hook + + +class Hook(hook.BaseHook): + def notify(self, content): + """Notify nothing. Empty hook.""" diff --git a/radicale/hook/rabbitmq/__init__.py b/radicale/hook/rabbitmq/__init__.py new file mode 100644 index 00000000..43568125 --- /dev/null +++ b/radicale/hook/rabbitmq/__init__.py @@ -0,0 +1,47 @@ +import pika +import json + +from radicale import hook +from enum import Enum + + +class Hook(hook.BaseHook): + + def __init__(self, configuration): + super().__init__(configuration) + endpoint = configuration.get("hook", "rabbitmq_endpoint") + self.topic = configuration.get("hook", "rabbitmq_topic") + self.encoding = configuration.get("encoding", "stock") + + self._make_connection_synced(endpoint) + self._make_declare_queue_synced(self.topic) + + def _make_connection_synced(self, endpoint): + parameters = pika.URLParameters(endpoint) + self.connection = pika.BlockingConnection(parameters) + self.channel = self.connection.channel() + + def _make_declare_queue_synced(self, topic): + self.channel.queue_declare(queue=topic) + + def notify(self, content): + if not isinstance(content, QueueItem): + return + self.channel.basic_publish(exchange='', + routing_key=self.topic, + body=content.to_json().encode(encoding=self.encoding)) + + +class QueueItemTypes(Enum): + UPSERT = "upsert" + DELETE = "delete" + + +class QueueItem: + + def __init__(self, queue_item_type, content): + self.type = queue_item_type.value + self.content = content + + def to_json(self): + return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)