1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-31 01:48:16 +03:00

r6561: re-did the internal message system based on DGRAM unix domain

sockets. This gains us about 40% in messaging speed.
(This used to be commit f244a64ed537447e44229172427b5b6a5c64800c)
This commit is contained in:
Andrew Tridgell 2005-05-01 18:49:07 +00:00 committed by Gerald (Jerry) Carter
parent 425350bb61
commit 7282ddda0a

View File

@ -34,10 +34,13 @@
/* the number of microseconds to backoff in retrying to send a message */
#define MESSAGING_BACKOFF 250000
/* maximum message size */
#define MESSAGING_MAX_SIZE 512
struct messaging_context {
uint32_t server_id;
struct socket_context *sock;
char *path;
const char *path;
struct dispatch_fn *dispatch;
struct {
@ -60,7 +63,6 @@ struct dispatch_fn {
struct messaging_rec {
struct messaging_context *msg;
struct socket_context *sock;
struct fd_event *fde;
const char *path;
struct {
@ -72,8 +74,6 @@ struct messaging_rec {
} header;
DATA_BLOB data;
uint32_t ndone;
};
/*
@ -112,112 +112,58 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
}
}
/* we don't free the record itself here as there may
be more messages from this client */
data_blob_free(&rec->data);
rec->header.length = 0;
rec->ndone = 0;
}
/*
handle IO for a single message
*/
static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
struct messaging_context *msg = rec->msg;
NTSTATUS status;
if (rec->ndone < sizeof(rec->header)) {
/* receive the header */
size_t nread;
status = socket_recv(rec->sock,
rec->ndone + (char *)&rec->header,
sizeof(rec->header) - rec->ndone, &nread, 0);
if (NT_STATUS_IS_ERR(status)) {
talloc_free(rec);
return;
}
if (nread == 0) {
return;
}
rec->ndone += nread;
if (rec->ndone == sizeof(rec->header)) {
if (rec->header.version != MESSAGING_VERSION) {
DEBUG(0,("meessage with wrong version %u\n",
rec->header.version));
talloc_free(rec);
}
rec->data = data_blob_talloc(rec, NULL, rec->header.length);
if (rec->data.length != rec->header.length) {
DEBUG(0,("Unable to allocate message of size %u\n",
rec->header.length));
talloc_free(rec);
}
}
}
if (rec->ndone >= sizeof(rec->header) &&
rec->ndone < sizeof(rec->header) + rec->header.length) {
/* receive the body, if any */
size_t nread;
status = socket_recv(rec->sock,
rec->data.data + (rec->ndone - sizeof(rec->header)),
sizeof(rec->header) + rec->header.length - rec->ndone,
&nread, 0);
if (NT_STATUS_IS_ERR(status)) {
talloc_free(rec);
return;
}
if (nread == 0) {
return;
}
rec->ndone += nread;
}
if (rec->ndone == sizeof(rec->header) + rec->header.length) {
/* we've got the whole message */
messaging_dispatch(msg, rec);
}
}
/*
handle a new incoming connection
*/
static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct messaging_context *msg = talloc_get_type(private,
struct messaging_context);
struct messaging_rec *rec;
NTSTATUS status;
uint8_t data[MESSAGING_MAX_SIZE];
size_t msize;
status = socket_recv(msg->sock, data, sizeof(data), &msize, 0);
if (!NT_STATUS_IS_OK(status)) {
return;
}
if (msize < sizeof(rec->header)) {
DEBUG(0,("messaging: bad message of size %d\n", msize));
return;
}
rec = talloc(msg, struct messaging_rec);
if (rec == NULL) {
smb_panic("Unable to allocate messaging_rec");
}
status = socket_accept(msg->sock, &rec->sock);
if (!NT_STATUS_IS_OK(status)) {
smb_panic("Unable to accept messaging_rec");
}
talloc_steal(rec, rec->sock);
rec->msg = msg;
rec->ndone = 0;
rec->header.length = 0;
rec->path = msg->path;
rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
EVENT_FD_READ, messaging_recv_handler, rec);
rec->sock = NULL;
memcpy(&rec->header, data, sizeof(rec->header));
if (msize != sizeof(rec->header) + rec->header.length) {
DEBUG(0,("messaging: bad message header size %d should be %d\n",
rec->header.length, msize - sizeof(rec->header)));
talloc_free(rec);
return;
}
rec->data = data_blob_talloc(rec, data, rec->header.length);
if (rec->data.data == NULL) {
talloc_free(rec);
return;
}
messaging_dispatch(msg, rec);
talloc_free(rec);
}
/*
@ -262,49 +208,28 @@ static void messaging_send_handler(struct event_context *ev, struct fd_event *fd
uint16_t flags, void *private)
{
struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec);
uint8_t data[MESSAGING_MAX_SIZE];
DATA_BLOB blob;
size_t nsent;
NTSTATUS status;
if (rec->ndone < sizeof(rec->header)) {
/* send the header */
size_t nsent;
DATA_BLOB blob;
memcpy(data, &rec->header, sizeof(rec->header));
memcpy(data + sizeof(rec->header), rec->data.data, rec->data.length);
blob.data = rec->ndone + (uint8_t *)&rec->header;
blob.length = sizeof(rec->header) - rec->ndone;
blob.data = data;
blob.length = sizeof(rec->header) + rec->header.length;
status = socket_send(rec->sock, &blob, &nsent, 0);
if (NT_STATUS_IS_ERR(status)) {
talloc_free(rec);
return;
}
if (nsent == 0) {
return;
}
rec->ndone += nsent;
status = socket_send(rec->sock, &blob, &nsent, 0);
if (NT_STATUS_IS_ERR(status)) {
DEBUG(3,("Unable to send message of type %d length %d - %s\n",
rec->header.msg_type,
rec->header.length,
nt_errstr(status)));
talloc_free(rec);
return;
}
if (rec->ndone >= sizeof(rec->header) &&
rec->ndone < sizeof(rec->header) + rec->header.length) {
/* send the body, if any */
DATA_BLOB blob;
size_t nsent;
blob.data = rec->data.data + (rec->ndone - sizeof(rec->header));
blob.length = rec->header.length - (rec->ndone - sizeof(rec->header));
status = socket_send(rec->sock, &blob, &nsent, 0);
if (NT_STATUS_IS_ERR(status)) {
talloc_free(rec);
return;
}
rec->ndone += nsent;
}
if (rec->ndone == sizeof(rec->header) + rec->header.length) {
/* we've done the whole message */
if (NT_STATUS_IS_OK(status)) {
talloc_free(rec);
}
}
@ -349,8 +274,8 @@ static void messaging_backoff_handler(struct event_context *ev, struct timed_eve
return;
}
rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
EVENT_FD_WRITE, messaging_send_handler, rec);
event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
EVENT_FD_WRITE, messaging_send_handler, rec);
}
@ -378,9 +303,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t
} else {
rec->data = data_blob(NULL, 0);
}
rec->ndone = 0;
status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0);
status = socket_create("unix", SOCKET_TYPE_DGRAM, &rec->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
talloc_free(rec);
return status;
@ -403,8 +327,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t
return status;
}
rec->fde = event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
EVENT_FD_WRITE, messaging_send_handler, rec);
event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock),
EVENT_FD_WRITE, messaging_send_handler, rec);
return NT_STATUS_OK;
}
@ -437,10 +361,12 @@ static int messaging_destructor(void *ptr)
/*
create the listening socket and setup the dispatcher
*/
struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, struct event_context *ev)
struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id,
struct event_context *ev)
{
struct messaging_context *msg;
NTSTATUS status;
char *path;
msg = talloc(mem_ctx, struct messaging_context);
if (msg == NULL) {
@ -448,15 +374,15 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
}
/* create the messaging directory if needed */
msg->path = smbd_tmp_path(msg, "messaging");
mkdir(msg->path, 0700);
talloc_free(msg->path);
path = smbd_tmp_path(msg, "messaging");
mkdir(path, 0700);
talloc_free(path);
msg->path = messaging_path(msg, server_id);
msg->server_id = server_id;
msg->dispatch = NULL;
msg->path = messaging_path(msg, server_id);
status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0);
status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
if (!NT_STATUS_IS_OK(status)) {
talloc_free(msg);
return NULL;
@ -475,7 +401,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
msg->event.ev = talloc_reference(msg, ev);
msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock),
EVENT_FD_READ, messaging_listen_handler, msg);
EVENT_FD_READ, messaging_recv_handler, msg);
talloc_set_destructor(msg, messaging_destructor);