From 9276c6546237100fd374347ff9987e7b0591e48e Mon Sep 17 00:00:00 2001 From: Unrud Date: Tue, 21 Mar 2023 00:03:29 +0100 Subject: [PATCH] Upgrade to journald's native journal protocol --- radicale/log.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/radicale/log.py b/radicale/log.py index eaa842bf..d45a6ef2 100644 --- a/radicale/log.py +++ b/radicale/log.py @@ -25,11 +25,16 @@ Log messages are sent to the first available target of: """ +import contextlib import logging import os +import socket +import struct import sys import threading -from typing import Any, Callable, ClassVar, Dict, Iterator, Union +import time +from typing import (Any, Callable, ClassVar, Dict, Iterator, Optional, Tuple, + Union) from radicale import types @@ -65,6 +70,9 @@ class IdentLogRecordFactory: if current_thread.name and main_thread != current_thread: ident += "/%s" % current_thread.name record.ident = ident # type:ignore[attr-defined] + record.tid = None # type:ignore[attr-defined] + if sys.version_info >= (3, 8): + record.tid = current_thread.native_id return record @@ -75,14 +83,76 @@ class ThreadedStreamHandler(logging.Handler): terminator: ClassVar[str] = "\n" _streams: Dict[int, types.ErrorStream] + _journal_stream_id: Optional[Tuple[int, int]] + _journal_socket: Optional[socket.socket] def __init__(self) -> None: super().__init__() self._streams = {} + self._journal_stream_id = None + with contextlib.suppress(TypeError, ValueError): + dev, inode = os.environ.get("JOURNAL_STREAM", "").split(":", 1) + self._journal_stream_id = int(dev), int(inode) + self._journal_socket = None + if self._journal_stream_id and hasattr(socket, "AF_UNIX"): + journal_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + journal_socket.connect("/run/systemd/journal/socket") + except OSError: + journal_socket.close() + else: + self._journal_socket = journal_socket + + def _detect_journal(self, stream): + if not self._journal_stream_id: + return False + try: + stat = os.fstat(stream.fileno()) + except Exception: + return False + return self._journal_stream_id == (stat.st_dev, stat.st_ino) + + @staticmethod + def _encode_journal(data): + msg = b"" + for key, value in data.items(): + if key is None: + continue + key = key.encode() + value = str(value).encode() + if b"\n" in value: + msg += (key + b"\n" + + struct.pack(" None: try: stream = self._streams.get(threading.get_ident(), sys.stderr) + if self._journal_socket and self._detect_journal(stream): + self._emit_journal(record) + return msg = self.format(record) stream.write(msg) stream.write(self.terminator)