1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-01 18:18:31 +00:00
This commit is contained in:
Unrud 2018-08-28 16:19:36 +02:00
parent 1bdc47bf44
commit 8869b34470
51 changed files with 4091 additions and 3335 deletions

374
radicale/item/__init__.py Normal file
View file

@ -0,0 +1,374 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2014 Jean-Marc Martins
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import math
import sys
from hashlib import md5
from random import getrandbits
import vobject
from radicale.item import filter as radicale_filter
def predict_tag_of_parent_collection(vobject_items):
if len(vobject_items) != 1:
return ""
if vobject_items[0].name == "VCALENDAR":
return "VCALENDAR"
if vobject_items[0].name in ("VCARD", "VLIST"):
return "VADDRESSBOOK"
return ""
def predict_tag_of_whole_collection(vobject_items, fallback_tag=None):
if vobject_items and vobject_items[0].name == "VCALENDAR":
return "VCALENDAR"
if vobject_items and vobject_items[0].name in ("VCARD", "VLIST"):
return "VADDRESSBOOK"
if not fallback_tag and not vobject_items:
# Maybe an empty address book
return "VADDRESSBOOK"
return fallback_tag
def check_and_sanitize_items(vobject_items, is_collection=False, tag=None):
"""Check vobject items for common errors and add missing UIDs.
``is_collection`` indicates that vobject_item contains unrelated
components.
The ``tag`` of the collection.
"""
if tag and tag not in ("VCALENDAR", "VADDRESSBOOK"):
raise ValueError("Unsupported collection tag: %r" % tag)
if not is_collection and len(vobject_items) != 1:
raise ValueError("Item contains %d components" % len(vobject_items))
if tag == "VCALENDAR":
if len(vobject_items) > 1:
raise RuntimeError("VCALENDAR collection contains %d "
"components" % len(vobject_items))
vobject_item = vobject_items[0]
if vobject_item.name != "VCALENDAR":
raise ValueError("Item type %r not supported in %r "
"collection" % (vobject_item.name, tag))
component_uids = set()
for component in vobject_item.components():
if component.name in ("VTODO", "VEVENT", "VJOURNAL"):
component_uid = get_uid(component)
if component_uid:
component_uids.add(component_uid)
component_name = None
object_uid = None
object_uid_set = False
for component in vobject_item.components():
# https://tools.ietf.org/html/rfc4791#section-4.1
if component.name == "VTIMEZONE":
continue
if component_name is None or is_collection:
component_name = component.name
elif component_name != component.name:
raise ValueError("Multiple component types in object: %r, %r" %
(component_name, component.name))
if component_name not in ("VTODO", "VEVENT", "VJOURNAL"):
continue
component_uid = get_uid(component)
if not object_uid_set or is_collection:
object_uid_set = True
object_uid = component_uid
if not component_uid:
if not is_collection:
raise ValueError("%s component without UID in object" %
component_name)
component_uid = find_available_uid(
component_uids.__contains__)
component_uids.add(component_uid)
if hasattr(component, "uid"):
component.uid.value = component_uid
else:
component.add("UID").value = component_uid
elif not object_uid or not component_uid:
raise ValueError("Multiple %s components without UID in "
"object" % component_name)
elif object_uid != component_uid:
raise ValueError(
"Multiple %s components with different UIDs in object: "
"%r, %r" % (component_name, object_uid, component_uid))
# vobject interprets recurrence rules on demand
try:
component.rruleset
except Exception as e:
raise ValueError("invalid recurrence rules in %s" %
component.name) from e
elif tag == "VADDRESSBOOK":
# https://tools.ietf.org/html/rfc6352#section-5.1
object_uids = set()
for vobject_item in vobject_items:
if vobject_item.name == "VCARD":
object_uid = get_uid(vobject_item)
if object_uid:
object_uids.add(object_uid)
for vobject_item in vobject_items:
if vobject_item.name == "VLIST":
# Custom format used by SOGo Connector to store lists of
# contacts
continue
if vobject_item.name != "VCARD":
raise ValueError("Item type %r not supported in %r "
"collection" % (vobject_item.name, tag))
object_uid = get_uid(vobject_item)
if not object_uid:
if not is_collection:
raise ValueError("%s object without UID" %
vobject_item.name)
object_uid = find_available_uid(object_uids.__contains__)
object_uids.add(object_uid)
if hasattr(vobject_item, "uid"):
vobject_item.uid.value = object_uid
else:
vobject_item.add("UID").value = object_uid
else:
for i in vobject_items:
raise ValueError("Item type %r not supported in %s collection" %
(i.name, repr(tag) if tag else "generic"))
def check_and_sanitize_props(props):
"""Check collection properties for common errors."""
tag = props.get("tag")
if tag and tag not in ("VCALENDAR", "VADDRESSBOOK"):
raise ValueError("Unsupported collection tag: %r" % tag)
def find_available_uid(exists_fn, suffix=""):
"""Generate a pseudo-random UID"""
# Prevent infinite loop
for _ in range(1000):
r = "%016x" % getrandbits(128)
name = "%s-%s-%s-%s-%s%s" % (
r[:8], r[8:12], r[12:16], r[16:20], r[20:], suffix)
if not exists_fn(name):
return name
# something is wrong with the PRNG
raise RuntimeError("No unique random sequence found")
def get_etag(text):
"""Etag from collection or item.
Encoded as quoted-string (see RFC 2616).
"""
etag = md5()
etag.update(text.encode("utf-8"))
return '"%s"' % etag.hexdigest()
def get_uid(vobject_component):
"""UID value of an item if defined."""
return (vobject_component.uid.value
if hasattr(vobject_component, "uid") else None)
def get_uid_from_object(vobject_item):
"""UID value of an calendar/addressbook object."""
if vobject_item.name == "VCALENDAR":
if hasattr(vobject_item, "vevent"):
return get_uid(vobject_item.vevent)
if hasattr(vobject_item, "vjournal"):
return get_uid(vobject_item.vjournal)
if hasattr(vobject_item, "vtodo"):
return get_uid(vobject_item.vtodo)
elif vobject_item.name == "VCARD":
return get_uid(vobject_item)
return None
def find_tag(vobject_item):
"""Find component name from ``vobject_item``."""
if vobject_item.name == "VCALENDAR":
for component in vobject_item.components():
if component.name != "VTIMEZONE":
return component.name or ""
return ""
def find_tag_and_time_range(vobject_item):
"""Find component name and enclosing time range from ``vobject item``.
Returns a tuple (``tag``, ``start``, ``end``) where ``tag`` is a string
and ``start`` and ``end`` are POSIX timestamps (as int).
This is intened to be used for matching against simplified prefilters.
"""
tag = find_tag(vobject_item)
if not tag:
return (
tag, radicale_filter.TIMESTAMP_MIN, radicale_filter.TIMESTAMP_MAX)
start = end = None
def range_fn(range_start, range_end, is_recurrence):
nonlocal start, end
if start is None or range_start < start:
start = range_start
if end is None or end < range_end:
end = range_end
return False
def infinity_fn(range_start):
nonlocal start, end
if start is None or range_start < start:
start = range_start
end = radicale_filter.DATETIME_MAX
return True
radicale_filter.visit_time_ranges(vobject_item, tag, range_fn, infinity_fn)
if start is None:
start = radicale_filter.DATETIME_MIN
if end is None:
end = radicale_filter.DATETIME_MAX
try:
return tag, math.floor(start.timestamp()), math.ceil(end.timestamp())
except ValueError as e:
if str(e) == ("offset must be a timedelta representing a whole "
"number of minutes") and sys.version_info < (3, 6):
raise RuntimeError("Unsupported in Python < 3.6: %s" % e) from e
raise
class Item:
def __init__(self, collection_path=None, collection=None,
vobject_item=None, href=None, last_modified=None, text=None,
etag=None, uid=None, name=None, component_name=None,
time_range=None):
"""Initialize an item.
``collection_path`` the path of the parent collection (optional if
``collection`` is set).
``collection`` the parent collection (optional).
``href`` the href of the item.
``last_modified`` the HTTP-datetime of when the item was modified.
``text`` the text representation of the item (optional if
``vobject_item`` is set).
``vobject_item`` the vobject item (optional if ``text`` is set).
``etag`` the etag of the item (optional). See ``get_etag``.
``uid`` the UID of the object (optional). See ``get_uid_from_object``.
``name`` the name of the item (optional). See ``vobject_item.name``.
``component_name`` the name of the primary component (optional).
See ``find_tag``.
``time_range`` the enclosing time range.
See ``find_tag_and_time_range``.
"""
if text is None and vobject_item is None:
raise ValueError(
"at least one of 'text' or 'vobject_item' must be set")
if collection_path is None:
if collection is None:
raise ValueError("at least one of 'collection_path' or "
"'collection' must be set")
collection_path = collection.path
self._collection_path = collection_path
self.collection = collection
self.href = href
self.last_modified = last_modified
self._text = text
self._vobject_item = vobject_item
self._etag = etag
self._uid = uid
self._name = name
self._component_name = component_name
self._time_range = time_range
def serialize(self):
if self._text is None:
try:
self._text = self.vobject_item.serialize()
except Exception as e:
raise RuntimeError("Failed to serialize item %r from %r: %s" %
(self.href, self._collection_path,
e)) from e
return self._text
@property
def vobject_item(self):
if self._vobject_item is None:
try:
self._vobject_item = vobject.readOne(self._text)
except Exception as e:
raise RuntimeError("Failed to parse item %r from %r: %s" %
(self.href, self._collection_path,
e)) from e
return self._vobject_item
@property
def etag(self):
"""Encoded as quoted-string (see RFC 2616)."""
if self._etag is None:
self._etag = get_etag(self.serialize())
return self._etag
@property
def uid(self):
if self._uid is None:
self._uid = get_uid_from_object(self.vobject_item)
return self._uid
@property
def name(self):
if self._name is None:
self._name = self.vobject_item.name or ""
return self._name
@property
def component_name(self):
if self._component_name is not None:
return self._component_name
return find_tag(self.vobject_item)
@property
def time_range(self):
if self._time_range is None:
self._component_name, *self._time_range = (
find_tag_and_time_range(self.vobject_item))
return self._time_range
def prepare(self):
"""Fill cache with values."""
orig_vobject_item = self._vobject_item
self.serialize()
self.etag
self.uid
self.name
self.time_range
self.component_name
self._vobject_item = orig_vobject_item

