1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-02 09:47:23 +03:00

s4-gensec Extend python bindings for GENSEC and the associated test

This now tests a real GENSEC exchange, including wrap and unwrap,
using GSSAPI.  Therefore, it now needs to access a KDC.

Andrew Bartlett

Autobuild-User: Andrew Bartlett <abartlet@samba.org>
Autobuild-Date: Tue Jan 18 11:41:26 CET 2011 on sn-devel-104
This commit is contained in:
Andrew Bartlett 2011-01-18 19:14:45 +11:00
parent 24a4b9a738
commit a1e1f02efe
3 changed files with 275 additions and 28 deletions

View File

@ -20,6 +20,7 @@
#include "includes.h" #include "includes.h"
#include "param/pyparam.h" #include "param/pyparam.h"
#include "auth/gensec/gensec.h" #include "auth/gensec/gensec.h"
#include "auth/credentials/pycredentials.h"
#include "libcli/util/pyerrors.h" #include "libcli/util/pyerrors.h"
#include "scripting/python/modules.h" #include "scripting/python/modules.h"
#include "lib/talloc/pytalloc.h" #include "lib/talloc/pytalloc.h"
@ -84,7 +85,7 @@ static PyObject *py_gensec_start_client(PyTypeObject *type, PyObject *args, PyOb
struct tevent_context *ev; struct tevent_context *ev;
struct gensec_security *gensec; struct gensec_security *gensec;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", discard_const_p(char *, kwnames), &py_settings)) if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O", discard_const_p(char *, kwnames), &py_settings))
return NULL; return NULL;
self = (py_talloc_Object*)type->tp_alloc(type, 0); self = (py_talloc_Object*)type->tp_alloc(type, 0);
@ -98,10 +99,20 @@ static PyObject *py_gensec_start_client(PyTypeObject *type, PyObject *args, PyOb
return NULL; return NULL;
} }
settings = settings_from_object(self->talloc_ctx, py_settings); if (py_settings != Py_None) {
if (settings == NULL) { settings = settings_from_object(self->talloc_ctx, py_settings);
PyObject_DEL(self); if (settings == NULL) {
return NULL; PyObject_DEL(self);
return NULL;
}
} else {
settings = talloc_zero(self->talloc_ctx, struct gensec_settings);
if (settings == NULL) {
PyObject_DEL(self);
return NULL;
}
settings->lp_ctx = loadparm_init_global(true);
} }
ev = tevent_context_init(self->talloc_ctx); ev = tevent_context_init(self->talloc_ctx);
@ -134,15 +145,15 @@ static PyObject *py_gensec_start_server(PyTypeObject *type, PyObject *args, PyOb
{ {
NTSTATUS status; NTSTATUS status;
py_talloc_Object *self; py_talloc_Object *self;
struct gensec_settings *settings; struct gensec_settings *settings = NULL;
const char *kwnames[] = { "auth_context", "settings", NULL }; const char *kwnames[] = { "settings", "auth_context", NULL };
PyObject *py_settings; PyObject *py_settings = Py_None;
PyObject *py_auth_context; PyObject *py_auth_context = Py_None;
struct tevent_context *ev; struct tevent_context *ev;
struct gensec_security *gensec; struct gensec_security *gensec;
struct auth_context *auth_context; struct auth_context *auth_context = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO", discard_const_p(char *, kwnames), &py_auth_context, &py_settings)) if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO", discard_const_p(char *, kwnames), &py_settings, &py_auth_context))
return NULL; return NULL;
self = (py_talloc_Object*)type->tp_alloc(type, 0); self = (py_talloc_Object*)type->tp_alloc(type, 0);
@ -156,10 +167,20 @@ static PyObject *py_gensec_start_server(PyTypeObject *type, PyObject *args, PyOb
return NULL; return NULL;
} }
settings = settings_from_object(self->talloc_ctx, py_settings); if (py_settings != Py_None) {
if (settings == NULL) { settings = settings_from_object(self->talloc_ctx, py_settings);
PyObject_DEL(self); if (settings == NULL) {
return NULL; PyObject_DEL(self);
return NULL;
}
} else {
settings = talloc_zero(self->talloc_ctx, struct gensec_settings);
if (settings == NULL) {
PyObject_DEL(self);
return NULL;
}
settings->lp_ctx = loadparm_init_global(true);
} }
ev = tevent_context_init(self->talloc_ctx); ev = tevent_context_init(self->talloc_ctx);
@ -169,7 +190,15 @@ static PyObject *py_gensec_start_server(PyTypeObject *type, PyObject *args, PyOb
return NULL; return NULL;
} }
auth_context = py_talloc_get_type(self, struct auth_context); if (py_auth_context != Py_None) {
auth_context = py_talloc_get_type(py_auth_context, struct auth_context);
if (!auth_context) {
PyErr_Format(PyExc_TypeError,
"Expected auth.AuthContext for auth_context argument, got %s",
talloc_get_name(py_talloc_get_ptr(py_auth_context)));
return NULL;
}
}
status = gensec_init(settings->lp_ctx); status = gensec_init(settings->lp_ctx);
if (!NT_STATUS_IS_OK(status)) { if (!NT_STATUS_IS_OK(status)) {
@ -190,6 +219,32 @@ static PyObject *py_gensec_start_server(PyTypeObject *type, PyObject *args, PyOb
return (PyObject *)self; return (PyObject *)self;
} }
static PyObject *py_gensec_set_credentials(PyObject *self, PyObject *args)
{
PyObject *py_creds = Py_None;
struct cli_credentials *creds;
struct gensec_security *security = py_talloc_get_type(self, struct gensec_security);
NTSTATUS status;
if (!PyArg_ParseTuple(args, "O", &py_creds))
return NULL;
creds = PyCredentials_AsCliCredentials(py_creds);
if (!creds) {
PyErr_Format(PyExc_TypeError,
"Expected samba.credentaials for credentials argument got %s",
talloc_get_name(py_talloc_get_ptr(py_creds)));
}
status = gensec_set_credentials(security, creds);
if (!NT_STATUS_IS_OK(status)) {
PyErr_SetNTSTATUS(status);
return NULL;
}
Py_RETURN_NONE;
}
static PyObject *py_gensec_session_info(PyObject *self) static PyObject *py_gensec_session_info(PyObject *self)
{ {
NTSTATUS status; NTSTATUS status;
@ -206,7 +261,7 @@ static PyObject *py_gensec_session_info(PyObject *self)
return NULL; return NULL;
} }
py_session_info = py_return_ndr_struct("samba.auth", "session_info", py_session_info = py_return_ndr_struct("samba.auth", "AuthSession",
info, info); info, info);
return py_session_info; return py_session_info;
} }
@ -229,6 +284,24 @@ static PyObject *py_gensec_start_mech_by_name(PyObject *self, PyObject *args)
Py_RETURN_NONE; Py_RETURN_NONE;
} }
static PyObject *py_gensec_start_mech_by_sasl_name(PyObject *self, PyObject *args)
{
char *sasl_name;
struct gensec_security *security = py_talloc_get_type(self, struct gensec_security);
NTSTATUS status;
if (!PyArg_ParseTuple(args, "s", &sasl_name))
return NULL;
status = gensec_start_mech_by_sasl_name(security, sasl_name);
if (!NT_STATUS_IS_OK(status)) {
PyErr_SetNTSTATUS(status);
return NULL;
}
Py_RETURN_NONE;
}
static PyObject *py_gensec_start_mech_by_authtype(PyObject *self, PyObject *args) static PyObject *py_gensec_start_mech_by_authtype(PyObject *self, PyObject *args)
{ {
int authtype, level; int authtype, level;
@ -246,6 +319,33 @@ static PyObject *py_gensec_start_mech_by_authtype(PyObject *self, PyObject *args
Py_RETURN_NONE; Py_RETURN_NONE;
} }
static PyObject *py_gensec_want_feature(PyObject *self, PyObject *args)
{
int feature;
struct gensec_security *security = py_talloc_get_type(self, struct gensec_security);
/* This is i (and declared as an int above) by design, as they are handled as an integer in python */
if (!PyArg_ParseTuple(args, "i", &feature))
return NULL;
gensec_want_feature(security, feature);
Py_RETURN_NONE;
}
static PyObject *py_gensec_have_feature(PyObject *self, PyObject *args)
{
int feature;
struct gensec_security *security = py_talloc_get_type(self, struct gensec_security);
/* This is i (and declared as an int above) by design, as they are handled as an integer in python */
if (!PyArg_ParseTuple(args, "i", &feature))
return NULL;
if (gensec_have_feature(security, feature)) {
return Py_True;
}
return Py_False;
}
static PyObject *py_gensec_update(PyObject *self, PyObject *args) static PyObject *py_gensec_update(PyObject *self, PyObject *args)
{ {
NTSTATUS status; NTSTATUS status;
@ -260,18 +360,107 @@ static PyObject *py_gensec_update(PyObject *self, PyObject *args)
mem_ctx = talloc_new(NULL); mem_ctx = talloc_new(NULL);
in.data = (uint8_t *)PyString_AsString(py_in); if (py_in == Py_None) {
in.length = PyString_Size(py_in); in = data_blob_null;
} else {
in.data = (uint8_t *)PyString_AsString(py_in);
in.length = PyString_Size(py_in);
}
status = gensec_update(security, mem_ctx, in, &out); status = gensec_update(security, mem_ctx, in, &out);
if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)
&& !NT_STATUS_IS_OK(status)) {
PyErr_SetNTSTATUS(status);
talloc_free(mem_ctx);
return NULL;
}
if (out.length != 0) {
ret = PyString_FromStringAndSize((const char *)out.data, out.length);
} else {
ret = Py_None;
}
talloc_free(mem_ctx);
if (NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED))
{
return PyTuple_Pack(2, Py_False, ret);
} else {
return PyTuple_Pack(2, Py_True, ret);
}
}
static PyObject *py_gensec_wrap(PyObject *self, PyObject *args)
{
NTSTATUS status;
TALLOC_CTX *mem_ctx;
DATA_BLOB in, out;
PyObject *ret, *py_in;
struct gensec_security *security = py_talloc_get_type(self, struct gensec_security);
if (!PyArg_ParseTuple(args, "O", &py_in))
return NULL;
mem_ctx = talloc_new(NULL);
if (py_in == Py_None) {
in = data_blob_null;
} else {
in.data = (uint8_t *)PyString_AsString(py_in);
in.length = PyString_Size(py_in);
}
status = gensec_wrap(security, mem_ctx, &in, &out);
if (!NT_STATUS_IS_OK(status)) { if (!NT_STATUS_IS_OK(status)) {
PyErr_SetNTSTATUS(status); PyErr_SetNTSTATUS(status);
talloc_free(mem_ctx); talloc_free(mem_ctx);
return NULL; return NULL;
} }
ret = PyString_FromStringAndSize((const char *)out.data, out.length); if (out.length != 0) {
ret = PyString_FromStringAndSize((const char *)out.data, out.length);
} else {
ret = Py_None;
}
talloc_free(mem_ctx);
return ret;
}
static PyObject *py_gensec_unwrap(PyObject *self, PyObject *args)
{
NTSTATUS status;
TALLOC_CTX *mem_ctx;
DATA_BLOB in, out;
PyObject *ret, *py_in;
struct gensec_security *security = py_talloc_get_type(self, struct gensec_security);
if (!PyArg_ParseTuple(args, "O", &py_in))
return NULL;
mem_ctx = talloc_new(NULL);
if (py_in == Py_None) {
in = data_blob_null;
} else {
in.data = (uint8_t *)PyString_AsString(py_in);
in.length = PyString_Size(py_in);
}
status = gensec_unwrap(security, mem_ctx, &in, &out);
if (!NT_STATUS_IS_OK(status)) {
PyErr_SetNTSTATUS(status);
talloc_free(mem_ctx);
return NULL;
}
if (out.length != 0) {
ret = PyString_FromStringAndSize((const char *)out.data, out.length);
} else {
ret = Py_None;
}
talloc_free(mem_ctx); talloc_free(mem_ctx);
return ret; return ret;
} }
@ -281,15 +470,28 @@ static PyMethodDef py_gensec_security_methods[] = {
"S.start_client(settings) -> gensec" }, "S.start_client(settings) -> gensec" },
{ "start_server", (PyCFunction)py_gensec_start_server, METH_VARARGS|METH_KEYWORDS|METH_CLASS, { "start_server", (PyCFunction)py_gensec_start_server, METH_VARARGS|METH_KEYWORDS|METH_CLASS,
"S.start_server(auth_ctx, settings) -> gensec" }, "S.start_server(auth_ctx, settings) -> gensec" },
{ "set_credentials", (PyCFunction)py_gensec_set_credentials, METH_VARARGS,
"S.start_client(credentials)" },
{ "session_info", (PyCFunction)py_gensec_session_info, METH_NOARGS, { "session_info", (PyCFunction)py_gensec_session_info, METH_NOARGS,
"S.session_info() -> info" }, "S.session_info() -> info" },
{ "start_mech_by_name", (PyCFunction)py_gensec_start_mech_by_name, METH_VARARGS, { "start_mech_by_name", (PyCFunction)py_gensec_start_mech_by_name, METH_VARARGS,
"S.start_mech_by_name(name)" }, "S.start_mech_by_name(name)" },
{ "start_mech_by_sasl_name", (PyCFunction)py_gensec_start_mech_by_sasl_name, METH_VARARGS,
"S.start_mech_by_sasl_name(name)" },
{ "start_mech_by_authtype", (PyCFunction)py_gensec_start_mech_by_authtype, METH_VARARGS, "S.start_mech_by_authtype(authtype, level)" }, { "start_mech_by_authtype", (PyCFunction)py_gensec_start_mech_by_authtype, METH_VARARGS, "S.start_mech_by_authtype(authtype, level)" },
{ "get_name_by_authtype", (PyCFunction)py_get_name_by_authtype, METH_VARARGS, { "get_name_by_authtype", (PyCFunction)py_get_name_by_authtype, METH_VARARGS,
"S.get_name_by_authtype(authtype) -> name\nLookup an auth type." }, "S.get_name_by_authtype(authtype) -> name\nLookup an auth type." },
{ "want_feature", (PyCFunction)py_gensec_want_feature, METH_VARARGS,
"S.want_feature(feature)\n Request that GENSEC negotiate a particular feature." },
{ "have_feature", (PyCFunction)py_gensec_have_feature, METH_VARARGS,
"S.have_feature()\n Return True if GENSEC negotiated a particular feature." },
{ "update", (PyCFunction)py_gensec_update, METH_VARARGS, { "update", (PyCFunction)py_gensec_update, METH_VARARGS,
"S.update(blob_in) -> blob_out\nPerform one step in a GENSEC dance." }, "S.update(blob_in) -> (finished, blob_out)\nPerform one step in a GENSEC dance. Repeat with new packets until finished is true or exception." },
{ "wrap", (PyCFunction)py_gensec_wrap, METH_VARARGS,
"S.wrap(blob_in) -> blob_out\nPackage one clear packet into a wrapped GENSEC packet." },
{ "unwrap", (PyCFunction)py_gensec_unwrap, METH_VARARGS,
"S.unwrap(blob_in) -> blob_out\nPerform one wrapped GENSEC packet into a clear packet." },
{ NULL } { NULL }
}; };

