1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-10 18:40:53 +00:00

Synced config.py with origin

This commit is contained in:
Tuna Celik 2023-02-10 23:32:46 +01:00
parent 4a0b2e8791
commit 3e6d8db98d

View file

@ -1,4 +1,4 @@
# This file is part of Radicale Server - Calendar Server # This file is part of Radicale - CalDAV and CardDAV server
# Copyright © 2008-2017 Guillaume Ayoub # Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2008 Nicolas Kandel # Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter # Copyright © 2008 Pascal Halter
@ -29,24 +29,27 @@ import contextlib
import math import math
import os import os
import string import string
import sys
from collections import OrderedDict from collections import OrderedDict
from configparser import RawConfigParser from configparser import RawConfigParser
from typing import (Any, Callable, ClassVar, Iterable, List, Optional,
Sequence, Tuple, TypeVar, Union)
from radicale import auth, hook, rights, storage, web from radicale import auth, hook, rights, storage, types, web
DEFAULT_CONFIG_PATH = os.pathsep.join([ DEFAULT_CONFIG_PATH: str = os.pathsep.join([
"?/etc/radicale/config", "?/etc/radicale/config",
"?~/.config/radicale/config"]) "?~/.config/radicale/config"])
def positive_int(value): def positive_int(value: Any) -> int:
value = int(value) value = int(value)
if value < 0: if value < 0:
raise ValueError("value is negative: %d" % value) raise ValueError("value is negative: %d" % value)
return value return value
def positive_float(value): def positive_float(value: Any) -> float:
value = float(value) value = float(value)
if not math.isfinite(value): if not math.isfinite(value):
raise ValueError("value is infinite") raise ValueError("value is infinite")
@ -57,22 +60,22 @@ def positive_float(value):
return value return value
def logging_level(value): def logging_level(value: Any) -> str:
if value not in ("debug", "info", "warning", "error", "critical"): if value not in ("debug", "info", "warning", "error", "critical"):
raise ValueError("unsupported level: %r" % value) raise ValueError("unsupported level: %r" % value)
return value return value
def filepath(value): def filepath(value: Any) -> str:
if not value: if not value:
return "" return ""
value = os.path.expanduser(value) value = os.path.expanduser(value)
if os.name == "nt": if sys.platform == "win32":
value = os.path.expandvars(value) value = os.path.expandvars(value)
return os.path.abspath(value) return os.path.abspath(value)
def list_of_ip_address(value): def list_of_ip_address(value: Any) -> List[Tuple[str, int]]:
def ip_address(value): def ip_address(value):
try: try:
address, port = value.rsplit(":", 1) address, port = value.rsplit(":", 1)
@ -82,30 +85,30 @@ def list_of_ip_address(value):
return [ip_address(s) for s in value.split(",")] return [ip_address(s) for s in value.split(",")]
def str_or_callable(value): def str_or_callable(value: Any) -> Union[str, Callable]:
if callable(value): if callable(value):
return value return value
return str(value) return str(value)
def unspecified_type(value): def unspecified_type(value: Any) -> Any:
return value return value
def _convert_to_bool(value): def _convert_to_bool(value: Any) -> bool:
if value.lower() not in RawConfigParser.BOOLEAN_STATES: if value.lower() not in RawConfigParser.BOOLEAN_STATES:
raise ValueError("Not a boolean: %r" % value) raise ValueError("not a boolean: %r" % value)
return RawConfigParser.BOOLEAN_STATES[value.lower()] return RawConfigParser.BOOLEAN_STATES[value.lower()]
INTERNAL_OPTIONS = ("_allow_extra",) INTERNAL_OPTIONS: Sequence[str] = ("_allow_extra",)
# Default configuration # Default configuration
DEFAULT_CONFIG_SCHEMA = OrderedDict([ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
("server", OrderedDict([ ("server", OrderedDict([
("hosts", { ("hosts", {
"value": "localhost:5232", "value": "localhost:5232",
"help": "set server hostnames including ports", "help": "set server hostnames including ports",
"aliases": ["-H", "--hosts"], "aliases": ("-H", "--hosts",),
"type": list_of_ip_address}), "type": list_of_ip_address}),
("max_connections", { ("max_connections", {
"value": "8", "value": "8",
@ -118,27 +121,27 @@ DEFAULT_CONFIG_SCHEMA = OrderedDict([
("timeout", { ("timeout", {
"value": "30", "value": "30",
"help": "socket timeout", "help": "socket timeout",
"type": positive_int}), "type": positive_float}),
("ssl", { ("ssl", {
"value": "False", "value": "False",
"help": "use SSL connection", "help": "use SSL connection",
"aliases": ["-s", "--ssl"], "aliases": ("-s", "--ssl",),
"opposite_aliases": ["-S", "--no-ssl"], "opposite_aliases": ("-S", "--no-ssl",),
"type": bool}), "type": bool}),
("certificate", { ("certificate", {
"value": "/etc/ssl/radicale.cert.pem", "value": "/etc/ssl/radicale.cert.pem",
"help": "set certificate file", "help": "set certificate file",
"aliases": ["-c", "--certificate"], "aliases": ("-c", "--certificate",),
"type": filepath}), "type": filepath}),
("key", { ("key", {
"value": "/etc/ssl/radicale.key.pem", "value": "/etc/ssl/radicale.key.pem",
"help": "set private key file", "help": "set private key file",
"aliases": ["-k", "--key"], "aliases": ("-k", "--key",),
"type": filepath}), "type": filepath}),
("certificate_authority", { ("certificate_authority", {
"value": "", "value": "",
"help": "set CA certificate for validating clients", "help": "set CA certificate for validating clients",
"aliases": ["--certificate-authority"], "aliases": ("--certificate-authority",),
"type": filepath}), "type": filepath}),
("_internal_server", { ("_internal_server", {
"value": "False", "value": "False",
@ -240,7 +243,8 @@ DEFAULT_CONFIG_SCHEMA = OrderedDict([
("_allow_extra", str)]))]) ("_allow_extra", str)]))])
def parse_compound_paths(*compound_paths): def parse_compound_paths(*compound_paths: Optional[str]
) -> List[Tuple[str, bool]]:
"""Parse a compound path and return the individual paths. """Parse a compound path and return the individual paths.
Paths in a compound path are joined by ``os.pathsep``. If a path starts Paths in a compound path are joined by ``os.pathsep``. If a path starts
with ``?`` the return value ``IGNORE_IF_MISSING`` is set. with ``?`` the return value ``IGNORE_IF_MISSING`` is set.
@ -266,7 +270,8 @@ def parse_compound_paths(*compound_paths):
return paths return paths
def load(paths=()): def load(paths: Optional[Iterable[Tuple[str, bool]]] = None
) -> "Configuration":
""" """
Create instance of ``Configuration`` for use with Create instance of ``Configuration`` for use with
``radicale.app.Application``. ``radicale.app.Application``.
@ -279,29 +284,40 @@ def load(paths=()):
The configuration can later be changed with ``Configuration.update()``. The configuration can later be changed with ``Configuration.update()``.
""" """
if paths is None:
paths = []
configuration = Configuration(DEFAULT_CONFIG_SCHEMA) configuration = Configuration(DEFAULT_CONFIG_SCHEMA)
for path, ignore_if_missing in paths: for path, ignore_if_missing in paths:
parser = RawConfigParser() parser = RawConfigParser()
config_source = "config file %r" % path config_source = "config file %r" % path
config: types.CONFIG
try: try:
if not parser.read(path): with open(path, "r") as f:
config = Configuration.SOURCE_MISSING parser.read_file(f)
if not ignore_if_missing:
raise RuntimeError("No such file: %r" % path)
else:
config = {s: {o: parser[s][o] for o in parser.options(s)} config = {s: {o: parser[s][o] for o in parser.options(s)}
for s in parser.sections()} for s in parser.sections()}
except Exception as e: except Exception as e:
raise RuntimeError( if not (ignore_if_missing and isinstance(e, (
"Failed to load %s: %s" % (config_source, e)) from e FileNotFoundError, NotADirectoryError, PermissionError))):
raise RuntimeError("Failed to load %s: %s" % (config_source, e)
) from e
config = Configuration.SOURCE_MISSING
configuration.update(config, config_source) configuration.update(config, config_source)
return configuration return configuration
class Configuration: _Self = TypeVar("_Self", bound="Configuration")
SOURCE_MISSING = {}
def __init__(self, schema):
class Configuration:
SOURCE_MISSING: ClassVar[types.CONFIG] = {}
_schema: types.CONFIG_SCHEMA
_values: types.MUTABLE_CONFIG
_configs: List[Tuple[types.CONFIG, str, bool]]
def __init__(self, schema: types.CONFIG_SCHEMA) -> None:
"""Initialize configuration. """Initialize configuration.
``schema`` a dict that describes the configuration format. ``schema`` a dict that describes the configuration format.
@ -322,7 +338,8 @@ class Configuration:
for section in self._schema} for section in self._schema}
self.update(default, "default config", privileged=True) self.update(default, "default config", privileged=True)
def update(self, config, source=None, privileged=False): def update(self, config: types.CONFIG, source: Optional[str] = None,
privileged: bool = False) -> None:
"""Update the configuration. """Update the configuration.
``config`` a dict of the format {SECTION: {OPTION: VALUE, ...}, ...}. ``config`` a dict of the format {SECTION: {OPTION: VALUE, ...}, ...}.
@ -336,8 +353,9 @@ class Configuration:
``privileged`` allows updating sections and options starting with "_". ``privileged`` allows updating sections and options starting with "_".
""" """
source = source or "unspecified config" if source is None:
new_values = {} source = "unspecified config"
new_values: types.MUTABLE_CONFIG = {}
for section in config: for section in config:
if (section not in self._schema or if (section not in self._schema or
section.startswith("_") and not privileged): section.startswith("_") and not privileged):
@ -376,40 +394,41 @@ class Configuration:
self._values[section] = self._values.get(section, {}) self._values[section] = self._values.get(section, {})
self._values[section].update(new_values[section]) self._values[section].update(new_values[section])
def get(self, section, option): def get(self, section: str, option: str) -> Any:
"""Get the value of ``option`` in ``section``.""" """Get the value of ``option`` in ``section``."""
with contextlib.suppress(KeyError): with contextlib.suppress(KeyError):
return self._values[section][option] return self._values[section][option]
raise KeyError(section, option) raise KeyError(section, option)
def get_raw(self, section, option): def get_raw(self, section: str, option: str) -> Any:
"""Get the raw value of ``option`` in ``section``.""" """Get the raw value of ``option`` in ``section``."""
for config, _, _ in reversed(self._configs): for config, _, _ in reversed(self._configs):
if option in config.get(section, {}): if option in config.get(section, {}):
return config[section][option] return config[section][option]
raise KeyError(section, option) raise KeyError(section, option)
def get_source(self, section, option): def get_source(self, section: str, option: str) -> str:
"""Get the source that provides ``option`` in ``section``.""" """Get the source that provides ``option`` in ``section``."""
for config, source, _ in reversed(self._configs): for config, source, _ in reversed(self._configs):
if option in config.get(section, {}): if option in config.get(section, {}):
return source return source
raise KeyError(section, option) raise KeyError(section, option)
def sections(self): def sections(self) -> List[str]:
"""List all sections.""" """List all sections."""
return self._values.keys() return list(self._values.keys())
def options(self, section): def options(self, section: str) -> List[str]:
"""List all options in ``section``""" """List all options in ``section``"""
return self._values[section].keys() return list(self._values[section].keys())
def sources(self): def sources(self) -> List[Tuple[str, bool]]:
"""List all config sources.""" """List all config sources."""
return [(source, config is self.SOURCE_MISSING) for return [(source, config is self.SOURCE_MISSING) for
config, source, _ in self._configs] config, source, _ in self._configs]
def copy(self, plugin_schema=None): def copy(self: _Self, plugin_schema: Optional[types.CONFIG_SCHEMA] = None
) -> _Self:
"""Create a copy of the configuration """Create a copy of the configuration
``plugin_schema`` is a optional dict that contains additional options ``plugin_schema`` is a optional dict that contains additional options
@ -419,20 +438,23 @@ class Configuration:
if plugin_schema is None: if plugin_schema is None:
schema = self._schema schema = self._schema
else: else:
schema = self._schema.copy() new_schema = dict(self._schema)
for section, options in plugin_schema.items(): for section, options in plugin_schema.items():
if (section not in schema or "type" not in schema[section] or if (section not in new_schema or
"internal" not in schema[section]["type"]): "type" not in new_schema[section] or
"internal" not in new_schema[section]["type"]):
raise ValueError("not a plugin section: %r" % section) raise ValueError("not a plugin section: %r" % section)
schema[section] = schema[section].copy() new_section = dict(new_schema[section])
schema[section]["type"] = schema[section]["type"].copy() new_type = dict(new_section["type"])
schema[section]["type"]["internal"] = [ new_type["internal"] = (self.get(section, "type"),)
self.get(section, "type")] new_section["type"] = new_type
for option, value in options.items(): for option, value in options.items():
if option in schema[section]: if option in new_section:
raise ValueError("option already exists in %r: %r" % ( raise ValueError("option already exists in %r: %r" %
section, option)) (section, option))
schema[section][option] = value new_section[option] = value
new_schema[section] = new_section
schema = new_schema
copy = type(self)(schema) copy = type(self)(schema)
for config, source, privileged in self._configs: for config, source, privileged in self._configs:
copy.update(config, source, privileged) copy.update(config, source, privileged)