Files
iredmail-alias-cli/mailalias.py
T
2026-04-21 16:12:29 -04:00

432 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
mailalias - simple alias / catch-all management tool for iRedMail (MySQL/MariaDB)
Alias model used:
- alias table contains alias addresses
- forwardings contains destination rows for aliases with is_list=1
- catch-all entries are stored in forwardings where address == domain
Examples:
mailalias alias-add contact@charlesdanesi.com charles@charlesdanesi.com
mailalias alias-remove admin@beyemir.com charles@beyemir.com
mailalias alias-lookup admin@charlesdanesi.com
mailalias alias-list
mailalias alias-list-user charles@danesisolutions.com
mailalias catchall-add beyemir.com charles@beyemir.com
mailalias catchall-remove beyemir.com charles@beyemir.com
mailalias catchall-list
"""
from __future__ import annotations
import argparse
import re
import subprocess
import sys
from pathlib import Path
SETTINGS_FILE = Path("/opt/www/iredadmin/settings.py")
DB_NAME = "vmail"
DB_USER = "vmailadmin"
MYSQL_BIN = "/usr/bin/mysql"
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)([A-Za-z0-9-]{1,63}\.)+[A-Za-z]{2,63}$")
def die(msg: str, code: int = 1) -> None:
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(code)
def validate_email(address: str) -> str:
address = address.strip().lower()
if not EMAIL_RE.match(address):
die(f"Invalid email address: {address}")
return address
def validate_domain(domain: str) -> str:
domain = domain.strip().lower()
if not DOMAIN_RE.match(domain):
die(f"Invalid domain: {domain}")
return domain
def domain_of(address: str) -> str:
return address.split("@", 1)[1]
def read_db_password() -> str:
if not SETTINGS_FILE.exists():
die(f"Could not find iRedAdmin settings file: {SETTINGS_FILE}")
for line in SETTINGS_FILE.read_text().splitlines():
line = line.strip()
if line.startswith("vmail_db_password"):
parts = line.split("=", 1)
if len(parts) != 2:
continue
value = parts[1].strip().strip('"').strip("'")
if value:
return value
die(f"Could not find vmail_db_password in {SETTINGS_FILE}")
raise RuntimeError("unreachable")
def sql_escape(value: str) -> str:
return value.replace("\\", "\\\\").replace("'", "\\'")
def mysql_query(sql: str, dry_run: bool = False) -> str:
password = read_db_password()
cmd = [
MYSQL_BIN,
f"--user={DB_USER}",
f"--password={password}",
"--batch",
"--raw",
"--skip-column-names",
"-D",
DB_NAME,
"-e",
sql,
]
if dry_run:
print("DRY RUN")
print(sql.strip())
return ""
try:
result = subprocess.run(
cmd,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.strip() if exc.stderr else "Unknown MySQL error"
die(stderr)
raise RuntimeError("unreachable")
def alias_row_exists(alias_addr: str) -> bool:
sql = f"""
SELECT COUNT(*)
FROM alias
WHERE address = '{sql_escape(alias_addr)}';
"""
out = mysql_query(sql)
return int(out or "0") > 0
def alias_mapping_exists(alias_addr: str, target_addr: str) -> bool:
sql = f"""
SELECT COUNT(*)
FROM forwardings
WHERE address = '{sql_escape(alias_addr)}'
AND forwarding = '{sql_escape(target_addr)}'
AND is_list = 1;
"""
out = mysql_query(sql)
return int(out or "0") > 0
def catchall_exists(domain: str, target_addr: str) -> bool:
sql = f"""
SELECT COUNT(*)
FROM forwardings
WHERE address = '{sql_escape(domain)}'
AND forwarding = '{sql_escape(target_addr)}'
AND domain = '{sql_escape(domain)}';
"""
out = mysql_query(sql)
return int(out or "0") > 0
def alias_add(alias_addr: str, target_addr: str, dry_run: bool = False) -> None:
alias_domain = domain_of(alias_addr)
target_domain = domain_of(target_addr)
if alias_addr == target_addr:
die("Alias and target cannot be the same address")
statements = []
if not alias_row_exists(alias_addr):
statements.append(
f"""INSERT INTO alias (address, domain, active)
VALUES ('{sql_escape(alias_addr)}', '{sql_escape(alias_domain)}', 1)"""
)
if alias_mapping_exists(alias_addr, target_addr):
die(f"Alias mapping already exists: {alias_addr} -> {target_addr}")
statements.append(
f"""INSERT INTO forwardings
(address, forwarding, domain, dest_domain, is_maillist, is_list, is_forwarding, is_alias, active)
VALUES
('{sql_escape(alias_addr)}', '{sql_escape(target_addr)}', '{sql_escape(alias_domain)}', '{sql_escape(target_domain)}', 0, 1, 0, 0, 1)"""
)
sql = ";\n".join(statements) + ";"
mysql_query(sql, dry_run=dry_run)
if not dry_run:
print(f"Added alias: {alias_addr} -> {target_addr}")
def alias_remove(alias_addr: str, target_addr: str, dry_run: bool = False) -> None:
if not alias_mapping_exists(alias_addr, target_addr):
die(f"Alias mapping not found: {alias_addr} -> {target_addr}")
sql = f"""
DELETE FROM forwardings
WHERE address = '{sql_escape(alias_addr)}'
AND forwarding = '{sql_escape(target_addr)}'
AND is_list = 1;
"""
mysql_query(sql, dry_run=dry_run)
if not dry_run:
# remove alias row if no list mappings remain
sql_cleanup = f"""
DELETE FROM alias
WHERE address = '{sql_escape(alias_addr)}'
AND NOT EXISTS (
SELECT 1
FROM forwardings
WHERE address = '{sql_escape(alias_addr)}'
AND is_list = 1
);
"""
mysql_query(sql_cleanup)
print(f"Removed alias mapping: {alias_addr} -> {target_addr}")
def alias_set_active(
alias_addr: str, target_addr: str, active: int, dry_run: bool = False
) -> None:
if not alias_mapping_exists(alias_addr, target_addr):
die(f"Alias mapping not found: {alias_addr} -> {target_addr}")
sql = f"""
UPDATE forwardings
SET active = {active}
WHERE address = '{sql_escape(alias_addr)}'
AND forwarding = '{sql_escape(target_addr)}'
AND is_list = 1;
"""
mysql_query(sql, dry_run=dry_run)
if not dry_run:
state = "Enabled" if active else "Disabled"
print(f"{state}: {alias_addr} -> {target_addr}")
def alias_lookup(alias_addr: str) -> None:
sql = f"""
SELECT f.address, f.forwarding, f.active
FROM forwardings f
WHERE f.address = '{sql_escape(alias_addr)}'
AND f.is_list = 1
ORDER BY f.forwarding;
"""
out = mysql_query(sql)
if not out:
print(f"No alias mappings found for {alias_addr}")
return
print("ALIAS\tTARGET\tACTIVE")
print(out)
def alias_list() -> None:
sql = """
SELECT a.address, GROUP_CONCAT(f.forwarding ORDER BY f.forwarding SEPARATOR ', '), a.active
FROM alias a
LEFT JOIN forwardings f
ON a.address = f.address
AND f.is_list = 1
GROUP BY a.address, a.active
ORDER BY a.address;
"""
out = mysql_query(sql)
if not out:
print("No aliases found")
return
print("ALIAS\tTARGETS\tACTIVE")
print(out)
def alias_list_user(target_addr: str) -> None:
sql = f"""
SELECT f.address, f.forwarding, f.active
FROM forwardings f
WHERE f.forwarding = '{sql_escape(target_addr)}'
AND f.is_list = 1
ORDER BY f.address;
"""
out = mysql_query(sql)
if not out:
print(f"No aliases found for target {target_addr}")
return
print("ALIAS\tTARGET\tACTIVE")
print(out)
def catchall_add(domain: str, target_addr: str, dry_run: bool = False) -> None:
target_domain = domain_of(target_addr)
if catchall_exists(domain, target_addr):
die(f"Catch-all already exists: @{domain} -> {target_addr}")
sql = f"""
INSERT INTO forwardings
(address, forwarding, domain, dest_domain, is_maillist, is_list, is_forwarding, is_alias, active)
VALUES
('{sql_escape(domain)}', '{sql_escape(target_addr)}', '{sql_escape(domain)}', '{sql_escape(target_domain)}', 0, 0, 0, 0, 1);
"""
mysql_query(sql, dry_run=dry_run)
if not dry_run:
print(f"Added catch-all: @{domain} -> {target_addr}")
def catchall_remove(domain: str, target_addr: str, dry_run: bool = False) -> None:
if not catchall_exists(domain, target_addr):
die(f"Catch-all not found: @{domain} -> {target_addr}")
sql = f"""
DELETE FROM forwardings
WHERE address = '{sql_escape(domain)}'
AND forwarding = '{sql_escape(target_addr)}'
AND domain = '{sql_escape(domain)}';
"""
mysql_query(sql, dry_run=dry_run)
if not dry_run:
print(f"Removed catch-all: @{domain} -> {target_addr}")
def catchall_list() -> None:
sql = """
SELECT address, forwarding, active
FROM forwardings
WHERE address = domain
AND address NOT LIKE '%@%'
ORDER BY address, forwarding;
"""
out = mysql_query(sql)
if not out:
print("No catch-all rules found")
return
print("DOMAIN\tTARGET\tACTIVE")
print(out)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="mailalias",
description="Alias and catch-all manager for iRedMail",
)
parser.add_argument(
"--dry-run", action="store_true", help="Show SQL without executing it"
)
subparsers = parser.add_subparsers(dest="command", required=True)
p = subparsers.add_parser("alias-add")
p.add_argument("alias_address")
p.add_argument("target_address")
p = subparsers.add_parser("alias-remove")
p.add_argument("alias_address")
p.add_argument("target_address")
p = subparsers.add_parser("alias-enable")
p.add_argument("alias_address")
p.add_argument("target_address")
p = subparsers.add_parser("alias-disable")
p.add_argument("alias_address")
p.add_argument("target_address")
p = subparsers.add_parser("alias-lookup")
p.add_argument("alias_address")
p = subparsers.add_parser("alias-list")
p = subparsers.add_parser("alias-list-user")
p.add_argument("target_address")
p = subparsers.add_parser("catchall-add")
p.add_argument("domain")
p.add_argument("target_address")
p = subparsers.add_parser("catchall-remove")
p.add_argument("domain")
p.add_argument("target_address")
p = subparsers.add_parser("catchall-list")
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
if args.command in ("alias-add", "alias-remove", "alias-enable", "alias-disable"):
alias_addr = validate_email(args.alias_address)
target_addr = validate_email(args.target_address)
if args.command == "alias-add":
alias_add(alias_addr, target_addr, dry_run=args.dry_run)
elif args.command == "alias-remove":
alias_remove(alias_addr, target_addr, dry_run=args.dry_run)
elif args.command == "alias-enable":
alias_set_active(alias_addr, target_addr, 1, dry_run=args.dry_run)
elif args.command == "alias-disable":
alias_set_active(alias_addr, target_addr, 0, dry_run=args.dry_run)
elif args.command == "alias-lookup":
alias_addr = validate_email(args.alias_address)
alias_lookup(alias_addr)
elif args.command == "alias-list":
alias_list()
elif args.command == "alias-list-user":
target_addr = validate_email(args.target_address)
alias_list_user(target_addr)
elif args.command == "catchall-add":
domain = validate_domain(args.domain)
target_addr = validate_email(args.target_address)
catchall_add(domain, target_addr, dry_run=args.dry_run)
elif args.command == "catchall-remove":
domain = validate_domain(args.domain)
target_addr = validate_email(args.target_address)
catchall_remove(domain, target_addr, dry_run=args.dry_run)
elif args.command == "catchall-list":
catchall_list()
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()