mirror of
https://github.com/samba-team/samba.git
synced 2025-02-24 13:57:43 +03:00
pycredentials: add function to return the netr_Authenticator
Add method new_client_authenticator that returns data to allow a netr_Authenticator to be constructed. Allows python to make netr_LogonSamLogonWithFlags, netr_LogonGetDomainInfo and similar calls Signed-off-by: Gary Lockyer <gary@catalyst.net.nz> Reviewed-by: Garming Sam <garming@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
parent
7539595c48
commit
b68a3374a5
@ -26,6 +26,8 @@
|
||||
#include "libcli/util/pyerrors.h"
|
||||
#include "param/pyparam.h"
|
||||
#include <tevent.h>
|
||||
#include "libcli/auth/libcli_auth.h"
|
||||
#include "auth/credentials/credentials_internal.h"
|
||||
|
||||
void initcredentials(void);
|
||||
|
||||
@ -584,6 +586,39 @@ static PyObject *py_creds_get_gensec_features(PyObject *self, PyObject *args)
|
||||
return PyInt_FromLong(gensec_features);
|
||||
}
|
||||
|
||||
static PyObject *py_creds_new_client_authenticator(PyObject *self,
|
||||
PyObject *args)
|
||||
{
|
||||
struct netr_Authenticator auth;
|
||||
struct cli_credentials *creds = NULL;
|
||||
struct netlogon_creds_CredentialState *nc = NULL;
|
||||
PyObject *ret = NULL;
|
||||
|
||||
creds = PyCredentials_AsCliCredentials(self);
|
||||
if (creds == NULL) {
|
||||
PyErr_SetString(PyExc_RuntimeError,
|
||||
"Failed to get credentials from python");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nc = creds->netlogon_creds;
|
||||
if (nc == NULL) {
|
||||
PyErr_SetString(PyExc_ValueError,
|
||||
"No netlogon credentials cannot make "
|
||||
"client authenticator");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
netlogon_creds_client_authenticator(
|
||||
nc,
|
||||
&auth);
|
||||
ret = Py_BuildValue("{ss#si}",
|
||||
"credential",
|
||||
(const char *) &auth.cred, sizeof(auth.cred),
|
||||
"timestamp", auth.timestamp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static PyObject *py_creds_set_secure_channel_type(PyObject *self, PyObject *args)
|
||||
{
|
||||
unsigned int channel_type;
|
||||
@ -700,6 +735,11 @@ static PyMethodDef py_creds_methods[] = {
|
||||
{ "set_forced_sasl_mech", py_creds_set_forced_sasl_mech, METH_VARARGS,
|
||||
"S.set_forced_sasl_mech(name) -> None\n"
|
||||
"Set forced SASL mechanism." },
|
||||
{ "new_client_authenticator",
|
||||
py_creds_new_client_authenticator,
|
||||
METH_NOARGS,
|
||||
"S.new_client_authenticator() -> Authenticator\n"
|
||||
"Get a new client NETLOGON_AUTHENTICATOR"},
|
||||
{ "set_secure_channel_type", py_creds_set_secure_channel_type,
|
||||
METH_VARARGS, NULL },
|
||||
{ NULL }
|
||||
|
241
python/samba/tests/py_credentials.py
Normal file
241
python/samba/tests/py_credentials.py
Normal file
@ -0,0 +1,241 @@
|
||||
# Integration tests for pycredentials
|
||||
#
|
||||
# Copyright (C) Catalyst IT Ltd. 2017
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
from samba.tests import TestCase, delete_force
|
||||
import os
|
||||
|
||||
import samba
|
||||
from samba.auth import system_session
|
||||
from samba.credentials import Credentials, CLI_CRED_NTLMv2_AUTH
|
||||
from samba.dcerpc import netlogon, ntlmssp
|
||||
from samba.dcerpc.netlogon import netr_Authenticator, netr_WorkstationInformation
|
||||
from samba.dcerpc.misc import SEC_CHAN_WKSTA
|
||||
from samba.dsdb import (
|
||||
UF_WORKSTATION_TRUST_ACCOUNT,
|
||||
UF_PASSWD_NOTREQD,
|
||||
UF_NORMAL_ACCOUNT)
|
||||
from samba.ndr import ndr_pack
|
||||
from samba.samdb import SamDB
|
||||
"""
|
||||
Integration tests for pycredentials
|
||||
"""
|
||||
|
||||
MACHINE_NAME = "PCTM"
|
||||
USER_NAME = "PCTU"
|
||||
|
||||
class PyCredentialsTests(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(PyCredentialsTests, self).setUp()
|
||||
|
||||
self.server = os.environ["SERVER"]
|
||||
self.domain = os.environ["DOMAIN"]
|
||||
self.host = os.environ["SERVER_IP"]
|
||||
self.lp = self.get_loadparm()
|
||||
|
||||
self.credentials = self.get_credentials()
|
||||
|
||||
self.session = system_session()
|
||||
self.ldb = SamDB(url="ldap://%s" % self.host,
|
||||
session_info=self.session,
|
||||
credentials=self.credentials,
|
||||
lp=self.lp)
|
||||
|
||||
self.create_machine_account()
|
||||
self.create_user_account()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
super(PyCredentialsTests, self).tearDown()
|
||||
delete_force(self.ldb, self.machine_dn)
|
||||
delete_force(self.ldb, self.user_dn)
|
||||
|
||||
# Until a successful netlogon connection has been established there will
|
||||
# not be a valid authenticator associated with the credentials
|
||||
# and new_client_authenticator should throw a ValueError
|
||||
def test_no_netlogon_connection(self):
|
||||
self.assertRaises(ValueError,
|
||||
self.machine_creds.new_client_authenticator)
|
||||
|
||||
# Once a netlogon connection has been established,
|
||||
# new_client_authenticator should return a value
|
||||
#
|
||||
def test_have_netlogon_connection(self):
|
||||
c = self.get_netlogon_connection()
|
||||
a = self.machine_creds.new_client_authenticator()
|
||||
self.assertIsNotNone(a)
|
||||
|
||||
# Get an authenticator and use it on a sequence of operations requiring
|
||||
# an authenticator
|
||||
def test_client_authenticator(self):
|
||||
c = self.get_netlogon_connection()
|
||||
(authenticator, subsequent) = self.get_authenticator(c)
|
||||
self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
|
||||
(authenticator, subsequent) = self.get_authenticator(c)
|
||||
self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
|
||||
(authenticator, subsequent) = self.get_authenticator(c)
|
||||
self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
|
||||
(authenticator, subsequent) = self.get_authenticator(c)
|
||||
self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
|
||||
|
||||
|
||||
|
||||
#
|
||||
# Establish aealed schannel netlogon connection over TCP/IP
|
||||
def get_netlogon_connection(self):
|
||||
return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
|
||||
self.lp,
|
||||
self.machine_creds)
|
||||
|
||||
#
|
||||
# Create the machine account
|
||||
def create_machine_account(self):
|
||||
self.machine_pass = samba.generate_random_password(32, 32)
|
||||
self.machine_name = MACHINE_NAME
|
||||
self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
|
||||
|
||||
# remove the account if it exists, this will happen if a previous test
|
||||
# run failed
|
||||
delete_force(self.ldb, self.machine_dn)
|
||||
|
||||
utf16pw = unicode(
|
||||
'"' + self.machine_pass.encode('utf-8') + '"', 'utf-8'
|
||||
).encode('utf-16-le')
|
||||
self.ldb.add({
|
||||
"dn": self.machine_dn,
|
||||
"objectclass": "computer",
|
||||
"sAMAccountName": "%s$" % self.machine_name,
|
||||
"userAccountControl":
|
||||
str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
|
||||
"unicodePwd": utf16pw})
|
||||
|
||||
self.machine_creds = Credentials()
|
||||
self.machine_creds.guess(self.get_loadparm())
|
||||
self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
|
||||
self.machine_creds.set_password(self.machine_pass)
|
||||
self.machine_creds.set_username(self.machine_name + "$")
|
||||
|
||||
#
|
||||
# Create a test user account
|
||||
def create_user_account(self):
|
||||
self.user_pass = samba.generate_random_password(32, 32)
|
||||
self.user_name = USER_NAME
|
||||
self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
|
||||
|
||||
# remove the account if it exists, this will happen if a previous test
|
||||
# run failed
|
||||
delete_force(self.ldb, self.user_dn)
|
||||
|
||||
utf16pw = unicode(
|
||||
'"' + self.user_pass.encode('utf-8') + '"', 'utf-8'
|
||||
).encode('utf-16-le')
|
||||
self.ldb.add({
|
||||
"dn": self.user_dn,
|
||||
"objectclass": "user",
|
||||
"sAMAccountName": "%s" % self.user_name,
|
||||
"userAccountControl": str(UF_NORMAL_ACCOUNT),
|
||||
"unicodePwd": utf16pw})
|
||||
|
||||
self.user_creds = Credentials()
|
||||
self.user_creds.guess(self.get_loadparm())
|
||||
self.user_creds.set_password(self.user_pass)
|
||||
self.user_creds.set_username(self.user_name)
|
||||
pass
|
||||
|
||||
#
|
||||
# Get the authenticator from the machine creds.
|
||||
def get_authenticator(self, c):
|
||||
auth = self.machine_creds.new_client_authenticator();
|
||||
current = netr_Authenticator()
|
||||
current.cred.data = [ord(x) for x in auth["credential"]]
|
||||
current.timestamp = auth["timestamp"]
|
||||
|
||||
subsequent = netr_Authenticator()
|
||||
return (current, subsequent)
|
||||
|
||||
def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
|
||||
logon = samlogon_logon_info(self.domain,
|
||||
self.machine_name,
|
||||
self.user_creds)
|
||||
|
||||
logon_level = netlogon.NetlogonNetworkTransitiveInformation
|
||||
validation_level = netlogon.NetlogonValidationSamInfo4
|
||||
netr_flags = 0
|
||||
c.netr_LogonSamLogonWithFlags(self.server,
|
||||
self.user_creds.get_workstation(),
|
||||
current,
|
||||
subsequent,
|
||||
logon_level,
|
||||
logon,
|
||||
validation_level,
|
||||
netr_flags)
|
||||
|
||||
def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
|
||||
query = netr_WorkstationInformation()
|
||||
|
||||
c.netr_LogonGetDomainInfo(self.server,
|
||||
self.user_creds.get_workstation(),
|
||||
current,
|
||||
subsequent,
|
||||
2,
|
||||
query)
|
||||
|
||||
#
|
||||
# Build the logon data required by NetrLogonSamLogonWithFlags
|
||||
def samlogon_logon_info(domain_name, computer_name, creds):
|
||||
|
||||
target_info_blob = samlogon_target(domain_name, computer_name)
|
||||
|
||||
challenge = b"abcdefgh"
|
||||
# User account under test
|
||||
response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH,
|
||||
challenge=challenge,
|
||||
target_info=target_info_blob)
|
||||
|
||||
logon = netlogon.netr_NetworkInfo()
|
||||
|
||||
logon.challenge = [ord(x) for x in challenge]
|
||||
logon.nt = netlogon.netr_ChallengeResponse()
|
||||
logon.nt.length = len(response["nt_response"])
|
||||
logon.nt.data = [ord(x) for x in response["nt_response"]]
|
||||
logon.identity_info = netlogon.netr_IdentityInfo()
|
||||
|
||||
(username, domain) = creds.get_ntlm_username_domain()
|
||||
logon.identity_info.domain_name.string = domain
|
||||
logon.identity_info.account_name.string = username
|
||||
logon.identity_info.workstation.string = creds.get_workstation()
|
||||
|
||||
return logon
|
||||
|
||||
#
|
||||
# Build the samlogon target info.
|
||||
def samlogon_target(domain_name, computer_name):
|
||||
target_info = ntlmssp.AV_PAIR_LIST()
|
||||
target_info.count = 3
|
||||
computername = ntlmssp.AV_PAIR()
|
||||
computername.AvId = ntlmssp.MsvAvNbComputerName
|
||||
computername.Value = computer_name
|
||||
|
||||
domainname = ntlmssp.AV_PAIR()
|
||||
domainname.AvId = ntlmssp.MsvAvNbDomainName
|
||||
domainname.Value = domain_name
|
||||
|
||||
eol = ntlmssp.AV_PAIR()
|
||||
eol.AvId = ntlmssp.MsvAvEOL
|
||||
target_info.pair = [domainname, computername, eol]
|
||||
|
||||
return ndr_pack(target_info)
|
@ -659,6 +659,9 @@ planoldpythontestsuite("ad_dc",
|
||||
extra_args=['-U"$USERNAME%$PASSWORD"'])
|
||||
|
||||
planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.lsa_string")
|
||||
planoldpythontestsuite("ad_dc_ntvfs",
|
||||
"samba.tests.py_credentials",
|
||||
extra_args=['-U"$USERNAME%$PASSWORD"'])
|
||||
|
||||
plantestsuite_loadlist("samba4.ldap.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
|
||||
plantestsuite_loadlist("samba4.tokengroups.krb5.python(ad_dc_ntvfs)", "ad_dc_ntvfs:local", [python, os.path.join(samba4srcdir, "dsdb/tests/python/token_group.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '-k', 'yes', '$LOADLIST', '$LISTOPT'])
|
||||
|
Loading…
x
Reference in New Issue
Block a user