From 3e6d8db98dca334589d0962cbda4e75e5607b9f1 Mon Sep 17 00:00:00 2001 From: Tuna Celik Date: Fri, 10 Feb 2023 23:32:46 +0100 Subject: [PATCH] Synced config.py with origin --- radicale/config.py | 136 ++++++++++++++++++++++++++------------------- 1 file changed, 79 insertions(+), 57 deletions(-) diff --git a/radicale/config.py b/radicale/config.py index 2d2bf748..63378efa 100644 --- a/radicale/config.py +++ b/radicale/config.py @@ -1,4 +1,4 @@ -# This file is part of Radicale Server - Calendar Server +# This file is part of Radicale - CalDAV and CardDAV server # Copyright © 2008-2017 Guillaume Ayoub # Copyright © 2008 Nicolas Kandel # Copyright © 2008 Pascal Halter @@ -29,24 +29,27 @@ import contextlib import math import os import string +import sys from collections import OrderedDict from configparser import RawConfigParser +from typing import (Any, Callable, ClassVar, Iterable, List, Optional, + Sequence, Tuple, TypeVar, Union) -from radicale import auth, hook, rights, storage, web +from radicale import auth, hook, rights, storage, types, web -DEFAULT_CONFIG_PATH = os.pathsep.join([ +DEFAULT_CONFIG_PATH: str = os.pathsep.join([ "?/etc/radicale/config", "?~/.config/radicale/config"]) -def positive_int(value): +def positive_int(value: Any) -> int: value = int(value) if value < 0: raise ValueError("value is negative: %d" % value) return value -def positive_float(value): +def positive_float(value: Any) -> float: value = float(value) if not math.isfinite(value): raise ValueError("value is infinite") @@ -57,22 +60,22 @@ def positive_float(value): return value -def logging_level(value): +def logging_level(value: Any) -> str: if value not in ("debug", "info", "warning", "error", "critical"): raise ValueError("unsupported level: %r" % value) return value -def filepath(value): +def filepath(value: Any) -> str: if not value: return "" value = os.path.expanduser(value) - if os.name == "nt": + if sys.platform == "win32": value = os.path.expandvars(value) return os.path.abspath(value) -def list_of_ip_address(value): +def list_of_ip_address(value: Any) -> List[Tuple[str, int]]: def ip_address(value): try: address, port = value.rsplit(":", 1) @@ -82,30 +85,30 @@ def list_of_ip_address(value): return [ip_address(s) for s in value.split(",")] -def str_or_callable(value): +def str_or_callable(value: Any) -> Union[str, Callable]: if callable(value): return value return str(value) -def unspecified_type(value): +def unspecified_type(value: Any) -> Any: return value -def _convert_to_bool(value): +def _convert_to_bool(value: Any) -> bool: if value.lower() not in RawConfigParser.BOOLEAN_STATES: - raise ValueError("Not a boolean: %r" % value) + raise ValueError("not a boolean: %r" % value) return RawConfigParser.BOOLEAN_STATES[value.lower()] -INTERNAL_OPTIONS = ("_allow_extra",) +INTERNAL_OPTIONS: Sequence[str] = ("_allow_extra",) # Default configuration -DEFAULT_CONFIG_SCHEMA = OrderedDict([ +DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([ ("server", OrderedDict([ ("hosts", { "value": "localhost:5232", "help": "set server hostnames including ports", - "aliases": ["-H", "--hosts"], + "aliases": ("-H", "--hosts",), "type": list_of_ip_address}), ("max_connections", { "value": "8", @@ -118,27 +121,27 @@ DEFAULT_CONFIG_SCHEMA = OrderedDict([ ("timeout", { "value": "30", "help": "socket timeout", - "type": positive_int}), + "type": positive_float}), ("ssl", { "value": "False", "help": "use SSL connection", - "aliases": ["-s", "--ssl"], - "opposite_aliases": ["-S", "--no-ssl"], + "aliases": ("-s", "--ssl",), + "opposite_aliases": ("-S", "--no-ssl",), "type": bool}), ("certificate", { "value": "/etc/ssl/radicale.cert.pem", "help": "set certificate file", - "aliases": ["-c", "--certificate"], + "aliases": ("-c", "--certificate",), "type": filepath}), ("key", { "value": "/etc/ssl/radicale.key.pem", "help": "set private key file", - "aliases": ["-k", "--key"], + "aliases": ("-k", "--key",), "type": filepath}), ("certificate_authority", { "value": "", "help": "set CA certificate for validating clients", - "aliases": ["--certificate-authority"], + "aliases": ("--certificate-authority",), "type": filepath}), ("_internal_server", { "value": "False", @@ -240,7 +243,8 @@ DEFAULT_CONFIG_SCHEMA = OrderedDict([ ("_allow_extra", str)]))]) -def parse_compound_paths(*compound_paths): +def parse_compound_paths(*compound_paths: Optional[str] + ) -> List[Tuple[str, bool]]: """Parse a compound path and return the individual paths. Paths in a compound path are joined by ``os.pathsep``. If a path starts with ``?`` the return value ``IGNORE_IF_MISSING`` is set. @@ -266,7 +270,8 @@ def parse_compound_paths(*compound_paths): return paths -def load(paths=()): +def load(paths: Optional[Iterable[Tuple[str, bool]]] = None + ) -> "Configuration": """ Create instance of ``Configuration`` for use with ``radicale.app.Application``. @@ -279,29 +284,40 @@ def load(paths=()): The configuration can later be changed with ``Configuration.update()``. """ + if paths is None: + paths = [] configuration = Configuration(DEFAULT_CONFIG_SCHEMA) for path, ignore_if_missing in paths: parser = RawConfigParser() config_source = "config file %r" % path + config: types.CONFIG try: - if not parser.read(path): - config = Configuration.SOURCE_MISSING - if not ignore_if_missing: - raise RuntimeError("No such file: %r" % path) - else: + with open(path, "r") as f: + parser.read_file(f) config = {s: {o: parser[s][o] for o in parser.options(s)} for s in parser.sections()} except Exception as e: - raise RuntimeError( - "Failed to load %s: %s" % (config_source, e)) from e + if not (ignore_if_missing and isinstance(e, ( + FileNotFoundError, NotADirectoryError, PermissionError))): + raise RuntimeError("Failed to load %s: %s" % (config_source, e) + ) from e + config = Configuration.SOURCE_MISSING configuration.update(config, config_source) return configuration -class Configuration: - SOURCE_MISSING = {} +_Self = TypeVar("_Self", bound="Configuration") - def __init__(self, schema): + +class Configuration: + + SOURCE_MISSING: ClassVar[types.CONFIG] = {} + + _schema: types.CONFIG_SCHEMA + _values: types.MUTABLE_CONFIG + _configs: List[Tuple[types.CONFIG, str, bool]] + + def __init__(self, schema: types.CONFIG_SCHEMA) -> None: """Initialize configuration. ``schema`` a dict that describes the configuration format. @@ -322,7 +338,8 @@ class Configuration: for section in self._schema} self.update(default, "default config", privileged=True) - def update(self, config, source=None, privileged=False): + def update(self, config: types.CONFIG, source: Optional[str] = None, + privileged: bool = False) -> None: """Update the configuration. ``config`` a dict of the format {SECTION: {OPTION: VALUE, ...}, ...}. @@ -336,8 +353,9 @@ class Configuration: ``privileged`` allows updating sections and options starting with "_". """ - source = source or "unspecified config" - new_values = {} + if source is None: + source = "unspecified config" + new_values: types.MUTABLE_CONFIG = {} for section in config: if (section not in self._schema or section.startswith("_") and not privileged): @@ -376,40 +394,41 @@ class Configuration: self._values[section] = self._values.get(section, {}) self._values[section].update(new_values[section]) - def get(self, section, option): + def get(self, section: str, option: str) -> Any: """Get the value of ``option`` in ``section``.""" with contextlib.suppress(KeyError): return self._values[section][option] raise KeyError(section, option) - def get_raw(self, section, option): + def get_raw(self, section: str, option: str) -> Any: """Get the raw value of ``option`` in ``section``.""" for config, _, _ in reversed(self._configs): if option in config.get(section, {}): return config[section][option] raise KeyError(section, option) - def get_source(self, section, option): + def get_source(self, section: str, option: str) -> str: """Get the source that provides ``option`` in ``section``.""" for config, source, _ in reversed(self._configs): if option in config.get(section, {}): return source raise KeyError(section, option) - def sections(self): + def sections(self) -> List[str]: """List all sections.""" - return self._values.keys() + return list(self._values.keys()) - def options(self, section): + def options(self, section: str) -> List[str]: """List all options in ``section``""" - return self._values[section].keys() + return list(self._values[section].keys()) - def sources(self): + def sources(self) -> List[Tuple[str, bool]]: """List all config sources.""" return [(source, config is self.SOURCE_MISSING) for config, source, _ in self._configs] - def copy(self, plugin_schema=None): + def copy(self: _Self, plugin_schema: Optional[types.CONFIG_SCHEMA] = None + ) -> _Self: """Create a copy of the configuration ``plugin_schema`` is a optional dict that contains additional options @@ -419,20 +438,23 @@ class Configuration: if plugin_schema is None: schema = self._schema else: - schema = self._schema.copy() + new_schema = dict(self._schema) for section, options in plugin_schema.items(): - if (section not in schema or "type" not in schema[section] or - "internal" not in schema[section]["type"]): + if (section not in new_schema or + "type" not in new_schema[section] or + "internal" not in new_schema[section]["type"]): raise ValueError("not a plugin section: %r" % section) - schema[section] = schema[section].copy() - schema[section]["type"] = schema[section]["type"].copy() - schema[section]["type"]["internal"] = [ - self.get(section, "type")] + new_section = dict(new_schema[section]) + new_type = dict(new_section["type"]) + new_type["internal"] = (self.get(section, "type"),) + new_section["type"] = new_type for option, value in options.items(): - if option in schema[section]: - raise ValueError("option already exists in %r: %r" % ( - section, option)) - schema[section][option] = value + if option in new_section: + raise ValueError("option already exists in %r: %r" % + (section, option)) + new_section[option] = value + new_schema[section] = new_section + schema = new_schema copy = type(self)(schema) for config, source, privileged in self._configs: copy.update(config, source, privileged)