#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention App Center
#  univention-app modules
#
# Copyright 2015-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#
import sys
from glob import glob
import os.path
from argparse import ArgumentParser, Action, Namespace
import logging
import ssl
from functools import wraps
from typing import TYPE_CHECKING, Any, Iterator, Mapping, Optional, Sequence, Tuple  # noqa: F401
from six.moves import urllib_error, http_client
from six import string_types
from univention.appcenter.app_cache import Apps
from univention.appcenter.log import get_base_logger
from univention.appcenter.utils import underscore, call_process, verbose_http_error, send_information
from univention.appcenter.exceptions import Abort, NetworkError
from six import with_metaclass
if TYPE_CHECKING:
	from univention.appcenter.app import App  # noqa: F401
_ACTIONS = {}
JOINSCRIPT_DIR = '/usr/lib/univention-install'
[docs]def possible_network_error(func):
	@wraps(func)
	def _func(*args, **kwargs):
		try:
			return func(*args, **kwargs)
		except (urllib_error.HTTPError, urllib_error.URLError, ssl.CertificateError, http_client.BadStatusLine) as exc:
			raise NetworkError(verbose_http_error(exc))
	return _func 
[docs]class StoreAppAction(Action):
	cache_class = Apps
	def __call__(self, parser, namespace, value, option_string=None):
		apps = []
		if self.nargs is None:
			value = [value]
		for val in value:
			if self.cache_class:
				app = self.cache_class.find_by_string(val)
			else:
				app = self.cache_class.split_app_string(val)
			if app is None:
				parser.error('Unable to find app %s. Maybe "%s update" to get the latest list of applications?' % (val, sys.argv[0]))
			apps.append(app)
		if self.nargs is None:
			apps = apps[0]
		setattr(namespace, self.dest, apps) 
[docs]class UniventionAppAction(with_metaclass(UniventionAppActionMeta, object)):
	parent_logger = get_base_logger().getChild('actions')
	def __init__(self):
		# type: () -> None
		self._progress_percentage = 0
[docs]	@classmethod
	def get_action_name(cls):
		# type: () -> str
		return underscore(cls.__name__).replace('_', '-') 
	@classmethod
	def _log(cls, logger, level, msg, *args, **kwargs):
		if logger is not None:
			logger = cls.logger.getChild(logger)
		else:
			logger = cls.logger
		logger.log(level, msg, *args, **kwargs)
[docs]	@classmethod
	def debug(cls, msg, logger=None):
		cls._log(logger, logging.DEBUG, str(msg)) 
[docs]	@classmethod
	def log(cls, msg, logger=None):
		cls._log(logger, logging.INFO, str(msg)) 
[docs]	@classmethod
	def warn(cls, msg, logger=None):
		cls._log(logger, logging.WARN, str(msg)) 
[docs]	@classmethod
	def fatal(cls, msg, logger=None):
		cls._log(logger, logging.FATAL, str(msg)) 
[docs]	@classmethod
	def log_exception(cls, exc, logger=None):
		cls._log(logger, logging.ERROR, exc, exc_info=1) 
[docs]	def setup_parser(self, parser):
		# type: (ArgumentParser) -> None
		pass 
	@property
	def percentage(self):
		# type: () -> int
		return self._progress_percentage
	@percentage.setter
	def percentage(self, percentage):
		# type: (int) -> None
		self._progress_percentage = percentage
		self.progress.debug(str(percentage))
	def _build_namespace(self, _namespace=None, **kwargs):
		# type: (Optional[Namespace], Any) -> Namespace
		parser = ArgumentParser()
		self.setup_parser(parser)
		namespace = Namespace()
		args = {}
		for action in parser._actions:
			default = parser._defaults.get(action.dest)
			if action.default is not None:
				default = action.default
			if hasattr(_namespace, action.dest):
				default = getattr(_namespace, action.dest)
			args[action.dest] = default
		args.update(kwargs)
		for key, value in args.items():
			setattr(namespace, key, value)
		return namespace
[docs]	@classmethod
	def call_safe(cls, **kwargs):
		# type: (Any) -> Any
		try:
			return cls.call(**kwargs)
		except Abort:
			return None 
[docs]	@classmethod
	def call(cls, **kwargs):
		# type: (Any) -> Any
		obj = cls()
		namespace = obj._build_namespace(**kwargs)
		return obj.call_with_namespace(namespace) 
[docs]	def call_with_namespace(self, namespace):
		# type: (Namespace) -> Any
		self.debug('Calling %s' % self.get_action_name())
		self.percentage = 0
		try:
			result = self.main(namespace)
		except Abort as exc:
			msg = str(exc)
			if msg:
				self.fatal(msg)
			self.percentage = 100
			raise
		except Exception as exc:
			self.log_exception(exc)
			raise
		else:
			self.percentage = 100
			return result 
	def _get_joinscript_path(self, app, unjoin=False):
		# type: (App, bool) -> str
		number = 50
		suffix = ''
		ext = 'inst'
		if unjoin:
			number = 51
			ext = 'uinst'
			suffix = '-uninstall'
		return os.path.join(JOINSCRIPT_DIR, '%d%s%s.%s' % (number, app.id, suffix, ext))
	def _call_cache_script(self, _app, _ext, *args, **kwargs):
		fname = _app.get_cache_file(_ext)
		# change to UCS umask + u+x:      -rwxr--r--
		if os.path.exists(fname):
			os.chmod(fname, 0o744)
		return self._call_script(fname, *args, **kwargs)
	def _call_script(self, _script, *args, **kwargs):
		# type: (str, Any, Any) -> Optional[bool]
		if not os.path.exists(_script):
			self.debug('%s does not exist' % _script)
			return None
		subprocess_args = [_script] + list(args)
		for key, value in kwargs.items():
			if value is None or value is False:
				continue
			key = '--%s' % key.replace('_', '-')
			subprocess_args.append(key)
			if value is not True:
				subprocess_args.append(value)
		process = self._subprocess(subprocess_args)
		self.debug('%s returned with %s' % (_script, process.returncode))
		return process.returncode == 0
	def _subprocess(self, args, logger=None, env=None, cwd=None):
		# type: (Sequence[str], Optional[logging.Logger], Optional[Mapping[str, str]], Optional[str]) -> Any
		if logger is None:
			logger = self.logger
		elif isinstance(logger, string_types):
			logger = self.logger.getChild(logger)
		return call_process(args, logger, env, cwd)
	@possible_network_error
	def _send_information(self, app, status, value=None):
		action = self.get_action_name()
		send_information(action, app, status, value) 
[docs]def get_action(action_name):
	# type: (str) -> Optional[UniventionAppAction]
	_import()
	return _ACTIONS.get(action_name) 
[docs]def all_actions():
	# type: () -> Iterator[Tuple[str, UniventionAppAction]]
	_import()
	for action_name in sorted(_ACTIONS):
		yield action_name, _ACTIONS[action_name] 
def _import():
	# type: () -> None
	if _ACTIONS:
		return
	path = os.path.dirname(__file__)
	for pymodule in glob(os.path.join(path, '*.py')):
		pymodule_name = os.path.basename(pymodule)[:-3]  # without .py
		__import__('univention.appcenter.actions.%s' % pymodule_name)