#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Mail Dovecot - shared code for listeners
#
# Copyright 2015-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
import os
import os.path
import subprocess
import re
import traceback
import pwd
import grp
import shutil
import stat
try:
	from typing import Any, Optional, Sequence, Tuple  # noqa: F401
except ImportError:
	pass
import univention.debug as ud
default_sieve_script = "/var/lib/dovecot/sieve/default.sieve"
[docs]class DovecotListener(object):
	def __init__(self, listener, name):
		# type: (str) -> None
		self.listener = listener
		self.name = name
[docs]	def log_p(self, msg):
		# type: (str) -> None
		ud.debug(ud.LISTENER, ud.PROCESS, "%s: %s" % (self.name, msg)) 
[docs]	def log_e(self, msg):
		# type: (str) -> None
		ud.debug(ud.LISTENER, ud.ERROR, "%s: %s" % (self.name, msg)) 
[docs]	def new_email_account(self, email):
		# type: (str) -> None
		spam_folder = self.listener.configRegistry.get("mail/dovecot/folder/spam")
		if self.listener.configRegistry.is_true("mail/dovecot/sieve/spam", True)\
			
and spam_folder and spam_folder.lower() != "none":
			try:
				self.upload_activate_sieve_script(email, default_sieve_script)
			except Exception:
				self.log_e("dovecot: Could not upload sieve script to account '%s'." % email)
				raise
			finally:
				self.listener.unsetuid()
		return 
[docs]	def delete_email_account(self, dn, email):
		# type: (str, str) -> None
		if self.listener.configRegistry.is_true('mail/dovecot/mailbox/delete', False):
			try:
				old_localpart, old_domainpart = email.split("@")
				global_mail_home = self.get_maillocation()
				old_home_calc = str(global_mail_home).replace("%Ld", old_domainpart).replace("%Ln", old_localpart)
			except Exception:
				self.log_e("dovecot: Delete mailbox: Configuration error. Could not remove mailbox (dn:'%s' old mail: '%s')." % (dn, email))
				raise
			self.read_from_ext_proc_as_root(["/usr/bin/doveadm", "kick", email])
			try:
				self.listener.setuid(0)
				shutil.rmtree(old_home_calc, ignore_errors=True)
			except Exception:
				self.log_e("dovecot: Delete mailbox: Error removing directory '%s' from disk." % old_home_calc)
				raise
			finally:
				self.listener.unsetuid()
		else:
			self.log_p("dovecot: Deleting of mailboxes disabled, not removing '%s' (dn '%s')." % (email, dn))
		return 
[docs]	def read_from_ext_proc_as_root(self, cmd, regexp=None, stdin=None, stdout=subprocess.PIPE, stderr=None, stdin_input=None):
		# type; (Sequence[str], Optional[str], Optional[str], Any, Any, Any) -> str
		"""
		Wrapper around Popen(), runs external command as root and return its
		output, optionally the first hit of a regexp. May raise an exception.
		:param cmd: list: with executable path as first item
		:param regexp: string: regexp for re.findall()
		:return: string
		"""
		try:
			self.listener.setuid(0)
			cmd_proc = subprocess.Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr)
			cmd_out, cmd_err = cmd_proc.communicate(input=stdin_input and stdin_input.encode('UTF-8'))
			cmd_exit = cmd_proc.wait()
			if cmd_out and not cmd_err and cmd_exit == 0:
				if regexp:
					res = re.findall(regexp, cmd_out.decode('UTF-8'))
					return res[0]
				else:
					return cmd_out.decode('UTF-8').rstrip()
		finally:
			self.listener.unsetuid() 
