432 lines
12 KiB
Python
Executable File
432 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
mailalias - simple alias / catch-all management tool for iRedMail (MySQL/MariaDB)
|
|
|
|
Alias model used:
|
|
- alias table contains alias addresses
|
|
- forwardings contains destination rows for aliases with is_list=1
|
|
- catch-all entries are stored in forwardings where address == domain
|
|
|
|
Examples:
|
|
mailalias alias-add contact@charlesdanesi.com charles@charlesdanesi.com
|
|
mailalias alias-remove admin@beyemir.com charles@beyemir.com
|
|
mailalias alias-lookup admin@charlesdanesi.com
|
|
mailalias alias-list
|
|
mailalias alias-list-user charles@danesisolutions.com
|
|
|
|
mailalias catchall-add beyemir.com charles@beyemir.com
|
|
mailalias catchall-remove beyemir.com charles@beyemir.com
|
|
mailalias catchall-list
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
SETTINGS_FILE = Path("/opt/www/iredadmin/settings.py")
|
|
DB_NAME = "vmail"
|
|
DB_USER = "vmailadmin"
|
|
MYSQL_BIN = "/usr/bin/mysql"
|
|
|
|
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
|
DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)([A-Za-z0-9-]{1,63}\.)+[A-Za-z]{2,63}$")
|
|
|
|
|
|
def die(msg: str, code: int = 1) -> None:
|
|
print(f"ERROR: {msg}", file=sys.stderr)
|
|
sys.exit(code)
|
|
|
|
|
|
def validate_email(address: str) -> str:
|
|
address = address.strip().lower()
|
|
if not EMAIL_RE.match(address):
|
|
die(f"Invalid email address: {address}")
|
|
return address
|
|
|
|
|
|
def validate_domain(domain: str) -> str:
|
|
domain = domain.strip().lower()
|
|
if not DOMAIN_RE.match(domain):
|
|
die(f"Invalid domain: {domain}")
|
|
return domain
|
|
|
|
|
|
def domain_of(address: str) -> str:
|
|
return address.split("@", 1)[1]
|
|
|
|
|
|
def read_db_password() -> str:
|
|
if not SETTINGS_FILE.exists():
|
|
die(f"Could not find iRedAdmin settings file: {SETTINGS_FILE}")
|
|
|
|
for line in SETTINGS_FILE.read_text().splitlines():
|
|
line = line.strip()
|
|
if line.startswith("vmail_db_password"):
|
|
parts = line.split("=", 1)
|
|
if len(parts) != 2:
|
|
continue
|
|
value = parts[1].strip().strip('"').strip("'")
|
|
if value:
|
|
return value
|
|
|
|
die(f"Could not find vmail_db_password in {SETTINGS_FILE}")
|
|
raise RuntimeError("unreachable")
|
|
|
|
|
|
def sql_escape(value: str) -> str:
|
|
return value.replace("\\", "\\\\").replace("'", "\\'")
|
|
|
|
|
|
def mysql_query(sql: str, dry_run: bool = False) -> str:
|
|
password = read_db_password()
|
|
|
|
cmd = [
|
|
MYSQL_BIN,
|
|
f"--user={DB_USER}",
|
|
f"--password={password}",
|
|
"--batch",
|
|
"--raw",
|
|
"--skip-column-names",
|
|
"-D",
|
|
DB_NAME,
|
|
"-e",
|
|
sql,
|
|
]
|
|
|
|
if dry_run:
|
|
print("DRY RUN")
|
|
print(sql.strip())
|
|
return ""
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return result.stdout.strip()
|
|
except subprocess.CalledProcessError as exc:
|
|
stderr = exc.stderr.strip() if exc.stderr else "Unknown MySQL error"
|
|
die(stderr)
|
|
raise RuntimeError("unreachable")
|
|
|
|
|
|
def alias_row_exists(alias_addr: str) -> bool:
|
|
sql = f"""
|
|
SELECT COUNT(*)
|
|
FROM alias
|
|
WHERE address = '{sql_escape(alias_addr)}';
|
|
"""
|
|
out = mysql_query(sql)
|
|
return int(out or "0") > 0
|
|
|
|
|
|
def alias_mapping_exists(alias_addr: str, target_addr: str) -> bool:
|
|
sql = f"""
|
|
SELECT COUNT(*)
|
|
FROM forwardings
|
|
WHERE address = '{sql_escape(alias_addr)}'
|
|
AND forwarding = '{sql_escape(target_addr)}'
|
|
AND is_list = 1;
|
|
"""
|
|
out = mysql_query(sql)
|
|
return int(out or "0") > 0
|
|
|
|
|
|
def catchall_exists(domain: str, target_addr: str) -> bool:
|
|
sql = f"""
|
|
SELECT COUNT(*)
|
|
FROM forwardings
|
|
WHERE address = '{sql_escape(domain)}'
|
|
AND forwarding = '{sql_escape(target_addr)}'
|
|
AND domain = '{sql_escape(domain)}';
|
|
"""
|
|
out = mysql_query(sql)
|
|
return int(out or "0") > 0
|
|
|
|
|
|
def alias_add(alias_addr: str, target_addr: str, dry_run: bool = False) -> None:
|
|
alias_domain = domain_of(alias_addr)
|
|
target_domain = domain_of(target_addr)
|
|
|
|
if alias_addr == target_addr:
|
|
die("Alias and target cannot be the same address")
|
|
|
|
statements = []
|
|
|
|
if not alias_row_exists(alias_addr):
|
|
statements.append(
|
|
f"""INSERT INTO alias (address, domain, active)
|
|
VALUES ('{sql_escape(alias_addr)}', '{sql_escape(alias_domain)}', 1)"""
|
|
)
|
|
|
|
if alias_mapping_exists(alias_addr, target_addr):
|
|
die(f"Alias mapping already exists: {alias_addr} -> {target_addr}")
|
|
|
|
statements.append(
|
|
f"""INSERT INTO forwardings
|
|
(address, forwarding, domain, dest_domain, is_maillist, is_list, is_forwarding, is_alias, active)
|
|
VALUES
|
|
('{sql_escape(alias_addr)}', '{sql_escape(target_addr)}', '{sql_escape(alias_domain)}', '{sql_escape(target_domain)}', 0, 1, 0, 0, 1)"""
|
|
)
|
|
|
|
sql = ";\n".join(statements) + ";"
|
|
mysql_query(sql, dry_run=dry_run)
|
|
|
|
if not dry_run:
|
|
print(f"Added alias: {alias_addr} -> {target_addr}")
|
|
|
|
|
|
def alias_remove(alias_addr: str, target_addr: str, dry_run: bool = False) -> None:
|
|
if not alias_mapping_exists(alias_addr, target_addr):
|
|
die(f"Alias mapping not found: {alias_addr} -> {target_addr}")
|
|
|
|
sql = f"""
|
|
DELETE FROM forwardings
|
|
WHERE address = '{sql_escape(alias_addr)}'
|
|
AND forwarding = '{sql_escape(target_addr)}'
|
|
AND is_list = 1;
|
|
"""
|
|
mysql_query(sql, dry_run=dry_run)
|
|
|
|
if not dry_run:
|
|
# remove alias row if no list mappings remain
|
|
sql_cleanup = f"""
|
|
DELETE FROM alias
|
|
WHERE address = '{sql_escape(alias_addr)}'
|
|
AND NOT EXISTS (
|
|
SELECT 1
|
|
FROM forwardings
|
|
WHERE address = '{sql_escape(alias_addr)}'
|
|
AND is_list = 1
|
|
);
|
|
"""
|
|
mysql_query(sql_cleanup)
|
|
print(f"Removed alias mapping: {alias_addr} -> {target_addr}")
|
|
|
|
|
|
def alias_set_active(
|
|
alias_addr: str, target_addr: str, active: int, dry_run: bool = False
|
|
) -> None:
|
|
if not alias_mapping_exists(alias_addr, target_addr):
|
|
die(f"Alias mapping not found: {alias_addr} -> {target_addr}")
|
|
|
|
sql = f"""
|
|
UPDATE forwardings
|
|
SET active = {active}
|
|
WHERE address = '{sql_escape(alias_addr)}'
|
|
AND forwarding = '{sql_escape(target_addr)}'
|
|
AND is_list = 1;
|
|
"""
|
|
mysql_query(sql, dry_run=dry_run)
|
|
|
|
if not dry_run:
|
|
state = "Enabled" if active else "Disabled"
|
|
print(f"{state}: {alias_addr} -> {target_addr}")
|
|
|
|
|
|
def alias_lookup(alias_addr: str) -> None:
|
|
sql = f"""
|
|
SELECT f.address, f.forwarding, f.active
|
|
FROM forwardings f
|
|
WHERE f.address = '{sql_escape(alias_addr)}'
|
|
AND f.is_list = 1
|
|
ORDER BY f.forwarding;
|
|
"""
|
|
out = mysql_query(sql)
|
|
if not out:
|
|
print(f"No alias mappings found for {alias_addr}")
|
|
return
|
|
|
|
print("ALIAS\tTARGET\tACTIVE")
|
|
print(out)
|
|
|
|
|
|
def alias_list() -> None:
|
|
sql = """
|
|
SELECT a.address, GROUP_CONCAT(f.forwarding ORDER BY f.forwarding SEPARATOR ', '), a.active
|
|
FROM alias a
|
|
LEFT JOIN forwardings f
|
|
ON a.address = f.address
|
|
AND f.is_list = 1
|
|
GROUP BY a.address, a.active
|
|
ORDER BY a.address;
|
|
"""
|
|
out = mysql_query(sql)
|
|
if not out:
|
|
print("No aliases found")
|
|
return
|
|
|
|
print("ALIAS\tTARGETS\tACTIVE")
|
|
print(out)
|
|
|
|
|
|
def alias_list_user(target_addr: str) -> None:
|
|
sql = f"""
|
|
SELECT f.address, f.forwarding, f.active
|
|
FROM forwardings f
|
|
WHERE f.forwarding = '{sql_escape(target_addr)}'
|
|
AND f.is_list = 1
|
|
ORDER BY f.address;
|
|
"""
|
|
out = mysql_query(sql)
|
|
if not out:
|
|
print(f"No aliases found for target {target_addr}")
|
|
return
|
|
|
|
print("ALIAS\tTARGET\tACTIVE")
|
|
print(out)
|
|
|
|
|
|
def catchall_add(domain: str, target_addr: str, dry_run: bool = False) -> None:
|
|
target_domain = domain_of(target_addr)
|
|
|
|
if catchall_exists(domain, target_addr):
|
|
die(f"Catch-all already exists: @{domain} -> {target_addr}")
|
|
|
|
sql = f"""
|
|
INSERT INTO forwardings
|
|
(address, forwarding, domain, dest_domain, is_maillist, is_list, is_forwarding, is_alias, active)
|
|
VALUES
|
|
('{sql_escape(domain)}', '{sql_escape(target_addr)}', '{sql_escape(domain)}', '{sql_escape(target_domain)}', 0, 0, 0, 0, 1);
|
|
"""
|
|
mysql_query(sql, dry_run=dry_run)
|
|
|
|
if not dry_run:
|
|
print(f"Added catch-all: @{domain} -> {target_addr}")
|
|
|
|
|
|
def catchall_remove(domain: str, target_addr: str, dry_run: bool = False) -> None:
|
|
if not catchall_exists(domain, target_addr):
|
|
die(f"Catch-all not found: @{domain} -> {target_addr}")
|
|
|
|
sql = f"""
|
|
DELETE FROM forwardings
|
|
WHERE address = '{sql_escape(domain)}'
|
|
AND forwarding = '{sql_escape(target_addr)}'
|
|
AND domain = '{sql_escape(domain)}';
|
|
"""
|
|
mysql_query(sql, dry_run=dry_run)
|
|
|
|
if not dry_run:
|
|
print(f"Removed catch-all: @{domain} -> {target_addr}")
|
|
|
|
|
|
def catchall_list() -> None:
|
|
sql = """
|
|
SELECT address, forwarding, active
|
|
FROM forwardings
|
|
WHERE address = domain
|
|
AND address NOT LIKE '%@%'
|
|
ORDER BY address, forwarding;
|
|
"""
|
|
out = mysql_query(sql)
|
|
if not out:
|
|
print("No catch-all rules found")
|
|
return
|
|
|
|
print("DOMAIN\tTARGET\tACTIVE")
|
|
print(out)
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="mailalias",
|
|
description="Alias and catch-all manager for iRedMail",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true", help="Show SQL without executing it"
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
p = subparsers.add_parser("alias-add")
|
|
p.add_argument("alias_address")
|
|
p.add_argument("target_address")
|
|
|
|
p = subparsers.add_parser("alias-remove")
|
|
p.add_argument("alias_address")
|
|
p.add_argument("target_address")
|
|
|
|
p = subparsers.add_parser("alias-enable")
|
|
p.add_argument("alias_address")
|
|
p.add_argument("target_address")
|
|
|
|
p = subparsers.add_parser("alias-disable")
|
|
p.add_argument("alias_address")
|
|
p.add_argument("target_address")
|
|
|
|
p = subparsers.add_parser("alias-lookup")
|
|
p.add_argument("alias_address")
|
|
|
|
p = subparsers.add_parser("alias-list")
|
|
|
|
p = subparsers.add_parser("alias-list-user")
|
|
p.add_argument("target_address")
|
|
|
|
p = subparsers.add_parser("catchall-add")
|
|
p.add_argument("domain")
|
|
p.add_argument("target_address")
|
|
|
|
p = subparsers.add_parser("catchall-remove")
|
|
p.add_argument("domain")
|
|
p.add_argument("target_address")
|
|
|
|
p = subparsers.add_parser("catchall-list")
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> None:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
if args.command in ("alias-add", "alias-remove", "alias-enable", "alias-disable"):
|
|
alias_addr = validate_email(args.alias_address)
|
|
target_addr = validate_email(args.target_address)
|
|
|
|
if args.command == "alias-add":
|
|
alias_add(alias_addr, target_addr, dry_run=args.dry_run)
|
|
elif args.command == "alias-remove":
|
|
alias_remove(alias_addr, target_addr, dry_run=args.dry_run)
|
|
elif args.command == "alias-enable":
|
|
alias_set_active(alias_addr, target_addr, 1, dry_run=args.dry_run)
|
|
elif args.command == "alias-disable":
|
|
alias_set_active(alias_addr, target_addr, 0, dry_run=args.dry_run)
|
|
|
|
elif args.command == "alias-lookup":
|
|
alias_addr = validate_email(args.alias_address)
|
|
alias_lookup(alias_addr)
|
|
|
|
elif args.command == "alias-list":
|
|
alias_list()
|
|
|
|
elif args.command == "alias-list-user":
|
|
target_addr = validate_email(args.target_address)
|
|
alias_list_user(target_addr)
|
|
|
|
elif args.command == "catchall-add":
|
|
domain = validate_domain(args.domain)
|
|
target_addr = validate_email(args.target_address)
|
|
catchall_add(domain, target_addr, dry_run=args.dry_run)
|
|
|
|
elif args.command == "catchall-remove":
|
|
domain = validate_domain(args.domain)
|
|
target_addr = validate_email(args.target_address)
|
|
catchall_remove(domain, target_addr, dry_run=args.dry_run)
|
|
|
|
elif args.command == "catchall-list":
|
|
catchall_list()
|
|
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|