# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
|UDM| password encryption methods.
"""
from __future__ import absolute_import
import re
import bcrypt
import hashlib
from typing import List, Optional, Tuple  # noqa: F401
import heimdal
import passlib.hash
import univention.debug as ud
from univention.admin._ucr import configRegistry
RE_PASSWORD_SCHEME = re.compile(r'^{(\w+)}(!?)(.*)', re.I)
[docs]def crypt(password, method_id=None, salt=None):
	# type: (str, Optional[str], Optional[str]) -> str
	"""
	Return crypt hash.
	:param password: password string.
	:param method_id: optional hash type, MD5, SHA256/SHA-256, SHA512/SHA-512.
	:param salt: salt for randomize the hashing.
	:returns: the hashed password string.
	"""
	hashing_method = configRegistry.get('password/hashing/method', 'sha-512').upper()
	if salt is None:
		salt = ''
		valid = [
			'.', '/', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
			'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v',
			'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
			'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
			'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5',
			'6', '7', '8', '9']
		urandom = open("/dev/urandom", "rb")
		for i in range(0, 16):  # up to 16 bytes of salt are evaluated by crypt(3), overhead is ignored
			o = ord(urandom.read(1))
			while not o < 256 // len(valid) * len(valid):  # make sure not to skew the distribution when using modulo
				o = ord(urandom.read(1))
			salt = salt + valid[(o % len(valid))]
		urandom.close()
	if method_id is None:
		method_id = {
			'MD5': '1',
			'SHA256': '5',
			'SHA-256': '5',
			'SHA512': '6',
			'SHA-512': '6',
		}.get(hashing_method, '6')
	from crypt import crypt as _crypt
	return _crypt(password, '$%s$%s$' % (method_id, salt, )) 
[docs]def bcrypt_hash(password):
	# type: (str) -> str
	"""
	Return bcrypt hash.
	:param password: password string.
	:returns: the hashed password string.
	"""
	cost_factor = int(configRegistry.get('password/hashing/bcrypt/cost_factor', '12'))
	prefix = configRegistry.get('password/hashing/bcrypt/prefix', '2b').encode('utf8')
	salt = bcrypt.gensalt(rounds=cost_factor, prefix=prefix)
	return bcrypt.hashpw(password.encode('utf-8'), salt).decode('ASCII') 
[docs]def ntlm(password):
	# type: (str) -> Tuple[str, str]
	"""
	Return tuple with NT and LanMan hash.
	:param password: password string.
	:returns: 2-tuple (NT, LanMan)
	"""
	nt = passlib.hash.nthash.hash(password).upper()
	if configRegistry.is_true('password/samba/lmhash', False):
		lm = passlib.hash.lmhash.hash(password).upper()
	else:
		lm = ''
	return (nt, lm) 
[docs]def krb5_asn1(principal, password, krb5_context=None):
	# type: (str, str, Optional[heimdal.context]) -> List[bytes]
	"""
	Generate Kerberos password hashes.
	:param principal: Kerberos principal name.
	:param password: password string.
	:param krb5_context: optional Kerberos context.
	:returns: list of ASN1 encoded Kerberos hashes.
	"""
	list = []
	if not krb5_context:
		krb5_context = heimdal.context()
	for krb5_etype in krb5_context.get_permitted_enctypes():
		if str(krb5_etype) == 'des3-cbc-md5' and configRegistry.is_false('password/krb5/enctype/des3-cbc-md5', True):
			continue
		krb5_principal = heimdal.principal(krb5_context, principal)
		krb5_keyblock = heimdal.keyblock(krb5_context, krb5_etype, password, krb5_principal)
		krb5_salt = heimdal.salt(krb5_context, krb5_principal)
		list.append(heimdal.asn1_encode_key(krb5_keyblock, krb5_salt, 0))
	return list 
[docs]def is_locked(password):
	# type: (str) -> bool
	"""
	Check is the password (hash) is locked
	:param password: password hash.
	:returns: `True` when locked, `False` otherwise.
	>>> is_locked('foo')
	False
	>>> is_locked('{crypt}$1$foo')
	False
	>>> is_locked('{crypt}!$1$foo')
	True
	>>> is_locked('{KINIT}')
	False
	>>> is_locked('{LANMAN}!')
	True
	"""
	match = RE_PASSWORD_SCHEME.match(password or '')
	return match is not None and '!' == match.group(2) 
[docs]def unlock_password(password):
	# type: (str) -> str
	"""
	Remove prefix from password used for locking.
	:param password: password hash.
	:returns: the unlocked password hash.
	>>> unlock_password('{crypt}!$1$foo')
	'{crypt}$1$foo'
	>>> unlock_password('{LANMAN}!')
	'{LANMAN}'
	>>> unlock_password('{SASL}!')
	'{SASL}'
	>>> unlock_password('{KINIT}!')
	'{KINIT}'
	>>> unlock_password('{BCRYPT}!')
	'{BCRYPT}'
	"""
	if is_locked(password):
		match = RE_PASSWORD_SCHEME.match(password).groups()
		password = '{%s}%s' % (match[0], match[2])
	return password 
[docs]def lock_password(password):
	# type: (str) -> str
	"""
	Add prefix to password used for locking.
	:param password: password hash.
	:returns: the locked password hash.
	>>> lock_password('{crypt}$1$foo')
	'{crypt}!$1$foo'
	>>> lock_password('{LANMAN}')
	'{LANMAN}!'
	>>> lock_password('{SASL}')
	'{SASL}!'
	>>> lock_password('{KINIT}')
	'{KINIT}!'
	>>> lock_password('{BCRYPT}')
	'{BCRYPT}!'
	>>> lock_password('foo').startswith('{crypt}!$')
	True
	"""
	# cleartext password?
	if not RE_PASSWORD_SCHEME.match(password):
		if configRegistry.is_true('password/hashing/bcrypt'):
			return "{BCRYPT}!%s" % (bcrypt_hash(password))
		return "{crypt}!%s" % (crypt(password))
	if not is_locked(password):
		match = RE_PASSWORD_SCHEME.match(password).groups()
		password = '{%s}!%s' % (match[0], match[2])
	return password 
[docs]def password_is_auth_saslpassthrough(password):
	# type: (str) -> bool
	"""
	Check if the password hash indicates the use of |SASL|.
	:param apssword: password hash.
	:returns: `True` is |SASL| shall be used, `False` otherwise.
	"""
	return password.startswith('{SASL}') and configRegistry.get('directory/manager/web/modules/users/user/auth/saslpassthrough', 'no').lower() == 'keep' 
[docs]def get_password_history(password, pwhistory, pwhlen):
	# type: (str, str, int) -> str
	"""
	Append the given password as hash to the history of password hashes
	:param password: the new password.
	:param pwhistory: history of previous password hashes.
	:param pwhlen: length of the password history.
	:returns: modified password hash history.
	>>> get_password_history("a", "b", 0)
	'b'
	>>> len(get_password_history("a", "", 1).split(' '))
	1
	>>> len(get_password_history("a", "b", 1).split(' '))
	1
	>>> len(get_password_history("a", "b", 2).split(' '))
	2
	"""
	# create hash
	if configRegistry.is_true('password/hashing/bcrypt'):
		newpwhash = "{BCRYPT}%s" % (bcrypt_hash(password))
	else:
		newpwhash = crypt(password)
	# this preserves a temporary disabled history
	if pwhlen > 0:
		# split the history
		pwlist = pwhistory.strip().split(' ')
		# append new hash
		pwlist.append(newpwhash)
		# strip old hashes
		pwlist = pwlist[-pwhlen:]
		# build history
		pwhistory = ' '.join(pwlist)
	return pwhistory 
[docs]def password_already_used(password, pwhistory):
	# type: (str, str) -> bool
	"""
	Check if the password is already used in the password hash history.
	:param password: new password hash.
	:param pwhistory: history of previous password hashes.
	:returns: `True` when already used, `False` otherwise,
	>>> password_already_used('a', '')
	False
	>>> password_already_used('a', 'b')
	False
	>>> password_already_used('a', 'b ' + crypt('a'))
	True
	"""
	for line in pwhistory.split(" "):
		linesplit = line.split("$")  # $method_id$salt$password_hash
		try:
			if linesplit[0] == '{BCRYPT}':
				password_hash = line[len('{BCRYPT}'):]
				if bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('ASCII')):
					return True
			else:
				password_hash = crypt(password, linesplit[1], linesplit[2])
		except IndexError:  # old style password history entry, no method id/salt in there
			hash_algorithm = hashlib.new("sha1")
			hash_algorithm.update(password.encode("utf-8"))
			password_hash = hash_algorithm.hexdigest().upper()
		if password_hash == line:
			return True
	return False 
[docs]class PasswortHistoryPolicy(object):
	"""
	Policy for handling history of password hashes.
	"""
	def __init__(self, pwhistoryPolicy):
		super(PasswortHistoryPolicy, self).__init__()
		self.pwhistoryPolicy = pwhistoryPolicy
		self.pwhistoryLength = None
		self.pwhistoryPasswordLength = 0
		self.pwhistoryPasswordCheck = False
		self.expiryInterval = 0
		if pwhistoryPolicy:
			try:
				self.pwhistoryLength = max(0, int(pwhistoryPolicy['length'] or 0))
			except ValueError:
				ud.debug(ud.ADMIN, ud.WARN, 'Corrupt Password history policy (history length): %r' % (pwhistoryPolicy.dn,))
			try:
				self.pwhistoryPasswordLength = max(0, int(pwhistoryPolicy['pwLength'] or 0))
			except ValueError:
				ud.debug(ud.ADMIN, ud.WARN, 'Corrupt Password history policy (password length): %r' % (pwhistoryPolicy.dn,))
			self.pwhistoryPasswordCheck = (pwhistoryPolicy['pwQualityCheck'] or '').lower() in ['true', '1']
			try:
				self.expiryInterval = max(0, int(pwhistoryPolicy['expiryInterval'] or 0))
			except ValueError:
				ud.debug(ud.ADMIN, ud.WARN, 'Corrupt Password history policy (expiry interval): %r' % (pwhistoryPolicy.dn,))