[docs]	def move_user_home(self, newMailPrimaryAddress, oldMailPrimaryAddress, force_rename=False):
		# type: (str, str, bool) -> None
		if not force_rename and not self.listener.configRegistry.is_true("mail/dovecot/mailbox/rename", False):
			self.log_p("Renaming of mailboxes disabled, not moving ('%s' -> '%s')." % (oldMailPrimaryAddress, newMailPrimaryAddress))
			return
		old_localpart, old_domainpart = oldMailPrimaryAddress.lower().split("@")
		try:
			global_mail_home = self.get_maillocation()
			old_home_calc = str(global_mail_home).replace("%Ld", old_domainpart).replace("%Ln", old_localpart)
			new_home_dove = self.get_user_home(newMailPrimaryAddress)
		except Exception:
			self.log_e("Move mailbox: Configuration error. Could not move mailbox ('%s' -> '%s')." % (oldMailPrimaryAddress, newMailPrimaryAddress))
			return
		try:
			self.listener.setuid(0)
			if not os.path.isdir(old_home_calc):
				# Either the user never logged in or never got any email, and thus no maildir was ever created,
				# or it was moved manually. In any case: ignore.
				self.log_p("Move mailbox: Source directory ('%s') does not exist. Nothing to do for mailbox move ('%s' -> '%s')." % (old_home_calc, oldMailPrimaryAddress, newMailPrimaryAddress))
				return
			if os.path.isdir(new_home_dove) or os.path.isfile(new_home_dove):
				# We don't know why there is a file or directory already. For security reasons we don't do anything.
				self.log_e("Move mailbox: Target directory ('%s') exists.  For security reasons not moving mailbox for mailbox move ('%s' -> '%s')." % (new_home_dove, oldMailPrimaryAddress, newMailPrimaryAddress))
				return
		finally:
			self.listener.unsetuid()
		try:
			self.read_from_ext_proc_as_root(["/usr/bin/doveadm", "kick", oldMailPrimaryAddress])
		except Exception:
			# ignore
			pass
		try:
			self.move_mail_home(old_home_calc, new_home_dove, newMailPrimaryAddress, force_rename)
		except Exception:
			self.log_e("Move mailbox: Failed to move mail home (of mail '%s') from '%s' to '%s'.\n%s" % (
				newMailPrimaryAddress, old_home_calc, new_home_dove, traceback.format_exc()))
			return
		self.log_p("Moved mail home (of mail: '%s') from '%s' to '%s'." % (newMailPrimaryAddress, old_home_calc, new_home_dove))
		return 
[docs]	def move_mail_home(self, old_path, new_path, email, force_rename=False):
		# type: (str, str, str, bool) -> None
		# create parent path in any case to make sure it has correct ownership
		self.mkdir_p(os.path.dirname(new_path))
		if not force_rename and not self.listener.configRegistry.is_true("mail/dovecot/mailbox/rename", False):
			self.log_p("Renaming of mailboxes disabled, not moving mail home (of mail '%s') from '%s' to '%s." % (email, old_path, new_path))
			return
		try:
			self.listener.setuid(0)
			st = os.stat(old_path)
			shutil.move(old_path, new_path)
			self.chown_r(new_path, st[stat.ST_UID], st[stat.ST_GID])
		except Exception:
			self.log_e("Failed to move mail home (of mail '%s') from '%s' to '%s'.\n%s" % (
				email, old_path, new_path, traceback.format_exc()))
			raise
		finally:
			self.listener.unsetuid() 
[docs]	def get_maillocation(self):
		# type: () -> str
		try:
			return self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_location"], r"\S+:(\S+)/Maildir")
		except Exception:
			self.log_e("Failed to get mail_location from Dovecot configuration.\n%s" % traceback.format_exc())
			raise 
