1
0
Fork 0
mirror of https://github.com/Kozea/Radicale.git synced 2025-08-07 18:30:54 +00:00

- Add support for local SMTP

- Ignore venv in flake8
This commit is contained in:
Nate Harris 2025-06-29 00:47:26 -06:00
parent 3228f046e2
commit ce9b2cf5d2
5 changed files with 143 additions and 19 deletions

View file

@ -1548,15 +1548,27 @@ Port to connect to SMTP server.
Default: Default:
##### smtp_security
Use encryption on the SMTP connection. none, tls, starttls
Default: none
##### smtp_ssl_verify_mode
The certificate verification mode. Works for tls and starttls. NONE, OPTIONAL or REQUIRED
Default: REQUIRED
##### smtp_username ##### smtp_username
Username to authenticate with SMTP server. Username to authenticate with SMTP server. Leave empty to disable authentication (e.g. using local mail server).
Default: Default:
##### smtp_password ##### smtp_password
Password to authenticate with SMTP server. Password to authenticate with SMTP server. Leave empty to disable authentication (e.g. using local mail server).
Default: Default:

8
config
View file

@ -305,13 +305,15 @@
[hook] [hook]
# Hook types # Hook types
# Value: none | rabbitmq # Value: none | rabbitmq | email
#type = none #type = none
#rabbitmq_endpoint = #rabbitmq_endpoint =
#rabbitmq_topic = #rabbitmq_topic =
#rabbitmq_queue_type = classic #rabbitmq_queue_type = classic
#smtp_server = #smtp_server = localhost
#smtp_port = 587 #smtp_port = 25
#smtp_security = starttls
#smtp_ssl_verify_mode = REQUIRED
#smtp_username = #smtp_username =
#smtp_password = #smtp_password =
#from_email = #from_email =

View file

