1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-11 05:18:09 +03:00

r7294: implemented the irpc messaging system. This is the core of the

management system I proposed on samba-technical a couple of days
ago. Essentially it is a very lightweight way for any code in Samba to
make IDL based rpc calls to anywhere else in the code, without the
client or server having to go to the trouble of setting up a full rpc
service.

It can be used with any of our existing IDL, but I expect it will
mostly be used for a new set of Samba specific management calls.

The LOCAL-IRPC torture test demonstrates how it can be used by calling
the echo_AddOne() call over this transport.
(This used to be commit 3d589a0995)
This commit is contained in:
Andrew Tridgell 2005-06-05 06:53:07 +00:00 committed by Gerald (Jerry) Carter
parent 0384065235
commit bf1ffa283c
15 changed files with 508 additions and 3 deletions

View File

@ -30,5 +30,6 @@ struct messaging_context;
#define MSG_PONG 3
#define MSG_BRL_RETRY 4
#define MSG_PVFS_RETRY_OPEN 5
#define MSG_IRPC 6
#endif

View File

@ -4,5 +4,8 @@
[SUBSYSTEM::MESSAGING]
INIT_OBJ_FILES = \
lib/messaging/messaging.o
NOPROTO = YES
REQUIRED_SUBSYSTEMS = \
NDR_IRPC
# End SUBSYSTEM MESSAGING
################################################

View File

@ -0,0 +1,94 @@
/*
Unix SMB/CIFS implementation.
Samba internal rpc code - header
Copyright (C) Andrew Tridgell 2005
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/*
an incoming irpc message
*/
struct irpc_message {
uint32_t from;
};
/* don't allow calls to take too long */
#define IRPC_CALL_TIMEOUT 10
/* the server function type */
typedef NTSTATUS (*irpc_function_t)(struct irpc_message *, void *r);
/* register a server function with the irpc messaging system */
#define IRPC_REGISTER(msg_ctx, pipename, funcname, function) \
irpc_register(msg_ctx, &dcerpc_table_ ## pipename, \
DCERPC_ ## funcname, \
(irpc_function_t)function)
/* make a irpc call */
#define IRPC_CALL(msg_ctx, server_id, pipename, funcname, ptr) \
irpc_call(msg_ctx, server_id, &dcerpc_table_ ## pipename, DCERPC_ ## funcname, ptr)
/*
a pending irpc call
*/
struct irpc_request {
struct messaging_context *msg_ctx;
const struct dcerpc_interface_table *table;
int callnum;
int callid;
void *r;
NTSTATUS status;
BOOL done;
struct {
void (*fn)(struct irpc_request *);
void *private;
} async;
};
struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id,
struct event_context *ev);
NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
uint32_t msg_type, DATA_BLOB *data);
void messaging_register(struct messaging_context *msg, void *private,
uint32_t msg_type,
void (*fn)(struct messaging_context *, void *, uint32_t, uint32_t, DATA_BLOB *));
struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id,
struct event_context *ev);
NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server,
uint32_t msg_type, void *ptr);
void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void *private);
NTSTATUS irpc_register(struct messaging_context *msg_ctx,
const struct dcerpc_interface_table *table,
int call, irpc_function_t fn);
struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
uint32_t server_id,
const struct dcerpc_interface_table *table,
int callnum, void *r);
NTSTATUS irpc_call_recv(struct irpc_request *irpc);
NTSTATUS irpc_call(struct messaging_context *msg_ctx,
uint32_t server_id,
const struct dcerpc_interface_table *table,
int callnum, void *r);

View File

