diff --git a/radicale/log.py b/radicale/log.py index 49143457..64304c1f 100644 --- a/radicale/log.py +++ b/radicale/log.py @@ -26,6 +26,7 @@ Log messages are sent to the first available target of: """ import contextlib +import io import logging import os import socket @@ -33,8 +34,8 @@ import struct import sys import threading import time -from typing import (Any, Callable, ClassVar, Dict, Iterator, Optional, Tuple, - Union) +from typing import (Any, Callable, ClassVar, Dict, Iterator, Mapping, Optional, + Tuple, Union, cast) from radicale import types @@ -85,6 +86,7 @@ class ThreadedStreamHandler(logging.Handler): _streams: Dict[int, types.ErrorStream] _journal_stream_id: Optional[Tuple[int, int]] _journal_socket: Optional[socket.socket] + _journal_socket_failed: bool def __init__(self) -> None: super().__init__() @@ -92,42 +94,52 @@ class ThreadedStreamHandler(logging.Handler): self._journal_stream_id = None with contextlib.suppress(TypeError, ValueError): dev, inode = os.environ.get("JOURNAL_STREAM", "").split(":", 1) - self._journal_stream_id = int(dev), int(inode) + self._journal_stream_id = (int(dev), int(inode)) self._journal_socket = None - if self._journal_stream_id and hasattr(socket, "AF_UNIX"): - journal_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - try: - journal_socket.connect("/run/systemd/journal/socket") - except OSError: - journal_socket.close() - else: - self._journal_socket = journal_socket + self._journal_socket_failed = False - def _detect_journal(self, stream): - if not self._journal_stream_id: + def _detect_journal(self, stream: types.ErrorStream) -> bool: + if not self._journal_stream_id or not isinstance(stream, io.IOBase): return False try: stat = os.fstat(stream.fileno()) - except Exception: + except OSError: return False return self._journal_stream_id == (stat.st_dev, stat.st_ino) @staticmethod - def _encode_journal(data): + def _encode_journal(data: Mapping[str, Optional[Union[str, int]]] + ) -> bytes: msg = b"" for key, value in data.items(): if value is None: continue - key = key.encode() - value = str(value).encode() - if b"\n" in value: - msg += (key + b"\n" + - struct.pack(" bool: + if not self._journal_socket: + # Try to connect to systemd journal socket + if self._journal_socket_failed or not hasattr(socket, "AF_UNIX"): + return False + journal_socket = None + try: + journal_socket = socket.socket( + socket.AF_UNIX, socket.SOCK_DGRAM) + journal_socket.connect("/run/systemd/journal/socket") + except OSError: + self._journal_socket_failed = True + if journal_socket: + journal_socket.close() + return False + self._journal_socket = journal_socket + priority = {"DEBUG": 7, "INFO": 6, "WARNING": 4, @@ -136,7 +148,7 @@ class ThreadedStreamHandler(logging.Handler): timestamp = time.strftime("%Y-%m-%dT%H:%M:%S.%%03dZ", time.gmtime(record.created)) % record.msecs data = {"PRIORITY": priority, - "TID": record.tid, + "TID": cast(Optional[int], getattr(record, "tid", None)), "SYSLOG_IDENTIFIER": record.name, "SYSLOG_FACILITY": 1, "SYSLOG_PID": record.process, @@ -146,12 +158,12 @@ class ThreadedStreamHandler(logging.Handler): "CODE_FUNC": record.funcName, "MESSAGE": self.format(record)} self._journal_socket.sendall(self._encode_journal(data)) + return True def emit(self, record: logging.LogRecord) -> None: try: stream = self._streams.get(threading.get_ident(), sys.stderr) - if self._journal_socket and self._detect_journal(stream): - self._emit_journal(record) + if self._detect_journal(stream) and self._try_emit_journal(record): return msg = self.format(record) stream.write(msg) diff --git a/radicale/types.py b/radicale/types.py index 0eb3fd6a..c7e1904a 100644 --- a/radicale/types.py +++ b/radicale/types.py @@ -50,8 +50,8 @@ if sys.version_info >= (3, 8): @runtime_checkable class ErrorStream(Protocol): - def flush(self) -> None: ... - def write(self, s: str) -> None: ... + def flush(self) -> object: ... + def write(self, s: str) -> object: ... else: ErrorStream = Any InputStream = Any