From f771e3c637f38b9904997333763091195d739498 Mon Sep 17 00:00:00 2001 From: Charles Danesi Date: Tue, 21 Apr 2026 16:12:29 -0400 Subject: [PATCH] initial commit --- .gitignore | 0 mailalias.py | 431 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 431 insertions(+) create mode 100644 .gitignore create mode 100755 mailalias.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/mailalias.py b/mailalias.py new file mode 100755 index 0000000..84cfe4e --- /dev/null +++ b/mailalias.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python3 +""" +mailalias - simple alias / catch-all management tool for iRedMail (MySQL/MariaDB) + +Alias model used: +- alias table contains alias addresses +- forwardings contains destination rows for aliases with is_list=1 +- catch-all entries are stored in forwardings where address == domain + +Examples: + mailalias alias-add contact@charlesdanesi.com charles@charlesdanesi.com + mailalias alias-remove admin@beyemir.com charles@beyemir.com + mailalias alias-lookup admin@charlesdanesi.com + mailalias alias-list + mailalias alias-list-user charles@danesisolutions.com + + mailalias catchall-add beyemir.com charles@beyemir.com + mailalias catchall-remove beyemir.com charles@beyemir.com + mailalias catchall-list +""" + +from __future__ import annotations + +import argparse +import re +import subprocess +import sys +from pathlib import Path + +SETTINGS_FILE = Path("/opt/www/iredadmin/settings.py") +DB_NAME = "vmail" +DB_USER = "vmailadmin" +MYSQL_BIN = "/usr/bin/mysql" + +EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") +DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)([A-Za-z0-9-]{1,63}\.)+[A-Za-z]{2,63}$") + + +def die(msg: str, code: int = 1) -> None: + print(f"ERROR: {msg}", file=sys.stderr) + sys.exit(code) + + +def validate_email(address: str) -> str: + address = address.strip().lower() + if not EMAIL_RE.match(address): + die(f"Invalid email address: {address}") + return address + + +def validate_domain(domain: str) -> str: + domain = domain.strip().lower() + if not DOMAIN_RE.match(domain): + die(f"Invalid domain: {domain}") + return domain + + +def domain_of(address: str) -> str: + return address.split("@", 1)[1] + + +def read_db_password() -> str: + if not SETTINGS_FILE.exists(): + die(f"Could not find iRedAdmin settings file: {SETTINGS_FILE}") + + for line in SETTINGS_FILE.read_text().splitlines(): + line = line.strip() + if line.startswith("vmail_db_password"): + parts = line.split("=", 1) + if len(parts) != 2: + continue + value = parts[1].strip().strip('"').strip("'") + if value: + return value + + die(f"Could not find vmail_db_password in {SETTINGS_FILE}") + raise RuntimeError("unreachable") + + +def sql_escape(value: str) -> str: + return value.replace("\\", "\\\\").replace("'", "\\'") + + +def mysql_query(sql: str, dry_run: bool = False) -> str: + password = read_db_password() + + cmd = [ + MYSQL_BIN, + f"--user={DB_USER}", + f"--password={password}", + "--batch", + "--raw", + "--skip-column-names", + "-D", + DB_NAME, + "-e", + sql, + ] + + if dry_run: + print("DRY RUN") + print(sql.strip()) + return "" + + try: + result = subprocess.run( + cmd, + check=True, + capture_output=True, + text=True, + ) + return result.stdout.strip() + except subprocess.CalledProcessError as exc: + stderr = exc.stderr.strip() if exc.stderr else "Unknown MySQL error" + die(stderr) + raise RuntimeError("unreachable") + + +def alias_row_exists(alias_addr: str) -> bool: + sql = f""" + SELECT COUNT(*) + FROM alias + WHERE address = '{sql_escape(alias_addr)}'; + """ + out = mysql_query(sql) + return int(out or "0") > 0 + + +def alias_mapping_exists(alias_addr: str, target_addr: str) -> bool: + sql = f""" + SELECT COUNT(*) + FROM forwardings + WHERE address = '{sql_escape(alias_addr)}' + AND forwarding = '{sql_escape(target_addr)}' + AND is_list = 1; + """ + out = mysql_query(sql) + return int(out or "0") > 0 + + +def catchall_exists(domain: str, target_addr: str) -> bool: + sql = f""" + SELECT COUNT(*) + FROM forwardings + WHERE address = '{sql_escape(domain)}' + AND forwarding = '{sql_escape(target_addr)}' + AND domain = '{sql_escape(domain)}'; + """ + out = mysql_query(sql) + return int(out or "0") > 0 + + +def alias_add(alias_addr: str, target_addr: str, dry_run: bool = False) -> None: + alias_domain = domain_of(alias_addr) + target_domain = domain_of(target_addr) + + if alias_addr == target_addr: + die("Alias and target cannot be the same address") + + statements = [] + + if not alias_row_exists(alias_addr): + statements.append( + f"""INSERT INTO alias (address, domain, active) +VALUES ('{sql_escape(alias_addr)}', '{sql_escape(alias_domain)}', 1)""" + ) + + if alias_mapping_exists(alias_addr, target_addr): + die(f"Alias mapping already exists: {alias_addr} -> {target_addr}") + + statements.append( + f"""INSERT INTO forwardings +(address, forwarding, domain, dest_domain, is_maillist, is_list, is_forwarding, is_alias, active) +VALUES +('{sql_escape(alias_addr)}', '{sql_escape(target_addr)}', '{sql_escape(alias_domain)}', '{sql_escape(target_domain)}', 0, 1, 0, 0, 1)""" + ) + + sql = ";\n".join(statements) + ";" + mysql_query(sql, dry_run=dry_run) + + if not dry_run: + print(f"Added alias: {alias_addr} -> {target_addr}") + + +def alias_remove(alias_addr: str, target_addr: str, dry_run: bool = False) -> None: + if not alias_mapping_exists(alias_addr, target_addr): + die(f"Alias mapping not found: {alias_addr} -> {target_addr}") + + sql = f""" + DELETE FROM forwardings + WHERE address = '{sql_escape(alias_addr)}' + AND forwarding = '{sql_escape(target_addr)}' + AND is_list = 1; + """ + mysql_query(sql, dry_run=dry_run) + + if not dry_run: + # remove alias row if no list mappings remain + sql_cleanup = f""" + DELETE FROM alias + WHERE address = '{sql_escape(alias_addr)}' + AND NOT EXISTS ( + SELECT 1 + FROM forwardings + WHERE address = '{sql_escape(alias_addr)}' + AND is_list = 1 + ); + """ + mysql_query(sql_cleanup) + print(f"Removed alias mapping: {alias_addr} -> {target_addr}") + + +def alias_set_active( + alias_addr: str, target_addr: str, active: int, dry_run: bool = False +) -> None: + if not alias_mapping_exists(alias_addr, target_addr): + die(f"Alias mapping not found: {alias_addr} -> {target_addr}") + + sql = f""" + UPDATE forwardings + SET active = {active} + WHERE address = '{sql_escape(alias_addr)}' + AND forwarding = '{sql_escape(target_addr)}' + AND is_list = 1; + """ + mysql_query(sql, dry_run=dry_run) + + if not dry_run: + state = "Enabled" if active else "Disabled" + print(f"{state}: {alias_addr} -> {target_addr}") + + +def alias_lookup(alias_addr: str) -> None: + sql = f""" + SELECT f.address, f.forwarding, f.active + FROM forwardings f + WHERE f.address = '{sql_escape(alias_addr)}' + AND f.is_list = 1 + ORDER BY f.forwarding; + """ + out = mysql_query(sql) + if not out: + print(f"No alias mappings found for {alias_addr}") + return + + print("ALIAS\tTARGET\tACTIVE") + print(out) + + +def alias_list() -> None: + sql = """ + SELECT a.address, GROUP_CONCAT(f.forwarding ORDER BY f.forwarding SEPARATOR ', '), a.active + FROM alias a + LEFT JOIN forwardings f + ON a.address = f.address + AND f.is_list = 1 + GROUP BY a.address, a.active + ORDER BY a.address; + """ + out = mysql_query(sql) + if not out: + print("No aliases found") + return + + print("ALIAS\tTARGETS\tACTIVE") + print(out) + + +def alias_list_user(target_addr: str) -> None: + sql = f""" + SELECT f.address, f.forwarding, f.active + FROM forwardings f + WHERE f.forwarding = '{sql_escape(target_addr)}' + AND f.is_list = 1 + ORDER BY f.address; + """ + out = mysql_query(sql) + if not out: + print(f"No aliases found for target {target_addr}") + return + + print("ALIAS\tTARGET\tACTIVE") + print(out) + + +def catchall_add(domain: str, target_addr: str, dry_run: bool = False) -> None: + target_domain = domain_of(target_addr) + + if catchall_exists(domain, target_addr): + die(f"Catch-all already exists: @{domain} -> {target_addr}") + + sql = f""" + INSERT INTO forwardings + (address, forwarding, domain, dest_domain, is_maillist, is_list, is_forwarding, is_alias, active) + VALUES + ('{sql_escape(domain)}', '{sql_escape(target_addr)}', '{sql_escape(domain)}', '{sql_escape(target_domain)}', 0, 0, 0, 0, 1); + """ + mysql_query(sql, dry_run=dry_run) + + if not dry_run: + print(f"Added catch-all: @{domain} -> {target_addr}") + + +def catchall_remove(domain: str, target_addr: str, dry_run: bool = False) -> None: + if not catchall_exists(domain, target_addr): + die(f"Catch-all not found: @{domain} -> {target_addr}") + + sql = f""" + DELETE FROM forwardings + WHERE address = '{sql_escape(domain)}' + AND forwarding = '{sql_escape(target_addr)}' + AND domain = '{sql_escape(domain)}'; + """ + mysql_query(sql, dry_run=dry_run) + + if not dry_run: + print(f"Removed catch-all: @{domain} -> {target_addr}") + + +def catchall_list() -> None: + sql = """ + SELECT address, forwarding, active + FROM forwardings + WHERE address = domain + AND address NOT LIKE '%@%' + ORDER BY address, forwarding; + """ + out = mysql_query(sql) + if not out: + print("No catch-all rules found") + return + + print("DOMAIN\tTARGET\tACTIVE") + print(out) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="mailalias", + description="Alias and catch-all manager for iRedMail", + ) + parser.add_argument( + "--dry-run", action="store_true", help="Show SQL without executing it" + ) + subparsers = parser.add_subparsers(dest="command", required=True) + + p = subparsers.add_parser("alias-add") + p.add_argument("alias_address") + p.add_argument("target_address") + + p = subparsers.add_parser("alias-remove") + p.add_argument("alias_address") + p.add_argument("target_address") + + p = subparsers.add_parser("alias-enable") + p.add_argument("alias_address") + p.add_argument("target_address") + + p = subparsers.add_parser("alias-disable") + p.add_argument("alias_address") + p.add_argument("target_address") + + p = subparsers.add_parser("alias-lookup") + p.add_argument("alias_address") + + p = subparsers.add_parser("alias-list") + + p = subparsers.add_parser("alias-list-user") + p.add_argument("target_address") + + p = subparsers.add_parser("catchall-add") + p.add_argument("domain") + p.add_argument("target_address") + + p = subparsers.add_parser("catchall-remove") + p.add_argument("domain") + p.add_argument("target_address") + + p = subparsers.add_parser("catchall-list") + + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + + if args.command in ("alias-add", "alias-remove", "alias-enable", "alias-disable"): + alias_addr = validate_email(args.alias_address) + target_addr = validate_email(args.target_address) + + if args.command == "alias-add": + alias_add(alias_addr, target_addr, dry_run=args.dry_run) + elif args.command == "alias-remove": + alias_remove(alias_addr, target_addr, dry_run=args.dry_run) + elif args.command == "alias-enable": + alias_set_active(alias_addr, target_addr, 1, dry_run=args.dry_run) + elif args.command == "alias-disable": + alias_set_active(alias_addr, target_addr, 0, dry_run=args.dry_run) + + elif args.command == "alias-lookup": + alias_addr = validate_email(args.alias_address) + alias_lookup(alias_addr) + + elif args.command == "alias-list": + alias_list() + + elif args.command == "alias-list-user": + target_addr = validate_email(args.target_address) + alias_list_user(target_addr) + + elif args.command == "catchall-add": + domain = validate_domain(args.domain) + target_addr = validate_email(args.target_address) + catchall_add(domain, target_addr, dry_run=args.dry_run) + + elif args.command == "catchall-remove": + domain = validate_domain(args.domain) + target_addr = validate_email(args.target_address) + catchall_remove(domain, target_addr, dry_run=args.dry_run) + + elif args.command == "catchall-list": + catchall_list() + + else: + parser.print_help() + sys.exit(1) + + +if __name__ == "__main__": + main()