@ -27,6 +27,8 @@
#include "messages.h"
#include "dlinklist.h"
#include "lib/socket/socket.h"
#include "librpc/gen_ndr/ndr_irpc.h"
#include "lib/messaging/irpc.h"
/* change the message version with any incompatible changes in the protocol */
#define MESSAGING_VERSION 1
@ -37,6 +39,8 @@ struct messaging_context {
const char *path;
struct dispatch_fn *dispatch;
struct messaging_rec *pending;
struct irpc_list *irpc;
struct idr_context *idr;
struct {
struct event_context *ev;
@ -72,6 +76,10 @@ struct messaging_rec {
};
static void irpc_handler(struct messaging_context *, void *,
uint32_t, uint32_t, DATA_BLOB *);
/*
A useful function for testing the message system.
*/
@ -363,8 +371,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
msg->path = messaging_path(msg, server_id);
msg->server_id = server_id;
msg->dispatch = NULL;
msg->pending = NULL;
msg->dispatch = NULL;
msg->pending = NULL;
msg->idr = idr_init(msg);
msg->irpc = NULL;
status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
@ -393,6 +403,275 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
talloc_set_destructor(msg, messaging_destructor);
messaging_register(msg, NULL, MSG_PING, ping_message);
messaging_register(msg, NULL, MSG_IRPC, irpc_handler);
return msg;
}
/*
a list of registered irpc server functions
*/
struct irpc_list {
struct irpc_list *next, *prev;
struct GUID uuid;
const struct dcerpc_interface_table *table;
int callnum;
irpc_function_t fn;
};
/*
register a irpc server function
*/
NTSTATUS irpc_register(struct messaging_context *msg_ctx,
const struct dcerpc_interface_table *table,
int call, irpc_function_t fn)
{
struct irpc_list *irpc;
irpc = talloc(msg_ctx, struct irpc_list);
NT_STATUS_HAVE_NO_MEMORY(irpc);
irpc->table = table;
irpc->callnum = call;
irpc->fn = fn;
GUID_from_string(irpc->table->uuid, &irpc->uuid);
DLIST_ADD(msg_ctx->irpc, irpc);
return NT_STATUS_OK;
}
/*
handle an incoming irpc reply message
*/
static void irpc_handler_reply(struct messaging_context *msg_ctx,
struct ndr_pull *ndr, struct irpc_header *header)
{
struct irpc_request *irpc;
irpc = idr_find(msg_ctx->idr, header->callid);
if (irpc == NULL) return;
/* parse the reply data */
irpc->status = irpc->table->calls[irpc->callnum].ndr_pull(ndr, NDR_OUT, irpc->r);
if (NT_STATUS_IS_OK(irpc->status)) {
irpc->status = header->status;
}
irpc->done = True;
if (irpc->async.fn) {
irpc->async.fn(irpc);
}
}
/*
handle an incoming irpc request message
*/
static void irpc_handler_request(struct messaging_context *msg_ctx,
struct ndr_pull *ndr, struct irpc_header *header,
uint32_t src)
{
struct irpc_list *i;
void *r;
NTSTATUS status;
struct irpc_message m;
struct ndr_push *push;
DATA_BLOB packet;
for (i=msg_ctx->irpc; i; i=i->next) {
if (GUID_equal(&i->uuid, &header->uuid) &&
i->table->if_version == header->if_version &&
i->callnum == header->callnum) {
break;
}
}
if (i == NULL) {
/* no registered handler for this message */
return;
}
/* allocate space for the structure */
r = talloc_zero_size(ndr, i->table->calls[header->callnum].struct_size);
if (r == NULL) goto failed;
/* parse the request data */
status = i->table->calls[i->callnum].ndr_pull(ndr, NDR_IN, r);
if (!NT_STATUS_IS_OK(status)) goto failed;
/* make the call */
m.from = src;
header->status = i->fn(&m, r);
/* setup the reply */
push = ndr_push_init_ctx(ndr);
if (push == NULL) goto failed;
header->flags |= IRPC_FLAG_REPLY;
/* construct the packet */
status = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, header);
if (!NT_STATUS_IS_OK(status)) goto failed;
status = i->table->calls[i->callnum].ndr_push(push, NDR_OUT, r);
if (!NT_STATUS_IS_OK(status)) goto failed;
/* send the reply message */
packet = ndr_push_blob(push);
status = messaging_send(msg_ctx, src, MSG_IRPC, &packet);
if (!NT_STATUS_IS_OK(status)) goto failed;
failed:
/* nothing to clean up */
return;
}
/*
handle an incoming irpc message
*/
static void irpc_handler(struct messaging_context *msg_ctx, void *private,
uint32_t msg_type, uint32_t src, DATA_BLOB *packet)
{
struct irpc_header header;
struct ndr_pull *ndr;
NTSTATUS status;
ndr = ndr_pull_init_blob(packet, msg_ctx);
if (ndr == NULL) goto failed;
status = ndr_pull_irpc_header(ndr, NDR_BUFFERS|NDR_SCALARS, &header);
if (!NT_STATUS_IS_OK(status)) goto failed;
if (header.flags & IRPC_FLAG_REPLY) {
irpc_handler_reply(msg_ctx, ndr, &header);
} else {
irpc_handler_request(msg_ctx, ndr, &header, src);
}
failed:
talloc_free(ndr);
}
/*
destroy a irpc request
*/
static int irpc_destructor(void *ptr)
{
struct irpc_request *irpc = talloc_get_type(ptr, struct irpc_request);
idr_remove(irpc->msg_ctx->idr, irpc->callid);
return 0;
}
/*
timeout a irpc request
*/
static void irpc_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct irpc_request *irpc = talloc_get_type(private, struct irpc_request);
irpc->status = NT_STATUS_IO_TIMEOUT;
irpc->done = True;
if (irpc->async.fn) {
irpc->async.fn(irpc);
}
}
/*
make a irpc call - async send
*/
struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
uint32_t server_id,
const struct dcerpc_interface_table *table,
int callnum, void *r)
{
struct irpc_header header;
struct ndr_push *ndr;
NTSTATUS status;
DATA_BLOB packet;
struct irpc_request *irpc;
irpc = talloc(msg_ctx, struct irpc_request);
if (irpc == NULL) goto failed;
irpc->msg_ctx = msg_ctx;
irpc->table = table;
irpc->callnum = callnum;
irpc->callid = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);
if (irpc->callid == -1) goto failed;
irpc->r = r;
irpc->done = False;
irpc->async.fn = NULL;
talloc_set_destructor(irpc, irpc_destructor);
/* setup the header */
status = GUID_from_string(table->uuid, &header.uuid);
if (!NT_STATUS_IS_OK(status)) goto failed;
header.if_version = table->if_version;
header.callid = irpc->callid;
header.callnum = callnum;
header.flags = 0;
header.status = NT_STATUS_OK;
/* construct the irpc packet */
ndr = ndr_push_init_ctx(irpc);
if (ndr == NULL) goto failed;
status = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
if (!NT_STATUS_IS_OK(status)) goto failed;
status = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
if (!NT_STATUS_IS_OK(status)) goto failed;
/* and send it */
packet = ndr_push_blob(ndr);
status = messaging_send(msg_ctx, server_id, MSG_IRPC, &packet);
if (!NT_STATUS_IS_OK(status)) goto failed;
event_add_timed(msg_ctx->event.ev, irpc,
timeval_current_ofs(IRPC_CALL_TIMEOUT, 0),
irpc_timeout, irpc);
talloc_free(ndr);
return irpc;
failed:
talloc_free(irpc);
return NULL;
}
/*
wait for a irpc reply
*/
NTSTATUS irpc_call_recv(struct irpc_request *irpc)
{
NTSTATUS status;
NT_STATUS_HAVE_NO_MEMORY(irpc);
while (!irpc->done) {
if (event_loop_once(irpc->msg_ctx->event.ev) != 0) {
return NT_STATUS_CONNECTION_DISCONNECTED;
}
}
status = irpc->status;
talloc_free(irpc);
return status;
}
/*
perform a synchronous irpc request
*/
NTSTATUS irpc_call(struct messaging_context *msg_ctx,
uint32_t server_id,
const struct dcerpc_interface_table *table,
int callnum, void *r)
{
struct irpc_request *irpc = irpc_call_send(msg_ctx, server_id,
table, callnum, r);
return irpc_call_recv(irpc);
}

