1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-06-26 16:45:52 +00:00
Radicale/radicale/pathutils.py

317 lines
10 KiB
Python
Raw Normal View History

2021-12-08 21:45:42 +01:00
# This file is part of Radicale - CalDAV and CardDAV server
2018-08-28 16:19:36 +02:00
# Copyright © 2014 Jean-Marc Martins
# Copyright © 2012-2017 Guillaume Ayoub
2019-06-17 04:13:24 +02:00
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
2018-08-28 16:19:36 +02:00
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
2020-01-12 23:32:28 +01:00
"""
Helper functions for working with the file system.
"""
import errno
2018-08-28 16:19:36 +02:00
import os
2019-06-15 09:01:55 +02:00
import posixpath
import sys
2018-08-28 16:19:36 +02:00
import threading
from tempfile import TemporaryDirectory
2021-07-26 20:56:46 +02:00
from typing import Iterator, Type, Union
2018-08-28 16:19:36 +02:00
2021-07-26 20:56:46 +02:00
from radicale import storage, types
if sys.platform == "win32":
2018-08-28 16:19:36 +02:00
import ctypes
import ctypes.wintypes
import msvcrt
2021-07-26 20:56:46 +02:00
LOCKFILE_EXCLUSIVE_LOCK: int = 2
2020-10-04 15:13:01 +02:00
ULONG_PTR: Union[Type[ctypes.c_uint32], Type[ctypes.c_uint64]]
2018-08-28 16:19:36 +02:00
if ctypes.sizeof(ctypes.c_void_p) == 4:
ULONG_PTR = ctypes.c_uint32
else:
ULONG_PTR = ctypes.c_uint64
class Overlapped(ctypes.Structure):
_fields_ = [
("internal", ULONG_PTR),
("internal_high", ULONG_PTR),
("offset", ctypes.wintypes.DWORD),
("offset_high", ctypes.wintypes.DWORD),
("h_event", ctypes.wintypes.HANDLE)]
2021-07-26 20:56:46 +02:00
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
lock_file_ex = kernel32.LockFileEx
2018-08-28 16:19:36 +02:00
lock_file_ex.argtypes = [
ctypes.wintypes.HANDLE,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
ctypes.POINTER(Overlapped)]
lock_file_ex.restype = ctypes.wintypes.BOOL
unlock_file_ex = kernel32.UnlockFileEx
2018-08-28 16:19:36 +02:00
unlock_file_ex.argtypes = [
ctypes.wintypes.HANDLE,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
ctypes.wintypes.DWORD,
ctypes.POINTER(Overlapped)]
unlock_file_ex.restype = ctypes.wintypes.BOOL
else:
2018-08-28 16:19:36 +02:00
import fcntl
if sys.platform == "linux":
import ctypes
2021-07-26 20:56:46 +02:00
RENAME_EXCHANGE: int = 2
2022-02-01 16:19:51 +01:00
renameat2 = None
try:
2021-12-25 19:58:29 +01:00
renameat2 = ctypes.CDLL(None, use_errno=True).renameat2
except AttributeError:
pass
else:
renameat2.argtypes = [
ctypes.c_int, ctypes.c_char_p,
ctypes.c_int, ctypes.c_char_p,
ctypes.c_uint]
renameat2.restype = ctypes.c_int
if sys.platform == "darwin":
# Definition missing in PyPy
F_FULLFSYNC: int = getattr(fcntl, "F_FULLFSYNC", 51)
2018-08-28 16:19:36 +02:00
class RwLock:
"""A readers-Writer lock that locks a file."""
2021-07-26 20:56:46 +02:00
_path: str
_readers: int
_writer: bool
_lock: threading.Lock
def __init__(self, path: str) -> None:
2018-08-28 16:19:36 +02:00
self._path = path
self._readers = 0
self._writer = False
self._lock = threading.Lock()
@property
2021-07-26 20:56:46 +02:00
def locked(self) -> str:
2018-08-28 16:19:36 +02:00
with self._lock:
if self._readers > 0:
return "r"
if self._writer:
return "w"
return ""
2021-07-26 20:56:46 +02:00
@types.contextmanager
def acquire(self, mode: str) -> Iterator[None]:
2018-08-28 16:19:36 +02:00
if mode not in "rw":
raise ValueError("Invalid mode: %r" % mode)
with open(self._path, "w+") as lock_file:
2021-07-26 20:56:46 +02:00
if sys.platform == "win32":
2018-08-28 16:19:36 +02:00
handle = msvcrt.get_osfhandle(lock_file.fileno())
flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0
overlapped = Overlapped()
try:
if not lock_file_ex(handle, flags, 0, 1, 0, overlapped):
raise ctypes.WinError()
except OSError as e:
2021-07-26 20:56:46 +02:00
raise RuntimeError("Locking the storage failed: %s" % e
) from e
else:
2018-08-28 16:19:36 +02:00
_cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
try:
fcntl.flock(lock_file.fileno(), _cmd)
except OSError as e:
2021-07-26 20:56:46 +02:00
raise RuntimeError("Locking the storage failed: %s" % e
) from e
2018-08-28 16:19:36 +02:00
with self._lock:
if self._writer or mode == "w" and self._readers != 0:
raise RuntimeError("Locking the storage failed: "
"Guarantees failed")
if mode == "r":
self._readers += 1
else:
self._writer = True
try:
yield
finally:
with self._lock:
if mode == "r":
self._readers -= 1
self._writer = False
2021-07-26 20:56:46 +02:00
def rename_exchange(src: str, dst: str) -> None:
"""Exchange the files or directories `src` and `dst`.
Both `src` and `dst` must exist but may be of different types.
On Linux with renameat2 the operation is atomic.
On other platforms it's not atomic.
"""
src_dir, src_base = os.path.split(src)
dst_dir, dst_base = os.path.split(dst)
src_dir = src_dir or os.curdir
dst_dir = dst_dir or os.curdir
if not src_base or not dst_base:
raise ValueError("Invalid arguments: %r -> %r" % (src, dst))
2022-02-01 16:19:51 +01:00
if sys.platform == "linux" and renameat2:
src_base_bytes = os.fsencode(src_base)
dst_base_bytes = os.fsencode(dst_base)
src_dir_fd = os.open(src_dir, 0)
try:
dst_dir_fd = os.open(dst_dir, 0)
try:
if renameat2(src_dir_fd, src_base_bytes,
dst_dir_fd, dst_base_bytes,
RENAME_EXCHANGE) == 0:
return
errno_ = ctypes.get_errno()
# Fallback if RENAME_EXCHANGE not supported by filesystem
if errno_ != errno.EINVAL:
raise OSError(errno_, os.strerror(errno_))
finally:
os.close(dst_dir_fd)
finally:
os.close(src_dir_fd)
with TemporaryDirectory(prefix=".Radicale.tmp-", dir=src_dir
) as tmp_dir:
os.rename(dst, os.path.join(tmp_dir, "interim"))
os.rename(src, dst)
os.rename(os.path.join(tmp_dir, "interim"), src)
2021-07-26 20:56:46 +02:00
def fsync(fd: int) -> None:
if sys.platform == "darwin":
try:
fcntl.fcntl(fd, F_FULLFSYNC)
return
except OSError as e:
# Fallback if F_FULLFSYNC not supported by filesystem
if e.errno != errno.EINVAL:
raise
os.fsync(fd)
2018-08-28 16:19:36 +02:00
2021-07-26 20:56:46 +02:00
def strip_path(path: str) -> str:
2018-08-28 16:19:50 +02:00
assert sanitize_path(path) == path
return path.strip("/")
2021-07-26 20:56:46 +02:00
def unstrip_path(stripped_path: str, trailing_slash: bool = False) -> str:
2018-08-28 16:19:50 +02:00
assert strip_path(sanitize_path(stripped_path)) == stripped_path
assert stripped_path or trailing_slash
path = "/%s" % stripped_path
if trailing_slash and not path.endswith("/"):
path += "/"
return path
2021-07-26 20:56:46 +02:00
def sanitize_path(path: str) -> str:
2018-08-28 16:19:36 +02:00
"""Make path absolute with leading slash to prevent access to other data.
Preserve potential trailing slash.
"""
trailing_slash = "/" if path.endswith("/") else ""
path = posixpath.normpath(path)
new_path = "/"
for part in path.split("/"):
if not is_safe_path_component(part):
continue
new_path = posixpath.join(new_path, part)
trailing_slash = "" if new_path.endswith("/") else trailing_slash
return new_path + trailing_slash
2021-07-26 20:56:46 +02:00
def is_safe_path_component(path: str) -> bool:
2018-08-28 16:19:36 +02:00
"""Check if path is a single component of a path.
Check that the path is safe to join too.
"""
2021-07-26 20:56:46 +02:00
return bool(path) and "/" not in path and path not in (".", "..")
2018-08-28 16:19:36 +02:00
2021-07-26 20:56:46 +02:00
def is_safe_filesystem_path_component(path: str) -> bool:
2018-08-28 16:19:36 +02:00
"""Check if path is a single component of a local and posix filesystem
path.
Check that the path is safe to join too.
"""
return (
2021-07-26 20:56:46 +02:00
bool(path) and not os.path.splitdrive(path)[0] and
2022-03-30 22:26:02 +02:00
(sys.platform != "win32" or ":" not in path) and # Block NTFS-ADS
2018-08-28 16:19:36 +02:00
not os.path.split(path)[0] and path not in (os.curdir, os.pardir) and
not path.startswith(".") and not path.endswith("~") and
is_safe_path_component(path))
2021-07-26 20:56:46 +02:00
def path_to_filesystem(root: str, sane_path: str) -> str:
2018-08-28 16:19:50 +02:00
"""Convert `sane_path` to a local filesystem path relative to `root`.
2018-08-28 16:19:36 +02:00
`root` must be a secure filesystem path, it will be prepend to the path.
2018-08-28 16:19:50 +02:00
`sane_path` must be a sanitized path without leading or trailing ``/``.
Conversion of `sane_path` is done in a secure manner,
or raises ``ValueError``.
2018-08-28 16:19:36 +02:00
"""
2018-08-28 16:19:50 +02:00
assert sane_path == strip_path(sanitize_path(sane_path))
2018-08-28 16:19:36 +02:00
safe_path = root
2018-08-28 16:19:50 +02:00
parts = sane_path.split("/") if sane_path else []
for part in parts:
if not is_safe_filesystem_path_component(part):
raise UnsafePathError(part)
safe_path_parent = safe_path
safe_path = os.path.join(safe_path, part)
# Check for conflicting files (e.g. case-insensitive file systems
# or short names on Windows file systems)
if (os.path.lexists(safe_path) and
2021-07-26 20:56:46 +02:00
part not in (e.name for e in os.scandir(safe_path_parent))):
2018-08-28 16:19:50 +02:00
raise CollidingPathError(part)
2018-08-28 16:19:36 +02:00
return safe_path
class UnsafePathError(ValueError):
2021-07-26 20:56:46 +02:00
def __init__(self, path: str) -> None:
super().__init__("Can't translate name safely to filesystem: %r" %
path)
2018-08-28 16:19:36 +02:00
class CollidingPathError(ValueError):
2021-07-26 20:56:46 +02:00
def __init__(self, path: str) -> None:
super().__init__("File name collision: %r" % path)
2018-08-28 16:19:36 +02:00
2021-07-26 20:56:46 +02:00
def name_from_path(path: str, collection: "storage.BaseCollection") -> str:
2018-08-28 16:19:36 +02:00
"""Return Radicale item name from ``path``."""
2018-08-28 16:19:50 +02:00
assert sanitize_path(path) == path
start = unstrip_path(collection.path, True)
if not (path + "/").startswith(start):
2018-08-28 16:19:36 +02:00
raise ValueError("%r doesn't start with %r" % (path, start))
2018-08-28 16:19:50 +02:00
name = path[len(start):]
2018-08-28 16:19:36 +02:00
if name and not is_safe_path_component(name):
raise ValueError("%r is not a component in collection %r" %
(name, collection.path))
return name