mirror of
https://github.com/samba-team/samba.git
synced 2025-03-08 04:58:40 +03:00
First part of efficiency fixes for message sending to pid's (cutting down
the amount of time we hold tdb locks). Gulp down all messages at once rather than reading/re-writing one at a time. NOTE: All dispatch routines *must* be able to cope with incoming message on *odd* byte boundaries (all current handlers do). Jeremy.
This commit is contained in:
parent
21c8acd25a
commit
04243e39cf
@ -3,6 +3,7 @@
|
||||
Samba internal messaging functions
|
||||
Copyright (C) Andrew Tridgell 2000
|
||||
Copyright (C) 2001 by Martin Pool
|
||||
Copyright (C) 2002 by Jeremy Allison
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
@ -35,6 +36,9 @@
|
||||
use that to reply by message_send_pid(). See ping_message() for a
|
||||
simple example.
|
||||
|
||||
*NOTE*: Dispatch functions must be able to cope with incoming
|
||||
messages on an *odd* byte boundary.
|
||||
|
||||
This system doesn't have any inherent size limitations but is not
|
||||
very efficient for large messages or when messages are sent in very
|
||||
quick succession.
|
||||
@ -86,6 +90,16 @@ static void ping_message(int msg_type, pid_t src, void *buf, size_t len)
|
||||
message_send_pid(src, MSG_PONG, buf, len, True);
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
Return current debug level.
|
||||
****************************************************************************/
|
||||
|
||||
void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len)
|
||||
{
|
||||
DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %u\n",(unsigned int)src));
|
||||
message_send_pid(src, MSG_DEBUGLEVEL, DEBUGLEVEL_CLASS, sizeof(DEBUGLEVEL_CLASS), True);
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
Initialise the messaging functions.
|
||||
****************************************************************************/
|
||||
@ -106,6 +120,7 @@ BOOL message_init(void)
|
||||
CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1);
|
||||
|
||||
message_register(MSG_PING, ping_message);
|
||||
message_register(MSG_REQ_DEBUGLEVEL, debuglevel_message);
|
||||
|
||||
return True;
|
||||
}
|
||||
@ -133,9 +148,13 @@ static TDB_DATA message_key_pid(pid_t pid)
|
||||
|
||||
static BOOL message_notify(pid_t pid)
|
||||
{
|
||||
/* Doing kill with a non-positive pid causes messages to be
|
||||
* sent to places we don't want. */
|
||||
/*
|
||||
* Doing kill with a non-positive pid causes messages to be
|
||||
* sent to places we don't want.
|
||||
*/
|
||||
|
||||
SMB_ASSERT(pid > 0);
|
||||
|
||||
if (kill(pid, SIGUSR1) == -1) {
|
||||
if (errno == ESRCH) {
|
||||
DEBUG(2,("pid %d doesn't exist - deleting messages record\n", (int)pid));
|
||||
@ -160,16 +179,19 @@ BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len,
|
||||
struct message_rec rec;
|
||||
void *p;
|
||||
|
||||
/*
|
||||
* Doing kill with a non-positive pid causes messages to be
|
||||
* sent to places we don't want.
|
||||
*/
|
||||
|
||||
SMB_ASSERT(pid > 0);
|
||||
|
||||
rec.msg_version = MESSAGE_VERSION;
|
||||
rec.msg_type = msg_type;
|
||||
rec.dest = pid;
|
||||
rec.src = sys_getpid();
|
||||
rec.len = len;
|
||||
|
||||
/* Doing kill with a non-positive pid causes messages to be
|
||||
* sent to places we don't want. */
|
||||
SMB_ASSERT(pid > 0);
|
||||
|
||||
kbuf = message_key_pid(pid);
|
||||
|
||||
/* lock the record for the destination */
|
||||
@ -241,111 +263,136 @@ BOOL message_send_pid(pid_t pid, int msg_type, const void *buf, size_t len,
|
||||
|
||||
failed:
|
||||
tdb_chainunlock(tdb, kbuf);
|
||||
SAFE_FREE(dbuf.dptr);
|
||||
errno = 0; /* paranoia */
|
||||
return False;
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
Retrieve the next message for the current process.
|
||||
Retrieve all messages for the current process.
|
||||
****************************************************************************/
|
||||
|
||||
static BOOL message_recv(int *msg_type, pid_t *src, void **buf, size_t *len)
|
||||
static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len)
|
||||
{
|
||||
TDB_DATA kbuf;
|
||||
TDB_DATA dbuf;
|
||||
struct message_rec rec;
|
||||
TDB_DATA null_dbuf;
|
||||
|
||||
ZERO_STRUCT(null_dbuf);
|
||||
|
||||
*msgs_buf = NULL;
|
||||
*total_len = 0;
|
||||
|
||||
kbuf = message_key_pid(sys_getpid());
|
||||
|
||||
tdb_chainlock(tdb, kbuf);
|
||||
|
||||
dbuf = tdb_fetch(tdb, kbuf);
|
||||
if (dbuf.dptr == NULL || dbuf.dsize == 0)
|
||||
goto failed;
|
||||
/*
|
||||
* Replace with an empty record to keep the allocated
|
||||
* space in the tdb.
|
||||
*/
|
||||
tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE);
|
||||
tdb_chainunlock(tdb, kbuf);
|
||||
|
||||
memcpy(&rec, dbuf.dptr, sizeof(rec));
|
||||
if (dbuf.dptr == NULL || dbuf.dsize == 0) {
|
||||
SAFE_FREE(dbuf.dptr);
|
||||
return False;
|
||||
}
|
||||
|
||||
*msgs_buf = dbuf.dptr;
|
||||
*total_len = dbuf.dsize;
|
||||
|
||||
return True;
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
Parse out the next message for the current process.
|
||||
****************************************************************************/
|
||||
|
||||
static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, pid_t *src, char **buf, size_t *len)
|
||||
{
|
||||
struct message_rec rec;
|
||||
char *ret_buf = *buf;
|
||||
|
||||
*buf = NULL;
|
||||
*len = 0;
|
||||
|
||||
if (total_len - (ret_buf - msgs_buf) < sizeof(rec))
|
||||
return False;
|
||||
|
||||
memcpy(&rec, ret_buf, sizeof(rec));
|
||||
ret_buf += sizeof(rec);
|
||||
|
||||
if (rec.msg_version != MESSAGE_VERSION) {
|
||||
DEBUG(0,("message version %d received (expected %d)\n", rec.msg_version, MESSAGE_VERSION));
|
||||
goto failed;
|
||||
return False;
|
||||
}
|
||||
|
||||
if (rec.len > 0) {
|
||||
(*buf) = (void *)malloc(rec.len);
|
||||
if (!(*buf))
|
||||
goto failed;
|
||||
|
||||
memcpy(*buf, dbuf.dptr+sizeof(rec), rec.len);
|
||||
} else {
|
||||
*buf = NULL;
|
||||
if (total_len - (ret_buf - msgs_buf) < rec.len)
|
||||
return False;
|
||||
}
|
||||
|
||||
*len = rec.len;
|
||||
*msg_type = rec.msg_type;
|
||||
*src = rec.src;
|
||||
*buf = ret_buf;
|
||||
|
||||
if (dbuf.dsize - (sizeof(rec)+rec.len) > 0)
|
||||
memmove(dbuf.dptr, dbuf.dptr+sizeof(rec)+rec.len, dbuf.dsize - (sizeof(rec)+rec.len));
|
||||
dbuf.dsize -= sizeof(rec)+rec.len;
|
||||
|
||||
if (dbuf.dsize == 0)
|
||||
tdb_delete(tdb, kbuf);
|
||||
else
|
||||
tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
|
||||
|
||||
SAFE_FREE(dbuf.dptr);
|
||||
tdb_chainunlock(tdb, kbuf);
|
||||
return True;
|
||||
|
||||
failed:
|
||||
tdb_chainunlock(tdb, kbuf);
|
||||
SAFE_FREE(dbuf.dptr);
|
||||
return False;
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
Receive and dispatch any messages pending for this process.
|
||||
Notice that all dispatch handlers for a particular msg_type get called,
|
||||
so you can register multiple handlers for a message.
|
||||
*NOTE*: Dispatch functions must be able to cope with incoming
|
||||
messages on an *odd* byte boundary.
|
||||
****************************************************************************/
|
||||
|
||||
void message_dispatch(void)
|
||||
{
|
||||
int msg_type;
|
||||
pid_t src;
|
||||
void *buf;
|
||||
size_t len;
|
||||
char *buf;
|
||||
char *msgs_buf;
|
||||
size_t len, total_len;
|
||||
struct dispatch_fns *dfn;
|
||||
int n_handled;
|
||||
|
||||
if (!received_signal) return;
|
||||
if (!received_signal)
|
||||
return;
|
||||
|
||||
DEBUG(10,("message_dispatch: received_signal = %d\n", received_signal));
|
||||
|
||||
received_signal = 0;
|
||||
|
||||
while (message_recv(&msg_type, &src, &buf, &len)) {
|
||||
DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%d\n",
|
||||
msg_type, (int) src));
|
||||
if (!retrieve_all_messages(&msgs_buf, &total_len))
|
||||
return;
|
||||
|
||||
for (buf = msgs_buf; message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); buf += len) {
|
||||
DEBUG(10,("message_dispatch: received msg_type=%d src_pid=%u\n",
|
||||
msg_type, (unsigned int) src));
|
||||
n_handled = 0;
|
||||
for (dfn = dispatch_fns; dfn; dfn = dfn->next) {
|
||||
if (dfn->msg_type == msg_type) {
|
||||
DEBUG(10,("message_dispatch: processing message of type %d.\n", msg_type));
|
||||
dfn->fn(msg_type, src, buf, len);
|
||||
dfn->fn(msg_type, src, len ? (void *)buf : NULL, len);
|
||||
n_handled++;
|
||||
}
|
||||
}
|
||||
if (!n_handled) {
|
||||
DEBUG(5,("message_dispatch: warning: no handlers registered for "
|
||||
"msg_type %d in pid %d\n",
|
||||
msg_type, sys_getpid()));
|
||||
DEBUG(5,("message_dispatch: warning: no handlers registed for "
|
||||
"msg_type %d in pid %u\n",
|
||||
msg_type, (unsigned int)sys_getpid()));
|
||||
}
|
||||
SAFE_FREE(buf);
|
||||
}
|
||||
SAFE_FREE(msgs_buf);
|
||||
}
|
||||
|
||||
/****************************************************************************
|
||||
Register a dispatch function for a particular message type.
|
||||
*NOTE*: Dispatch functions must be able to cope with incoming
|
||||
messages on an *odd* byte boundary.
|
||||
****************************************************************************/
|
||||
|
||||
void message_register(int msg_type,
|
||||
|
Loading…
x
Reference in New Issue
Block a user