View File

@ -23,17 +23,19 @@ Note that this just tests the bindings work. It does not intend to test
the functionality, that's already done in other tests. the functionality, that's already done in other tests.
""" """
from samba.credentials import Credentials
from samba import gensec from samba import gensec
import samba.tests import samba.tests
class CredentialsTests(samba.tests.TestCase): class GensecTests(samba.tests.TestCase):
def setUp(self): def setUp(self):
super(CredentialsTests, self).setUp() super(GensecTests, self).setUp()
settings = {} self.settings = {}
settings["target_hostname"] = "localhost" self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm()
settings["lp_ctx"] = samba.tests.env_loadparm() self.settings["target_hostname"] = self.lp_ctx.get("netbios name")
self.gensec = gensec.Security.start_client(settings) """This is just for the API tests"""
self.gensec = gensec.Security.start_client(self.settings)
def test_start_mech_by_unknown_name(self): def test_start_mech_by_unknown_name(self):
self.assertRaises(RuntimeError, self.gensec.start_mech_by_name, "foo") self.assertRaises(RuntimeError, self.gensec.start_mech_by_name, "foo")
@ -43,3 +45,46 @@ class CredentialsTests(samba.tests.TestCase):
def test_info_uninitialized(self): def test_info_uninitialized(self):
self.assertRaises(RuntimeError, self.gensec.session_info) self.assertRaises(RuntimeError, self.gensec.session_info)
def test_update(self):
"""Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC"""
"""Start up a client and server GENSEC instance to test things with"""
self.gensec_client = gensec.Security.start_client(self.settings)
self.gensec_client.set_credentials(self.get_credentials())
self.gensec_client.want_feature(gensec.FEATURE_SEAL)
self.gensec_client.start_mech_by_sasl_name("GSSAPI")
self.gensec_server = gensec.Security.start_server(self.settings)
creds = Credentials()
creds.guess(self.lp_ctx)
creds.set_machine_account(self.lp_ctx)
self.gensec_server.set_credentials(creds)
self.gensec_server.want_feature(gensec.FEATURE_SEAL)
self.gensec_server.start_mech_by_sasl_name("GSSAPI")
client_finished = False
server_finished = False
server_to_client = None
"""Run the actual call loop"""
while client_finished == False and server_finished == False:
if not client_finished:
print "running client gensec_update"
(client_finished, client_to_server) = self.gensec_client.update(server_to_client)
if not server_finished:
print "running server gensec_update"
(server_finished, server_to_client) = self.gensec_server.update(client_to_server)
session_info = self.gensec_server.session_info()
test_string = "Hello Server"
test_wrapped = self.gensec_client.wrap(test_string)
test_unwrapped = self.gensec_server.unwrap(test_wrapped)
self.assertEqual(test_string, test_unwrapped)
test_string = "Hello Client"
test_wrapped = self.gensec_server.wrap(test_string)
test_unwrapped = self.gensec_client.unwrap(test_wrapped)
self.assertEqual(test_string, test_unwrapped)

View File

@ -474,7 +474,7 @@ def plansambapythontestsuite(name, env, path, module, environ={}, extra_args=[])
plansambapythontestsuite("ldb.python", "none", "./lib/ldb/tests/python/", 'api') plansambapythontestsuite("ldb.python", "none", "./lib/ldb/tests/python/", 'api')
planpythontestsuite("none", "samba.tests.credentials") planpythontestsuite("none", "samba.tests.credentials")
planpythontestsuite("none", "samba.tests.gensec") plantestsuite_idlist("samba.tests.gensec", "dc:local", [subunitrun, "$LISTOPT", '-U"$USERNAME%$PASSWORD"', "samba.tests.gensec"])
planpythontestsuite("none", "samba.tests.registry") planpythontestsuite("none", "samba.tests.registry")
plansambapythontestsuite("tdb.python", "none", "../lib/tdb/python/tests", 'simple') plansambapythontestsuite("tdb.python", "none", "../lib/tdb/python/tests", 'simple')
planpythontestsuite("none", "samba.tests.auth") planpythontestsuite("none", "samba.tests.auth")