View File

@ -55,6 +55,12 @@ INIT_OBJ_FILES = librpc/gen_ndr/ndr_echo.o
NOPROTO = YES
REQUIRED_SUBSYSTEMS = NDR_RAW
[SUBSYSTEM::NDR_IRPC]
INIT_FUNCTION = dcerpc_irpc_init
INIT_OBJ_FILES = librpc/gen_ndr/ndr_irpc.o
NOPROTO = YES
REQUIRED_SUBSYSTEMS = NDR_RAW
[SUBSYSTEM::NDR_EXCHANGE]
INIT_FUNCTION = dcerpc_exchange_init
INIT_OBJ_FILES = librpc/gen_ndr/ndr_exchange.o

View File

@ -0,0 +1,23 @@
#include "idl_types.h"
/*
definitions for irpc primitives
*/
[
pointer_default(unique)
]
interface irpc
{
typedef bitmap {
IRPC_FLAG_REPLY = 0x0001
} irpc_flags;
typedef [public] struct {
GUID uuid;
uint32 if_version;
uint32 callnum;
uint32 callid;
irpc_flags flags;
NTSTATUS status;
} irpc_header;
}

View File

@ -30,6 +30,7 @@
#include "lib/tdb/include/tdb.h"
#include "messages.h"
#include "db_wrap.h"
#include "lib/messaging/irpc.h"
/*
in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies

View File

@ -44,6 +44,7 @@
#include "messages.h"
#include "librpc/gen_ndr/ndr_security.h"
#include "db_wrap.h"
#include "lib/messaging/irpc.h"
struct odb_context {
struct tdb_wrap *w;

View File

@ -25,6 +25,7 @@
#include "dlinklist.h"
#include "vfs_posix.h"
#include "smbd/service_stream.h"
#include "lib/messaging/irpc.h"
/* the context for a single wait instance */
struct pvfs_wait {

View File

@ -26,6 +26,7 @@
#include "lib/events/events.h"
#include "lib/socket/socket.h"
#include "smbd/service_stream.h"
#include "lib/messaging/irpc.h"
/* the range of ports to try for dcerpc over tcp endpoints */
#define SERVER_TCP_LOW_PORT 1024

View File

@ -24,6 +24,7 @@
#include "process_model.h"
#include "lib/events/events.h"
#include "smbd/service_task.h"
#include "lib/messaging/irpc.h"
/*
terminate a task service

View File

@ -143,7 +143,8 @@ ADD_OBJ_FILES = \
torture/local/messaging.o \
torture/local/binding_string.o \
torture/local/idtree.o \
torture/local/socket.o
torture/local/socket.o \
torture/local/irpc.o
REQUIRED_SUBSYSTEMS = \
LIBSMB \
MESSAGING

View File

@ -0,0 +1,91 @@
/*
Unix SMB/CIFS implementation.
local test for irpc code
Copyright (C) Andrew Tridgell 2004
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#include "includes.h"
#include "lib/events/events.h"
#include "lib/messaging/irpc.h"
#include "librpc/gen_ndr/ndr_echo.h"
const uint32_t MSG_ID = 1;
/*
serve up AddOne over the irpc system
*/
static NTSTATUS irpc_AddOne(struct irpc_message *irpc, struct echo_AddOne *r)
{
*r->out.out_data = r->in.in_data + 1;
return NT_STATUS_OK;
}
/*
test a addone call over the internal messaging system
*/
static BOOL test_addone(TALLOC_CTX *mem_ctx, struct messaging_context *msg_ctx)
{
struct echo_AddOne r;
NTSTATUS status;
uint32_t res;
/* register the server side function */
IRPC_REGISTER(msg_ctx, rpcecho, ECHO_ADDONE, irpc_AddOne);
/* make the call */
r.in.in_data = random();
r.out.out_data = &res;
status = IRPC_CALL(msg_ctx, MSG_ID, rpcecho, ECHO_ADDONE, &r);
if (!NT_STATUS_IS_OK(status)) {
printf("AddOne failed - %s\n", nt_errstr(status));
return False;
}
/* check the answer */
if (res != r.in.in_data + 1) {
printf("AddOne wrong answer - %u should be %u\n",
*r.out.out_data, r.in.in_data+1);
return False;
}
printf("%u + 1 = %u\n", r.in.in_data, res);
return True;
}
BOOL torture_local_irpc(void)
{
TALLOC_CTX *mem_ctx = talloc_init("torture_local_irpc");
BOOL ret = True;
struct messaging_context *msg_ctx;
struct event_context *ev;
lp_set_cmdline("lock dir", "lockdir.tmp");
ev = event_context_init(mem_ctx);
msg_ctx = messaging_init(mem_ctx, MSG_ID, ev);
ret &= test_addone(mem_ctx, msg_ctx);
talloc_free(mem_ctx);
return True;
}

View File

@ -23,6 +23,7 @@
#include "includes.h"
#include "system/filesys.h"
#include "lib/events/events.h"
#include "lib/messaging/irpc.h"
enum {MY_PING=1000, MY_PONG, MY_EXIT};

View File

@ -2315,6 +2315,7 @@ static struct {
{"LOCAL-ICONV", torture_local_iconv, 0},
{"LOCAL-TALLOC", torture_local_talloc, 0},
{"LOCAL-MESSAGING", torture_local_messaging, 0},
{"LOCAL-IRPC", torture_local_irpc, 0},
{"LOCAL-BINDING", torture_local_binding_string, 0},
{"LOCAL-IDTREE", torture_local_idtree, 0},
{"LOCAL-SOCKET", torture_local_socket, 0},