@ -38,6 +38,7 @@ from typing import (Any, Callable, ClassVar, Iterable, List, Optional,
Sequence, Tuple, TypeVar, Union) Sequence, Tuple, TypeVar, Union)
from radicale import auth, hook, rights, storage, types, web from radicale import auth, hook, rights, storage, types, web
from radicale.hook import email
from radicale.item import check_and_sanitize_props from radicale.item import check_and_sanitize_props
DEFAULT_CONFIG_PATH: str = os.pathsep.join([ DEFAULT_CONFIG_PATH: str = os.pathsep.join([
@ -85,6 +86,7 @@ def list_of_ip_address(value: Any) -> List[Tuple[str, int]]:
return address.strip(string.whitespace + "[]"), int(port) return address.strip(string.whitespace + "[]"), int(port)
except ValueError: except ValueError:
raise ValueError("malformed IP address: %r" % value) raise ValueError("malformed IP address: %r" % value)
return [ip_address(s) for s in value.split(",")] return [ip_address(s) for s in value.split(",")]
@ -445,6 +447,16 @@ DEFAULT_CONFIG_SCHEMA: types.CONFIG_SCHEMA = OrderedDict([
"value": "", "value": "",
"help": "SMTP server port", "help": "SMTP server port",
"type": str}), "type": str}),
("smtp_security", {
"value": "none",
"help": "SMTP security mode: *none*|tls|starttls",
"type": str,
"internal": email.SMTP_SECURITY_TYPES}),
("smtp_ssl_verify_mode", {
"value": "REQUIRED",
"help": "The certificate verification mode. Works for tls and starttls: NONE, OPTIONAL, default is REQUIRED",
"type": str,
"internal": email.SMTP_SSL_VERIFY_MODES}),
("smtp_username", { ("smtp_username", {
"value": "", "value": "",
"help": "SMTP server username", "help": "SMTP server username",
@ -606,7 +618,6 @@ _Self = TypeVar("_Self", bound="Configuration")
class Configuration: class Configuration:
SOURCE_MISSING: ClassVar[types.CONFIG] = {} SOURCE_MISSING: ClassVar[types.CONFIG] = {}
_schema: types.CONFIG_SCHEMA _schema: types.CONFIG_SCHEMA

View file

@ -1,16 +1,17 @@
# This file is related to Radicale - CalDAV and CardDAV server # This file is related to Radicale - CalDAV and CardDAV server
# for email notifications # for email notifications
# Copyright © 2025-2025 Nate Harris # Copyright © 2025-2025 Nate Harris
import enum
import re import re
import smtplib import smtplib
import ssl
from datetime import datetime, timedelta from datetime import datetime, timedelta
from email.encoders import encode_base64 from email.encoders import encode_base64
from email.mime.base import MIMEBase from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.utils import formatdate from email.utils import formatdate
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Sequence, Tuple
import vobject import vobject
@ -28,6 +29,14 @@ PLUGIN_CONFIG_SCHEMA = {
"value": "", "value": "",
"type": str "type": str
}, },
"smtp_security": {
"value": "none",
"type": str,
},
"smtp_ssl_verify_mode": {
"value": "REQUIRED",
"type": str,
},
"smtp_username": { "smtp_username": {
"value": "", "value": "",
"type": str "type": str
@ -82,6 +91,44 @@ MESSAGE_TEMPLATE_VARIABLES = [
] ]
class SMTP_SECURITY_TYPE_ENUM(enum.Enum):
EMPTY = ""
NONE = "none"
STARTTLS = "starttls"
TLS = "tls"
@classmethod
def from_string(cls, value):
"""Convert a string to the corresponding enum value."""
for member in cls:
if member.value == value:
return member
raise ValueError(f"Invalid security type: {value}. Allowed values are: {[m.value for m in cls]}")
class SMTP_SSL_VERIFY_MODE_ENUM(enum.Enum):
EMPTY = ""
NONE = "NONE"
OPTIONAL = "OPTIONAL"
REQUIRED = "REQUIRED"
@classmethod
def from_string(cls, value):
"""Convert a string to the corresponding enum value."""
for member in cls:
if member.value == value:
return member
raise ValueError(f"Invalid SSL verify mode: {value}. Allowed values are: {[m.value for m in cls]}")
SMTP_SECURITY_TYPES: Sequence[str] = (SMTP_SECURITY_TYPE_ENUM.NONE.value,
SMTP_SECURITY_TYPE_ENUM.STARTTLS.value,
SMTP_SECURITY_TYPE_ENUM.TLS.value)
SMTP_SSL_VERIFY_MODES: Sequence[str] = (SMTP_SSL_VERIFY_MODE_ENUM.NONE.value,
SMTP_SSL_VERIFY_MODE_ENUM.OPTIONAL.value,
SMTP_SSL_VERIFY_MODE_ENUM.REQUIRED.value)
def ics_contents_contains_invited_event(contents: str): def ics_contents_contains_invited_event(contents: str):
""" """
Check if the ICS contents contain an event (versus a VTODO or VJOURNAL). Check if the ICS contents contain an event (versus a VTODO or VJOURNAL).
@ -543,6 +590,8 @@ class EmailConfig:
def __init__(self, def __init__(self,
host: str, host: str,
port: int, port: int,
security: str,
ssl_verify_mode: str,
username: str, username: str,
password: str, password: str,
from_email: str, from_email: str,
@ -551,6 +600,8 @@ class EmailConfig:
removed_template: MessageTemplate): removed_template: MessageTemplate):
self.host = host self.host = host
self.port = port self.port = port
self.security = SMTP_SECURITY_TYPE_ENUM.from_string(value=security)
self.ssl_verify_mode = SMTP_SSL_VERIFY_MODE_ENUM.from_string(value=ssl_verify_mode)
self.username = username self.username = username
self.password = password self.password = password
self.from_email = from_email self.from_email = from_email
@ -567,6 +618,9 @@ class EmailConfig:
return f"EmailConfig(host={self.host}, port={self.port}, username={self.username}, " \ return f"EmailConfig(host={self.host}, port={self.port}, username={self.username}, " \
f"from_email={self.from_email}, send_mass_emails={self.send_mass_emails})" f"from_email={self.from_email}, send_mass_emails={self.send_mass_emails})"
def __repr__(self):
return self.__str__()
def send_added_email(self, attendees: List[Attendee], event: EmailEvent) -> bool: def send_added_email(self, attendees: List[Attendee], event: EmailEvent) -> bool:
""" """
Send a notification for added attendees. Send a notification for added attendees.
@ -644,6 +698,23 @@ class EmailConfig:
return not failure_encountered # Return True if all emails were sent successfully return not failure_encountered # Return True if all emails were sent successfully
def _build_context(self) -> ssl.SSLContext:
"""
Build the SSL context based on the configured security and SSL verify mode.
:return: An SSLContext object configured for the SMTP connection.
"""
context = ssl.create_default_context()
if self.ssl_verify_mode == SMTP_SSL_VERIFY_MODE_ENUM.REQUIRED:
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
elif self.ssl_verify_mode == SMTP_SSL_VERIFY_MODE_ENUM.OPTIONAL:
context.check_hostname = True
context.verify_mode = ssl.CERT_OPTIONAL
else:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
def _send_email(self, def _send_email(self,
subject: str, subject: str,
body: str, body: str,
@ -681,11 +752,25 @@ class EmailConfig:
text = message.as_string() text = message.as_string()
try: try:
server = smtplib.SMTP(host=self.host, port=self.port) if self.security == SMTP_SECURITY_TYPE_ENUM.EMPTY:
server.ehlo() # Identify self to server logger.warning("SMTP security type is empty, raising ValueError.")
server.starttls() # Start TLS connection raise ValueError("SMTP security type cannot be empty. Please specify a valid security type.")
server.ehlo() # Identify again after starting TLS elif self.security == SMTP_SECURITY_TYPE_ENUM.NONE:
server.login(user=self.username, password=self.password) server = smtplib.SMTP(host=self.host, port=self.port)
elif self.security == SMTP_SECURITY_TYPE_ENUM.STARTTLS:
context = self._build_context()
server = smtplib.SMTP(host=self.host, port=self.port)
server.ehlo() # Identify self to server
server.starttls(context=context) # Start TLS connection
server.ehlo() # Identify again after starting TLS
elif self.security == SMTP_SECURITY_TYPE_ENUM.TLS:
context = self._build_context()
server = smtplib.SMTP_SSL(host=self.host, port=self.port, context=context)
if self.username and self.password:
logger.debug("Logging in to SMTP server with username: %s", self.username)
server.login(user=self.username, password=self.password)
errors: Dict[str, Tuple[int, bytes]] = server.sendmail(from_addr=self.from_email, to_addrs=to_addresses, errors: Dict[str, Tuple[int, bytes]] = server.sendmail(from_addr=self.from_email, to_addrs=to_addresses,
msg=text) msg=text)
logger.debug("Email sent successfully to %s", to_addresses) logger.debug("Email sent successfully to %s", to_addresses)
@ -701,12 +786,6 @@ class EmailConfig:
return True return True
def __repr__(self):
return f'EmailConfig(host={self.host}, port={self.port}, from_email={self.from_email})'
def __str__(self):
return f'{self.from_email} ({self.host}:{self.port})'
def _read_event(vobject_data: str) -> EmailEvent: def _read_event(vobject_data: str) -> EmailEvent:
""" """
@ -729,6 +808,8 @@ class Hook(BaseHook):
self.email_config = EmailConfig( self.email_config = EmailConfig(
host=self.configuration.get("hook", "smtp_server"), host=self.configuration.get("hook", "smtp_server"),
port=self.configuration.get("hook", "smtp_port"), port=self.configuration.get("hook", "smtp_port"),
security=self.configuration.get("hook", "smtp_security"),
ssl_verify_mode=self.configuration.get("hook", "smtp_ssl_verify_mode"),
username=self.configuration.get("hook", "smtp_username"), username=self.configuration.get("hook", "smtp_username"),
password=self.configuration.get("hook", "smtp_password"), password=self.configuration.get("hook", "smtp_password"),
from_email=self.configuration.get("hook", "from_email"), from_email=self.configuration.get("hook", "from_email"),

View file

@ -3,4 +3,22 @@
# DNE: DOES-NOT-EXIST # DNE: DOES-NOT-EXIST
select = E,F,W,C90,DNE000 select = E,F,W,C90,DNE000
ignore = E121,E123,E126,E226,E24,E704,W503,W504,DNE000,E501,E261 ignore = E121,E123,E126,E226,E24,E704,W503,W504,DNE000,E501,E261
exclude = .git,
__pycache__,
build,
dist,
*.egg,
*.egg-info,
*.eggs,
*.pyc,
*.pyo,
*.pyd,
.tox,
venv,
venv3,
.venv,
.venv3,
.env,
.mypy_cache,
.pytest_cache
extend-exclude = build extend-exclude = build