diff --git a/radicale/tests/__init__.py b/radicale/tests/__init__.py
index f8900b1b..00e2b15d 100644
--- a/radicale/tests/__init__.py
+++ b/radicale/tests/__init__.py
@@ -22,13 +22,19 @@ Tests for Radicale.
import base64
import logging
+import shutil
import sys
+import tempfile
+import xml.etree.ElementTree as ET
from io import BytesIO
+from typing import Any, Dict, List, Optional, Tuple, Union
import defusedxml.ElementTree as DefusedET
import radicale
-from radicale import xmlutils
+from radicale import app, config, xmlutils
+
+RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]]
# Enable debug output
radicale.log.logger.setLevel(logging.DEBUG)
@@ -37,40 +43,70 @@ radicale.log.logger.setLevel(logging.DEBUG)
class BaseTest:
"""Base class for tests."""
- def request(self, method, path, data=None, login=None, **args):
+ colpath: str
+ configuration: config.Configuration
+ application: app.Application
+
+ def setup(self) -> None:
+ self.configuration = config.load()
+ self.colpath = tempfile.mkdtemp()
+ self.configuration.update({
+ "storage": {"filesystem_folder": self.colpath,
+ # Disable syncing to disk for better performance
+ "_filesystem_fsync": "False"},
+ # Set incorrect authentication delay to a short duration
+ "auth": {"delay": "0.001"}}, "test", privileged=True)
+ self.application = app.Application(self.configuration)
+
+ def teardown(self) -> None:
+ shutil.rmtree(self.colpath)
+
+ def request(self, method: str, path: str, data: Optional[str] = None,
+ **kwargs) -> Tuple[int, Dict[str, str], str]:
"""Send a request."""
- for key in args:
- args[key.upper()] = args[key]
+ login = kwargs.pop("login", None)
+ if login is not None and not isinstance(login, str):
+ raise TypeError("login argument must be %r, not %r" %
+ (str, type(login)))
+ environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
+ for k, v in environ.items():
+ if not isinstance(v, str):
+ raise TypeError("type of %r is %r, expected %r" %
+ (k, type(v), str))
+ encoding: str = self.configuration.get("encoding", "request")
if login:
- args["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
- login.encode()).decode()
- args["REQUEST_METHOD"] = method.upper()
- args["PATH_INFO"] = path
+ environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
+ login.encode(encoding)).decode()
+ environ["REQUEST_METHOD"] = method.upper()
+ environ["PATH_INFO"] = path
if data:
- data = data.encode()
- args["wsgi.input"] = BytesIO(data)
- args["CONTENT_LENGTH"] = str(len(data))
- args["wsgi.errors"] = sys.stderr
+ data_bytes = data.encode(encoding)
+ environ["wsgi.input"] = BytesIO(data_bytes)
+ environ["CONTENT_LENGTH"] = str(len(data_bytes))
+ environ["wsgi.errors"] = sys.stderr
status = headers = None
- def start_response(status_, headers_):
+ def start_response(status_: str, headers_: List[Tuple[str, str]]
+ ) -> None:
nonlocal status, headers
status = status_
headers = headers_
- answer = self.application(args, start_response)
+ answers = list(self.application(environ, start_response))
+ assert status is not None and headers is not None
return (int(status.split()[0]), dict(headers),
- answer[0].decode() if answer else None)
+ answers[0].decode() if answers else "")
@staticmethod
- def parse_responses(text):
+ def parse_responses(text: str) -> RESPONSES:
xml = DefusedET.fromstring(text)
assert xml.tag == xmlutils.make_clark("D:multistatus")
- path_responses = {}
+ path_responses: Dict[str, Union[
+ int, Dict[str, Tuple[int, ET.Element]]]] = {}
for response in xml.findall(xmlutils.make_clark("D:response")):
href = response.find(xmlutils.make_clark("D:href"))
assert href.text not in path_responses
- prop_respones = {}
+ prop_respones: Dict[str, Tuple[int, ET.Element]] = {}
for propstat in response.findall(
xmlutils.make_clark("D:propstat")):
status = propstat.find(xmlutils.make_clark("D:status"))
@@ -92,70 +128,90 @@ class BaseTest:
return path_responses
@staticmethod
- def _check_status(status, good_status, check=True):
- if check is True:
- assert status == good_status
- elif check is not False:
- assert status == check
+ def _check_status(status: int, good_status: int,
+ check: Union[bool, int] = True) -> bool:
+ if check is not False:
+ expected = good_status if check is True else check
+ assert status == expected, "%d != %d" % (status, expected)
return status == good_status
- def get(self, path, check=True, **args):
- status, _, answer = self.request("GET", path, **args)
+ def get(self, path: str, check: Union[bool, int] = True, **kwargs
+ ) -> Tuple[int, str]:
+ assert "data" not in kwargs
+ status, _, answer = self.request("GET", path, **kwargs)
self._check_status(status, 200, check)
return status, answer
- def post(self, path, data=None, check=True, **args):
- status, _, answer = self.request("POST", path, data, **args)
+ def post(self, path: str, data: str = None, check: Union[bool, int] = True,
+ **kwargs) -> Tuple[int, str]:
+ status, _, answer = self.request("POST", path, data, **kwargs)
self._check_status(status, 200, check)
return status, answer
- def put(self, path, data, check=True, **args):
- status, _, answer = self.request("PUT", path, data, **args)
+ def put(self, path: str, data: str, check: Union[bool, int] = True,
+ **kwargs) -> Tuple[int, str]:
+ status, _, answer = self.request("PUT", path, data, **kwargs)
self._check_status(status, 201, check)
return status, answer
- def propfind(self, path, data=None, check=True, **args):
- status, _, answer = self.request("PROPFIND", path, data, **args)
+ def propfind(self, path: str, data: Optional[str] = None,
+ check: Union[bool, int] = True, **kwargs
+ ) -> Tuple[int, RESPONSES]:
+ status, _, answer = self.request("PROPFIND", path, data, **kwargs)
if not self._check_status(status, 207, check):
- return status, None
+ return status, {}
+ assert answer is not None
responses = self.parse_responses(answer)
- if args.get("HTTP_DEPTH", "0") == "0":
+ if kwargs.get("HTTP_DEPTH", "0") == "0":
assert len(responses) == 1 and path in responses
return status, responses
- def proppatch(self, path, data=None, check=True, **args):
- status, _, answer = self.request("PROPPATCH", path, data, **args)
+ def proppatch(self, path: str, data: Optional[str] = None,
+ check: Union[bool, int] = True, **kwargs
+ ) -> Tuple[int, RESPONSES]:
+ status, _, answer = self.request("PROPPATCH", path, data, **kwargs)
if not self._check_status(status, 207, check):
- return status, None
+ return status, {}
+ assert answer is not None
responses = self.parse_responses(answer)
assert len(responses) == 1 and path in responses
return status, responses
- def report(self, path, data, check=True, **args):
- status, _, answer = self.request("REPORT", path, data, **args)
+ def report(self, path: str, data: str, check: Union[bool, int] = True,
+ **kwargs) -> Tuple[int, RESPONSES]:
+ status, _, answer = self.request("REPORT", path, data, **kwargs)
if not self._check_status(status, 207, check):
- return status, None
+ return status, {}
+ assert answer is not None
return status, self.parse_responses(answer)
- def delete(self, path, check=True, **args):
- status, _, answer = self.request("DELETE", path, **args)
+ def delete(self, path: str, check: Union[bool, int] = True, **kwargs
+ ) -> Tuple[int, RESPONSES]:
+ assert "data" not in kwargs
+ status, _, answer = self.request("DELETE", path, **kwargs)
if not self._check_status(status, 200, check):
- return status, None
+ return status, {}
+ assert answer is not None
responses = self.parse_responses(answer)
assert len(responses) == 1 and path in responses
return status, responses
- def mkcalendar(self, path, data=None, check=True, **args):
- status, _, answer = self.request("MKCALENDAR", path, data, **args)
+ def mkcalendar(self, path: str, data: Optional[str] = None,
+ check: Union[bool, int] = True, **kwargs
+ ) -> Tuple[int, str]:
+ status, _, answer = self.request("MKCALENDAR", path, data, **kwargs)
self._check_status(status, 201, check)
return status, answer
- def mkcol(self, path, data=None, check=True, **args):
- status, _, _ = self.request("MKCOL", path, data, **args)
+ def mkcol(self, path: str, data: Optional[str] = None,
+ check: Union[bool, int] = True, **kwargs) -> int:
+ status, _, _ = self.request("MKCOL", path, data, **kwargs)
self._check_status(status, 201, check)
return status
- def create_addressbook(self, path, check=True, **args):
+ def create_addressbook(self, path: str, check: Union[bool, int] = True,
+ **kwargs) -> int:
+ assert "data" not in kwargs
return self.mkcol(path, """\
@@ -167,4 +223,4 @@ class BaseTest:
-""", check=check, **args)
+""", check=check, **kwargs)
diff --git a/radicale/tests/custom/auth.py b/radicale/tests/custom/auth.py
index b0a11726..8e17adc1 100644
--- a/radicale/tests/custom/auth.py
+++ b/radicale/tests/custom/auth.py
@@ -28,7 +28,8 @@ from radicale import auth
class Auth(auth.BaseAuth):
- def login(self, login, password):
+
+ def login(self, login: str, password: str) -> str:
if login == "tmp":
return login
return ""
diff --git a/radicale/tests/custom/rights.py b/radicale/tests/custom/rights.py
index 5696ad53..8f145dd1 100644
--- a/radicale/tests/custom/rights.py
+++ b/radicale/tests/custom/rights.py
@@ -23,7 +23,8 @@ from radicale import pathutils, rights
class Rights(rights.BaseRights):
- def authorization(self, user, path):
+
+ def authorization(self, user: str, path: str) -> str:
sane_path = pathutils.strip_path(path)
if sane_path not in ("tmp", "other"):
return ""
diff --git a/radicale/tests/custom/storage_simple_sync.py b/radicale/tests/custom/storage_simple_sync.py
index 3046a829..7e023f02 100644
--- a/radicale/tests/custom/storage_simple_sync.py
+++ b/radicale/tests/custom/storage_simple_sync.py
@@ -27,8 +27,10 @@ from radicale.storage import BaseCollection, multifilesystem
class Collection(multifilesystem.Collection):
+
sync = BaseCollection.sync
class Storage(multifilesystem.Storage):
+
_collection_class = Collection
diff --git a/radicale/tests/custom/web.py b/radicale/tests/custom/web.py
index 784614f4..2626e051 100644
--- a/radicale/tests/custom/web.py
+++ b/radicale/tests/custom/web.py
@@ -21,13 +21,16 @@ Custom web plugin.
from http import client
-from radicale import httputils, web
+from radicale import httputils, types, web
class Web(web.BaseWeb):
- def get(self, environ, base_prefix, path, user):
+
+ def get(self, environ: types.WSGIEnviron, base_prefix: str, path: str,
+ user: str) -> types.WSGIResponse:
return client.OK, {"Content-Type": "text/plain"}, "custom"
- def post(self, environ, base_prefix, path, user):
+ def post(self, environ: types.WSGIEnviron, base_prefix: str, path: str,
+ user: str) -> types.WSGIResponse:
content = httputils.read_request_body(self.configuration, environ)
return client.OK, {"Content-Type": "text/plain"}, "echo:" + content
diff --git a/radicale/tests/helpers.py b/radicale/tests/helpers.py
index face22ba..3d2d13b4 100644
--- a/radicale/tests/helpers.py
+++ b/radicale/tests/helpers.py
@@ -26,19 +26,21 @@ This module offers helpers to use in tests.
import os
-EXAMPLES_FOLDER = os.path.join(os.path.dirname(__file__), "static")
+from radicale import config, types
+
+EXAMPLES_FOLDER: str = os.path.join(os.path.dirname(__file__), "static")
-def get_file_path(file_name):
+def get_file_path(file_name: str) -> str:
return os.path.join(EXAMPLES_FOLDER, file_name)
-def get_file_content(file_name):
- with open(get_file_path(file_name), encoding="utf-8") as fd:
- return fd.read()
+def get_file_content(file_name: str) -> str:
+ with open(get_file_path(file_name), encoding="utf-8") as f:
+ return f.read()
-def configuration_to_dict(configuration):
+def configuration_to_dict(configuration: config.Configuration) -> types.CONFIG:
"""Convert configuration to a dict with raw values."""
return {section: {option: configuration.get_raw(section, option)
for option in configuration.options(section)
diff --git a/radicale/tests/test_auth.py b/radicale/tests/test_auth.py
index 5379be72..d2f52daa 100644
--- a/radicale/tests/test_auth.py
+++ b/radicale/tests/test_auth.py
@@ -22,13 +22,12 @@ Radicale tests with simple requests and authentication.
"""
import os
-import shutil
import sys
-import tempfile
+from typing import Iterable, Tuple, Union
import pytest
-from radicale import Application, config, xmlutils
+from radicale import Application, xmlutils
from radicale.tests import BaseTest
@@ -38,21 +37,10 @@ class TestBaseAuthRequests(BaseTest):
We should setup auth for each type before creating the Application object.
"""
- def setup(self):
- self.configuration = config.load()
- self.colpath = tempfile.mkdtemp()
- self.configuration.update({
- "storage": {"filesystem_folder": self.colpath,
- # Disable syncing to disk for better performance
- "_filesystem_fsync": "False"},
- # Set incorrect authentication delay to a very low value
- "auth": {"delay": "0.002"}}, "test", privileged=True)
- def teardown(self):
- shutil.rmtree(self.colpath)
-
- def _test_htpasswd(self, htpasswd_encryption, htpasswd_content,
- test_matrix="ascii"):
+ def _test_htpasswd(self, htpasswd_encryption: str, htpasswd_content: str,
+ test_matrix: Union[str, Iterable[Tuple[str, str, bool]]]
+ = "ascii") -> None:
"""Test htpasswd authentication with user "tmp" and password "bepo" for
``test_matrix`` "ascii" or user "😀" and password "🔑" for
``test_matrix`` "unicode"."""
@@ -67,7 +55,7 @@ class TestBaseAuthRequests(BaseTest):
except MissingBackendError:
pytest.skip("bcrypt backend for passlib is not installed")
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
- encoding = self.configuration.get("encoding", "stock")
+ encoding: str = self.configuration.get("encoding", "stock")
with open(htpasswd_file_path, "w", encoding=encoding) as f:
f.write(htpasswd_content)
self.configuration.update({
@@ -83,54 +71,56 @@ class TestBaseAuthRequests(BaseTest):
test_matrix = (("😀", "🔑", True), ("😀", "🌹", False),
("😁", "🔑", False), ("😀", "", False),
("", "🔑", False), ("", "", False))
+ elif isinstance(test_matrix, str):
+ raise ValueError("Unknown test matrix %r" % test_matrix)
for user, password, valid in test_matrix:
self.propfind("/", check=207 if valid else 401,
login="%s:%s" % (user, password))
- def test_htpasswd_plain(self):
+ def test_htpasswd_plain(self) -> None:
self._test_htpasswd("plain", "tmp:bepo")
- def test_htpasswd_plain_password_split(self):
+ def test_htpasswd_plain_password_split(self) -> None:
self._test_htpasswd("plain", "tmp:be:po", (
("tmp", "be:po", True), ("tmp", "bepo", False)))
- def test_htpasswd_plain_unicode(self):
+ def test_htpasswd_plain_unicode(self) -> None:
self._test_htpasswd("plain", "😀:🔑", "unicode")
- def test_htpasswd_md5(self):
+ def test_htpasswd_md5(self) -> None:
self._test_htpasswd("md5", "tmp:$apr1$BI7VKCZh$GKW4vq2hqDINMr8uv7lDY/")
def test_htpasswd_md5_unicode(self):
self._test_htpasswd(
"md5", "😀:$apr1$w4ev89r1$29xO8EvJmS2HEAadQ5qy11", "unicode")
- def test_htpasswd_bcrypt(self):
+ def test_htpasswd_bcrypt(self) -> None:
self._test_htpasswd("bcrypt", "tmp:$2y$05$oD7hbiQFQlvCM7zoalo/T.MssV3V"
"NTRI3w5KDnj8NTUKJNWfVpvRq")
- def test_htpasswd_bcrypt_unicode(self):
+ def test_htpasswd_bcrypt_unicode(self) -> None:
self._test_htpasswd("bcrypt", "😀:$2y$10$Oyz5aHV4MD9eQJbk6GPemOs4T6edK"
"6U9Sqlzr.W1mMVCS8wJUftnW", "unicode")
- def test_htpasswd_multi(self):
+ def test_htpasswd_multi(self) -> None:
self._test_htpasswd("plain", "ign:ign\ntmp:bepo")
@pytest.mark.skipif(sys.platform == "win32", reason="leading and trailing "
"whitespaces not allowed in file names")
- def test_htpasswd_whitespace_user(self):
+ def test_htpasswd_whitespace_user(self) -> None:
for user in (" tmp", "tmp ", " tmp "):
self._test_htpasswd("plain", "%s:bepo" % user, (
(user, "bepo", True), ("tmp", "bepo", False)))
- def test_htpasswd_whitespace_password(self):
+ def test_htpasswd_whitespace_password(self) -> None:
for password in (" bepo", "bepo ", " bepo "):
self._test_htpasswd("plain", "tmp:%s" % password, (
("tmp", password, True), ("tmp", "bepo", False)))
- def test_htpasswd_comment(self):
+ def test_htpasswd_comment(self) -> None:
self._test_htpasswd("plain", "#comment\n #comment\n \ntmp:bepo\n\n")
- def test_remote_user(self):
+ def test_remote_user(self) -> None:
self.configuration.update({"auth": {"type": "remote_user"}}, "test")
self.application = Application(self.configuration)
_, responses = self.propfind("/", """\
@@ -140,11 +130,15 @@ class TestBaseAuthRequests(BaseTest):
""", REMOTE_USER="test")
- status, prop = responses["/"]["D:current-user-principal"]
+ assert responses is not None
+ response = responses["/"]
+ assert not isinstance(response, int)
+ status, prop = response["D:current-user-principal"]
assert status == 200
- assert prop.find(xmlutils.make_clark("D:href")).text == "/test/"
+ href_element = prop.find(xmlutils.make_clark("D:href"))
+ assert href_element is not None and href_element.text == "/test/"
- def test_http_x_remote_user(self):
+ def test_http_x_remote_user(self) -> None:
self.configuration.update(
{"auth": {"type": "http_x_remote_user"}}, "test")
self.application = Application(self.configuration)
@@ -155,11 +149,15 @@ class TestBaseAuthRequests(BaseTest):
""", HTTP_X_REMOTE_USER="test")
- status, prop = responses["/"]["D:current-user-principal"]
+ assert responses is not None
+ response = responses["/"]
+ assert not isinstance(response, int)
+ status, prop = response["D:current-user-principal"]
assert status == 200
- assert prop.find(xmlutils.make_clark("D:href")).text == "/test/"
+ href_element = prop.find(xmlutils.make_clark("D:href"))
+ assert href_element is not None and href_element.text == "/test/"
- def test_custom(self):
+ def test_custom(self) -> None:
"""Custom authentication."""
self.configuration.update(
{"auth": {"type": "radicale.tests.custom.auth"}}, "test")
diff --git a/radicale/tests/test_base.py b/radicale/tests/test_base.py
index 5952615f..4e8cd113 100644
--- a/radicale/tests/test_base.py
+++ b/radicale/tests/test_base.py
@@ -24,37 +24,39 @@ import os
import posixpath
import shutil
import sys
-import tempfile
-from typing import Any, ClassVar
+from typing import (Any, Callable, ClassVar, Iterable, List, Optional, Tuple,
+ Union)
import defusedxml.ElementTree as DefusedET
import pytest
import radicale.tests.custom.storage_simple_sync
from radicale import Application, config, storage, xmlutils
-from radicale.tests import BaseTest
+from radicale.tests import RESPONSES, BaseTest
from radicale.tests.helpers import get_file_content
+StorageType = Union[str, Callable[[config.Configuration], storage.BaseStorage]]
-class BaseRequestsMixIn:
+
+class BaseRequestsMixIn(BaseTest):
"""Tests with simple requests."""
# Allow skipping sync-token tests, when not fully supported by the backend
- full_sync_token_support = True
+ full_sync_token_support: ClassVar[bool] = True
- def test_root(self):
+ def test_root(self) -> None:
"""GET request at "/"."""
_, answer = self.get("/", check=302)
assert answer == "Redirected to .web"
- def test_script_name(self):
+ def test_script_name(self) -> None:
"""GET request at "/" with SCRIPT_NAME."""
_, answer = self.get("/", check=302, SCRIPT_NAME="/radicale")
assert answer == "Redirected to .web"
_, answer = self.get("", check=302, SCRIPT_NAME="/radicale")
assert answer == "Redirected to radicale/.web"
- def test_add_event(self):
+ def test_add_event(self) -> None:
"""Add an event."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -68,7 +70,7 @@ class BaseRequestsMixIn:
assert "Event" in answer
assert "UID:event" in answer
- def test_add_event_without_uid(self):
+ def test_add_event_without_uid(self) -> None:
"""Add an event without UID."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics").replace("UID:event1\n", "")
@@ -76,7 +78,7 @@ class BaseRequestsMixIn:
path = "/calendar.ics/event.ics"
self.put(path, event, check=400)
- def test_add_event_duplicate_uid(self):
+ def test_add_event_duplicate_uid(self) -> None:
"""Add an event with an existing UID."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -88,7 +90,7 @@ class BaseRequestsMixIn:
assert xml.tag == xmlutils.make_clark("D:error")
assert xml.find(xmlutils.make_clark("C:no-uid-conflict")) is not None
- def test_add_todo(self):
+ def test_add_todo(self) -> None:
"""Add a todo."""
self.mkcalendar("/calendar.ics/")
todo = get_file_content("todo1.ics")
@@ -102,7 +104,7 @@ class BaseRequestsMixIn:
assert "Todo" in answer
assert "UID:todo" in answer
- def test_add_contact(self):
+ def test_add_contact(self) -> None:
"""Add a contact."""
self.create_addressbook("/contacts.vcf/")
contact = get_file_content("contact1.vcf")
@@ -117,7 +119,7 @@ class BaseRequestsMixIn:
_, answer = self.get(path)
assert "UID:contact1" in answer
- def test_add_contact_without_uid(self):
+ def test_add_contact_without_uid(self) -> None:
"""Add a contact without UID."""
self.create_addressbook("/contacts.vcf/")
contact = get_file_content("contact1.vcf").replace("UID:contact1\n",
@@ -126,7 +128,7 @@ class BaseRequestsMixIn:
path = "/contacts.vcf/contact.vcf"
self.put(path, contact, check=400)
- def test_update_event(self):
+ def test_update_event(self) -> None:
"""Update an event."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -139,7 +141,7 @@ class BaseRequestsMixIn:
_, answer = self.get(path)
assert "DTSTAMP:20130902T150159Z" in answer
- def test_update_event_uid_event(self):
+ def test_update_event_uid_event(self) -> None:
"""Update an event with a different UID."""
self.mkcalendar("/calendar.ics/")
event1 = get_file_content("event1.ics")
@@ -152,7 +154,7 @@ class BaseRequestsMixIn:
assert xml.tag == xmlutils.make_clark("D:error")
assert xml.find(xmlutils.make_clark("C:no-uid-conflict")) is not None
- def test_put_whole_calendar(self):
+ def test_put_whole_calendar(self) -> None:
"""Create and overwrite a whole calendar."""
self.put("/calendar.ics/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR")
event1 = get_file_content("event1.ics")
@@ -165,7 +167,7 @@ class BaseRequestsMixIn:
assert "\r\nUID:event\r\n" in answer and "\r\nUID:todo\r\n" in answer
assert "\r\nUID:event1\r\n" not in answer
- def test_put_whole_calendar_without_uids(self):
+ def test_put_whole_calendar_without_uids(self) -> None:
"""Create a whole calendar without UID."""
event = get_file_content("event_multiple.ics")
event = event.replace("UID:event\n", "").replace("UID:todo\n", "")
@@ -182,15 +184,16 @@ class BaseRequestsMixIn:
for uid2 in uids[i + 1:]:
assert uid1 != uid2
- def test_put_whole_addressbook(self):
+ def test_put_whole_addressbook(self) -> None:
"""Create and overwrite a whole addressbook."""
contacts = get_file_content("contact_multiple.vcf")
self.put("/contacts.vcf/", contacts)
_, answer = self.get("/contacts.vcf/")
- assert ("\r\nUID:contact1\r\n" in answer and
- "\r\nUID:contact2\r\n" in answer)
+ assert answer is not None
+ assert "\r\nUID:contact1\r\n" in answer
+ assert "\r\nUID:contact2\r\n" in answer
- def test_put_whole_addressbook_without_uids(self):
+ def test_put_whole_addressbook_without_uids(self) -> None:
"""Create a whole addressbook without UID."""
contacts = get_file_content("contact_multiple.vcf")
contacts = contacts.replace("UID:contact1\n", "").replace(
@@ -208,7 +211,7 @@ class BaseRequestsMixIn:
for uid2 in uids[i + 1:]:
assert uid1 != uid2
- def test_verify(self):
+ def test_verify(self) -> None:
"""Verify the storage."""
contacts = get_file_content("contact_multiple.vcf")
self.put("/contacts.vcf/", contacts)
@@ -217,7 +220,7 @@ class BaseRequestsMixIn:
s = storage.load(self.configuration)
assert s.verify()
- def test_delete(self):
+ def test_delete(self) -> None:
"""Delete an event."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -228,14 +231,14 @@ class BaseRequestsMixIn:
_, answer = self.get("/calendar.ics/")
assert "VEVENT" not in answer
- def test_mkcalendar(self):
+ def test_mkcalendar(self) -> None:
"""Make a calendar."""
self.mkcalendar("/calendar.ics/")
_, answer = self.get("/calendar.ics/")
assert "BEGIN:VCALENDAR" in answer
assert "END:VCALENDAR" in answer
- def test_mkcalendar_overwrite(self):
+ def test_mkcalendar_overwrite(self) -> None:
"""Try to overwrite an existing calendar."""
self.mkcalendar("/calendar.ics/")
status, answer = self.mkcalendar("/calendar.ics/", check=False)
@@ -245,41 +248,43 @@ class BaseRequestsMixIn:
assert xml.find(xmlutils.make_clark(
"D:resource-must-be-null")) is not None
- def test_mkcalendar_intermediate(self):
+ def test_mkcalendar_intermediate(self) -> None:
"""Try make a calendar in a unmapped collection."""
status, _ = self.mkcalendar("/unmapped/calendar.ics/", check=False)
assert status == 409
- def test_mkcol(self):
+ def test_mkcol(self) -> None:
"""Make a collection."""
self.mkcol("/user/")
- def test_mkcol_overwrite(self):
+ def test_mkcol_overwrite(self) -> None:
"""Try to overwrite an existing collection."""
self.mkcol("/user/")
status = self.mkcol("/user/", check=False)
assert status == 405
- def test_mkcol_intermediate(self):
+ def test_mkcol_intermediate(self) -> None:
"""Try make a collection in a unmapped collection."""
status = self.mkcol("/unmapped/user/", check=False)
assert status == 409
- def test_mkcol_make_calendar(self):
+ def test_mkcol_make_calendar(self) -> None:
"""Make a calendar with additional props."""
mkcol_make_calendar = get_file_content("mkcol_make_calendar.xml")
self.mkcol("/calendar.ics/", mkcol_make_calendar)
_, answer = self.get("/calendar.ics/")
+ assert answer is not None
assert "BEGIN:VCALENDAR" in answer
assert "END:VCALENDAR" in answer
# Read additional properties
propfind = get_file_content("propfind_calendar_color.xml")
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 1
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and prop.text == "#BADA55"
- def test_move(self):
+ def test_move(self) -> None:
"""Move a item."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -292,7 +297,7 @@ class BaseRequestsMixIn:
self.get(path1, check=404)
self.get(path2)
- def test_move_between_colections(self):
+ def test_move_between_colections(self) -> None:
"""Move a item."""
self.mkcalendar("/calendar1.ics/")
self.mkcalendar("/calendar2.ics/")
@@ -306,7 +311,7 @@ class BaseRequestsMixIn:
self.get(path1, check=404)
self.get(path2)
- def test_move_between_colections_duplicate_uid(self):
+ def test_move_between_colections_duplicate_uid(self) -> None:
"""Move a item to a collection which already contains the UID."""
self.mkcalendar("/calendar1.ics/")
self.mkcalendar("/calendar2.ics/")
@@ -322,7 +327,7 @@ class BaseRequestsMixIn:
assert xml.tag == xmlutils.make_clark("D:error")
assert xml.find(xmlutils.make_clark("C:no-uid-conflict")) is not None
- def test_move_between_colections_overwrite(self):
+ def test_move_between_colections_overwrite(self) -> None:
"""Move a item to a collection which already contains the item."""
self.mkcalendar("/calendar1.ics/")
self.mkcalendar("/calendar2.ics/")
@@ -338,7 +343,7 @@ class BaseRequestsMixIn:
HTTP_HOST="", HTTP_OVERWRITE="T")
assert status == 204
- def test_move_between_colections_overwrite_uid_conflict(self):
+ def test_move_between_colections_overwrite_uid_conflict(self) -> None:
"""Move a item to a collection which already contains the item with
a different UID."""
self.mkcalendar("/calendar1.ics/")
@@ -356,16 +361,16 @@ class BaseRequestsMixIn:
assert xml.tag == xmlutils.make_clark("D:error")
assert xml.find(xmlutils.make_clark("C:no-uid-conflict")) is not None
- def test_head(self):
+ def test_head(self) -> None:
status, _, _ = self.request("HEAD", "/")
assert status == 302
- def test_options(self):
+ def test_options(self) -> None:
status, headers, _ = self.request("OPTIONS", "/")
assert status == 200
assert "DAV" in headers
- def test_delete_collection(self):
+ def test_delete_collection(self) -> None:
"""Delete a collection."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -374,7 +379,7 @@ class BaseRequestsMixIn:
assert responses["/calendar.ics/"] == 200
self.get("/calendar.ics/", check=404)
- def test_delete_root_collection(self):
+ def test_delete_root_collection(self) -> None:
"""Delete the root collection."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -385,7 +390,7 @@ class BaseRequestsMixIn:
self.get("/calendar.ics/", check=404)
self.get("/event1.ics", 404)
- def test_propfind(self):
+ def test_propfind(self) -> None:
calendar_path = "/calendar.ics/"
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -398,139 +403,163 @@ class BaseRequestsMixIn:
assert len(responses) == 2
assert calendar_path in responses and event_path in responses
- def test_propfind_propname(self):
+ def test_propfind_propname(self) -> None:
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
self.put("/calendar.ics/event.ics", event)
propfind = get_file_content("propname.xml")
_, responses = self.propfind("/calendar.ics/", propfind)
- status, prop = responses["/calendar.ics/"]["D:sync-token"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int)
+ status, prop = response["D:sync-token"]
assert status == 200 and not prop.text
_, responses = self.propfind("/calendar.ics/event.ics", propfind)
- status, prop = responses["/calendar.ics/event.ics"]["D:getetag"]
+ response = responses["/calendar.ics/event.ics"]
+ assert not isinstance(response, int)
+ status, prop = response["D:getetag"]
assert status == 200 and not prop.text
- def test_propfind_allprop(self):
+ def test_propfind_allprop(self) -> None:
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
self.put("/calendar.ics/event.ics", event)
propfind = get_file_content("allprop.xml")
_, responses = self.propfind("/calendar.ics/", propfind)
- status, prop = responses["/calendar.ics/"]["D:sync-token"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int)
+ status, prop = response["D:sync-token"]
assert status == 200 and prop.text
_, responses = self.propfind("/calendar.ics/event.ics", propfind)
- status, prop = responses["/calendar.ics/event.ics"]["D:getetag"]
+ response = responses["/calendar.ics/event.ics"]
+ assert not isinstance(response, int)
+ status, prop = response["D:getetag"]
assert status == 200 and prop.text
- def test_propfind_nonexistent(self):
+ def test_propfind_nonexistent(self) -> None:
"""Read a property that does not exist."""
self.mkcalendar("/calendar.ics/")
propfind = get_file_content("propfind_calendar_color.xml")
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 1
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["ICAL:calendar-color"]
assert status == 404 and not prop.text
- def test_proppatch(self):
+ def test_proppatch(self) -> None:
"""Set/Remove a property and read it back."""
self.mkcalendar("/calendar.ics/")
proppatch = get_file_content("proppatch_set_calendar_color.xml")
_, responses = self.proppatch("/calendar.ics/", proppatch)
- assert len(responses["/calendar.ics/"]) == 1
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and not prop.text
# Read property back
propfind = get_file_content("propfind_calendar_color.xml")
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 1
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and prop.text == "#BADA55"
propfind = get_file_content("allprop.xml")
_, responses = self.propfind("/calendar.ics/", propfind)
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int)
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and prop.text == "#BADA55"
# Remove property
proppatch = get_file_content("proppatch_remove_calendar_color.xml")
_, responses = self.proppatch("/calendar.ics/", proppatch)
- assert len(responses["/calendar.ics/"]) == 1
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and not prop.text
# Read property back
propfind = get_file_content("propfind_calendar_color.xml")
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 1
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["ICAL:calendar-color"]
assert status == 404
- def test_proppatch_multiple1(self):
+ def test_proppatch_multiple1(self) -> None:
"""Set/Remove a multiple properties and read them back."""
self.mkcalendar("/calendar.ics/")
propfind = get_file_content("propfind_multiple.xml")
proppatch = get_file_content("proppatch_set_multiple1.xml")
_, responses = self.proppatch("/calendar.ics/", proppatch)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and not prop.text
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 200 and not prop.text
# Read properties back
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and prop.text == "#BADA55"
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 200 and prop.text == "test"
# Remove properties
proppatch = get_file_content("proppatch_remove_multiple1.xml")
_, responses = self.proppatch("/calendar.ics/", proppatch)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and not prop.text
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 200 and not prop.text
# Read properties back
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 404
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 404
- def test_proppatch_multiple2(self):
+ def test_proppatch_multiple2(self) -> None:
"""Set/Remove a multiple properties and read them back."""
self.mkcalendar("/calendar.ics/")
propfind = get_file_content("propfind_multiple.xml")
proppatch = get_file_content("proppatch_set_multiple2.xml")
_, responses = self.proppatch("/calendar.ics/", proppatch)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and not prop.text
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 200 and not prop.text
# Read properties back
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ assert len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and prop.text == "#BADA55"
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 200 and prop.text == "test"
# Remove properties
proppatch = get_file_content("proppatch_remove_multiple2.xml")
_, responses = self.proppatch("/calendar.ics/", proppatch)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and not prop.text
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 200 and not prop.text
# Read properties back
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 404
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 404
- def test_proppatch_set_and_remove(self):
+ def test_proppatch_set_and_remove(self) -> None:
"""Set and remove multiple properties in single request."""
self.mkcalendar("/calendar.ics/")
propfind = get_file_content("propfind_multiple.xml")
@@ -540,20 +569,22 @@ class BaseRequestsMixIn:
# Remove and set properties in single request
proppatch = get_file_content("proppatch_set_and_remove.xml")
_, responses = self.proppatch("/calendar.ics/", proppatch)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 200 and not prop.text
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 200 and not prop.text
# Read properties back
_, responses = self.propfind("/calendar.ics/", propfind)
- assert len(responses["/calendar.ics/"]) == 2
- status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"]
+ response = responses["/calendar.ics/"]
+ assert not isinstance(response, int) and len(response) == 2
+ status, prop = response["ICAL:calendar-color"]
assert status == 404
- status, prop = responses["/calendar.ics/"]["C:calendar-description"]
+ status, prop = response["C:calendar-description"]
assert status == 200 and prop.text == "test2"
- def test_put_whole_calendar_multiple_events_with_same_uid(self):
+ def test_put_whole_calendar_multiple_events_with_same_uid(self) -> None:
"""Add two events with the same UID."""
self.put("/calendar.ics/", get_file_content("event2.ics"))
_, responses = self.report("/calendar.ics/", """\
@@ -564,13 +595,18 @@ class BaseRequestsMixIn:
""")
assert len(responses) == 1
- status, prop = responses["/calendar.ics/event2.ics"]["D:getetag"]
+ response = responses["/calendar.ics/event2.ics"]
+ assert not isinstance(response, int)
+ status, prop = response["D:getetag"]
assert status == 200 and prop.text
_, answer = self.get("/calendar.ics/")
assert answer.count("BEGIN:VEVENT") == 2
- def _test_filter(self, filters, kind="event", test=None, items=(1,)):
+ def _test_filter(self, filters: Iterable[str], kind: str = "event",
+ test: Optional[str] = None, items: Iterable[int] = (1,)
+ ) -> List[str]:
filter_template = "%s"
+ create_collection_fn: Callable[[str], Any]
if kind in ("event", "journal", "todo"):
create_collection_fn = self.mkcalendar
path = "/calendar.ics/"
@@ -603,18 +639,19 @@ class BaseRequestsMixIn:
{2}
""".format(namespace, report, filters_text))
+ assert responses is not None
paths = []
for path, props in responses.items():
- assert len(props) == 1
+ assert not isinstance(props, int) and len(props) == 1
status, prop = props["D:getetag"]
assert status == 200 and prop.text
paths.append(path)
return paths
- def test_addressbook_empty_filter(self):
+ def test_addressbook_empty_filter(self) -> None:
self._test_filter([""], kind="contact")
- def test_addressbook_prop_filter(self):
+ def test_addressbook_prop_filter(self) -> None:
assert "/contacts.vcf/contact1.vcf" in self._test_filter(["""\
tes
"""], "contact")
- def test_addressbook_prop_filter_any(self):
+ def test_addressbook_prop_filter_any(self) -> None:
assert "/contacts.vcf/contact1.vcf" in self._test_filter(["""\
test
@@ -688,7 +725,7 @@ class BaseRequestsMixIn:
test
"""], "contact")
- def test_addressbook_prop_filter_all(self):
+ def test_addressbook_prop_filter_all(self) -> None:
assert "/contacts.vcf/contact1.vcf" in self._test_filter(["""\
tes
@@ -704,15 +741,15 @@ class BaseRequestsMixIn:
test
"""], "contact", test="allof")
- def test_calendar_empty_filter(self):
+ def test_calendar_empty_filter(self) -> None:
self._test_filter([""])
- def test_calendar_tag_filter(self):
+ def test_calendar_tag_filter(self) -> None:
"""Report request with tag-based filter on calendar."""
assert "/calendar.ics/event1.ics" in self._test_filter(["""\
"""])
- def test_item_tag_filter(self):
+ def test_item_tag_filter(self) -> None:
"""Report request with tag-based filter on an item."""
assert "/calendar.ics/event1.ics" in self._test_filter(["""\
@@ -723,7 +760,7 @@ class BaseRequestsMixIn:
"""])
- def test_item_not_tag_filter(self):
+ def test_item_not_tag_filter(self) -> None:
"""Report request with tag-based is-not filter on an item."""
assert "/calendar.ics/event1.ics" not in self._test_filter(["""\
@@ -738,7 +775,7 @@ class BaseRequestsMixIn:
"""])
- def test_item_prop_filter(self):
+ def test_item_prop_filter(self) -> None:
"""Report request with prop-based filter on an item."""
assert "/calendar.ics/event1.ics" in self._test_filter(["""\
@@ -753,7 +790,7 @@ class BaseRequestsMixIn:
"""])
- def test_item_not_prop_filter(self):
+ def test_item_not_prop_filter(self) -> None:
"""Report request with prop-based is-not filter on an item."""
assert "/calendar.ics/event1.ics" not in self._test_filter(["""\
@@ -772,7 +809,7 @@ class BaseRequestsMixIn:
"""])
- def test_mutiple_filters(self):
+ def test_mutiple_filters(self) -> None:
"""Report request with multiple filters on an item."""
assert "/calendar.ics/event1.ics" not in self._test_filter(["""\
@@ -812,7 +849,7 @@ class BaseRequestsMixIn:
"""])
- def test_text_match_filter(self):
+ def test_text_match_filter(self) -> None:
"""Report request with text-match filter on calendar."""
assert "/calendar.ics/event1.ics" in self._test_filter(["""\
@@ -847,7 +884,7 @@ class BaseRequestsMixIn:
"""])
- def test_param_filter(self):
+ def test_param_filter(self) -> None:
"""Report request with param-filter on calendar."""
assert "/calendar.ics/event1.ics" in self._test_filter(["""\
@@ -892,7 +929,7 @@ class BaseRequestsMixIn:
"""])
- def test_time_range_filter_events(self):
+ def test_time_range_filter_events(self) -> None:
"""Report request with time-range filter on events."""
answer = self._test_filter(["""\
@@ -1019,7 +1056,7 @@ class BaseRequestsMixIn:
"""], items=(9,))
assert "/calendar.ics/event9.ics" not in answer
- def test_time_range_filter_events_rrule(self):
+ def test_time_range_filter_events_rrule(self) -> None:
"""Report request with time-range filter on events with rrules."""
answer = self._test_filter(["""\
@@ -1054,7 +1091,7 @@ class BaseRequestsMixIn:
assert "/calendar.ics/event1.ics" not in answer
assert "/calendar.ics/event2.ics" not in answer
- def test_time_range_filter_todos(self):
+ def test_time_range_filter_todos(self) -> None:
"""Report request with time-range filter on todos."""
answer = self._test_filter(["""\
@@ -1113,7 +1150,7 @@ class BaseRequestsMixIn:
"""], "todo", items=range(1, 9))
assert "/calendar.ics/todo7.ics" in answer
- def test_time_range_filter_todos_rrule(self):
+ def test_time_range_filter_todos_rrule(self) -> None:
"""Report request with time-range filter on todos with rrules."""
answer = self._test_filter(["""\
@@ -1157,7 +1194,7 @@ class BaseRequestsMixIn:
"""], "todo", items=(9,))
assert "/calendar.ics/todo9.ics" not in answer
- def test_time_range_filter_journals(self):
+ def test_time_range_filter_journals(self) -> None:
"""Report request with time-range filter on journals."""
answer = self._test_filter(["""\
@@ -1205,7 +1242,7 @@ class BaseRequestsMixIn:
assert "/calendar.ics/journal2.ics" in answer
assert "/calendar.ics/journal3.ics" in answer
- def test_time_range_filter_journals_rrule(self):
+ def test_time_range_filter_journals_rrule(self) -> None:
"""Report request with time-range filter on journals with rrules."""
answer = self._test_filter(["""\
@@ -1232,7 +1269,7 @@ class BaseRequestsMixIn:
assert "/calendar.ics/journal1.ics" not in answer
assert "/calendar.ics/journal2.ics" not in answer
- def test_report_item(self):
+ def test_report_item(self) -> None:
"""Test report request on an item"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1247,10 +1284,14 @@ class BaseRequestsMixIn:
""")
assert len(responses) == 1
- status, prop = responses[event_path]["D:getetag"]
+ response = responses[event_path]
+ assert not isinstance(response, int)
+ status, prop = response["D:getetag"]
assert status == 200 and prop.text
- def _report_sync_token(self, calendar_path, sync_token=None):
+ def _report_sync_token(
+ self, calendar_path: str, sync_token: Optional[str] = None
+ ) -> Tuple[str, RESPONSES]:
sync_token_xml = (
"" % sync_token
if sync_token else "")
@@ -1267,7 +1308,7 @@ class BaseRequestsMixIn:
assert xml.tag == xmlutils.make_clark("D:error")
assert sync_token and xml.find(
xmlutils.make_clark("D:valid-sync-token")) is not None
- return None, None
+ return "", {}
assert status == 207
assert xml.tag == xmlutils.make_clark("D:multistatus")
sync_token = xml.find(xmlutils.make_clark("D:sync-token")).text.strip()
@@ -1281,7 +1322,7 @@ class BaseRequestsMixIn:
assert response in (200, 404)
return sync_token, responses
- def test_report_sync_collection_no_change(self):
+ def test_report_sync_collection_no_change(self) -> None:
"""Test sync-collection report without modifying the collection"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1296,7 +1337,7 @@ class BaseRequestsMixIn:
return
assert sync_token == new_sync_token and len(responses) == 0
- def test_report_sync_collection_add(self):
+ def test_report_sync_collection_add(self) -> None:
"""Test sync-collection report with an added item"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1311,7 +1352,7 @@ class BaseRequestsMixIn:
return
assert len(responses) == 1 and responses[event_path] == 200
- def test_report_sync_collection_delete(self):
+ def test_report_sync_collection_delete(self) -> None:
"""Test sync-collection report with a deleted item"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1327,7 +1368,7 @@ class BaseRequestsMixIn:
return
assert len(responses) == 1 and responses[event_path] == 404
- def test_report_sync_collection_create_delete(self):
+ def test_report_sync_collection_create_delete(self) -> None:
"""Test sync-collection report with a created and deleted item"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1343,7 +1384,7 @@ class BaseRequestsMixIn:
return
assert len(responses) == 1 and responses[event_path] == 404
- def test_report_sync_collection_modify_undo(self):
+ def test_report_sync_collection_modify_undo(self) -> None:
"""Test sync-collection report with a modified and changed back item"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1361,7 +1402,7 @@ class BaseRequestsMixIn:
return
assert len(responses) == 1 and responses[event_path] == 200
- def test_report_sync_collection_move(self):
+ def test_report_sync_collection_move(self) -> None:
"""Test sync-collection report a moved item"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1381,7 +1422,7 @@ class BaseRequestsMixIn:
assert len(responses) == 2 and (responses[event1_path] == 404 and
responses[event2_path] == 200)
- def test_report_sync_collection_move_undo(self):
+ def test_report_sync_collection_move_undo(self) -> None:
"""Test sync-collection report with a moved and moved back item"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1404,7 +1445,7 @@ class BaseRequestsMixIn:
assert len(responses) == 2 and (responses[event1_path] == 200 and
responses[event2_path] == 404)
- def test_report_sync_collection_invalid_sync_token(self):
+ def test_report_sync_collection_invalid_sync_token(self) -> None:
"""Test sync-collection report with an invalid sync token"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
@@ -1412,34 +1453,40 @@ class BaseRequestsMixIn:
calendar_path, "http://radicale.org/ns/sync/INVALID")
assert not sync_token
- def test_propfind_sync_token(self):
+ def test_propfind_sync_token(self) -> None:
"""Retrieve the sync-token with a propfind request"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
propfind = get_file_content("allprop.xml")
_, responses = self.propfind(calendar_path, propfind)
- status, sync_token = responses[calendar_path]["D:sync-token"]
+ response = responses[calendar_path]
+ assert not isinstance(response, int)
+ status, sync_token = response["D:sync-token"]
assert status == 200 and sync_token.text
event = get_file_content("event1.ics")
event_path = posixpath.join(calendar_path, "event.ics")
self.put(event_path, event)
_, responses = self.propfind(calendar_path, propfind)
- status, new_sync_token = responses[calendar_path]["D:sync-token"]
+ response = responses[calendar_path]
+ assert not isinstance(response, int)
+ status, new_sync_token = response["D:sync-token"]
assert status == 200 and new_sync_token.text
assert sync_token.text != new_sync_token.text
- def test_propfind_same_as_sync_collection_sync_token(self):
+ def test_propfind_same_as_sync_collection_sync_token(self) -> None:
"""Compare sync-token property with sync-collection sync-token"""
calendar_path = "/calendar.ics/"
self.mkcalendar(calendar_path)
propfind = get_file_content("allprop.xml")
_, responses = self.propfind(calendar_path, propfind)
- status, sync_token = responses[calendar_path]["D:sync-token"]
+ response = responses[calendar_path]
+ assert not isinstance(response, int)
+ status, sync_token = response["D:sync-token"]
assert status == 200 and sync_token.text
report_sync_token, _ = self._report_sync_token(calendar_path)
assert sync_token.text == report_sync_token
- def test_calendar_getcontenttype(self):
+ def test_calendar_getcontenttype(self) -> None:
"""Test report request on an item"""
self.mkcalendar("/test/")
for component in ("event", "todo", "journal"):
@@ -1454,14 +1501,15 @@ class BaseRequestsMixIn:
""")
- assert len(responses) == 1 and len(
- responses["/test/test.ics"]) == 1
- status, prop = responses["/test/test.ics"]["D:getcontenttype"]
+ assert len(responses) == 1
+ response = responses["/test/test.ics"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["D:getcontenttype"]
assert status == 200 and prop.text == (
"text/calendar;charset=utf-8;component=V%s" %
component.upper())
- def test_addressbook_getcontenttype(self):
+ def test_addressbook_getcontenttype(self) -> None:
"""Test report request on an item"""
self.create_addressbook("/test/")
contact = get_file_content("contact1.vcf")
@@ -1473,11 +1521,13 @@ class BaseRequestsMixIn:
""")
- assert len(responses) == 1 and len(responses["/test/test.vcf"]) == 1
- status, prop = responses["/test/test.vcf"]["D:getcontenttype"]
+ assert len(responses) == 1
+ response = responses["/test/test.vcf"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["D:getcontenttype"]
assert status == 200 and prop.text == "text/vcard;charset=utf-8"
- def test_authorization(self):
+ def test_authorization(self) -> None:
_, responses = self.propfind("/", """\
@@ -1485,12 +1535,14 @@ class BaseRequestsMixIn:
""", login="user:")
- assert len(responses["/"]) == 1
- status, prop = responses["/"]["D:current-user-principal"]
+ response = responses["/"]
+ assert not isinstance(response, int) and len(response) == 1
+ status, prop = response["D:current-user-principal"]
assert status == 200 and len(prop) == 1
- assert prop.find(xmlutils.make_clark("D:href")).text == "/user/"
+ element = prop.find(xmlutils.make_clark("D:href"))
+ assert element is not None and element.text == "/user/"
- def test_authentication(self):
+ def test_authentication(self) -> None:
"""Test if server sends authentication request."""
self.configuration.update({
"auth": {"type": "htpasswd",
@@ -1502,11 +1554,11 @@ class BaseRequestsMixIn:
assert status in (401, 403)
assert headers.get("WWW-Authenticate")
- def test_principal_collection_creation(self):
+ def test_principal_collection_creation(self) -> None:
"""Verify existence of the principal collection."""
self.propfind("/user/", login="user:")
- def test_authentication_current_user_principal_workaround(self):
+ def test_authentication_current_user_principal_hack(self) -> None:
"""Test if server sends authentication request when accessing
current-user-principal prop (workaround for DAVx5)."""
status, headers, _ = self.request("PROPFIND", "/", """\
@@ -1519,7 +1571,7 @@ class BaseRequestsMixIn:
assert status in (401, 403)
assert headers.get("WWW-Authenticate")
- def test_existence_of_root_collections(self):
+ def test_existence_of_root_collections(self) -> None:
"""Verify that the root collection always exists."""
# Use PROPFIND because GET returns message
self.propfind("/")
@@ -1527,7 +1579,7 @@ class BaseRequestsMixIn:
self.delete("/")
self.propfind("/")
- def test_custom_headers(self):
+ def test_custom_headers(self) -> None:
self.configuration.update({"headers": {"test": "123"}}, "test")
self.application = Application(self.configuration)
# Test if header is set on success
@@ -1541,7 +1593,7 @@ class BaseRequestsMixIn:
@pytest.mark.skipif(sys.version_info < (3, 6),
reason="Unsupported in Python < 3.6")
- def test_timezone_seconds(self):
+ def test_timezone_seconds(self) -> None:
"""Verify that timezones with minutes and seconds work."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event_timezone_seconds.ics")
@@ -1551,11 +1603,10 @@ class BaseRequestsMixIn:
class BaseFileSystemTest(BaseTest):
"""Base class for filesystem backend tests."""
- storage_type: ClassVar[Any]
+ storage_type: ClassVar[StorageType]
- def setup(self):
- self.configuration = config.load()
- self.colpath = tempfile.mkdtemp()
+ def setup(self) -> None:
+ super().setup()
# Allow access to anything for tests
rights_file_path = os.path.join(self.colpath, "rights")
with open(rights_file_path, "w") as f:
@@ -1565,23 +1616,18 @@ user: .*
collection: .*
permissions: RrWw""")
self.configuration.update({
- "storage": {"type": self.storage_type,
- "filesystem_folder": self.colpath,
- # Disable syncing to disk for better performance
- "_filesystem_fsync": "False"},
+ "storage": {"type": self.storage_type},
"rights": {"file": rights_file_path,
"type": "from_file"}}, "test", privileged=True)
self.application = Application(self.configuration)
- def teardown(self):
- shutil.rmtree(self.colpath)
-
class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
"""Test BaseRequests on multifilesystem."""
- storage_type = "multifilesystem"
- def test_folder_creation(self):
+ storage_type: ClassVar[StorageType] = "multifilesystem"
+
+ def test_folder_creation(self) -> None:
"""Verify that the folder is created."""
folder = os.path.join(self.colpath, "subfolder")
self.configuration.update(
@@ -1589,14 +1635,14 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
self.application = Application(self.configuration)
assert os.path.isdir(folder)
- def test_fsync(self):
+ def test_fsync(self) -> None:
"""Create a directory and file with syncing enabled."""
self.configuration.update({"storage": {"_filesystem_fsync": "True"}},
"test", privileged=True)
self.application = Application(self.configuration)
self.mkcalendar("/calendar.ics/")
- def test_hook(self):
+ def test_hook(self) -> None:
"""Run hook."""
self.configuration.update({"storage": {
"hook": ("mkdir %s" % os.path.join(
@@ -1605,7 +1651,7 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
self.mkcalendar("/calendar.ics/")
self.propfind("/created_by_hook/")
- def test_hook_read_access(self):
+ def test_hook_read_access(self) -> None:
"""Verify that hook is not run for read accesses."""
self.configuration.update({"storage": {
"hook": ("mkdir %s" % os.path.join(
@@ -1616,14 +1662,14 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
@pytest.mark.skipif(not shutil.which("flock"),
reason="flock command not found")
- def test_hook_storage_locked(self):
+ def test_hook_storage_locked(self) -> None:
"""Verify that the storage is locked when the hook runs."""
self.configuration.update({"storage": {"hook": (
"flock -n .Radicale.lock || exit 0; exit 1")}}, "test")
self.application = Application(self.configuration)
self.mkcalendar("/calendar.ics/")
- def test_hook_principal_collection_creation(self):
+ def test_hook_principal_collection_creation(self) -> None:
"""Verify that the hooks runs when a new user is created."""
self.configuration.update({"storage": {
"hook": ("mkdir %s" % os.path.join(
@@ -1632,13 +1678,13 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
self.propfind("/", login="user:")
self.propfind("/created_by_hook/")
- def test_hook_fail(self):
+ def test_hook_fail(self) -> None:
"""Verify that a request fails if the hook fails."""
self.configuration.update({"storage": {"hook": "exit 1"}}, "test")
self.application = Application(self.configuration)
self.mkcalendar("/calendar.ics/", check=500)
- def test_item_cache_rebuild(self):
+ def test_item_cache_rebuild(self) -> None:
"""Delete the item cache and verify that it is rebuild."""
self.mkcalendar("/calendar.ics/")
event = get_file_content("event1.ics")
@@ -1655,7 +1701,7 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
@pytest.mark.skipif(os.name != "posix" and sys.platform != "win32",
reason="Only supported on 'posix' and 'win32'")
- def test_put_whole_calendar_uids_used_as_file_names(self):
+ def test_put_whole_calendar_uids_used_as_file_names(self) -> None:
"""Test if UIDs are used as file names."""
BaseRequestsMixIn.test_put_whole_calendar(self)
for uid in ("todo", "event"):
@@ -1664,21 +1710,23 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
@pytest.mark.skipif(os.name != "posix" and sys.platform != "win32",
reason="Only supported on 'posix' and 'win32'")
- def test_put_whole_calendar_random_uids_used_as_file_names(self):
+ def test_put_whole_calendar_random_uids_used_as_file_names(self) -> None:
"""Test if UIDs are used as file names."""
BaseRequestsMixIn.test_put_whole_calendar_without_uids(self)
_, answer = self.get("/calendar.ics")
+ assert answer is not None
uids = []
for line in answer.split("\r\n"):
if line.startswith("UID:"):
uids.append(line[len("UID:"):])
for uid in uids:
_, answer = self.get("/calendar.ics/%s.ics" % uid)
+ assert answer is not None
assert "\r\nUID:%s\r\n" % uid in answer
@pytest.mark.skipif(os.name != "posix" and sys.platform != "win32",
reason="Only supported on 'posix' and 'win32'")
- def test_put_whole_addressbook_uids_used_as_file_names(self):
+ def test_put_whole_addressbook_uids_used_as_file_names(self) -> None:
"""Test if UIDs are used as file names."""
BaseRequestsMixIn.test_put_whole_addressbook(self)
for uid in ("contact1", "contact2"):
@@ -1687,27 +1735,33 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
@pytest.mark.skipif(os.name != "posix" and sys.platform != "win32",
reason="Only supported on 'posix' and 'win32'")
- def test_put_whole_addressbook_random_uids_used_as_file_names(self):
+ def test_put_whole_addressbook_random_uids_used_as_file_names(
+ self) -> None:
"""Test if UIDs are used as file names."""
BaseRequestsMixIn.test_put_whole_addressbook_without_uids(self)
_, answer = self.get("/contacts.vcf")
+ assert answer is not None
uids = []
for line in answer.split("\r\n"):
if line.startswith("UID:"):
uids.append(line[len("UID:"):])
for uid in uids:
_, answer = self.get("/contacts.vcf/%s.vcf" % uid)
+ assert answer is not None
assert "\r\nUID:%s\r\n" % uid in answer
class TestCustomStorageSystem(BaseFileSystemTest):
"""Test custom backend loading."""
- storage_type = "radicale.tests.custom.storage_simple_sync"
- full_sync_token_support = False
+
+ storage_type: ClassVar[StorageType] = (
+ "radicale.tests.custom.storage_simple_sync")
+ full_sync_token_support: ClassVar[bool] = False
+
test_root = BaseRequestsMixIn.test_root
_report_sync_token = BaseRequestsMixIn._report_sync_token
# include tests related to sync token
- s = None
+ s: str = ""
for s in dir(BaseRequestsMixIn):
if s.startswith("test_") and ("_sync_" in s or s.endswith("_sync")):
locals()[s] = getattr(BaseRequestsMixIn, s)
@@ -1716,5 +1770,8 @@ class TestCustomStorageSystem(BaseFileSystemTest):
class TestCustomStorageSystemCallable(BaseFileSystemTest):
"""Test custom backend loading with ``callable``."""
- storage_type = radicale.tests.custom.storage_simple_sync.Storage
+
+ storage_type: ClassVar[StorageType] = (
+ radicale.tests.custom.storage_simple_sync.Storage)
+
test_add_event = BaseRequestsMixIn.test_add_event
diff --git a/radicale/tests/test_config.py b/radicale/tests/test_config.py
index e9177cb4..384cbca8 100644
--- a/radicale/tests/test_config.py
+++ b/radicale/tests/test_config.py
@@ -18,23 +18,26 @@ import os
import shutil
import tempfile
from configparser import RawConfigParser
+from typing import List, Tuple
import pytest
-from radicale import config
+from radicale import config, types
from radicale.tests.helpers import configuration_to_dict
class TestConfig:
"""Test the configuration."""
- def setup(self):
+ colpath: str
+
+ def setup(self) -> None:
self.colpath = tempfile.mkdtemp()
- def teardown(self):
+ def teardown(self) -> None:
shutil.rmtree(self.colpath)
- def _write_config(self, config_dict, name):
+ def _write_config(self, config_dict: types.CONFIG, name: str) -> str:
parser = RawConfigParser()
parser.read_dict(config_dict)
config_path = os.path.join(self.colpath, name)
@@ -42,7 +45,7 @@ class TestConfig:
parser.write(f)
return config_path
- def test_parse_compound_paths(self):
+ def test_parse_compound_paths(self) -> None:
assert len(config.parse_compound_paths()) == 0
assert len(config.parse_compound_paths("")) == 0
assert len(config.parse_compound_paths(None, "")) == 0
@@ -62,16 +65,16 @@ class TestConfig:
assert os.path.basename(paths[i][0]) == name
assert paths[i][1] is ignore_if_missing
- def test_load_empty(self):
+ def test_load_empty(self) -> None:
config_path = self._write_config({}, "config")
config.load([(config_path, False)])
- def test_load_full(self):
+ def test_load_full(self) -> None:
config_path = self._write_config(
configuration_to_dict(config.load()), "config")
config.load([(config_path, False)])
- def test_load_missing(self):
+ def test_load_missing(self) -> None:
config_path = os.path.join(self.colpath, "does_not_exist")
config.load([(config_path, True)])
with pytest.raises(Exception) as exc_info:
@@ -79,18 +82,20 @@ class TestConfig:
e = exc_info.value
assert "Failed to load config file %r" % config_path in str(e)
- def test_load_multiple(self):
+ def test_load_multiple(self) -> None:
config_path1 = self._write_config({
"server": {"hosts": "192.0.2.1:1111"}}, "config1")
config_path2 = self._write_config({
"server": {"max_connections": 1111}}, "config2")
configuration = config.load([(config_path1, False),
(config_path2, False)])
- assert len(configuration.get("server", "hosts")) == 1
- assert configuration.get("server", "hosts")[0] == ("192.0.2.1", 1111)
+ server_hosts: List[Tuple[str, int]] = configuration.get(
+ "server", "hosts")
+ assert len(server_hosts) == 1
+ assert server_hosts[0] == ("192.0.2.1", 1111)
assert configuration.get("server", "max_connections") == 1111
- def test_copy(self):
+ def test_copy(self) -> None:
configuration1 = config.load()
configuration1.update({"server": {"max_connections": "1111"}}, "test")
configuration2 = configuration1.copy()
@@ -98,14 +103,14 @@ class TestConfig:
assert configuration1.get("server", "max_connections") == 1111
assert configuration2.get("server", "max_connections") == 1112
- def test_invalid_section(self):
+ def test_invalid_section(self) -> None:
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.update({"does_not_exist": {"x": "x"}}, "test")
e = exc_info.value
assert "Invalid section 'does_not_exist'" in str(e)
- def test_invalid_option(self):
+ def test_invalid_option(self) -> None:
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.update({"server": {"x": "x"}}, "test")
@@ -113,7 +118,7 @@ class TestConfig:
assert "Invalid option 'x'" in str(e)
assert "section 'server'" in str(e)
- def test_invalid_option_plugin(self):
+ def test_invalid_option_plugin(self) -> None:
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.update({"auth": {"x": "x"}}, "test")
@@ -121,7 +126,7 @@ class TestConfig:
assert "Invalid option 'x'" in str(e)
assert "section 'auth'" in str(e)
- def test_invalid_value(self):
+ def test_invalid_value(self) -> None:
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.update({"server": {"max_connections": "x"}}, "test")
@@ -131,7 +136,7 @@ class TestConfig:
assert "section 'server" in str(e)
assert "'x'" in str(e)
- def test_privileged(self):
+ def test_privileged(self) -> None:
configuration = config.load()
configuration.update({"server": {"_internal_server": "True"}},
"test", privileged=True)
@@ -141,9 +146,9 @@ class TestConfig:
e = exc_info.value
assert "Invalid option '_internal_server'" in str(e)
- def test_plugin_schema(self):
- plugin_schema = {"auth": {"new_option": {"value": "False",
- "type": bool}}}
+ def test_plugin_schema(self) -> None:
+ plugin_schema: types.CONFIG_SCHEMA = {
+ "auth": {"new_option": {"value": "False", "type": bool}}}
configuration = config.load()
configuration.update({"auth": {"type": "new_plugin"}}, "test")
plugin_configuration = configuration.copy(plugin_schema)
@@ -152,26 +157,26 @@ class TestConfig:
plugin_configuration = configuration.copy(plugin_schema)
assert plugin_configuration.get("auth", "new_option") is True
- def test_plugin_schema_duplicate_option(self):
- plugin_schema = {"auth": {"type": {"value": "False",
- "type": bool}}}
+ def test_plugin_schema_duplicate_option(self) -> None:
+ plugin_schema: types.CONFIG_SCHEMA = {
+ "auth": {"type": {"value": "False", "type": bool}}}
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.copy(plugin_schema)
e = exc_info.value
assert "option already exists in 'auth': 'type'" in str(e)
- def test_plugin_schema_invalid(self):
- plugin_schema = {"server": {"new_option": {"value": "False",
- "type": bool}}}
+ def test_plugin_schema_invalid(self) -> None:
+ plugin_schema: types.CONFIG_SCHEMA = {
+ "server": {"new_option": {"value": "False", "type": bool}}}
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.copy(plugin_schema)
e = exc_info.value
assert "not a plugin section: 'server" in str(e)
- def test_plugin_schema_option_invalid(self):
- plugin_schema = {"auth": {}}
+ def test_plugin_schema_option_invalid(self) -> None:
+ plugin_schema: types.CONFIG_SCHEMA = {"auth": {}}
configuration = config.load()
configuration.update({"auth": {"type": "new_plugin",
"new_option": False}}, "test")
diff --git a/radicale/tests/test_rights.py b/radicale/tests/test_rights.py
index 951cf55c..e9b92787 100644
--- a/radicale/tests/test_rights.py
+++ b/radicale/tests/test_rights.py
@@ -19,10 +19,8 @@ Radicale tests with simple requests and rights.
"""
import os
-import shutil
-import tempfile
-from radicale import Application, config
+from radicale import Application
from radicale.tests import BaseTest
from radicale.tests.helpers import get_file_content
@@ -30,20 +28,8 @@ from radicale.tests.helpers import get_file_content
class TestBaseRightsRequests(BaseTest):
"""Tests basic requests with rights."""
- def setup(self):
- self.configuration = config.load()
- self.colpath = tempfile.mkdtemp()
- self.configuration.update({
- "storage": {"filesystem_folder": self.colpath,
- # Disable syncing to disk for better performance
- "_filesystem_fsync": "False"}},
- "test", privileged=True)
-
- def teardown(self):
- shutil.rmtree(self.colpath)
-
- def _test_rights(self, rights_type, user, path, mode, expected_status,
- with_auth=True):
+ def _test_rights(self, rights_type: str, user: str, path: str, mode: str,
+ expected_status: int, with_auth: bool = True) -> None:
assert mode in ("r", "w")
assert user in ("", "tmp")
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
@@ -61,7 +47,7 @@ class TestBaseRightsRequests(BaseTest):
(self.propfind if mode == "r" else self.proppatch)(
path, check=expected_status, login="tmp:bepo" if user else None)
- def test_owner_only(self):
+ def test_owner_only(self) -> None:
self._test_rights("owner_only", "", "/", "r", 401)
self._test_rights("owner_only", "", "/", "w", 401)
self._test_rights("owner_only", "", "/tmp/", "r", 401)
@@ -73,13 +59,13 @@ class TestBaseRightsRequests(BaseTest):
self._test_rights("owner_only", "tmp", "/other/", "r", 403)
self._test_rights("owner_only", "tmp", "/other/", "w", 403)
- def test_owner_only_without_auth(self):
+ def test_owner_only_without_auth(self) -> None:
self._test_rights("owner_only", "", "/", "r", 207, False)
self._test_rights("owner_only", "", "/", "w", 401, False)
self._test_rights("owner_only", "", "/tmp/", "r", 207, False)
self._test_rights("owner_only", "", "/tmp/", "w", 207, False)
- def test_owner_write(self):
+ def test_owner_write(self) -> None:
self._test_rights("owner_write", "", "/", "r", 401)
self._test_rights("owner_write", "", "/", "w", 401)
self._test_rights("owner_write", "", "/tmp/", "r", 401)
@@ -91,13 +77,13 @@ class TestBaseRightsRequests(BaseTest):
self._test_rights("owner_write", "tmp", "/other/", "r", 207)
self._test_rights("owner_write", "tmp", "/other/", "w", 403)
- def test_owner_write_without_auth(self):
+ def test_owner_write_without_auth(self) -> None:
self._test_rights("owner_write", "", "/", "r", 207, False)
self._test_rights("owner_write", "", "/", "w", 401, False)
self._test_rights("owner_write", "", "/tmp/", "r", 207, False)
self._test_rights("owner_write", "", "/tmp/", "w", 207, False)
- def test_authenticated(self):
+ def test_authenticated(self) -> None:
self._test_rights("authenticated", "", "/", "r", 401)
self._test_rights("authenticated", "", "/", "w", 401)
self._test_rights("authenticated", "", "/tmp/", "r", 401)
@@ -109,13 +95,13 @@ class TestBaseRightsRequests(BaseTest):
self._test_rights("authenticated", "tmp", "/other/", "r", 207)
self._test_rights("authenticated", "tmp", "/other/", "w", 207)
- def test_authenticated_without_auth(self):
+ def test_authenticated_without_auth(self) -> None:
self._test_rights("authenticated", "", "/", "r", 207, False)
self._test_rights("authenticated", "", "/", "w", 207, False)
self._test_rights("authenticated", "", "/tmp/", "r", 207, False)
self._test_rights("authenticated", "", "/tmp/", "w", 207, False)
- def test_from_file(self):
+ def test_from_file(self) -> None:
rights_file_path = os.path.join(self.colpath, "rights")
with open(rights_file_path, "w") as f:
f.write("""\
@@ -160,13 +146,13 @@ permissions: i""")
self.get("/public/calendar")
self.get("/public/calendar/1.ics", check=401)
- def test_custom(self):
+ def test_custom(self) -> None:
"""Custom rights management."""
self._test_rights("radicale.tests.custom.rights", "", "/", "r", 401)
self._test_rights(
"radicale.tests.custom.rights", "", "/tmp/", "r", 207)
- def test_collections_and_items(self):
+ def test_collections_and_items(self) -> None:
"""Test rights for creation of collections, calendars and items.
Collections are allowed at "/" and "/.../".
@@ -183,7 +169,7 @@ permissions: i""")
self.mkcol("/user/calendar/item", check=401)
self.mkcalendar("/user/calendar/item", check=401)
- def test_put_collections_and_items(self):
+ def test_put_collections_and_items(self) -> None:
"""Test rights for creation of calendars and items with PUT."""
self.application = Application(self.configuration)
self.put("/user/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR", check=401)
diff --git a/radicale/tests/test_server.py b/radicale/tests/test_server.py
index 72ff52be..956bc85b 100644
--- a/radicale/tests/test_server.py
+++ b/radicale/tests/test_server.py
@@ -21,15 +21,14 @@ Test the internal server.
import errno
import os
-import shutil
import socket
import ssl
import subprocess
import sys
-import tempfile
import threading
import time
from configparser import RawConfigParser
+from typing import Callable, Dict, NoReturn, Optional, Tuple, cast
from urllib import request
from urllib.error import HTTPError, URLError
@@ -41,34 +40,43 @@ from radicale.tests.helpers import configuration_to_dict, get_file_path
class DisabledRedirectHandler(request.HTTPRedirectHandler):
- def http_error_301(self, req, fp, code, msg, headers):
+
+ # HACK: typeshed annotation are wrong for `fp` and `msg`
+ # (https://github.com/python/typeshed/pull/5728)
+ # `headers` is incompatible with `http.client.HTTPMessage`
+ # (https://github.com/python/typeshed/issues/5729)
+ def http_error_301(self, req: request.Request, fp, code: int,
+ msg, headers) -> NoReturn:
raise HTTPError(req.full_url, code, msg, headers, fp)
- def http_error_302(self, req, fp, code, msg, headers):
+ def http_error_302(self, req: request.Request, fp, code: int,
+ msg, headers) -> NoReturn:
raise HTTPError(req.full_url, code, msg, headers, fp)
- def http_error_303(self, req, fp, code, msg, headers):
+ def http_error_303(self, req: request.Request, fp, code: int,
+ msg, headers) -> NoReturn:
raise HTTPError(req.full_url, code, msg, headers, fp)
- def http_error_307(self, req, fp, code, msg, headers):
+ def http_error_307(self, req: request.Request, fp, code: int,
+ msg, headers) -> NoReturn:
raise HTTPError(req.full_url, code, msg, headers, fp)
class TestBaseServerRequests(BaseTest):
"""Test the internal server."""
- def setup(self):
- self.configuration = config.load()
- self.colpath = tempfile.mkdtemp()
+ shutdown_socket: socket.socket
+ thread: threading.Thread
+ opener: request.OpenerDirector
+
+ def setup(self) -> None:
+ super().setup()
self.shutdown_socket, shutdown_socket_out = socket.socketpair()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Find available port
sock.bind(("127.0.0.1", 0))
self.sockname = sock.getsockname()
self.configuration.update({
- "storage": {"filesystem_folder": self.colpath,
- # Disable syncing to disk for better performance
- "_filesystem_fsync": "False"},
"server": {"hosts": "[%s]:%d" % self.sockname},
# Enable debugging for new processes
"logging": {"level": "debug"}},
@@ -82,40 +90,57 @@ class TestBaseServerRequests(BaseTest):
request.HTTPSHandler(context=ssl_context),
DisabledRedirectHandler)
- def teardown(self):
+ def teardown(self) -> None:
self.shutdown_socket.close()
try:
self.thread.join()
except RuntimeError: # Thread never started
pass
- shutil.rmtree(self.colpath)
+ super().teardown()
- def request(self, method, path, data=None, is_alive_fn=None, **headers):
+ def request(self, method: str, path: str, data: Optional[str] = None,
+ **kwargs) -> Tuple[int, Dict[str, str], str]:
"""Send a request."""
+ login = kwargs.pop("login", None)
+ if login is not None and not isinstance(login, str):
+ raise TypeError("login argument must be %r, not %r" %
+ (str, type(login)))
+ if login:
+ raise NotImplementedError
+ is_alive_fn: Optional[Callable[[], bool]] = kwargs.pop(
+ "is_alive_fn", None)
+ headers: Dict[str, str] = kwargs
+ for k, v in headers.items():
+ if not isinstance(v, str):
+ raise TypeError("type of %r is %r, expected %r" %
+ (k, type(v), str))
if is_alive_fn is None:
is_alive_fn = self.thread.is_alive
- scheme = ("https" if self.configuration.get("server", "ssl") else
- "http")
+ encoding: str = self.configuration.get("encoding", "request")
+ scheme = "https" if self.configuration.get("server", "ssl") else "http"
+ data_bytes = None
+ if data:
+ data_bytes = data.encode(encoding)
req = request.Request(
"%s://[%s]:%d%s" % (scheme, *self.sockname, path),
- data=data, headers=headers, method=method)
+ data=data_bytes, headers=headers, method=method)
while True:
assert is_alive_fn()
try:
with self.opener.open(req) as f:
- return f.getcode(), f.info(), f.read().decode()
+ return f.getcode(), dict(f.info()), f.read().decode()
except HTTPError as e:
- return e.code, e.headers, e.read().decode()
+ return e.code, dict(e.headers), e.read().decode()
except URLError as e:
if not isinstance(e.reason, ConnectionRefusedError):
raise
time.sleep(0.1)
- def test_root(self):
+ def test_root(self) -> None:
self.thread.start()
self.get("/", check=302)
- def test_ssl(self):
+ def test_ssl(self) -> None:
self.configuration.update({
"server": {"ssl": "True",
"certificate": get_file_path("cert.pem"),
@@ -123,7 +148,7 @@ class TestBaseServerRequests(BaseTest):
self.thread.start()
self.get("/", check=302)
- def test_bind_fail(self):
+ def test_bind_fail(self) -> None:
for address_family, address in [(socket.AF_INET, "::1"),
(socket.AF_INET6, "127.0.0.1")]:
with socket.socket(address_family, socket.SOCK_STREAM) as sock:
@@ -143,7 +168,7 @@ class TestBaseServerRequests(BaseTest):
errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT,
errno.EPROTONOSUPPORT))
- def test_ipv6(self):
+ def test_ipv6(self) -> None:
try:
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
# Only allow IPv6 connections to the IPv6 socket
@@ -162,7 +187,7 @@ class TestBaseServerRequests(BaseTest):
self.thread.start()
self.get("/", check=302)
- def test_command_line_interface(self):
+ def test_command_line_interface(self) -> None:
config_args = []
for section, values in config.DEFAULT_CONFIG_SCHEMA.items():
if section.startswith("_"):
@@ -172,13 +197,14 @@ class TestBaseServerRequests(BaseTest):
continue
long_name = "--%s-%s" % (section, option.replace("_", "-"))
if data["type"] == bool:
- if not self.configuration.get(section, option):
+ if not cast(bool, self.configuration.get(section, option)):
long_name = "--no%s" % long_name[1:]
config_args.append(long_name)
else:
config_args.append(long_name)
- config_args.append(
- self.configuration.get_raw(section, option))
+ raw_value = self.configuration.get_raw(section, option)
+ assert isinstance(raw_value, str)
+ config_args.append(raw_value)
p = subprocess.Popen(
[sys.executable, "-m", "radicale"] + config_args,
env={**os.environ, "PYTHONPATH": os.pathsep.join(sys.path)})
@@ -190,7 +216,7 @@ class TestBaseServerRequests(BaseTest):
if os.name == "posix":
assert p.returncode == 0
- def test_wsgi_server(self):
+ def test_wsgi_server(self) -> None:
config_path = os.path.join(self.colpath, "config")
parser = RawConfigParser()
parser.read_dict(configuration_to_dict(self.configuration))
@@ -199,9 +225,10 @@ class TestBaseServerRequests(BaseTest):
env = os.environ.copy()
env["PYTHONPATH"] = os.pathsep.join(sys.path)
env["RADICALE_CONFIG"] = config_path
+ raw_server_hosts = self.configuration.get_raw("server", "hosts")
+ assert isinstance(raw_server_hosts, str)
p = subprocess.Popen([
- sys.executable, "-m", "waitress",
- "--listen", self.configuration.get_raw("server", "hosts"),
+ sys.executable, "-m", "waitress", "--listen", raw_server_hosts,
"radicale:application"], env=env)
try:
self.get("/", is_alive_fn=lambda: p.poll() is None, check=302)
diff --git a/radicale/tests/test_web.py b/radicale/tests/test_web.py
index 5d2b7925..2dc599e3 100644
--- a/radicale/tests/test_web.py
+++ b/radicale/tests/test_web.py
@@ -19,30 +19,14 @@ Test web plugin.
"""
-import shutil
-import tempfile
-
-from radicale import Application, config
+from radicale import Application
from radicale.tests import BaseTest
class TestBaseWebRequests(BaseTest):
"""Test web plugin."""
- def setup(self):
- self.configuration = config.load()
- self.colpath = tempfile.mkdtemp()
- self.configuration.update({
- "storage": {"filesystem_folder": self.colpath,
- # Disable syncing to disk for better performance
- "_filesystem_fsync": "False"}},
- "test", privileged=True)
- self.application = Application(self.configuration)
-
- def teardown(self):
- shutil.rmtree(self.colpath)
-
- def test_internal(self):
+ def test_internal(self) -> None:
status, headers, _ = self.request("GET", "/.web")
assert status == 302
assert headers.get("Location") == ".web/"
@@ -50,7 +34,7 @@ class TestBaseWebRequests(BaseTest):
assert answer
self.post("/.web", check=405)
- def test_none(self):
+ def test_none(self) -> None:
self.configuration.update({"web": {"type": "none"}}, "test")
self.application = Application(self.configuration)
_, answer = self.get("/.web")
@@ -58,7 +42,7 @@ class TestBaseWebRequests(BaseTest):
self.get("/.web/", check=404)
self.post("/.web", check=405)
- def test_custom(self):
+ def test_custom(self) -> None:
"""Custom web plugin."""
self.configuration.update({
"web": {"type": "radicale.tests.custom.web"}}, "test")