#!/usr/bin/env python3 """ mailalias - simple alias / catch-all management tool for iRedMail (MySQL/MariaDB) Alias model used: - alias table contains alias addresses - forwardings contains destination rows for aliases with is_list=1 - catch-all entries are stored in forwardings where address == domain Examples: mailalias alias-add contact@charlesdanesi.com charles@charlesdanesi.com mailalias alias-remove admin@beyemir.com charles@beyemir.com mailalias alias-lookup admin@charlesdanesi.com mailalias alias-list mailalias alias-list-user charles@danesisolutions.com mailalias catchall-add beyemir.com charles@beyemir.com mailalias catchall-remove beyemir.com charles@beyemir.com mailalias catchall-list """ from __future__ import annotations import argparse import re import subprocess import sys from pathlib import Path SETTINGS_FILE = Path("/opt/www/iredadmin/settings.py") DB_NAME = "vmail" DB_USER = "vmailadmin" MYSQL_BIN = "/usr/bin/mysql" EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)([A-Za-z0-9-]{1,63}\.)+[A-Za-z]{2,63}$") def die(msg: str, code: int = 1) -> None: print(f"ERROR: {msg}", file=sys.stderr) sys.exit(code) def validate_email(address: str) -> str: address = address.strip().lower() if not EMAIL_RE.match(address): die(f"Invalid email address: {address}") return address def validate_domain(domain: str) -> str: domain = domain.strip().lower() if not DOMAIN_RE.match(domain): die(f"Invalid domain: {domain}") return domain def domain_of(address: str) -> str: return address.split("@", 1)[1] def read_db_password() -> str: if not SETTINGS_FILE.exists(): die(f"Could not find iRedAdmin settings file: {SETTINGS_FILE}") for line in SETTINGS_FILE.read_text().splitlines(): line = line.strip() if line.startswith("vmail_db_password"): parts = line.split("=", 1) if len(parts) != 2: continue value = parts[1].strip().strip('"').strip("'") if value: return value die(f"Could not find vmail_db_password in {SETTINGS_FILE}") raise RuntimeError("unreachable") def sql_escape(value: str) -> str: return value.replace("\\", "\\\\").replace("'", "\\'") def mysql_query(sql: str, dry_run: bool = False) -> str: password = read_db_password() cmd = [ MYSQL_BIN, f"--user={DB_USER}", f"--password={password}", "--batch", "--raw", "--skip-column-names", "-D", DB_NAME, "-e", sql, ] if dry_run: print("DRY RUN") print(sql.strip()) return "" try: result = subprocess.run( cmd, check=True, capture_output=True, text=True, ) return result.stdout.strip() except subprocess.CalledProcessError as exc: stderr = exc.stderr.strip() if exc.stderr else "Unknown MySQL error" die(stderr) raise RuntimeError("unreachable") def alias_row_exists(alias_addr: str) -> bool: sql = f""" SELECT COUNT(*) FROM alias WHERE address = '{sql_escape(alias_addr)}'; """ out = mysql_query(sql) return int(out or "0") > 0 def alias_mapping_exists(alias_addr: str, target_addr: str) -> bool: sql = f""" SELECT COUNT(*) FROM forwardings WHERE address = '{sql_escape(alias_addr)}' AND forwarding = '{sql_escape(target_addr)}' AND is_list = 1; """ out = mysql_query(sql) return int(out or "0") > 0 def catchall_exists(domain: str, target_addr: str) -> bool: sql = f""" SELECT COUNT(*) FROM forwardings WHERE address = '{sql_escape(domain)}' AND forwarding = '{sql_escape(target_addr)}' AND domain = '{sql_escape(domain)}'; """ out = mysql_query(sql) return int(out or "0") > 0 def alias_add(alias_addr: str, target_addr: str, dry_run: bool = False) -> None: alias_domain = domain_of(alias_addr) target_domain = domain_of(target_addr) if alias_addr == target_addr: die("Alias and target cannot be the same address") statements = [] if not alias_row_exists(alias_addr): statements.append( f"""INSERT INTO alias (address, domain, active) VALUES ('{sql_escape(alias_addr)}', '{sql_escape(alias_domain)}', 1)""" ) if alias_mapping_exists(alias_addr, target_addr): die(f"Alias mapping already exists: {alias_addr} -> {target_addr}") statements.append( f"""INSERT INTO forwardings (address, forwarding, domain, dest_domain, is_maillist, is_list, is_forwarding, is_alias, active) VALUES ('{sql_escape(alias_addr)}', '{sql_escape(target_addr)}', '{sql_escape(alias_domain)}', '{sql_escape(target_domain)}', 0, 1, 0, 0, 1)""" ) sql = ";\n".join(statements) + ";" mysql_query(sql, dry_run=dry_run) if not dry_run: print(f"Added alias: {alias_addr} -> {target_addr}") def alias_remove(alias_addr: str, target_addr: str, dry_run: bool = False) -> None: if not alias_mapping_exists(alias_addr, target_addr): die(f"Alias mapping not found: {alias_addr} -> {target_addr}") sql = f""" DELETE FROM forwardings WHERE address = '{sql_escape(alias_addr)}' AND forwarding = '{sql_escape(target_addr)}' AND is_list = 1; """ mysql_query(sql, dry_run=dry_run) if not dry_run: # remove alias row if no list mappings remain sql_cleanup = f""" DELETE FROM alias WHERE address = '{sql_escape(alias_addr)}' AND NOT EXISTS ( SELECT 1 FROM forwardings WHERE address = '{sql_escape(alias_addr)}' AND is_list = 1 ); """ mysql_query(sql_cleanup) print(f"Removed alias mapping: {alias_addr} -> {target_addr}") def alias_set_active( alias_addr: str, target_addr: str, active: int, dry_run: bool = False ) -> None: if not alias_mapping_exists(alias_addr, target_addr): die(f"Alias mapping not found: {alias_addr} -> {target_addr}") sql = f""" UPDATE forwardings SET active = {active} WHERE address = '{sql_escape(alias_addr)}' AND forwarding = '{sql_escape(target_addr)}' AND is_list = 1; """ mysql_query(sql, dry_run=dry_run) if not dry_run: state = "Enabled" if active else "Disabled" print(f"{state}: {alias_addr} -> {target_addr}") def alias_lookup(alias_addr: str) -> None: sql = f""" SELECT f.address, f.forwarding, f.active FROM forwardings f WHERE f.address = '{sql_escape(alias_addr)}' AND f.is_list = 1 ORDER BY f.forwarding; """ out = mysql_query(sql) if not out: print(f"No alias mappings found for {alias_addr}") return print("ALIAS\tTARGET\tACTIVE") print(out) def alias_list() -> None: sql = """ SELECT a.address, GROUP_CONCAT(f.forwarding ORDER BY f.forwarding SEPARATOR ', '), a.active FROM alias a LEFT JOIN forwardings f ON a.address = f.address AND f.is_list = 1 GROUP BY a.address, a.active ORDER BY a.address; """ out = mysql_query(sql) if not out: print("No aliases found") return print("ALIAS\tTARGETS\tACTIVE") print(out) def alias_list_user(target_addr: str) -> None: sql = f""" SELECT f.address, f.forwarding, f.active FROM forwardings f WHERE f.forwarding = '{sql_escape(target_addr)}' AND f.is_list = 1 ORDER BY f.address; """ out = mysql_query(sql) if not out: print(f"No aliases found for target {target_addr}") return print("ALIAS\tTARGET\tACTIVE") print(out) def catchall_add(domain: str, target_addr: str, dry_run: bool = False) -> None: target_domain = domain_of(target_addr) if catchall_exists(domain, target_addr): die(f"Catch-all already exists: @{domain} -> {target_addr}") sql = f""" INSERT INTO forwardings (address, forwarding, domain, dest_domain, is_maillist, is_list, is_forwarding, is_alias, active) VALUES ('{sql_escape(domain)}', '{sql_escape(target_addr)}', '{sql_escape(domain)}', '{sql_escape(target_domain)}', 0, 0, 0, 0, 1); """ mysql_query(sql, dry_run=dry_run) if not dry_run: print(f"Added catch-all: @{domain} -> {target_addr}") def catchall_remove(domain: str, target_addr: str, dry_run: bool = False) -> None: if not catchall_exists(domain, target_addr): die(f"Catch-all not found: @{domain} -> {target_addr}") sql = f""" DELETE FROM forwardings WHERE address = '{sql_escape(domain)}' AND forwarding = '{sql_escape(target_addr)}' AND domain = '{sql_escape(domain)}'; """ mysql_query(sql, dry_run=dry_run) if not dry_run: print(f"Removed catch-all: @{domain} -> {target_addr}") def catchall_list() -> None: sql = """ SELECT address, forwarding, active FROM forwardings WHERE address = domain AND address NOT LIKE '%@%' ORDER BY address, forwarding; """ out = mysql_query(sql) if not out: print("No catch-all rules found") return print("DOMAIN\tTARGET\tACTIVE") print(out) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="mailalias", description="Alias and catch-all manager for iRedMail", ) parser.add_argument( "--dry-run", action="store_true", help="Show SQL without executing it" ) subparsers = parser.add_subparsers(dest="command", required=True) p = subparsers.add_parser("alias-add") p.add_argument("alias_address") p.add_argument("target_address") p = subparsers.add_parser("alias-remove") p.add_argument("alias_address") p.add_argument("target_address") p = subparsers.add_parser("alias-enable") p.add_argument("alias_address") p.add_argument("target_address") p = subparsers.add_parser("alias-disable") p.add_argument("alias_address") p.add_argument("target_address") p = subparsers.add_parser("alias-lookup") p.add_argument("alias_address") p = subparsers.add_parser("alias-list") p = subparsers.add_parser("alias-list-user") p.add_argument("target_address") p = subparsers.add_parser("catchall-add") p.add_argument("domain") p.add_argument("target_address") p = subparsers.add_parser("catchall-remove") p.add_argument("domain") p.add_argument("target_address") p = subparsers.add_parser("catchall-list") return parser def main() -> None: parser = build_parser() args = parser.parse_args() if args.command in ("alias-add", "alias-remove", "alias-enable", "alias-disable"): alias_addr = validate_email(args.alias_address) target_addr = validate_email(args.target_address) if args.command == "alias-add": alias_add(alias_addr, target_addr, dry_run=args.dry_run) elif args.command == "alias-remove": alias_remove(alias_addr, target_addr, dry_run=args.dry_run) elif args.command == "alias-enable": alias_set_active(alias_addr, target_addr, 1, dry_run=args.dry_run) elif args.command == "alias-disable": alias_set_active(alias_addr, target_addr, 0, dry_run=args.dry_run) elif args.command == "alias-lookup": alias_addr = validate_email(args.alias_address) alias_lookup(alias_addr) elif args.command == "alias-list": alias_list() elif args.command == "alias-list-user": target_addr = validate_email(args.target_address) alias_list_user(target_addr) elif args.command == "catchall-add": domain = validate_domain(args.domain) target_addr = validate_email(args.target_address) catchall_add(domain, target_addr, dry_run=args.dry_run) elif args.command == "catchall-remove": domain = validate_domain(args.domain) target_addr = validate_email(args.target_address) catchall_remove(domain, target_addr, dry_run=args.dry_run) elif args.command == "catchall-list": catchall_list() else: parser.print_help() sys.exit(1) if __name__ == "__main__": main()