529
radicale/item/filter.py Normal file
View file

@ -0,0 +1,529 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2015 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import math
from datetime import date, datetime, timedelta, timezone
from itertools import chain
from radicale import xmlutils
from radicale.log import logger
DAY = timedelta(days=1)
SECOND = timedelta(seconds=1)
DATETIME_MIN = datetime.min.replace(tzinfo=timezone.utc)
DATETIME_MAX = datetime.max.replace(tzinfo=timezone.utc)
TIMESTAMP_MIN = math.floor(DATETIME_MIN.timestamp())
TIMESTAMP_MAX = math.ceil(DATETIME_MAX.timestamp())
def date_to_datetime(date_):
"""Transform a date to a UTC datetime.
If date_ is a datetime without timezone, return as UTC datetime. If date_
is already a datetime with timezone, return as is.
"""
if not isinstance(date_, datetime):
date_ = datetime.combine(date_, datetime.min.time())
if not date_.tzinfo:
date_ = date_.replace(tzinfo=timezone.utc)
return date_
def comp_match(item, filter_, level=0):
"""Check whether the ``item`` matches the comp ``filter_``.
If ``level`` is ``0``, the filter is applied on the
item's collection. Otherwise, it's applied on the item.
See rfc4791-9.7.1.
"""
# TODO: Filtering VALARM and VFREEBUSY is not implemented
# HACK: the filters are tested separately against all components
if level == 0:
tag = item.name
elif level == 1:
tag = item.component_name
else:
logger.warning(
"Filters with three levels of comp-filter are not supported")
return True
if not tag:
return False
name = filter_.get("name").upper()
if len(filter_) == 0:
# Point #1 of rfc4791-9.7.1
return name == tag
if len(filter_) == 1:
if filter_[0].tag == xmlutils.make_tag("C", "is-not-defined"):
# Point #2 of rfc4791-9.7.1
return name != tag
if name != tag:
return False
if (level == 0 and name != "VCALENDAR" or
level == 1 and name not in ("VTODO", "VEVENT", "VJOURNAL")):
logger.warning("Filtering %s is not supported" % name)
return True
# Point #3 and #4 of rfc4791-9.7.1
components = ([item.vobject_item] if level == 0
else list(getattr(item.vobject_item,
"%s_list" % tag.lower())))
for child in filter_:
if child.tag == xmlutils.make_tag("C", "prop-filter"):
if not any(prop_match(comp, child, "C")
for comp in components):
return False
elif child.tag == xmlutils.make_tag("C", "time-range"):
if not time_range_match(item.vobject_item, filter_[0], tag):
return False
elif child.tag == xmlutils.make_tag("C", "comp-filter"):
if not comp_match(item, child, level=level + 1):
return False
else:
raise ValueError("Unexpected %r in comp-filter" % child.tag)
return True
def prop_match(vobject_item, filter_, ns):
"""Check whether the ``item`` matches the prop ``filter_``.
See rfc4791-9.7.2 and rfc6352-10.5.1.
"""
name = filter_.get("name").lower()
if len(filter_) == 0:
# Point #1 of rfc4791-9.7.2
return name in vobject_item.contents
if len(filter_) == 1:
if filter_[0].tag == xmlutils.make_tag("C", "is-not-defined"):
# Point #2 of rfc4791-9.7.2
return name not in vobject_item.contents
if name not in vobject_item.contents:
return False
# Point #3 and #4 of rfc4791-9.7.2
for child in filter_:
if ns == "C" and child.tag == xmlutils.make_tag("C", "time-range"):
if not time_range_match(vobject_item, child, name):
return False
elif child.tag == xmlutils.make_tag(ns, "text-match"):
if not text_match(vobject_item, child, name, ns):
return False
elif child.tag == xmlutils.make_tag(ns, "param-filter"):
if not param_filter_match(vobject_item, child, name, ns):
return False
else:
raise ValueError("Unexpected %r in prop-filter" % child.tag)
return True
def time_range_match(vobject_item, filter_, child_name):
"""Check whether the component/property ``child_name`` of
``vobject_item`` matches the time-range ``filter_``."""
start = filter_.get("start")
end = filter_.get("end")
if not start and not end:
return False
if start:
start = datetime.strptime(start, "%Y%m%dT%H%M%SZ")
else:
start = datetime.min
if end:
end = datetime.strptime(end, "%Y%m%dT%H%M%SZ")
else:
end = datetime.max
start = start.replace(tzinfo=timezone.utc)
end = end.replace(tzinfo=timezone.utc)
matched = False
def range_fn(range_start, range_end, is_recurrence):
nonlocal matched
if start < range_end and range_start < end:
matched = True
return True
if end < range_start and not is_recurrence:
return True
return False
def infinity_fn(start):
return False
visit_time_ranges(vobject_item, child_name, range_fn, infinity_fn)
return matched
def visit_time_ranges(vobject_item, child_name, range_fn, infinity_fn):
"""Visit all time ranges in the component/property ``child_name`` of
`vobject_item`` with visitors ``range_fn`` and ``infinity_fn``.
``range_fn`` gets called for every time_range with ``start`` and ``end``
datetimes and ``is_recurrence`` as arguments. If the function returns True,
the operation is cancelled.
``infinity_fn`` gets called when an infiite recurrence rule is detected
with ``start`` datetime as argument. If the function returns True, the
operation is cancelled.
See rfc4791-9.9.
"""
# HACK: According to rfc5545-3.8.4.4 an recurrance that is resheduled
# with Recurrence ID affects the recurrence itself and all following
# recurrences too. This is not respected and client don't seem to bother
# either.
def getrruleset(child, ignore=()):
if (hasattr(child, "rrule") and
";UNTIL=" not in child.rrule.value.upper() and
";COUNT=" not in child.rrule.value.upper()):
for dtstart in child.getrruleset(addRDate=True):
if dtstart in ignore:
continue
if infinity_fn(date_to_datetime(dtstart)):
return (), True
break
return filter(lambda dtstart: dtstart not in ignore,
child.getrruleset(addRDate=True)), False
def get_children(components):
main = None
recurrences = []
for comp in components:
if hasattr(comp, "recurrence_id") and comp.recurrence_id.value:
recurrences.append(comp.recurrence_id.value)
if comp.rruleset:
# Prevent possible infinite loop
raise ValueError("Overwritten recurrence with RRULESET")
yield comp, True, ()
else:
if main is not None:
raise ValueError("Multiple main components")
main = comp
if main is None:
raise ValueError("Main component missing")
yield main, False, recurrences
# Comments give the lines in the tables of the specification
if child_name == "VEVENT":
for child, is_recurrence, recurrences in get_children(
vobject_item.vevent_list):
# TODO: check if there's a timezone
dtstart = child.dtstart.value
if child.rruleset:
dtstarts, infinity = getrruleset(child, recurrences)
if infinity:
return
else:
dtstarts = (dtstart,)
dtend = getattr(child, "dtend", None)
if dtend is not None:
dtend = dtend.value
original_duration = (dtend - dtstart).total_seconds()
dtend = date_to_datetime(dtend)
duration = getattr(child, "duration", None)
if duration is not None:
original_duration = duration = duration.value
for dtstart in dtstarts:
dtstart_is_datetime = isinstance(dtstart, datetime)
dtstart = date_to_datetime(dtstart)
if dtend is not None:
# Line 1
dtend = dtstart + timedelta(seconds=original_duration)
if range_fn(dtstart, dtend, is_recurrence):
return
elif duration is not None:
if original_duration is None:
original_duration = duration.seconds
if duration.seconds > 0:
# Line 2
if range_fn(dtstart, dtstart + duration,
is_recurrence):
return
else:
# Line 3
if range_fn(dtstart, dtstart + SECOND, is_recurrence):
return
elif dtstart_is_datetime:
# Line 4
if range_fn(dtstart, dtstart + SECOND, is_recurrence):
return
else:
# Line 5
if range_fn(dtstart, dtstart + DAY, is_recurrence):
return
elif child_name == "VTODO":
for child, is_recurrence, recurrences in get_children(
vobject_item.vtodo_list):
dtstart = getattr(child, "dtstart", None)
duration = getattr(child, "duration", None)
due = getattr(child, "due", None)
completed = getattr(child, "completed", None)
created = getattr(child, "created", None)
if dtstart is not None:
dtstart = date_to_datetime(dtstart.value)
if duration is not None:
duration = duration.value
if due is not None:
due = date_to_datetime(due.value)
if dtstart is not None:
original_duration = (due - dtstart).total_seconds()
if completed is not None:
completed = date_to_datetime(completed.value)
if created is not None:
created = date_to_datetime(created.value)
original_duration = (completed - created).total_seconds()
elif created is not None:
created = date_to_datetime(created.value)
if child.rruleset:
reference_dates, infinity = getrruleset(child, recurrences)
if infinity:
return
else:
if dtstart is not None:
reference_dates = (dtstart,)
elif due is not None:
reference_dates = (due,)
elif completed is not None:
reference_dates = (completed,)
elif created is not None:
reference_dates = (created,)
else:
# Line 8
if range_fn(DATETIME_MIN, DATETIME_MAX, is_recurrence):
return
reference_dates = ()
for reference_date in reference_dates:
reference_date = date_to_datetime(reference_date)
if dtstart is not None and duration is not None:
# Line 1
if range_fn(reference_date,
reference_date + duration + SECOND,
is_recurrence):
return
if range_fn(reference_date + duration - SECOND,
reference_date + duration + SECOND,
is_recurrence):
return
elif dtstart is not None and due is not None:
# Line 2
due = reference_date + timedelta(seconds=original_duration)
if (range_fn(reference_date, due, is_recurrence) or
range_fn(reference_date,
reference_date + SECOND, is_recurrence) or
range_fn(due - SECOND, due, is_recurrence) or
range_fn(due - SECOND, reference_date + SECOND,
is_recurrence)):
return
elif dtstart is not None:
if range_fn(reference_date, reference_date + SECOND,
is_recurrence):
return
elif due is not None:
# Line 4
if range_fn(reference_date - SECOND, reference_date,
is_recurrence):
return
elif completed is not None and created is not None:
# Line 5
completed = reference_date + timedelta(
seconds=original_duration)
if (range_fn(reference_date - SECOND,
reference_date + SECOND,
is_recurrence) or
range_fn(completed - SECOND, completed + SECOND,
is_recurrence) or
range_fn(reference_date - SECOND,
reference_date + SECOND, is_recurrence) or
range_fn(completed - SECOND, completed + SECOND,
is_recurrence)):
return
elif completed is not None:
# Line 6
if range_fn(reference_date - SECOND,
reference_date + SECOND, is_recurrence):
return
elif created is not None:
# Line 7
if range_fn(reference_date, DATETIME_MAX, is_recurrence):
return
elif child_name == "VJOURNAL":
for child, is_recurrence, recurrences in get_children(
vobject_item.vjournal_list):
dtstart = getattr(child, "dtstart", None)
if dtstart is not None:
dtstart = dtstart.value
if child.rruleset:
dtstarts, infinity = getrruleset(child, recurrences)
if infinity:
return
else:
dtstarts = (dtstart,)
for dtstart in dtstarts:
dtstart_is_datetime = isinstance(dtstart, datetime)
dtstart = date_to_datetime(dtstart)
if dtstart_is_datetime:
# Line 1
if range_fn(dtstart, dtstart + SECOND, is_recurrence):
return
else:
# Line 2
if range_fn(dtstart, dtstart + DAY, is_recurrence):
return
else:
# Match a property
child = getattr(vobject_item, child_name.lower())
if isinstance(child, date):
range_fn(child, child + DAY, False)
elif isinstance(child, datetime):
range_fn(child, child + SECOND, False)
def text_match(vobject_item, filter_, child_name, ns, attrib_name=None):
"""Check whether the ``item`` matches the text-match ``filter_``.
See rfc4791-9.7.5.
"""
# TODO: collations are not supported, but the default ones needed
# for DAV servers are actually pretty useless. Texts are lowered to
# be case-insensitive, almost as the "i;ascii-casemap" value.
text = next(filter_.itertext()).lower()
match_type = "contains"
if ns == "CR":
match_type = filter_.get("match-type", match_type)
def match(value):
value = value.lower()
if match_type == "equals":
return value == text
if match_type == "contains":
return text in value
if match_type == "starts-with":
return value.startswith(text)
if match_type == "ends-with":
return value.endswith(text)
raise ValueError("Unexpected text-match match-type: %r" % match_type)
children = getattr(vobject_item, "%s_list" % child_name, [])
if attrib_name:
condition = any(
match(attrib) for child in children
for attrib in child.params.get(attrib_name, []))
else:
condition = any(match(child.value) for child in children)
if filter_.get("negate-condition") == "yes":
return not condition
else:
return condition
def param_filter_match(vobject_item, filter_, parent_name, ns):
"""Check whether the ``item`` matches the param-filter ``filter_``.
See rfc4791-9.7.3.
"""
name = filter_.get("name").upper()
children = getattr(vobject_item, "%s_list" % parent_name, [])
condition = any(name in child.params for child in children)
if len(filter_):
if filter_[0].tag == xmlutils.make_tag(ns, "text-match"):
return condition and text_match(
vobject_item, filter_[0], parent_name, ns, name)
elif filter_[0].tag == xmlutils.make_tag(ns, "is-not-defined"):
return not condition
else:
return condition
def simplify_prefilters(filters, collection_tag="VCALENDAR"):
"""Creates a simplified condition from ``filters``.
Returns a tuple (``tag``, ``start``, ``end``, ``simple``) where ``tag`` is
a string or None (match all) and ``start`` and ``end`` are POSIX
timestamps (as int). ``simple`` is a bool that indicates that ``filters``
and the simplified condition are identical.
"""
flat_filters = tuple(chain.from_iterable(filters))
simple = len(flat_filters) <= 1
for col_filter in flat_filters:
if collection_tag != "VCALENDAR":
simple = False
break
if (col_filter.tag != xmlutils.make_tag("C", "comp-filter") or
col_filter.get("name").upper() != "VCALENDAR"):
simple = False
continue
simple &= len(col_filter) <= 1
for comp_filter in col_filter:
if comp_filter.tag != xmlutils.make_tag("C", "comp-filter"):
simple = False
continue
tag = comp_filter.get("name").upper()
if comp_filter.find(
xmlutils.make_tag("C", "is-not-defined")) is not None:
simple = False
continue
simple &= len(comp_filter) <= 1
for time_filter in comp_filter:
if tag not in ("VTODO", "VEVENT", "VJOURNAL"):
simple = False
break
if time_filter.tag != xmlutils.make_tag("C", "time-range"):
simple = False
continue
start = time_filter.get("start")
end = time_filter.get("end")
if start:
start = math.floor(datetime.strptime(
start, "%Y%m%dT%H%M%SZ").replace(
tzinfo=timezone.utc).timestamp())
else:
start = TIMESTAMP_MIN
if end:
end = math.ceil(datetime.strptime(
end, "%Y%m%dT%H%M%SZ").replace(
tzinfo=timezone.utc).timestamp())
else:
end = TIMESTAMP_MAX
return tag, start, end, simple
return tag, TIMESTAMP_MIN, TIMESTAMP_MAX, simple
return None, TIMESTAMP_MIN, TIMESTAMP_MAX, simple