mirror of
https://github.com/Kozea/Radicale.git
synced 2025-08-01 18:18:31 +00:00
Compare network location with port
This commit is contained in:
parent
7d4a0fe70e
commit
1a78114a56
3 changed files with 31 additions and 13 deletions
|
@ -18,6 +18,7 @@
|
||||||
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
|
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import posixpath
|
import posixpath
|
||||||
|
import re
|
||||||
from http import client
|
from http import client
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
@ -26,6 +27,16 @@ from radicale.app.base import Access, ApplicationBase
|
||||||
from radicale.log import logger
|
from radicale.log import logger
|
||||||
|
|
||||||
|
|
||||||
|
def get_server_netloc(environ: types.WSGIEnviron, force_port: bool = False):
|
||||||
|
host = environ.get("HTTP_HOST") or environ["SERVER_NAME"]
|
||||||
|
proto = environ["wsgi.url_scheme"]
|
||||||
|
port = environ["SERVER_PORT"]
|
||||||
|
if (not force_port and port == ("443" if proto == "https" else "80") or
|
||||||
|
re.search(r":\d+$", host)):
|
||||||
|
return host
|
||||||
|
return host + ":" + port
|
||||||
|
|
||||||
|
|
||||||
class ApplicationPartMove(ApplicationBase):
|
class ApplicationPartMove(ApplicationBase):
|
||||||
|
|
||||||
def do_MOVE(self, environ: types.WSGIEnviron, base_prefix: str,
|
def do_MOVE(self, environ: types.WSGIEnviron, base_prefix: str,
|
||||||
|
@ -33,7 +44,11 @@ class ApplicationPartMove(ApplicationBase):
|
||||||
"""Manage MOVE request."""
|
"""Manage MOVE request."""
|
||||||
raw_dest = environ.get("HTTP_DESTINATION", "")
|
raw_dest = environ.get("HTTP_DESTINATION", "")
|
||||||
to_url = urlparse(raw_dest)
|
to_url = urlparse(raw_dest)
|
||||||
if to_url.netloc != environ["HTTP_HOST"]:
|
to_netloc_with_port = to_url.netloc
|
||||||
|
if to_url.port is None:
|
||||||
|
to_netloc_with_port += (":443" if to_url.scheme == "https"
|
||||||
|
else ":80")
|
||||||
|
if to_netloc_with_port != get_server_netloc(environ, force_port=True):
|
||||||
logger.info("Unsupported destination address: %r", raw_dest)
|
logger.info("Unsupported destination address: %r", raw_dest)
|
||||||
# Remote destination server, not supported
|
# Remote destination server, not supported
|
||||||
return httputils.REMOTE_DESTINATION
|
return httputils.REMOTE_DESTINATION
|
||||||
|
|
|
@ -25,6 +25,7 @@ import logging
|
||||||
import shutil
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import wsgiref.util
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||||
|
@ -83,11 +84,12 @@ class BaseTest:
|
||||||
login.encode(encoding)).decode()
|
login.encode(encoding)).decode()
|
||||||
environ["REQUEST_METHOD"] = method.upper()
|
environ["REQUEST_METHOD"] = method.upper()
|
||||||
environ["PATH_INFO"] = path
|
environ["PATH_INFO"] = path
|
||||||
if data:
|
if data is not None:
|
||||||
data_bytes = data.encode(encoding)
|
data_bytes = data.encode(encoding)
|
||||||
environ["wsgi.input"] = BytesIO(data_bytes)
|
environ["wsgi.input"] = BytesIO(data_bytes)
|
||||||
environ["CONTENT_LENGTH"] = str(len(data_bytes))
|
environ["CONTENT_LENGTH"] = str(len(data_bytes))
|
||||||
environ["wsgi.errors"] = sys.stderr
|
environ["wsgi.errors"] = sys.stderr
|
||||||
|
wsgiref.util.setup_testing_defaults(environ)
|
||||||
status = headers = None
|
status = headers = None
|
||||||
|
|
||||||
def start_response(status_: str, headers_: List[Tuple[str, str]]
|
def start_response(status_: str, headers_: List[Tuple[str, str]]
|
||||||
|
|
|
@ -355,7 +355,7 @@ permissions: RrWw""")
|
||||||
path2 = "/calendar.ics/event2.ics"
|
path2 = "/calendar.ics/event2.ics"
|
||||||
self.put(path1, event)
|
self.put(path1, event)
|
||||||
self.request("MOVE", path1, check=201,
|
self.request("MOVE", path1, check=201,
|
||||||
HTTP_DESTINATION=path2, HTTP_HOST="")
|
HTTP_DESTINATION="http://127.0.0.1/"+path2)
|
||||||
self.get(path1, check=404)
|
self.get(path1, check=404)
|
||||||
self.get(path2)
|
self.get(path2)
|
||||||
|
|
||||||
|
@ -368,7 +368,7 @@ permissions: RrWw""")
|
||||||
path2 = "/calendar2.ics/event2.ics"
|
path2 = "/calendar2.ics/event2.ics"
|
||||||
self.put(path1, event)
|
self.put(path1, event)
|
||||||
self.request("MOVE", path1, check=201,
|
self.request("MOVE", path1, check=201,
|
||||||
HTTP_DESTINATION=path2, HTTP_HOST="")
|
HTTP_DESTINATION="http://127.0.0.1/"+path2)
|
||||||
self.get(path1, check=404)
|
self.get(path1, check=404)
|
||||||
self.get(path2)
|
self.get(path2)
|
||||||
|
|
||||||
|
@ -382,7 +382,7 @@ permissions: RrWw""")
|
||||||
self.put(path1, event)
|
self.put(path1, event)
|
||||||
self.put("/calendar2.ics/event1.ics", event)
|
self.put("/calendar2.ics/event1.ics", event)
|
||||||
status, _, answer = self.request(
|
status, _, answer = self.request(
|
||||||
"MOVE", path1, HTTP_DESTINATION=path2, HTTP_HOST="")
|
"MOVE", path1, HTTP_DESTINATION="http://127.0.0.1/"+path2)
|
||||||
assert status in (403, 409)
|
assert status in (403, 409)
|
||||||
xml = DefusedET.fromstring(answer)
|
xml = DefusedET.fromstring(answer)
|
||||||
assert xml.tag == xmlutils.make_clark("D:error")
|
assert xml.tag == xmlutils.make_clark("D:error")
|
||||||
|
@ -398,9 +398,9 @@ permissions: RrWw""")
|
||||||
self.put(path1, event)
|
self.put(path1, event)
|
||||||
self.put(path2, event)
|
self.put(path2, event)
|
||||||
self.request("MOVE", path1, check=412,
|
self.request("MOVE", path1, check=412,
|
||||||
HTTP_DESTINATION=path2, HTTP_HOST="")
|
HTTP_DESTINATION="http://127.0.0.1/"+path2)
|
||||||
self.request("MOVE", path1, check=204,
|
self.request("MOVE", path1, check=204, HTTP_OVERWRITE="T",
|
||||||
HTTP_DESTINATION=path2, HTTP_HOST="", HTTP_OVERWRITE="T")
|
HTTP_DESTINATION="http://127.0.0.1/"+path2)
|
||||||
|
|
||||||
def test_move_between_colections_overwrite_uid_conflict(self) -> None:
|
def test_move_between_colections_overwrite_uid_conflict(self) -> None:
|
||||||
"""Move a item to a collection which already contains the item with
|
"""Move a item to a collection which already contains the item with
|
||||||
|
@ -413,8 +413,9 @@ permissions: RrWw""")
|
||||||
path2 = "/calendar2.ics/event2.ics"
|
path2 = "/calendar2.ics/event2.ics"
|
||||||
self.put(path1, event1)
|
self.put(path1, event1)
|
||||||
self.put(path2, event2)
|
self.put(path2, event2)
|
||||||
status, _, answer = self.request("MOVE", path1, HTTP_DESTINATION=path2,
|
status, _, answer = self.request(
|
||||||
HTTP_HOST="", HTTP_OVERWRITE="T")
|
"MOVE", path1, HTTP_OVERWRITE="T",
|
||||||
|
HTTP_DESTINATION="http://127.0.0.1/"+path2)
|
||||||
assert status in (403, 409)
|
assert status in (403, 409)
|
||||||
xml = DefusedET.fromstring(answer)
|
xml = DefusedET.fromstring(answer)
|
||||||
assert xml.tag == xmlutils.make_clark("D:error")
|
assert xml.tag == xmlutils.make_clark("D:error")
|
||||||
|
@ -1487,7 +1488,7 @@ permissions: RrWw""")
|
||||||
sync_token, responses = self._report_sync_token(calendar_path)
|
sync_token, responses = self._report_sync_token(calendar_path)
|
||||||
assert len(responses) == 1 and responses[event1_path] == 200
|
assert len(responses) == 1 and responses[event1_path] == 200
|
||||||
self.request("MOVE", event1_path, check=201,
|
self.request("MOVE", event1_path, check=201,
|
||||||
HTTP_DESTINATION=event2_path, HTTP_HOST="")
|
HTTP_DESTINATION="http://127.0.0.1/"+event2_path)
|
||||||
sync_token, responses = self._report_sync_token(
|
sync_token, responses = self._report_sync_token(
|
||||||
calendar_path, sync_token)
|
calendar_path, sync_token)
|
||||||
if not self.full_sync_token_support and not sync_token:
|
if not self.full_sync_token_support and not sync_token:
|
||||||
|
@ -1506,9 +1507,9 @@ permissions: RrWw""")
|
||||||
sync_token, responses = self._report_sync_token(calendar_path)
|
sync_token, responses = self._report_sync_token(calendar_path)
|
||||||
assert len(responses) == 1 and responses[event1_path] == 200
|
assert len(responses) == 1 and responses[event1_path] == 200
|
||||||
self.request("MOVE", event1_path, check=201,
|
self.request("MOVE", event1_path, check=201,
|
||||||
HTTP_DESTINATION=event2_path, HTTP_HOST="")
|
HTTP_DESTINATION="http://127.0.0.1/"+event2_path)
|
||||||
self.request("MOVE", event2_path, check=201,
|
self.request("MOVE", event2_path, check=201,
|
||||||
HTTP_DESTINATION=event1_path, HTTP_HOST="")
|
HTTP_DESTINATION="http://127.0.0.1/"+event1_path)
|
||||||
sync_token, responses = self._report_sync_token(
|
sync_token, responses = self._report_sync_token(
|
||||||
calendar_path, sync_token)
|
calendar_path, sync_token)
|
||||||
if not self.full_sync_token_support and not sync_token:
|
if not self.full_sync_token_support and not sync_token:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue