1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-27 03:21:53 +03:00

r7206: changed the messaging library to use sendto instead of a connected

send on the unix domain datagram socket. This gains us about 50% in
speed, and also means that we don't run the risk of running out of
file descriptors due to heavy messaging traffic. We now use a single
file descriptor no matter how many messages are pending to any number
of servers.
This commit is contained in:
Andrew Tridgell 2005-06-03 04:21:25 +00:00 committed by Gerald (Jerry) Carter
parent 35ef6e3b15
commit 2369170fc1

View File

@ -31,9 +31,6 @@
/* change the message version with any incompatible changes in the protocol */
#define MESSAGING_VERSION 1
/* the number of microseconds to backoff in retrying to send a message */
#define MESSAGING_BACKOFF 250000
/* maximum message size */
#define MESSAGING_MAX_SIZE 512
@ -42,6 +39,7 @@ struct messaging_context {
struct socket_context *sock;
const char *path;
struct dispatch_fn *dispatch;
struct messaging_rec *pending;
struct {
struct event_context *ev;
@ -61,8 +59,8 @@ struct dispatch_fn {
/* an individual message */
struct messaging_rec {
struct messaging_rec *next, *prev;
struct messaging_context *msg;
struct socket_context *sock;
const char *path;
struct {
@ -76,6 +74,7 @@ struct messaging_rec {
DATA_BLOB data;
};
/*
A useful function for testing the message system.
*/
@ -111,19 +110,67 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data);
}
}
rec->header.length = 0;
}
/*
handle a new incoming connection
try to send the message
*/
static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
static NTSTATUS try_send(struct messaging_rec *rec)
{
struct messaging_context *msg = rec->msg;
DATA_BLOB blob;
size_t nsent;
void *priv;
NTSTATUS status;
blob = data_blob_talloc(rec, NULL, sizeof(rec->header) + rec->data.length);
NT_STATUS_HAVE_NO_MEMORY(blob.data);
memcpy(blob.data, &rec->header, sizeof(rec->header));
memcpy(blob.data + sizeof(rec->header), rec->data.data, rec->data.length);
/* we send with privileges so messages work from any context */
priv = root_privileges();
status = socket_sendto(msg->sock, &blob, &nsent, 0, rec->path, 0);
talloc_free(priv);
data_blob_free(&blob);
return status;
}
/*
handle a socket write event
*/
static void messaging_send_handler(struct messaging_context *msg)
{
while (msg->pending) {
struct messaging_rec *rec = msg->pending;
NTSTATUS status;
status = try_send(rec);
if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
break;
}
if (!NT_STATUS_IS_OK(status)) {
DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n",
rec->header.from, rec->header.to, rec->header.msg_type,
nt_errstr(status)));
}
DLIST_REMOVE(msg->pending, rec);
talloc_free(rec);
}
if (msg->pending == NULL) {
EVENT_FD_NOT_WRITEABLE(msg->event.fde);
}
}
/*
handle a new incoming packet
*/
static void messaging_recv_handler(struct messaging_context *msg)
{
struct messaging_context *msg = talloc_get_type(private,
struct messaging_context);
struct messaging_rec *rec;
NTSTATUS status;
uint8_t data[MESSAGING_MAX_SIZE];
@ -146,7 +193,6 @@ static void messaging_recv_handler(struct event_context *ev, struct fd_event *fd
rec->msg = msg;
rec->path = msg->path;
rec->sock = NULL;
memcpy(&rec->header, data, sizeof(rec->header));
if (msize != sizeof(rec->header) + rec->header.length) {
@ -166,6 +212,24 @@ static void messaging_recv_handler(struct event_context *ev, struct fd_event *fd
talloc_free(rec);
}
/*
handle a socket event
*/
static void messaging_handler(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct messaging_context *msg = talloc_get_type(private,
struct messaging_context);
if (flags & EVENT_FD_WRITE) {
messaging_send_handler(msg);
}
if (flags & EVENT_FD_READ) {
messaging_recv_handler(msg);
}
}
/*
Register a dispatch function for a particular message type.
*/
@ -200,89 +264,11 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void
}
/*
handle IO for sending a message
*/
static void messaging_send_handler(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
uint8_t data[MESSAGING_MAX_SIZE];
DATA_BLOB blob;
size_t nsent;
NTSTATUS status;
memcpy(data, &rec->header, sizeof(rec->header));
memcpy(data + sizeof(rec->header), rec->data.data, rec->data.length);
blob.data = data;
blob.length = sizeof(rec->header) + rec->header.length;
status = socket_send(rec->sock, &blob, &nsent, 0);
if (NT_STATUS_IS_ERR(status)) {
DEBUG(3,("Unable to send message of type %d length %d - %s\n",
rec->header.msg_type,
rec->header.length,
nt_errstr(status)));
talloc_free(rec);
return;
}
if (NT_STATUS_IS_OK(status)) {
talloc_free(rec);
}
}
/*
wrapper around socket_connect with raised privileges
*/
static NTSTATUS try_connect(struct messaging_rec *rec)
{
NTSTATUS status;
void *priv = root_privileges();
status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
talloc_free(priv);
return status;
}
/*
when the servers listen queue is full we use this to backoff the message
*/
static void messaging_backoff_handler(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
struct messaging_context *msg = rec->msg;
NTSTATUS status;
status = try_connect(rec);
if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
/* backoff again */
event_add_timed(msg->event.ev, rec,
timeval_add(&t, 0, MESSAGING_BACKOFF),
messaging_backoff_handler, rec);
return;
}
if (!NT_STATUS_IS_OK(status)) {
DEBUG(1,("messaging: Lost message from %u to %u of type %u after backoff - %s\n",
rec->header.from, rec->header.to, rec->header.msg_type, nt_errstr(status)));
talloc_free(rec);
return;
}
event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
EVENT_FD_WRITE, messaging_send_handler, rec);
}
/*
Send a message to a particular server
*/
NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t msg_type, DATA_BLOB *data)
NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
uint32_t msg_type, DATA_BLOB *data)
{
struct messaging_rec *rec;
NTSTATUS status;
@ -292,45 +278,32 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t
return NT_STATUS_NO_MEMORY;
}
rec->msg = msg;
rec->header.version = MESSAGING_VERSION;
rec->msg = msg;
rec->header.version = MESSAGING_VERSION;
rec->header.msg_type = msg_type;
rec->header.from = msg->server_id;
rec->header.to = server;
rec->header.length = data?data->length:0;
rec->header.from = msg->server_id;
rec->header.to = server;
rec->header.length = data?data->length:0;
if (rec->header.length != 0) {
rec->data = data_blob_talloc(rec, data->data, data->length);
} else {
rec->data = data_blob(NULL, 0);
}
status = socket_create("unix", SOCKET_TYPE_DGRAM, &rec->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
talloc_free(rec);
return status;
}
talloc_steal(rec, rec->sock);
rec->path = messaging_path(rec, server);
status = try_connect(rec);
status = try_send(rec);
if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
/* backoff on this message - the servers listen queue is full */
event_add_timed(msg->event.ev, rec,
timeval_current_ofs(0, MESSAGING_BACKOFF),
messaging_backoff_handler, rec);
if (msg->pending == NULL) {
EVENT_FD_WRITEABLE(msg->event.fde);
}
DLIST_ADD(msg->pending, rec);
return NT_STATUS_OK;
}
if (!NT_STATUS_IS_OK(status)) {
talloc_free(rec);
return status;
}
talloc_free(rec);
event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
EVENT_FD_WRITE, messaging_send_handler, rec);
return NT_STATUS_OK;
return status;
}
/*
@ -381,6 +354,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
msg->path = messaging_path(msg, server_id);
msg->server_id = server_id;
msg->dispatch = NULL;
msg->pending = NULL;
status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
@ -399,9 +373,12 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
return NULL;
}
/* it needs to be non blocking for sends */
set_blocking(socket_get_fd(msg->sock), False);
msg->event.ev = talloc_reference(msg, ev);
msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock),
EVENT_FD_READ, messaging_recv_handler, msg);
EVENT_FD_READ, messaging_handler, msg);
talloc_set_destructor(msg, messaging_destructor);
@ -409,5 +386,3 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
return msg;
}