# This file is part of Radicale Server - Calendar Server # Copyright © 2012-2017 Guillaume Ayoub # Copyright © 2017-2018 Unrud # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Radicale. If not, see . """ Radicale tests with simple requests. """ import base64 import os import shutil import sys import tempfile import xml.etree.ElementTree as ET from functools import partial from radicale import Application, config from . import BaseTest from .helpers import get_file_content import posixpath # isort:skip import pytest # isort:skip class BaseRequestsMixIn: """Tests with simple requests.""" # Allow skipping sync-token tests, when not fully supported by the backend full_sync_token_support = True def test_root(self): """GET request at "/".""" status, _, answer = self.request("GET", "/") assert status == 302 assert answer == "Redirected to .web" def test_script_name(self): """GET request at "/" with SCRIPT_NAME.""" status, _, answer = self.request("GET", "/", SCRIPT_NAME="/radicale") assert status == 302 assert answer == "Redirected to .web" status, _, answer = self.request("GET", "", SCRIPT_NAME="/radicale") assert status == 302 assert answer == "Redirected to radicale/.web" def test_add_event(self): """Add an event.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 status, headers, answer = self.request("GET", path) assert status == 200 assert "ETag" in headers assert headers["Content-Type"] == "text/calendar; charset=utf-8" assert "VEVENT" in answer assert "Event" in answer assert "UID:event" in answer def test_add_event_without_uid(self): """Add an event without UID.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics").replace("UID:event1\n", "") assert "\nUID:" not in event path = "/calendar.ics/event.ics" status, _, _ = self.request("PUT", path, event) assert status == 400 def test_add_todo(self): """Add a todo.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 todo = get_file_content("todo1.ics") path = "/calendar.ics/todo1.ics" status, _, _ = self.request("PUT", path, todo) assert status == 201 status, headers, answer = self.request("GET", path) assert status == 200 assert "ETag" in headers assert headers["Content-Type"] == "text/calendar; charset=utf-8" assert "VTODO" in answer assert "Todo" in answer assert "UID:todo" in answer def _create_addressbook(self, path): return self.request( "MKCOL", path, """\ """) def test_add_contact(self): """Add a contact.""" status, _, _ = self._create_addressbook("/contacts.vcf/") assert status == 201 contact = get_file_content("contact1.vcf") path = "/contacts.vcf/contact.vcf" status, _, _ = self.request("PUT", path, contact) assert status == 201 status, headers, answer = self.request("GET", path) assert status == 200 assert "ETag" in headers assert headers["Content-Type"] == "text/vcard; charset=utf-8" assert "VCARD" in answer assert "UID:contact1" in answer status, _, answer = self.request("GET", path) assert status == 200 assert "UID:contact1" in answer def test_add_contact_without_uid(self): """Add a contact without UID.""" status, _, _ = self._create_addressbook("/contacts.vcf/") assert status == 201 contact = get_file_content("contact1.vcf").replace("UID:contact1\n", "") assert "\nUID" not in contact path = "/contacts.vcf/contact.vcf" status, _, _ = self.request("PUT", path, contact) assert status == 400 def test_update(self): """Update an event.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 status, headers, answer = self.request("GET", path) assert "ETag" in headers assert status == 200 assert "VEVENT" in answer assert "Event" in answer assert "UID:event" in answer assert "DTSTART;TZID=Europe/Paris:20130901T180000" in answer assert "DTEND;TZID=Europe/Paris:20130901T190000" in answer # Then we send another PUT request event = get_file_content("event1-prime.ics") status, _, _ = self.request("PUT", path, event) assert status == 201 status, _, answer = self.request("GET", "/calendar.ics/") assert status == 200 assert answer.count("BEGIN:VEVENT") == 1 status, headers, answer = self.request("GET", path) assert status == 200 assert "ETag" in headers assert "VEVENT" in answer assert "Event" in answer assert "UID:event" in answer assert "DTSTART;TZID=Europe/Paris:20130901T180000" not in answer assert "DTEND;TZID=Europe/Paris:20130901T190000" not in answer assert "DTSTART;TZID=Europe/Paris:20140901T180000" in answer assert "DTEND;TZID=Europe/Paris:20140901T210000" in answer def test_put_whole_calendar(self): """Create and overwrite a whole calendar.""" status, _, _ = self.request( "PUT", "/calendar.ics/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR") assert status == 201 event1 = get_file_content("event1.ics") status, _, _ = self.request( "PUT", "/calendar.ics/test_event.ics", event1) assert status == 201 # Overwrite events = get_file_content("event_multiple.ics") status, _, _ = self.request("PUT", "/calendar.ics/", events) assert status == 201 status, _, _ = self.request("GET", "/calendar.ics/test_event.ics") assert status == 404 status, _, answer = self.request("GET", "/calendar.ics/") assert status == 200 assert "\r\nUID:event\r\n" in answer and "\r\nUID:todo\r\n" in answer assert "\r\nUID:event1\r\n" not in answer def test_put_whole_calendar_without_uids(self): """Create a whole calendar without UID.""" event = get_file_content("event_multiple.ics") event = event.replace("UID:event\n", "").replace("UID:todo\n", "") assert "\nUID:" not in event status, _, _ = self.request("PUT", "/calendar.ics/", event) assert status == 201 status, _, answer = self.request("GET", "/calendar.ics") assert status == 200 uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) assert len(uids) == 2 for i, uid1 in enumerate(uids): assert uid1 for uid2 in uids[i + 1:]: assert uid1 != uid2 def test_put_whole_addressbook(self): """Create and overwrite a whole addressbook.""" contacts = get_file_content("contact_multiple.vcf") status, _, _ = self.request("PUT", "/contacts.vcf/", contacts) assert status == 201 status, _, answer = self.request("GET", "/contacts.vcf/") assert status == 200 assert ("\r\nUID:contact1\r\n" in answer and "\r\nUID:contact2\r\n" in answer) def test_put_whole_addressbook_without_uids(self): """Create a whole addressbook without UID.""" contacts = get_file_content("contact_multiple.vcf") contacts = contacts.replace("UID:contact1\n", "").replace( "UID:contact2\n", "") assert "\nUID:" not in contacts status, _, _ = self.request("PUT", "/contacts.vcf/", contacts) assert status == 201 status, _, answer = self.request("GET", "/contacts.vcf") assert status == 200 uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) assert len(uids) == 2 for i, uid1 in enumerate(uids): assert uid1 for uid2 in uids[i + 1:]: assert uid1 != uid2 def test_delete(self): """Delete an event.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 # Then we send a DELETE request status, _, answer = self.request("DELETE", path) assert status == 200 assert "href>%s/calendar.ics///%s%s%s" in answer status, _, answer = self.request( "PROPFIND", "/calendar.ics/event.ics", propfind) assert "" in answer def test_propfind_allprop(self): status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") status, _, _ = self.request("PUT", "/calendar.ics/event.ics", event) assert status == 201 propfind = get_file_content("allprop.xml") status, _, answer = self.request( "PROPFIND", "/calendar.ics/", propfind) assert "" in answer status, _, answer = self.request( "PROPFIND", "/calendar.ics/event.ics", propfind) assert "" in answer def test_proppatch(self): """Write a property and read it back.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 proppatch = get_file_content("proppatch1.xml") status, _, answer = self.request( "PROPPATCH", "/calendar.ics/", proppatch) assert status == 207 assert "calendar-color" in answer assert "200 OK#BADA55" in answer def test_put_whole_calendar_multiple_events_with_same_uid(self): """Add two events with the same UID.""" status, _, _ = self.request( "PUT", "/calendar.ics/", get_file_content("event2.ics")) assert status == 201 status, _, answer = self.request( "REPORT", "/calendar.ics/", """ """) assert status == 207 assert answer.count("") == 1 status, _, answer = self.request("GET", "/calendar.ics/") assert status == 200 assert answer.count("BEGIN:VEVENT") == 2 def _test_filter(self, filters, kind="event", test=None, items=(1,)): filter_template = "{}" if kind in ("event", "journal", "todo"): create_collection_fn = partial(self.request, "MKCALENDAR") path = "/calendar.ics/" filename_template = "{}{}.ics" namespace = "urn:ietf:params:xml:ns:caldav" report = "calendar-query" elif kind == "contact": create_collection_fn = self._create_addressbook if test: filter_template = '{{}}'.format( test) path = "/contacts.vcf/" filename_template = "{}{}.vcf" namespace = "urn:ietf:params:xml:ns:carddav" report = "addressbook-query" else: raise ValueError("Unsupported kind: %r" % kind) status, _, _ = self.request("DELETE", path) assert status in (200, 404) status, _, _ = create_collection_fn(path) assert status == 201 for i in items: filename = filename_template.format(kind, i) event = get_file_content(filename) status, _, _ = self.request( "PUT", posixpath.join(path, filename), event) assert status == 201 filters_text = "".join( filter_template.format(filter_) for filter_ in filters) status, _, answer = self.request( "REPORT", path, """ {2} """.format(namespace, report, filters_text)) assert status == 207 return answer def test_addressbook_empty_filter(self): self._test_filter([""], kind="contact") def test_addressbook_prop_filter(self): assert "href>/contacts.vcf/contact1.vcf es """], "contact") assert "href>/contacts.vcf/contact1.vcf es """], "contact") assert "href>/contacts.vcf/contact1.vcf a """], "contact") assert "href>/contacts.vcf/contact1.vcf test """], "contact") assert "href>/contacts.vcf/contact1.vcf tes """], "contact") assert "href>/contacts.vcf/contact1.vcf est """], "contact") assert "href>/contacts.vcf/contact1.vcf tes """], "contact") assert "href>/contacts.vcf/contact1.vcf est """], "contact") assert "href>/contacts.vcf/contact1.vcf est """], "contact") assert "href>/contacts.vcf/contact1.vcf tes """], "contact") def test_addressbook_prop_filter_any(self): assert "href>/contacts.vcf/contact1.vcf test test """], "contact", test="anyof") assert "href>/contacts.vcf/contact1.vcf a test """], "contact", test="anyof") assert "href>/contacts.vcf/contact1.vcf test test """], "contact") def test_addressbook_prop_filter_all(self): assert "href>/contacts.vcf/contact1.vcf tes est """], "contact", test="allof") assert "href>/contacts.vcf/contact1.vcf test test """], "contact", test="allof") def test_calendar_empty_filter(self): self._test_filter([""]) def test_calendar_tag_filter(self): """Report request with tag-based filter on calendar.""" assert "href>/calendar.ics/event1.ics"""]) def test_item_tag_filter(self): """Report request with tag-based filter on an item.""" assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_item_not_tag_filter(self): """Report request with tag-based is-not filter on an item.""" assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_item_prop_filter(self): """Report request with prop-based filter on an item.""" assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_item_not_prop_filter(self): """Report request with prop-based is-not filter on an item.""" assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_mutiple_filters(self): """Report request with multiple filters on an item.""" assert "href>/calendar.ics/event1.ics """, """ """]) assert "href>/calendar.ics/event1.ics """, """ """]) assert "href>/calendar.ics/event1.ics """]) def test_text_match_filter(self): """Report request with text-match filter on calendar.""" assert "href>/calendar.ics/event1.ics event """]) assert "href>/calendar.ics/event1.ics event """]) assert "href>/calendar.ics/event1.ics unknown """]) assert "href>/calendar.ics/event1.ics event """]) def test_param_filter(self): """Report request with param-filter on calendar.""" assert "href>/calendar.ics/event1.ics ACCEPTED """]) assert "href>/calendar.ics/event1.ics UNKNOWN """]) assert "href>/calendar.ics/event1.ics """]) assert "href>/calendar.ics/event1.ics """]) def test_time_range_filter_events(self): """Report request with time-range filter on events.""" answer = self._test_filter([""" """], "event", items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], "event", items=range(1, 6)) assert "href>/calendar.ics/event1.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=range(1, 6)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics/calendar.ics/event3.ics/calendar.ics/event4.ics/calendar.ics/event5.ics """], items=(6, 7, 8, 9)) assert "href>/calendar.ics/event6.ics/calendar.ics/event7.ics/calendar.ics/event8.ics/calendar.ics/event9.ics """], items=(6, 7, 8, 9)) assert "href>/calendar.ics/event6.ics/calendar.ics/event7.ics/calendar.ics/event8.ics/calendar.ics/event9.ics """], items=(6, 7, 8, 9)) assert "href>/calendar.ics/event6.ics/calendar.ics/event7.ics/calendar.ics/event8.ics/calendar.ics/event9.ics """], items=(9,)) assert "href>/calendar.ics/event9.ics """], items=(9,)) assert "href>/calendar.ics/event9.ics """], "event", items=(1, 2)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics """], "event", items=(1, 2)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics """], "event", items=(1, 2)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics """], "event", items=(1, 2)) assert "href>/calendar.ics/event1.ics/calendar.ics/event2.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics/calendar.ics/todo3.ics/calendar.ics/todo4.ics/calendar.ics/todo5.ics/calendar.ics/todo6.ics/calendar.ics/todo7.ics/calendar.ics/todo8.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics/calendar.ics/todo3.ics/calendar.ics/todo4.ics/calendar.ics/todo5.ics/calendar.ics/todo6.ics/calendar.ics/todo7.ics/calendar.ics/todo8.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo2.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo2.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo3.ics """], "todo", items=range(1, 9)) assert "href>/calendar.ics/todo7.ics """], "todo", items=(1, 2, 9)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics/calendar.ics/todo9.ics """], "todo", items=(1, 2, 9)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics/calendar.ics/todo9.ics """], "todo", items=(1, 2)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics """], "todo", items=(1, 2)) assert "href>/calendar.ics/todo1.ics/calendar.ics/todo2.ics """], "todo", items=(9,)) assert "href>/calendar.ics/todo9.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2, 3)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics/calendar.ics/journal3.ics """], "journal", items=(1, 2)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics """], "journal", items=(1, 2)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics """], "journal", items=(1, 2)) assert "href>/calendar.ics/journal1.ics/calendar.ics/journal2.ics """) assert status == 207 assert "href>%s<" % event_path in answer def _report_sync_token(self, calendar_path, sync_token=None): sync_token_xml = ( "" % sync_token if sync_token else "") status, _, answer = self.request( "REPORT", calendar_path, """ %s """ % sync_token_xml) if sync_token and status == 409: return None, None assert status == 207 xml = ET.fromstring(answer) sync_token = xml.find("{DAV:}sync-token").text.strip() assert sync_token return sync_token, xml def test_report_sync_collection_no_change(self): """Test sync-collection report without modifying the collection""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) assert xml.find("{DAV:}response") is not None new_sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not self.full_sync_token_support and not new_sync_token: pytest.skip("storage backend does not support sync-token") assert sync_token == new_sync_token assert xml.find("{DAV:}response") is None def test_report_sync_collection_add(self): """Test sync-collection report with an added item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not self.full_sync_token_support and not sync_token: pytest.skip("storage backend does not support sync-token") assert xml.find("{DAV:}response") is not None assert xml.find("{DAV:}response/{DAV:}status") is None def test_report_sync_collection_delete(self): """Test sync-collection report with a deleted item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) status, _, _ = self.request("DELETE", event_path) assert status == 200 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not self.full_sync_token_support and not sync_token: pytest.skip("storage backend does not support sync-token") assert "404" in xml.find("{DAV:}response/{DAV:}status").text def test_report_sync_collection_create_delete(self): """Test sync-collection report with a created and deleted item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 status, _, _ = self.request("DELETE", event_path) assert status == 200 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not self.full_sync_token_support and not sync_token: pytest.skip("storage backend does not support sync-token") assert "404" in xml.find("{DAV:}response/{DAV:}status").text def test_report_sync_collection_modify_undo(self): """Test sync-collection report with a modified and changed back item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event1 = get_file_content("event1.ics") event2 = get_file_content("event1_modified.ics") event_path = posixpath.join(calendar_path, "event1.ics") status, _, _ = self.request("PUT", event_path, event1) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) status, _, _ = self.request("PUT", event_path, event2) assert status == 201 status, _, _ = self.request("PUT", event_path, event1) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not self.full_sync_token_support and not sync_token: pytest.skip("storage backend does not support sync-token") assert xml.find("{DAV:}response") is not None assert xml.find("{DAV:}response/{DAV:}status") is None def test_report_sync_collection_move(self): """Test sync-collection report a moved item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event = get_file_content("event1.ics") event1_path = posixpath.join(calendar_path, "event1.ics") event2_path = posixpath.join(calendar_path, "event2.ics") status, _, _ = self.request("PUT", event1_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) status, _, _ = self.request( "MOVE", event1_path, HTTP_DESTINATION=event2_path, HTTP_HOST="") assert status == 201 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not self.full_sync_token_support and not sync_token: pytest.skip("storage backend does not support sync-token") for response in xml.findall("{DAV:}response"): if response.find("{DAV:}status") is None: assert response.find("{DAV:}href").text == event2_path else: assert "404" in response.find("{DAV:}status").text assert response.find("{DAV:}href").text == event1_path def test_report_sync_collection_move_undo(self): """Test sync-collection report with a moved and moved back item""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 event = get_file_content("event1.ics") event1_path = posixpath.join(calendar_path, "event1.ics") event2_path = posixpath.join(calendar_path, "event2.ics") status, _, _ = self.request("PUT", event1_path, event) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) status, _, _ = self.request( "MOVE", event1_path, HTTP_DESTINATION=event2_path, HTTP_HOST="") assert status == 201 status, _, _ = self.request( "MOVE", event2_path, HTTP_DESTINATION=event1_path, HTTP_HOST="") assert status == 201 sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not self.full_sync_token_support and not sync_token: pytest.skip("storage backend does not support sync-token") created = deleted = 0 for response in xml.findall("{DAV:}response"): if response.find("{DAV:}status") is None: assert response.find("{DAV:}href").text == event1_path created += 1 else: assert "404" in response.find("{DAV:}status").text assert response.find("{DAV:}href").text == event2_path deleted += 1 assert created == 1 and deleted == 1 def test_report_sync_collection_invalid_sync_token(self): """Test sync-collection report with an invalid sync token""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token( calendar_path, "http://radicale.org/ns/sync/INVALID") assert not sync_token def test_propfind_sync_token(self): """Retrieve the sync-token with a propfind request""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") status, _, _ = self.request("PUT", event_path, event) assert status == 201 new_sync_token, xml = self._report_sync_token(calendar_path, sync_token) assert sync_token != new_sync_token def test_propfind_same_as_sync_collection_sync_token(self): """Compare sync-token property with sync-collection sync-token""" calendar_path = "/calendar.ics/" status, _, _ = self.request("MKCALENDAR", calendar_path) assert status == 201 sync_token, xml = self._report_sync_token(calendar_path) new_sync_token, xml = self._report_sync_token(calendar_path, sync_token) if not self.full_sync_token_support and not new_sync_token: pytest.skip("storage backend does not support sync-token") assert sync_token == new_sync_token def test_calendar_getcontenttype(self): """Test report request on an item""" status, _, _ = self.request("MKCALENDAR", "/test/") assert status == 201 for component in ("event", "todo", "journal"): event = get_file_content("{}1.ics".format(component)) status, _, _ = self.request("DELETE", "/test/test.ics") assert status in (200, 404) status, _, _ = self.request("PUT", "/test/test.ics", event) assert status == 201 status, _, answer = self.request( "REPORT", "/test/", """ """) assert status == 207 assert ">text/calendar;charset=utf-8;component=V{}<".format( component.upper()) in answer def test_addressbook_getcontenttype(self): """Test report request on an item""" status, _, _ = self._create_addressbook("/test/") assert status == 201 contact = get_file_content("contact1.vcf") status, _, _ = self.request("PUT", "/test/test.vcf", contact) assert status == 201 status, _, answer = self.request( "REPORT", "/test/", """ """) assert status == 207 assert ">text/vcard;charset=utf-8<" in answer def test_authorization(self): authorization = "Basic " + base64.b64encode(b"user:").decode() status, _, answer = self.request( "PROPFIND", "/", """ """, HTTP_AUTHORIZATION=authorization) assert status == 207 assert "href>/user/<" in answer def test_authentication(self): """Test if server sends authentication request.""" self.configuration["auth"]["type"] = "htpasswd" self.configuration["auth"]["htpasswd_filename"] = os.devnull self.configuration["auth"]["htpasswd_encryption"] = "plain" self.configuration["rights"]["type"] = "owner_only" self.application = Application(self.configuration) status, headers, _ = self.request("MKCOL", "/user/") assert status in (401, 403) assert headers.get("WWW-Authenticate") def test_principal_collection_creation(self): """Verify existence of the principal collection.""" status, _, _ = self.request("PROPFIND", "/user/", HTTP_AUTHORIZATION=( "Basic " + base64.b64encode(b"user:").decode())) assert status == 207 def test_existence_of_root_collections(self): """Verify that the root collection always exists.""" # Use PROPFIND because GET returns message status, _, _ = self.request("PROPFIND", "/") assert status == 207 # it should still exist after deletion status, _, _ = self.request("DELETE", "/") assert status == 200 status, _, _ = self.request("PROPFIND", "/") assert status == 207 def test_custom_headers(self): if not self.configuration.has_section("headers"): self.configuration.add_section("headers") self.configuration.set("headers", "test", "123") # Test if header is set on success status, headers, _ = self.request("OPTIONS", "/") assert status == 200 assert headers.get("test") == "123" # Test if header is set on failure status, headers, _ = self.request( "GET", "/.well-known/does not exist") assert status == 404 assert headers.get("test") == "123" @pytest.mark.skipif(sys.version_info < (3, 6), reason="Unsupported in Python < 3.6") def test_timezone_seconds(self): """Verify that timezones with minutes and seconds work.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event_timezone_seconds.ics") status, _, _ = self.request("PUT", "/calendar.ics/event.ics", event) assert status == 201 class BaseFileSystemTest(BaseTest): """Base class for filesystem backend tests.""" storage_type = None def setup(self): self.configuration = config.load() self.configuration["storage"]["type"] = self.storage_type self.colpath = tempfile.mkdtemp() self.configuration["storage"]["filesystem_folder"] = self.colpath # Disable syncing to disk for better performance self.configuration["internal"]["filesystem_fsync"] = "False" # Allow access to anything for tests rights_file_path = os.path.join(self.colpath, "rights") with open(rights_file_path, "w") as f: f.write("""\ [allow all] user: .* collection: .* permissions: RrWw""") self.configuration["rights"]["file"] = rights_file_path self.configuration["rights"]["type"] = "from_file" self.application = Application(self.configuration) def teardown(self): shutil.rmtree(self.colpath) class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): """Test BaseRequests on multifilesystem.""" storage_type = "multifilesystem" def test_fsync(self): """Create a directory and file with syncing enabled.""" self.configuration["internal"]["filesystem_fsync"] = "True" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 def test_hook(self): """Run hook.""" self.configuration["storage"]["hook"] = ( "mkdir %s" % os.path.join("collection-root", "created_by_hook")) status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 status, _, _ = self.request("PROPFIND", "/created_by_hook/") assert status == 207 def test_hook_read_access(self): """Verify that hook is not run for read accesses.""" self.configuration["storage"]["hook"] = ( "mkdir %s" % os.path.join("collection-root", "created_by_hook")) status, _, _ = self.request("PROPFIND", "/") assert status == 207 status, _, _ = self.request("PROPFIND", "/created_by_hook/") assert status == 404 @pytest.mark.skipif(os.system("type flock") != 0, reason="flock command not found") def test_hook_storage_locked(self): """Verify that the storage is locked when the hook runs.""" self.configuration["storage"]["hook"] = ( "flock -n .Radicale.lock || exit 0; exit 1") status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 def test_hook_principal_collection_creation(self): """Verify that the hooks runs when a new user is created.""" self.configuration["storage"]["hook"] = ( "mkdir %s" % os.path.join("collection-root", "created_by_hook")) status, _, _ = self.request("PROPFIND", "/", HTTP_AUTHORIZATION=( "Basic " + base64.b64encode(b"user:").decode())) assert status == 207 status, _, _ = self.request("PROPFIND", "/created_by_hook/") assert status == 207 def test_hook_fail(self): """Verify that a request fails if the hook fails.""" self.configuration["storage"]["hook"] = "exit 1" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status != 201 def test_item_cache_rebuild(self): """Delete the item cache and verify that it is rebuild.""" status, _, _ = self.request("MKCALENDAR", "/calendar.ics/") assert status == 201 event = get_file_content("event1.ics") path = "/calendar.ics/event1.ics" status, _, _ = self.request("PUT", path, event) assert status == 201 status, _, answer1 = self.request("GET", path) assert status == 200 cache_folder = os.path.join(self.colpath, "collection-root", "calendar.ics", ".Radicale.cache", "item") assert os.path.exists(os.path.join(cache_folder, "event1.ics")) shutil.rmtree(cache_folder) status, _, answer2 = self.request("GET", path) assert status == 200 assert answer1 == answer2 assert os.path.exists(os.path.join(cache_folder, "event1.ics")) @pytest.mark.skipif(os.name not in ("nt", "posix"), reason="Only supported on 'nt' and 'posix'") def test_put_whole_calendar_uids_used_as_file_names(self): """Test if UIDs are used as file names.""" BaseRequestsMixIn.test_put_whole_calendar(self) for uid in ("todo", "event"): status, _, answer = self.request( "GET", "/calendar.ics/%s.ics" % uid) assert status == 200 assert "\r\nUID:%s\r\n" % uid in answer @pytest.mark.skipif(os.name not in ("nt", "posix"), reason="Only supported on 'nt' and 'posix'") def test_put_whole_calendar_random_uids_used_as_file_names(self): """Test if UIDs are used as file names.""" BaseRequestsMixIn.test_put_whole_calendar_without_uids(self) status, _, answer = self.request("GET", "/calendar.ics") assert status == 200 uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) for uid in uids: status, _, answer = self.request( "GET", "/calendar.ics/%s.ics" % uid) assert status == 200 assert "\r\nUID:%s\r\n" % uid in answer @pytest.mark.skipif(os.name not in ("nt", "posix"), reason="Only supported on 'nt' and 'posix'") def test_put_whole_addressbook_uids_used_as_file_names(self): """Test if UIDs are used as file names.""" BaseRequestsMixIn.test_put_whole_addressbook(self) for uid in ("contact1", "contact2"): status, _, answer = self.request( "GET", "/contacts.vcf/%s.vcf" % uid) assert status == 200 assert "\r\nUID:%s\r\n" % uid in answer @pytest.mark.skipif(os.name not in ("nt", "posix"), reason="Only supported on 'nt' and 'posix'") def test_put_whole_addressbook_random_uids_used_as_file_names(self): """Test if UIDs are used as file names.""" BaseRequestsMixIn.test_put_whole_addressbook_without_uids(self) status, _, answer = self.request("GET", "/contacts.vcf") assert status == 200 uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) for uid in uids: status, _, answer = self.request( "GET", "/contacts.vcf/%s.vcf" % uid) assert status == 200 assert "\r\nUID:%s\r\n" % uid in answer class TestCustomStorageSystem(BaseFileSystemTest): """Test custom backend loading.""" storage_type = "tests.custom.storage" def test_root(self): """A simple test to verify that the custom backend works.""" BaseRequestsMixIn.test_root(self)