diff --git a/radicale/hook/rabbitmq/__init__.py b/radicale/hook/rabbitmq/__init__.py index 58789e2b..13c36abb 100644 --- a/radicale/hook/rabbitmq/__init__.py +++ b/radicale/hook/rabbitmq/__init__.py @@ -1,30 +1,36 @@ import pika +from pika.exceptions import ChannelWrongStateError from radicale import hook from radicale.hook import HookNotificationItem +from radicale.log import logger class Hook(hook.BaseHook): def __init__(self, configuration): super().__init__(configuration) - endpoint = configuration.get("hook", "rabbitmq_endpoint") + self._endpoint = configuration.get("hook", "rabbitmq_endpoint") self._topic = configuration.get("hook", "rabbitmq_topic") self._encoding = configuration.get("encoding", "stock") - self._make_connection_synced(endpoint) - self._make_declare_queue_synced(self._topic) + self._make_connection_synced() + self._make_declare_queue_synced() - def _make_connection_synced(self, endpoint): - parameters = pika.URLParameters(endpoint) + def _make_connection_synced(self): + parameters = pika.URLParameters(self._endpoint) connection = pika.BlockingConnection(parameters) self._channel = connection.channel() - def _make_declare_queue_synced(self, topic): - self._channel.queue_declare(queue=topic, durable=True) + def _make_declare_queue_synced(self): + self._channel.queue_declare(queue=self._topic, durable=True) def notify(self, notification_item): if isinstance(notification_item, HookNotificationItem): + self._notify(notification_item, True) + + def _notify(self, notification_item, recall): + try: self._channel.basic_publish( exchange='', routing_key=self._topic, @@ -32,3 +38,11 @@ class Hook(hook.BaseHook): encoding=self._encoding ) ) + except ChannelWrongStateError as e: + if recall: + self._make_connection_synced() + self._notify(notification_item, False) + return + logger.error("An exception is occurred while " + "publishing hook notification item: %s", + e, exc_info=True)