1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-06-26 16:45:52 +00:00

Defer connection to systemd journal

This commit is contained in:
Unrud 2023-03-22 10:23:54 +01:00 committed by Unrud
parent e23f0283b0
commit 5070533a0b
2 changed files with 39 additions and 27 deletions

View file

@ -26,6 +26,7 @@ Log messages are sent to the first available target of:
""" """
import contextlib import contextlib
import io
import logging import logging
import os import os
import socket import socket
@ -33,8 +34,8 @@ import struct
import sys import sys
import threading import threading
import time import time
from typing import (Any, Callable, ClassVar, Dict, Iterator, Optional, Tuple, from typing import (Any, Callable, ClassVar, Dict, Iterator, Mapping, Optional,
Union) Tuple, Union, cast)
from radicale import types from radicale import types
@ -85,6 +86,7 @@ class ThreadedStreamHandler(logging.Handler):
_streams: Dict[int, types.ErrorStream] _streams: Dict[int, types.ErrorStream]
_journal_stream_id: Optional[Tuple[int, int]] _journal_stream_id: Optional[Tuple[int, int]]
_journal_socket: Optional[socket.socket] _journal_socket: Optional[socket.socket]
_journal_socket_failed: bool
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
@ -92,42 +94,52 @@ class ThreadedStreamHandler(logging.Handler):
self._journal_stream_id = None self._journal_stream_id = None
with contextlib.suppress(TypeError, ValueError): with contextlib.suppress(TypeError, ValueError):
dev, inode = os.environ.get("JOURNAL_STREAM", "").split(":", 1) dev, inode = os.environ.get("JOURNAL_STREAM", "").split(":", 1)
self._journal_stream_id = int(dev), int(inode) self._journal_stream_id = (int(dev), int(inode))
self._journal_socket = None self._journal_socket = None
if self._journal_stream_id and hasattr(socket, "AF_UNIX"): self._journal_socket_failed = False
journal_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
journal_socket.connect("/run/systemd/journal/socket")
except OSError:
journal_socket.close()
else:
self._journal_socket = journal_socket
def _detect_journal(self, stream): def _detect_journal(self, stream: types.ErrorStream) -> bool:
if not self._journal_stream_id: if not self._journal_stream_id or not isinstance(stream, io.IOBase):
return False return False
try: try:
stat = os.fstat(stream.fileno()) stat = os.fstat(stream.fileno())
except Exception: except OSError:
return False return False
return self._journal_stream_id == (stat.st_dev, stat.st_ino) return self._journal_stream_id == (stat.st_dev, stat.st_ino)
@staticmethod @staticmethod
def _encode_journal(data): def _encode_journal(data: Mapping[str, Optional[Union[str, int]]]
) -> bytes:
msg = b"" msg = b""
for key, value in data.items(): for key, value in data.items():
if value is None: if value is None:
continue continue
key = key.encode() keyb = key.encode()
value = str(value).encode() valueb = str(value).encode()
if b"\n" in value: if b"\n" in valueb:
msg += (key + b"\n" + msg += (keyb + b"\n" +
struct.pack("<Q", len(value)) + value + b"\n") struct.pack("<Q", len(valueb)) + valueb + b"\n")
else: else:
msg += key + b"=" + value + b"\n" msg += keyb + b"=" + valueb + b"\n"
return msg return msg
def _emit_journal(self, record): def _try_emit_journal(self, record: logging.LogRecord) -> bool:
if not self._journal_socket:
# Try to connect to systemd journal socket
if self._journal_socket_failed or not hasattr(socket, "AF_UNIX"):
return False
journal_socket = None
try:
journal_socket = socket.socket(
socket.AF_UNIX, socket.SOCK_DGRAM)
journal_socket.connect("/run/systemd/journal/socket")
except OSError:
self._journal_socket_failed = True
if journal_socket:
journal_socket.close()
return False
self._journal_socket = journal_socket
priority = {"DEBUG": 7, priority = {"DEBUG": 7,
"INFO": 6, "INFO": 6,
"WARNING": 4, "WARNING": 4,
@ -136,7 +148,7 @@ class ThreadedStreamHandler(logging.Handler):
timestamp = time.strftime("%Y-%m-%dT%H:%M:%S.%%03dZ", timestamp = time.strftime("%Y-%m-%dT%H:%M:%S.%%03dZ",
time.gmtime(record.created)) % record.msecs time.gmtime(record.created)) % record.msecs
data = {"PRIORITY": priority, data = {"PRIORITY": priority,
"TID": record.tid, "TID": cast(Optional[int], getattr(record, "tid", None)),
"SYSLOG_IDENTIFIER": record.name, "SYSLOG_IDENTIFIER": record.name,
"SYSLOG_FACILITY": 1, "SYSLOG_FACILITY": 1,
"SYSLOG_PID": record.process, "SYSLOG_PID": record.process,
@ -146,12 +158,12 @@ class ThreadedStreamHandler(logging.Handler):
"CODE_FUNC": record.funcName, "CODE_FUNC": record.funcName,
"MESSAGE": self.format(record)} "MESSAGE": self.format(record)}
self._journal_socket.sendall(self._encode_journal(data)) self._journal_socket.sendall(self._encode_journal(data))
return True
def emit(self, record: logging.LogRecord) -> None: def emit(self, record: logging.LogRecord) -> None:
try: try:
stream = self._streams.get(threading.get_ident(), sys.stderr) stream = self._streams.get(threading.get_ident(), sys.stderr)
if self._journal_socket and self._detect_journal(stream): if self._detect_journal(stream) and self._try_emit_journal(record):
self._emit_journal(record)
return return
msg = self.format(record) msg = self.format(record)
stream.write(msg) stream.write(msg)

View file

@ -50,8 +50,8 @@ if sys.version_info >= (3, 8):
@runtime_checkable @runtime_checkable
class ErrorStream(Protocol): class ErrorStream(Protocol):
def flush(self) -> None: ... def flush(self) -> object: ...
def write(self, s: str) -> None: ... def write(self, s: str) -> object: ...
else: else:
ErrorStream = Any ErrorStream = Any
InputStream = Any InputStream = Any