[docs]	def upload_activate_sieve_script(self, email, file):
		# type: (str, str) -> None
		try:
			master_name, master_pw = self.get_masteruser_credentials()
			ca_file = self.listener.configRegistry.get("mail/dovecot/sieve/client/cafile", "/etc/univention/ssl/ucsCA/CAcert.pem")
			fqdn = "%s.%s" % (self.listener.configRegistry['hostname'], self.listener.configRegistry['domainname'])
			fqdn = self.listener.configRegistry.get("mail/dovecot/sieve/client/server", fqdn)
			_cmd = [
				"sieve-connect", "--user", "%s*%s" % (email, master_name),
				"--server", fqdn,
				"--noclearauth", "--noclearchan",
				"--tlscafile", ca_file,
				"--remotesieve", "default"]
			cmd_upload = list(_cmd)
			cmd_upload.extend(["--localsieve", file, "--upload"])
			self.read_from_ext_proc_as_root(cmd_upload, stdin=subprocess.PIPE, stdin_input=master_pw)
			cmd_activate = list(_cmd)
			cmd_activate.extend(["--activate"])
			self.read_from_ext_proc_as_root(cmd_activate, stdin=subprocess.PIPE, stdin_input=master_pw)
		except Exception:
			self.log_e("upload_activate_sieve_script(): Could not upload sieve script '%s' to mailbox '%s'. Exception:\n%s" % (file, email, traceback.format_exc()))
			raise 
[docs]	def get_user_home(self, username):
		# type: (str) -> str
		try:
			return self.read_from_ext_proc_as_root(["/usr/bin/doveadm", 'user', "-f", "home", username]).lower()
		except Exception:
			self.log_e("Failed to get mail home for user '%s'.\n%s" % (username, traceback.format_exc()))
			raise 
[docs]	def get_masteruser_credentials(self):
		# type: () -> Tuple[str, str]
		try:
			self.listener.setuid(0)
			return re.findall(r"(\S+):{PLAIN}(\S+)::::::", open("/etc/dovecot/master-users").read())[0]
		except Exception:
			self.log_e("Failed to get masteruser password.\n%s" % traceback.format_exc())
			raise
		finally:
			self.listener.unsetuid() 
[docs]	def get_dovecot_user(self):
		# type: () -> Tuple[str, str]
		if not hasattr(self, "dovecot_user") or not hasattr(self, "dovecot_group"):
			try:
				uid = self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_uid"])
				gid = self.read_from_ext_proc_as_root(["/usr/bin/doveconf", "-h", "mail_gid"])
			except Exception:
				uid = "dovemail"
				gid = "dovemail"
			self.dovecot_user = uid
			self.dovecot_group = gid
		return self.dovecot_user, self.dovecot_group 
[docs]	def mkdir_p(self, dir):
		# type: (str) -> None
		user, group = self.get_dovecot_user()
		dovecot_uid = pwd.getpwnam(user).pw_uid
		dovecot_gid = grp.getgrnam(group).gr_gid
		# spool directory has to be traversed as root
		self.listener.setuid(0)
		parent = os.path.dirname(dir)
		if not os.path.exists(parent):
			self.listener.unsetuid()
			self.mkdir_p(parent)
		else:
			self.listener.unsetuid()
		try:
			self.listener.setuid(0)
			if not os.path.exists(dir):
				os.mkdir(dir, 0o2700)
				os.chown(dir, dovecot_uid, dovecot_gid)
		except Exception:
			self.log_e("Failed to create directory '%s'.\n%s" % (dir, traceback.format_exc()))
			raise
		finally:
			self.listener.unsetuid() 
[docs]	@classmethod
	def chown_r(cls, path, uid, gid):
		# type: (str, int, int) -> None
		"""
		Recursively set owner and group on a file/directory and its
		subdirectories.
		:param str path: file/directory (and its subdirectories) to change ownership on
		:param int uid: UID to set
		:param int gid: GID to set
		:return: None
		"""
		def chown_if_different(path_, uid_, gid_):
			st = os.stat(path_)
			if st[stat.ST_UID] != uid_ or st[stat.ST_GID] != gid_:
				os.chown(path_, uid_, gid_)
		chown_if_different(path, uid, gid)
		for dirpath, dirnames, filenames in os.walk(path):
			for dirname in dirnames:
				cls.chown_r(os.path.join(dirpath, dirname), uid, gid)
			for filename in filenames:
				chown_if_different(os.path.join(dirpath, filename), uid, gid)