1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-09-15 20:36:55 +00:00

Allow finer control in rights plugin

New permissions:

R: read collections without tag
r: read collections with tag and included objects
W: write and delete collections without tag
w: write and delete collection with tag and included objects
This commit is contained in:
Unrud 2018-08-21 18:43:45 +02:00
parent 72501c6e23
commit 0a492a00b1
6 changed files with 124 additions and 116 deletions

View file

@ -144,32 +144,33 @@ class Application:
def collect_allowed_items(self, items, user):
"""Get items from request that user is allowed to access."""
read_allowed_items = []
write_allowed_items = []
for item in items:
if isinstance(item, storage.BaseCollection):
path = storage.sanitize_path("/%s/" % item.path)
can_read = self.Rights.authorized(user, path, "r")
can_write = self.Rights.authorized(user, path, "w")
target = "collection %r" % item.path
if item.get_meta("tag"):
permissions = self.Rights.authorized(user, path, "rw")
target = "collection with tag %r" % item.path
else:
permissions = self.Rights.authorized(user, path, "RW")
target = "collection %r" % item.path
else:
path = storage.sanitize_path("/%s/%s" % (item.collection.path,
item.href))
can_read = self.Rights.authorized_item(user, path, "r")
can_write = self.Rights.authorized_item(user, path, "w")
path = storage.sanitize_path("/%s/" % item.collection.path)
permissions = self.Rights.authorized(user, path, "rw")
target = "item %r from %r" % (item.href, item.collection.path)
text_status = []
if can_read:
text_status.append("read")
read_allowed_items.append(item)
if can_write:
text_status.append("write")
write_allowed_items.append(item)
if rights.intersect_permissions(permissions, "Ww"):
permission = "w"
status = "write"
elif rights.intersect_permissions(permissions, "Rr"):
permission = "r"
status = "read"
else:
permission = ""
status = "NO"
logger.debug(
"%s has %s access to %s",
repr(user) if user else "anonymous user",
" and ".join(text_status) if text_status else "NO", target)
return read_allowed_items, write_allowed_items
repr(user) if user else "anonymous user", status, target)
if permission:
yield item, permission
def __call__(self, environ, start_response):
with log.register_stream(environ["wsgi.errors"]):
@ -315,7 +316,7 @@ class Application:
# Create principal collection
if user:
principal_path = "/%s/" % user
if self.Rights.authorized(user, principal_path, "w"):
if self.Rights.authorized(user, principal_path, "W"):
with self.Collection.acquire_lock("r", user):
principal = next(
self.Collection.discover(principal_path, depth="1"),
@ -365,19 +366,28 @@ class Application:
return response(status, headers, answer)
def _access(self, user, path, permission, item=None):
"""Check if ``user`` can access ``path`` or the parent collection.
``permission`` must either be "r" or "w".
If ``item`` is given, only access to that class of item is checked.
"""
allowed = False
if not item or isinstance(item, storage.BaseCollection):
allowed |= self.Rights.authorized(user, path, permission)
if not item or not isinstance(item, storage.BaseCollection):
allowed |= self.Rights.authorized_item(user, path, permission)
return allowed
if permission not in "rw":
raise ValueError("Invalid permission argument: %r" % permission)
if not item:
permissions = permission + permission.upper()
parent_permissions = permission
elif isinstance(item, storage.BaseCollection):
if item.get_meta("tag"):
permissions = permission
else:
permissions = permission.upper()
parent_permissions = ""
else:
permissions = ""
parent_permissions = permission
if permissions and self.Rights.authorized(user, path, permissions):
return True
if parent_permissions:
parent_path = storage.sanitize_path(
"/%s/" % posixpath.dirname(path.strip("/")))
if self.Rights.authorized(user, parent_path, parent_permissions):
return True
return False
def _read_raw_content(self, environ):
content_length = int(environ.get("CONTENT_LENGTH") or 0)
@ -459,10 +469,10 @@ class Application:
return NOT_ALLOWED
with self.Collection.acquire_lock("w", user):
item = next(self.Collection.discover(path), None)
if not self._access(user, path, "w", item):
return NOT_ALLOWED
if not item:
return NOT_FOUND
if not self._access(user, path, "w", item):
return NOT_ALLOWED
if_match = environ.get("HTTP_IF_MATCH", "*")
if if_match not in ("*", item.etag):
# ETag precondition not verified, do not delete item
@ -493,10 +503,10 @@ class Application:
return NOT_ALLOWED
with self.Collection.acquire_lock("r", user):
item = next(self.Collection.discover(path), None)
if not self._access(user, path, "r", item):
return NOT_ALLOWED
if not item:
return NOT_FOUND
if not self._access(user, path, "r", item):
return NOT_ALLOWED
if isinstance(item, storage.BaseCollection):
tag = item.get_meta("tag")
if not tag:
@ -563,7 +573,8 @@ class Application:
def do_MKCOL(self, environ, base_prefix, path, user):
"""Manage MKCOL request."""
if not self.Rights.authorized(user, path, "w"):
permissions = self.Rights.authorized(user, path, "Ww")
if not permissions:
return NOT_ALLOWED
try:
xml_content = self._read_xml_content(environ)
@ -587,6 +598,9 @@ class Application:
parent_item.get_meta("tag")):
return FORBIDDEN
props = xmlutils.props_from_request(xml_content)
if (props.get("tag") and "w" not in permissions or
not props.get("tag") and "W" not in permissions):
return NOT_ALLOWED
try:
storage.check_and_sanitize_props(props)
self.Collection.create_collection(path, props=props)
@ -617,12 +631,11 @@ class Application:
with self.Collection.acquire_lock("w", user):
item = next(self.Collection.discover(path), None)
if not self._access(user, path, "w", item):
return NOT_ALLOWED
if not self._access(user, to_path, "w", item):
return NOT_ALLOWED
if not item:
return NOT_FOUND
if (not self._access(user, path, "w", item) or
not self._access(user, to_path, "w", item)):
return NOT_ALLOWED
if isinstance(item, storage.BaseCollection):
# TODO: support moving collections
return METHOD_NOT_ALLOWED
@ -682,24 +695,24 @@ class Application:
path, environ.get("HTTP_DEPTH", "0"))
# take root item for rights checking
item = next(items, None)
if not self._access(user, path, "r", item):
return NOT_ALLOWED
if not item:
return NOT_FOUND
if not self._access(user, path, "r", item):
return NOT_ALLOWED
# put item back
items = itertools.chain([item], items)
read_items, write_items = self.collect_allowed_items(items, user)
allowed_items = self.collect_allowed_items(items, user)
headers = {"DAV": DAV_HEADERS,
"Content-Type": "text/xml; charset=%s" % self.encoding}
status, xml_answer = xmlutils.propfind(
base_prefix, path, xml_content, read_items, write_items, user)
base_prefix, path, xml_content, allowed_items, user)
if status == client.FORBIDDEN:
return NOT_ALLOWED
return status, headers, self._write_xml_content(xml_answer)
def do_PROPPATCH(self, environ, base_prefix, path, user):
"""Manage PROPPATCH request."""
if not self.Rights.authorized(user, path, "w"):
if not self._access(user, path, "w"):
return NOT_ALLOWED
try:
xml_content = self._read_xml_content(environ)
@ -714,6 +727,8 @@ class Application:
item = next(self.Collection.discover(path), None)
if not item:
return NOT_FOUND
if not self._access(user, path, "w", item):
return NOT_ALLOWED
if not isinstance(item, storage.BaseCollection):
return FORBIDDEN
headers = {"DAV": DAV_HEADERS,
@ -754,7 +769,7 @@ class Application:
if write_whole_collection:
if not self.Rights.authorized(user, path, "w"):
return NOT_ALLOWED
elif not self.Rights.authorized_item(user, path, "w"):
elif not self.Rights.authorized(user, parent_path, "w"):
return NOT_ALLOWED
etag = environ.get("HTTP_IF_MATCH", "")
@ -850,10 +865,10 @@ class Application:
return REQUEST_TIMEOUT
with self.Collection.acquire_lock("r", user):
item = next(self.Collection.discover(path), None)
if not self._access(user, path, "r", item):
return NOT_ALLOWED
if not item:
return NOT_FOUND
if not self._access(user, path, "r", item):
return NOT_ALLOWED
if isinstance(item, storage.BaseCollection):
collection = item
else: