#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
#  Decorators for functions in UMC 2.0 modules
#
# Copyright 2012-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Convenience decorators for developers of UMC modules
====================================================
Functions exposed by UMC modules often share some logic. They check the
existence and formatting of variables or check permissions. If anything
fails, they react in a similar way. If everything is correct, the real
logic is often as simple as returning one single value.
This module provides functions that can be used to separate repeating
tasks from the actual business logic. This means:
* less time to code
* fewer bugs
* consistent behavior throughout the UMC in standard cases
Note that the functions defined herein do not cover every corner case during
UMC module development. You are not bound to use them if you need more
flexibility.
"""
import sys
import inspect
import time
import types
import notifier
import notifier.threads
import functools
from threading import Thread
from univention.lib.i18n import Translation
from univention.management.console.error import UMC_Error, UnprocessableEntity
from univention.management.console.log import MODULE
from univention.management.console.modules.sanitizers import MultiValidationError, ValidationError, Sanitizer, DictSanitizer, ListSanitizer
_ = Translation('univention.management.console').translate
[docs]def sanitize(*args, **kwargs):
	"""
	Decorator that lets you sanitize the user input.
	The sanitize function can be used to validate the input
	as well as change it.
	Note that changing a value here will actually alter the request
	object. This should be no problem though.
	If the validation step fails an error will be passed to the user
	instead of executing the function. This step should not raise
	anything other than
	:class:`~univention.management.console.modules.sanitizers.ValidationError`
	or
	:class:`~univention.management.console.modules.sanitizers.UnformattedValidationError`
	(one should use the method
	:meth:`~univention.management.console.modules.sanitizers.Sanitizer.raise_validation_error`).
	You can find some predefined Sanitize classes in the
	corresponding module or you define one yourself, deriving it from
	:class:`~univention.management.console.modules.sanitizers.Sanitizer`::
		class SplitPathSanitizer(Sanitizer):
			def __init__(self):
				super(SplitPathSanitizer, self).__init__(
					validate_none=True,
					may_change_value=True)
			def _sanitize(self, value, name, further_fields):
				if value is None:
					return []
				try:
					return value.split('/')
				except BaseException:
					self.raise_validation_error('Split failed')
	Before::
		def my_func(self, request):
			var1 = request.options.get('var1')
			var2 = request.options.get('var2', 20)
			try:
				var1 = int(var1)
				var2 = int(var2)
			except (ValueError, TypeError):
				self.finished(request.id, None, 'Cannot convert to int', status=400)
				return
			if var2 < 10:
				self.finished(request.id, None, 'var2 must be >= 10', status=400)
				return
			self.finished(request.id, var1 + var2)
	After::
		@sanitize(
			var1=IntegerSanitizer(required=True),
			var2=IntegerSanitizer(required=True, minimum=10, default=20)
		)
		def add(self, request):
			var1 = request.options.get('var1')  # could now use ['var1']
			var2 = request.options.get('var2')
			self.finished(request.id, var1 + var2)
	The decorator can be combined with other decorators like
	:func:`simple_response` (be careful with ordering of decorators here)::
		@sanitize(
			var1=IntegerSanitizer(required=True),
			var2=IntegerSanitizer(required=True, minimum=10)
		)
		@simple_response
		def add(self, var1, var2):
			return var1 + var2
	Note that you lose the capability of specifying defaults in
	*@simple_response*. You need to do it in *@sanitize* now.
	"""
	if args:
		return sanitize_list(args[0], **kwargs)
	else:
		return sanitize_dict(kwargs) 
[docs]def sanitize_list(sanitizer, **kwargs):
	return lambda function: _sanitize_list(function, sanitizer, kwargs) 
[docs]def sanitize_dict(sanitized_attrs, **kwargs):
	return lambda function: _sanitize_dict(function, sanitized_attrs, kwargs) 
def _sanitize_dict(function, sanitized_attrs, sanitizer_parameters):
	defaults = {'default': {}, 'required': True, 'may_change_value': True}
	defaults.update(sanitizer_parameters)
	return _sanitize(function, DictSanitizer(sanitized_attrs, **defaults))
def _sanitize_list(function, sanitizer, sanitizer_parameters):
	defaults = {'default': [], 'required': True, 'may_change_value': True}
	defaults.update(sanitizer_parameters)
	return _sanitize(function, ListSanitizer(sanitizer, **defaults))
def _sanitize(function, sanitizer):
	def _response(self, request):
		request.options = sanitize_args(sanitizer, 'request.options', {'request.options': request.options})
		return function(self, request)
	copy_function_meta_data(function, _response)
	return _response
def sanitize_args(sanitizer, name, args):
	try:
		try:
			return sanitizer.sanitize(name, args)
		except MultiValidationError:
			raise
		except ValidationError as exc:
			multi_error = MultiValidationError()
			multi_error.add_error(exc, name)
			raise multi_error
	except MultiValidationError as exc:
		raise UnprocessableEntity(str(exc), result=exc.result())
[docs]def simple_response(function=None, with_flavor=None, with_progress=False):
	'''If your function is as simple as: "Just return some variables"
	this decorator is for you.
	Instead of defining the function
	.. code-block :: python
		def my_func(self, response): pass
	you now define a function with the variables you would expect in
	*request.options*. Default values are supported:
	.. code-block :: python
		@simple_response
		def my_func(self, var1, var2='default'): pass
	The decorator extracts variables from *request.options*. If the
	variable is not found, it either returns a failure or sets it to a
	default value (if specified by you).
	If you need to get the flavor passed to the function you can do it
	like this::
		@simple_response(with_flavor=True)
		def my_func(self, flavor, var1, var2='default'): pass
	With *with_flavor* set, the flavor is extracted from the *request*.
	You can also set with_flavor='varname', in which case the variable
	name for the flavor is *varname*. *True* means 'flavor'.
	As with ordinary option arguments, you may specify a default value
	for flavor in the function definition::
		@simple_response(with_flavor='module_flavor')
		def my_func(self, flavor='this comes from request.options',
			module_flavor='this is the flavor (and its default value)'): pass
	Instead of stating at the end of your function
	.. code-block:: python
		self.finished(request.id, some_value)
	you now just
	.. code-block:: python
		return some_value
	Before::
		def my_func(self, request):
			variable1 = request.options.get('variable1')
			variable2 = request.options.get('variable2')
			flavor = request.flavor or 'default flavor'
			if variable1 is None:
				self.finished(request.id, None, message='variable1 is required', success=False)
				return
			if variable2 is None:
			variable2 = ''
			try:
				value = '%s_%s_%s' % (self._saved_dict[variable1], variable2, flavor)
			except KeyError:
				self.finished(request.id, None, message='Something went wrong', success=False, status=500)
				return
			self.finished(request.id, value)
	After::
		@simple_response(with_flavor=True)
		def my_func(self, variable1, variable2='', flavor='default_flavor'):
			try:
				return '%s_%s_%s' % (self._saved_dict[variable1], variable2, flavor)
			except KeyError:
				raise UMC_Error('Something went wrong')
	'''
	if function is None:
		return lambda f: simple_response(f, with_flavor, with_progress)
	if with_progress is True:
		with_progress = 'progress'
	# fake a generator function that yields whatever the original
	# function returned
	def _fake_func(self, iterator, *args):
		for args in iterator:
			break
		yield function(self, *args)
	copy_function_meta_data(function, _fake_func, copy_arg_inspect=True)
	# fake another variable name
	# the name is not important as it is removed from the list while
	# being processed. Even a variable named 'iterator' in the original
	# function does not break anything
	_fake_func._original_argument_names = ['self', 'iterator'] + _fake_func._original_argument_names[1:]
	_multi_response = _eval_simple_decorated_function(_fake_func, with_flavor)
	def _response(self, request, *args, **kwargs):
		# other arguments than request won't be propagated
		# needed for @LDAP_Connection
		# fake a multi_request
		request.options = [request.options]
		if with_progress:
			progress_obj = self.new_progress()
			request.options[0][with_progress] = progress_obj
			def _thread(self, progress_obj, _multi_response, request):
				try:
					result = _multi_response(self, request)
				except Exception:
					progress_obj.exception(sys.exc_info())
				else:
					progress_obj.finish_with_result(result[0])
			thread = Thread(target=_thread, args=[self, progress_obj, _multi_response, request])
			thread.start()
			self.finished(request.id, progress_obj.initialised())
		else:
			result = _multi_response(self, request)
			if not isinstance(result[0], types.FunctionType):
				self.finished(request.id, result[0])
			else:
				# return value is a function which is meant to be executed as thread
				# TODO: replace notfier by threading
				thread = notifier.threads.Simple('simple_response', notifier.Callback(result[0], self, request), notifier.Callback(self.thread_finished_callback, request))
				thread.run()
	if with_progress:
		_response = sanitize_dict({})(_response)
	copy_function_meta_data(function, _response)
	return _response 
[docs]def multi_response(function=None, with_flavor=None, single_values=False, progress=False):
	''' This decorator acts similar to :func:`simple_response` but
	can handle a list of dicts instead of a single dict.
	Technically another object is passed to the function that you can
	name as you like. You can iterate over this object and get the values
	from each dictionary in *request.options*.
	Default values and flavors are supported.
	You do not return a value, you yield them (and you are supposed to
	yield!)::
		@multi_response
		def my_multi_func(self, iterator, variable1, variable2=''):
			# here, variable1 and variable2 are yet to be initialised
			# i.e. variable1 and variable2 will be None!
			do_some_initial_stuff()
			try:
				for variable1, variable2 in iterator:
					# now they are set
					yield '%s_%s' % (self._saved_dict[variable1], variable2)
			except KeyError:
				raise UMC_Error('Something went wrong')
			else:
				# only when everything went right...
				do_some_cleanup_stuff()
	The above code will send a list of answers to the client as soon as
	the function is finished (i.e. after *do_some_cleanup_stuff()*)
	filled with values yielded.
	If you have just one variable in your dictionary, do not forget to
	add a comma, otherwise Python will assign the first value a list
	of one element::
		for var, in iterator:
			# now var is set correctly
			pass
	'''
	if function is None:
		return lambda f: multi_response(f, with_flavor, single_values, progress)
	response_func = _eval_simple_decorated_function(function, with_flavor, single_values, progress)
	def _response(self, request):
		result = response_func(self, request)
		self.finished(request.id, result)
	copy_function_meta_data(function, _response)
	return _response 
def _eval_simple_decorated_function(function, with_flavor, single_values=False, progress=False):
	# name of flavor argument. default: 'flavor' (if given, of course)
	if with_flavor is True:
		with_flavor = 'flavor'
	# argument names of the function, including 'self'
	arguments, defaults = arginspect(function)
	# remove self, remove iterator
	arguments = arguments[2:]
	# use defaults as dict
	if defaults:
		defaults = dict(zip(arguments[-len(defaults):], defaults))
	else:
		defaults = {}
	@sanitize(DictSanitizer(dict((arg, Sanitizer(required=arg not in defaults and arg != with_flavor, default=defaults.get(arg))) for arg in arguments), _copy_value=False) if not single_values else None)
	def _response(self, request):
		# single_values: request.options is, e.g., ["id1", "id2", "id3"], no need for complicated dicts
		if not single_values:
			# normalize the whole request.options
			for element in request.options:
				# add flavor before default checking
				if with_flavor:
					element[with_flavor] = request.flavor or defaults.get(with_flavor)
		# checked for required arguments, set default... now run!
		iterator = RequestOptionsIterator(request.options, arguments, single_values)
		nones = [None] * len(arguments)
		if progress:
			number = len(request.options)
			if progress is True:
				progress_title = None
			else:
				if isinstance(progress, (list, tuple)):
					progress_title, progress_msg = progress
				else:
					progress_title, progress_msg = progress, None
				if '%d' in progress_title:
					progress_title = progress_title % number
			progress_obj = self.new_progress(progress_title, number)
			def _thread(self, progress_obj, iterator, nones):
				try:
					for res in function(self, iterator, *nones):
						if progress_msg:
							res_msg = progress_msg % res
						progress_obj.progress(res, res_msg)
				except Exception:
					progress_obj.exception(sys.exc_info())
				else:
					progress_obj.finish()
			thread = Thread(target=_thread, args=[self, progress_obj, iterator, nones])
			thread.start()
			return progress_obj.initialised()
		else:
			return list(function(self, iterator, *nones))
	return _response
class RequestOptionsIterator(object):
	def __init__(self, everything, names, single_values):
		self.everything = everything
		self.names = names
		self.single_values = single_values
		self.max = len(self.everything)
		self.current = 0
	def __bool__(self):
		return self.current < self.max
	__nonzero__ = __bool__
	def __iter__(self):
		self.current = 0
		return self
	def __next__(self):
		if self:
			values = self.everything[self.current]
			self.current += 1
			if self.single_values:
				return values
			else:
				return [values[name] for name in self.names]
		else:
			raise StopIteration
	next = __next__  # Python 2
def arginspect(function):
	getfullargspec = getattr(inspect, 'getfullargspec', inspect.getargspec)
	argspec = getfullargspec(function)
	if hasattr(function, '_original_argument_names'):
		arguments = function._original_argument_names
	else:
		arguments = argspec.args
	if hasattr(function, '_original_argument_defaults'):
		defaults = function._original_argument_defaults
	else:
		defaults = argspec.defaults
	return arguments, defaults
def copy_function_meta_data(original_function, new_function, copy_arg_inspect=False):
	# set function attrs to allow another arginspect to get original info
	# (used in @simple_response / @log - combo)
	if copy_arg_inspect:
		arguments, defaults = arginspect(original_function)
		new_function._original_argument_names = arguments
		new_function._original_argument_defaults = defaults
	# copy __doc__, otherwise it would not show up in api and such
	new_function.__doc__ = original_function.__doc__
	# copy __name__, otherwise it would be something like "_response"
	new_function.__name__ = original_function.__name__
	# copy __module__, otherwise it would be "univention.management.console.modules.decorators"
	new_function.__module__ = original_function.__module__
[docs]def log(function=None, sensitives=None, customs=None, single_values=False):
	'''Log decorator to be used with
	:func:`simple_response`::
		@simple_response
		@log
		def my_func(self, var1, var2):
			return "%s__%s" % (var1, var2)
	The above example will write two lines into the logfile for the
	module (given that the UCR variable *umc/module/debug/level*
	is set to at least 3)::
		<date>  MODULE      ( INFO    ) : my_func got: var1='value1', var2='value2'
		<date>  MODULE      ( INFO    ) : my_func returned: 'value1__value2'
	The variable names are ordered by appearance and hold the values that
	are actually going to be passed to the function (i.e. after they were
	:func:`sanitize` 'd or set to their default value).
	You may specify the names of sensitive arguments that should not
	show up in log files and custom functions that can alter the
	representation of a certain variable's values (useful for non-standard
	datatypes like regular expressions - you may have used a
	:class:`~univention.management.console.modules.sanitizers.PatternSanitizer`
	)::
		@sanitize(pattern=PatternSanitizer())
		@simple_reponse
		@log(sensitives=['password'], customs={'pattern':lambda x: x.pattern})
		def count_ucr(self, username, password, pattern):
			return self._ucr_count(username, password, pattern)
	This results in something like::
		<date>  MODULE      ( INFO    ) : count_ucr got: password='********', username='Administrator', pattern='.*'
		<date>  MODULE      ( INFO    ) : count_ucr returned: 650
	The decorator also works with :func:`multi_response`::
		@multi_response
		@log
		def multi_my_func(self, var1, var2):
			return "%s__%s" % (var1, var2)
	This results in something like::
		<date>  MODULE      ( INFO    ) : multi_my_func got: [var1='value1', var2='value2'], [var1='value3', var2='value4']
		<date>  MODULE      ( INFO    ) : multi_my_func returned: ['value1__value2', 'value3__value4']
	'''
	if function is None:
		return lambda f: log(f, sensitives, customs, single_values)
	if customs is None:
		customs = {}
	if sensitives is None:
		sensitives = []
	for sensitive in sensitives:
		customs[sensitive] = lambda x: '********'
	def _log(names, args):
		if single_values:
			args = [args]
		return ['%s=%r' % (name, customs.get(name, lambda x: x)(arg)) for name, arg in zip(names, args)]
	# including self
	names, _ = arginspect(function)
	name = function.__name__
	# multi_response yields i.e. is generator function
	if inspect.isgeneratorfunction(function):
		# remove self, iterator
		names = names[2:]
		def _response(self, iterator, *args):
			arg_reprs = []
			for element in iterator:
				arg_repr = _log(names, element)
				if arg_repr:
					arg_reprs.append(arg_repr)
			if arg_reprs:
				MODULE.info('%s got: [%s]' % (name, '], ['.join(', '.join(arg_repr) for arg_repr in arg_reprs)))
			result = []
			for res in function(self, iterator, *args):
				result.append(res)
				yield res
			MODULE.info('%s returned: %r' % (name, result))
	else:
		# remove self
		names = names[1:]
		def _response(self, *args):
			arg_repr = _log(names, args)
			if arg_repr:
				MODULE.info('%s got: %s' % (name, ', '.join(arg_repr)))
			result = function(self, *args)
			MODULE.info('%s returned: %r' % (name, result))
			return result
	copy_function_meta_data(function, _response, copy_arg_inspect=True)
	return _response 
[docs]def file_upload(function):
	''' This decorator restricts requests to be
	UPLOAD-commands. Simple, yet effective '''
	def _response(self, request):
		if request.command != 'UPLOAD':
			raise UMC_Error(_('%s can only be used as UPLOAD') % (function.__name__))
		return function(self, request)
	copy_function_meta_data(function, _response)
	prevent_referer_check(_response)
	prevent_xsrf_check(_response)
	return _response 
[docs]class reloading_ucr(object):
	_last_reload = dict()
	def __init__(self, ucr, timeout=0.2):
		self._ucr = ucr
		self._timeout = timeout
	def __call__(self, func):
		@functools.wraps(func)
		def wrapper(*args, **kwargs):
			last_reload = self._last_reload.get(id(self._ucr), 0)
			if last_reload == 0 or time.time() - last_reload > self._timeout:
				self._ucr.load()
				self._last_reload[id(self._ucr)] = time.time()
			return func(*args, **kwargs)
		return wrapper 
[docs]def require_password(function):
	@functools.wraps(function)
	def _decorated(self, request, *args, **kwargs):
		self.require_password()
		return function(self, request, *args, **kwargs)
	return _decorated 
def allow_get_request(function=None, xsrf_check=False, referer_check=False):
	"""Allows HTTP GET requests. Additionally prevents the XSRF check and the referer check."""
	def _decorator(function):
		if not xsrf_check:
			prevent_xsrf_check(function)
		if not referer_check:
			prevent_referer_check(function)
		function.allow_get = True
		return function
	if function is None:
		return _decorator
	return _decorator(function)
def prevent_xsrf_check(function):
	function.xsrf_protection = False
	return function
def prevent_referer_check(function):
	function.referer_protection = False
	return function
__all__ = ['simple_response', 'multi_response', 'sanitize', 'log', 'sanitize_list', 'sanitize_dict', 'file_upload', 'reloading_ucr', 'require_password']