1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-06-26 16:45:52 +00:00

Merge pull request #1783 from pbiering/issue-1782

Implement timerange filter for VALARM
This commit is contained in:
Peter Bieringer 2025-05-16 07:42:58 +02:00 committed by GitHub
commit c0fd66eda6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 129 additions and 17 deletions

View file

@ -1,7 +1,7 @@
# Changelog
## 3.5.4.dev
* Improve: item filter enhanced for 3rd level supporting VALARM and VFREEBUSY (only component existence so far)
* Improve: item filter enhanced for 3rd level supporting VALARM and honoring TRIGGER (offset or absolute)
## 3.5.3
* Add: [auth] htpasswd: support for Argon2 hashes

View file

@ -177,7 +177,7 @@ def xml_report(base_prefix: str, path: str, xml_request: Optional[ET.Element],
props: Union[ET.Element, List]
if root.find(xmlutils.make_clark("D:prop")) is not None:
props = root.find(xmlutils.make_clark("D:prop")) # type: ignore[assignment]
props = root.find(xmlutils.make_clark("D:prop")) # type: ignore[assignment]
else:
props = []

View file

@ -96,7 +96,7 @@ class Auth(auth.BaseAuth):
self._has_bcrypt = False
self._has_argon2 = False
self._htpasswd_ok = False
self._htpasswd_not_ok_reminder_seconds = 60 # currently hardcoded
self._htpasswd_not_ok_reminder_seconds = 60 # currently hardcoded
(self._htpasswd_ok, self._htpasswd_bcrypt_use, self._htpasswd_argon2_use, self._htpasswd, self._htpasswd_size, self._htpasswd_mtime_ns) = self._read_htpasswd(True, False)
self._lock = threading.Lock()

View file

@ -17,6 +17,8 @@
import imaplib
import ssl
import sys
from typing import Union
from radicale import auth
from radicale.log import logger
@ -49,7 +51,10 @@ class Auth(auth.BaseAuth):
def _login(self, login, password) -> str:
try:
connection: imaplib.IMAP4 | imaplib.IMAP4_SSL
if sys.version_info < (3, 10):
connection: Union[imaplib.IMAP4, imaplib.IMAP4_SSL]
else:
connection: imaplib.IMAP4 | imaplib.IMAP4_SSL
if self._security == "tls":
connection = imaplib.IMAP4_SSL(
host=self._host, port=self._port,

View file

@ -21,11 +21,12 @@
import math
import sys
import xml.etree.ElementTree as ET
from datetime import date, datetime, timedelta, timezone
from itertools import chain
from typing import (Callable, Iterable, Iterator, List, Optional, Sequence,
Tuple)
Tuple, Union)
import vobject
@ -39,6 +40,11 @@ DATETIME_MAX: datetime = datetime.max.replace(tzinfo=timezone.utc)
TIMESTAMP_MIN: int = math.floor(DATETIME_MIN.timestamp())
TIMESTAMP_MAX: int = math.ceil(DATETIME_MAX.timestamp())
if sys.version_info < (3, 10):
TRIGGER = Union[datetime, None]
else:
TRIGGER = datetime | None
def date_to_datetime(d: date) -> datetime:
"""Transform any date to a UTC datetime.
@ -88,8 +94,7 @@ def comp_match(item: "item.Item", filter_: ET.Element, level: int = 0) -> bool:
"""
# TODO: Improve filtering for VALARM and VFREEBUSY
# so far only filtering based on existence of such component is implemented
# TODO: Filtering VFREEBUSY is not implemented
# HACK: the filters are tested separately against all components
name = filter_.get("name", "").upper()
@ -117,10 +122,11 @@ def comp_match(item: "item.Item", filter_: ET.Element, level: int = 0) -> bool:
return False
if ((level == 0 and name != "VCALENDAR") or
(level == 1 and name not in ("VTODO", "VEVENT", "VJOURNAL")) or
(level == 2 and name not in ("VALARM", "VFREEBUSY"))):
(level == 2 and name not in ("VALARM"))):
logger.warning("Filtering %s is not supported", name)
return True
# Point #3 and #4 of rfc4791-9.7.1
trigger = None
if level == 0:
components = [item.vobject_item]
elif level == 1:
@ -128,15 +134,19 @@ def comp_match(item: "item.Item", filter_: ET.Element, level: int = 0) -> bool:
elif level == 2:
components = list(getattr(item.vobject_item, "%s_list" % tag.lower()))
for comp in components:
if not hasattr(comp, name.lower()):
subcomp = getattr(comp, name.lower(), None)
if not subcomp:
return False
if hasattr(subcomp, "trigger"):
# rfc4791-7.8.5:
trigger = subcomp.trigger.value
for child in filter_:
if child.tag == xmlutils.make_clark("C:prop-filter"):
if not any(prop_match(comp, child, "C")
for comp in components):
return False
elif child.tag == xmlutils.make_clark("C:time-range"):
if not time_range_match(item.vobject_item, filter_[0], tag):
if not time_range_match(item.vobject_item, filter_[0], tag, trigger):
return False
elif child.tag == xmlutils.make_clark("C:comp-filter"):
if not comp_match(item, child, level=level + 1):
@ -166,7 +176,7 @@ def prop_match(vobject_item: vobject.base.Component,
# Point #3 and #4 of rfc4791-9.7.2
for child in filter_:
if ns == "C" and child.tag == xmlutils.make_clark("C:time-range"):
if not time_range_match(vobject_item, child, name):
if not time_range_match(vobject_item, child, name, None):
return False
elif child.tag == xmlutils.make_clark("%s:text-match" % ns):
if not text_match(vobject_item, child, name, ns):
@ -180,9 +190,10 @@ def prop_match(vobject_item: vobject.base.Component,
def time_range_match(vobject_item: vobject.base.Component,
filter_: ET.Element, child_name: str) -> bool:
filter_: ET.Element, child_name: str, trigger: TRIGGER) -> bool:
"""Check whether the component/property ``child_name`` of
``vobject_item`` matches the time-range ``filter_``."""
# supporting since 3.5.4 now optional trigger (either absolute or relative offset)
if not filter_.get("start") and not filter_.get("end"):
return False
@ -193,6 +204,25 @@ def time_range_match(vobject_item: vobject.base.Component,
def range_fn(range_start: datetime, range_end: datetime,
is_recurrence: bool) -> bool:
nonlocal matched
if trigger:
# if trigger is given, only check range_start
if isinstance(trigger, timedelta):
# trigger is a offset, apply to range_start
if start < range_start + trigger and range_start + trigger < end:
matched = True
return True
else:
return False
elif isinstance(trigger, datetime):
# trigger is absolute, use instead of range_start
if start < trigger and trigger < end:
matched = True
return True
else:
return False
else:
logger.warning("item/filter/time_range_match/range_fn: unsupported data format of provided trigger=%r", trigger)
return True
if start < range_end and range_start < end:
matched = True
return True

View file

@ -0,0 +1,15 @@
BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//python-caldav//caldav//en_DK
BEGIN:VEVENT
SUMMARY:This is a test event
DTSTART:20151010T060000Z
DTEND:20161010T070000Z
DTSTAMP:20250515T073149Z
UID:a9cef952-315e-11f0-a30a-1c1bb5134174
BEGIN:VALARM
ACTION:AUDIO
TRIGGER:-PT15M
END:VALARM
END:VEVENT
END:VCALENDAR

View file

@ -0,0 +1,15 @@
BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//python-caldav//caldav//en_DK
BEGIN:VEVENT
SUMMARY:This is a test event
DTSTART:20151010T060000Z
DTEND:20161010T070000Z
DTSTAMP:20250515T073149Z
UID:a9cef952-315e-11f0-a30a-1c1bb5134175
BEGIN:VALARM
ACTION:AUDIO
TRIGGER;VALUE=DATE-TIME:20151010T033000Z
END:VALARM
END:VEVENT
END:VCALENDAR

View file

@ -21,6 +21,7 @@ Radicale tests with simple requests.
"""
import logging
import os
import posixpath
from typing import Any, Callable, ClassVar, Iterable, List, Optional, Tuple
@ -745,7 +746,7 @@ permissions: RrWw""")
) -> List[str]:
filter_template = "<C:filter>%s</C:filter>"
create_collection_fn: Callable[[str], Any]
if kind in ("event", "journal", "todo"):
if kind in ("event", "journal", "todo", "valarm"):
create_collection_fn = self.mkcalendar
path = "/calendar.ics/"
filename_template = "%s%d.ics"
@ -764,10 +765,13 @@ permissions: RrWw""")
status, _, = self.delete(path, check=None)
assert status in (200, 404)
create_collection_fn(path)
logging.warning("Upload items %r", items)
for i in items:
logging.warning("Upload %d", i)
filename = filename_template % (kind, i)
event = get_file_content(filename)
self.put(posixpath.join(path, filename), event)
logging.warning("Upload items finished")
filters_text = "".join(filter_template % f for f in filters)
_, responses = self.report(path, """\
<?xml version="1.0" encoding="utf-8" ?>
@ -1304,6 +1308,49 @@ permissions: RrWw""")
</C:comp-filter>"""], "todo", items=range(1, 9))
assert "/calendar.ics/todo7.ics" in answer
def test_time_range_filter_events_valarm(self) -> None:
"""Report request with time-range filter on events having absolute VALARM."""
answer = self._test_filter(["""\
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:comp-filter name="VALARM">
<C:time-range start="20151010T030000Z" end="20151010T040000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:comp-filter>"""], "valarm", items=[1, 2])
assert "/calendar.ics/valarm1.ics" not in answer
assert "/calendar.ics/valarm2.ics" in answer # absolute date
answer = self._test_filter(["""\
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:comp-filter name="VALARM">
<C:time-range start="20151010T010000Z" end="20151010T020000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:comp-filter>"""], "valarm", items=[1, 2])
assert "/calendar.ics/valarm1.ics" not in answer
assert "/calendar.ics/valarm2.ics" not in answer
answer = self._test_filter(["""\
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:comp-filter name="VALARM">
<C:time-range start="20151010T080000Z" end="20151010T090000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:comp-filter>"""], "valarm", items=[1, 2])
assert "/calendar.ics/valarm1.ics" not in answer
assert "/calendar.ics/valarm2.ics" not in answer
answer = self._test_filter(["""\
<C:comp-filter name="VCALENDAR">
<C:comp-filter name="VEVENT">
<C:comp-filter name="VALARM">
<C:time-range start="20151010T053000Z" end="20151010T055000Z"/>
</C:comp-filter>
</C:comp-filter>
</C:comp-filter>"""], "valarm", items=[1, 2])
assert "/calendar.ics/valarm1.ics" in answer # -15 min offset
assert "/calendar.ics/valarm2.ics" not in answer
def test_time_range_filter_todos_completed(self) -> None:
answer = self._test_filter(["""\
<C:comp-filter name="VCALENDAR">

View file

@ -102,7 +102,7 @@ def ssl_context_options_by_protocol(protocol: str, ssl_context_options):
ssl_context_options |= ssl.OP_NO_TLSv1_3
logger.debug("SSL cleared SSL context options: '0x%x'", ssl_context_options)
for entry in protocol.split():
entry = entry.strip('+') # remove trailing '+'
entry = entry.strip('+') # remove trailing '+'
if entry == "ALL":
logger.debug("SSL context options, enable ALL (some maybe not supported by underlying OpenSSL, SSLv2 not enabled at all)")
ssl_context_options &= ~ssl.OP_NO_SSLv3
@ -162,7 +162,7 @@ def ssl_context_options_by_protocol(protocol: str, ssl_context_options):
def ssl_context_minimum_version_by_options(ssl_context_options):
logger.debug("SSL calculate minimum version by context options: '0x%x'", ssl_context_options)
ssl_context_minimum_version = ssl.TLSVersion.SSLv3 # default
ssl_context_minimum_version = ssl.TLSVersion.SSLv3 # default
if ((ssl_context_options & ssl.OP_NO_SSLv3) and (ssl_context_minimum_version == ssl.TLSVersion.SSLv3)):
ssl_context_minimum_version = ssl.TLSVersion.TLSv1
if ((ssl_context_options & ssl.OP_NO_TLSv1) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1)):
@ -172,7 +172,7 @@ def ssl_context_minimum_version_by_options(ssl_context_options):
if ((ssl_context_options & ssl.OP_NO_TLSv1_2) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1_2)):
ssl_context_minimum_version = ssl.TLSVersion.TLSv1_3
if ((ssl_context_options & ssl.OP_NO_TLSv1_3) and (ssl_context_minimum_version == ssl.TLSVersion.TLSv1_3)):
ssl_context_minimum_version = 0 # all disabled
ssl_context_minimum_version = 0 # all disabled
logger.debug("SSL context options: '0x%x' results in minimum version: %s", ssl_context_options, ssl_context_minimum_version)
return ssl_context_minimum_version
@ -180,7 +180,7 @@ def ssl_context_minimum_version_by_options(ssl_context_options):
def ssl_context_maximum_version_by_options(ssl_context_options):
logger.debug("SSL calculate maximum version by context options: '0x%x'", ssl_context_options)
ssl_context_maximum_version = ssl.TLSVersion.TLSv1_3 # default
ssl_context_maximum_version = ssl.TLSVersion.TLSv1_3 # default
if ((ssl_context_options & ssl.OP_NO_TLSv1_3) and (ssl_context_maximum_version == ssl.TLSVersion.TLSv1_3)):
ssl_context_maximum_version = ssl.TLSVersion.TLSv1_2
if ((ssl_context_options & ssl.OP_NO_TLSv1_2) and (ssl_context_maximum_version == ssl.TLSVersion.TLSv1_2)):