1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-23 17:34:34 +03:00

Fix for a problem with the new messaging system. If a sender is using the

messaging system as a notification mechanism, and the speed of notification
greatly exceeds the speed of message recovery, then you get a massively (>75Mb)
growing tdb. If the message is a simple notification, then the message is
static, and you only need one of them in transit to a target process at
any one time.
This patch adds a BOOL "allow_duplicates" to the message_send_XX primitives.
If set to False, then before sending a message the sender checks the existing
message queue for a target pid for a duplicate of this message, and doesn't
add to it if one already exists.
Also added code into msgtest.c to test this.
Jeremy.
(This used to be commit 3aa7995660)
This commit is contained in:
Jeremy Allison 2000-11-16 21:38:24 +00:00
parent fb71f4a0af
commit cdac09614e
9 changed files with 99 additions and 36 deletions

View File

@ -291,6 +291,9 @@ SMBTORTURE_OBJ = utils/torture.o utils/nbio.o $(LIBSMB_OBJ) $(PARAM_OBJ) \
MASKTEST_OBJ = utils/masktest.o $(LIBSMB_OBJ) $(PARAM_OBJ) \ MASKTEST_OBJ = utils/masktest.o $(LIBSMB_OBJ) $(PARAM_OBJ) \
$(UBIQX_OBJ) $(LIB_OBJ) $(UBIQX_OBJ) $(LIB_OBJ)
MSGTEST_OBJ = utils/msgtest.o $(LIBSMB_OBJ) $(PARAM_OBJ) \
$(UBIQX_OBJ) $(LIB_OBJ)
LOCKTEST_OBJ = utils/locktest.o $(LOCKING_OBJ) $(LIBSMB_OBJ) $(PARAM_OBJ) \ LOCKTEST_OBJ = utils/locktest.o $(LOCKING_OBJ) $(LIBSMB_OBJ) $(PARAM_OBJ) \
$(UBIQX_OBJ) $(LIB_OBJ) $(UBIQX_OBJ) $(LIB_OBJ)
@ -359,6 +362,8 @@ smbtorture : CHECK bin/smbtorture
masktest : CHECK bin/masktest masktest : CHECK bin/masktest
msgtest : CHECK bin/msgtest
locktest : CHECK bin/locktest locktest : CHECK bin/locktest
locktest2 : CHECK bin/locktest2 locktest2 : CHECK bin/locktest2
@ -514,6 +519,10 @@ bin/masktest: $(MASKTEST_OBJ) bin/.dummy
@echo Linking $@ @echo Linking $@
@$(CC) $(FLAGS) -o $@ $(MASKTEST_OBJ) $(LDFLAGS) $(LIBS) @$(CC) $(FLAGS) -o $@ $(MASKTEST_OBJ) $(LDFLAGS) $(LIBS)
bin/msgtest: $(MSGTEST_OBJ) bin/.dummy
@echo Linking $@
@$(CC) $(FLAGS) -o $@ $(MSGTEST_OBJ) $(LDFLAGS) $(LIBS)
bin/locktest: $(LOCKTEST_OBJ) bin/.dummy bin/locktest: $(LOCKTEST_OBJ) bin/.dummy
@echo Linking $@ @echo Linking $@
@$(CC) $(FLAGS) -o $@ $(LOCKTEST_OBJ) $(LDFLAGS) $(LIBS) @$(CC) $(FLAGS) -o $@ $(LOCKTEST_OBJ) $(LDFLAGS) $(LIBS)

View File

@ -153,12 +153,12 @@ void mdfour(unsigned char *out, unsigned char *in, int n);
void ping_message(int msg_type, pid_t src, void *buf, size_t len); void ping_message(int msg_type, pid_t src, void *buf, size_t len);
void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len); void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len);
BOOL message_init(void); BOOL message_init(void);
BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len); BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len, BOOL duplicates_allowed);
void message_dispatch(void); void message_dispatch(void);
void message_register(int msg_type, void message_register(int msg_type,
void (*fn)(int msg_type, pid_t pid, void *buf, size_t len)); void (*fn)(int msg_type, pid_t pid, void *buf, size_t len));
void message_deregister(int msg_type); void message_deregister(int msg_type);
BOOL message_send_all(int msg_type, void *buf, size_t len); BOOL message_send_all(int msg_type, void *buf, size_t len, BOOL duplicates_allowed);
/*The following definitions come from lib/ms_fnmatch.c */ /*The following definitions come from lib/ms_fnmatch.c */

View File

@ -135,7 +135,7 @@ send a "set debug level" message
****************************************************************************/ ****************************************************************************/
void debug_message_send(pid_t pid, int level) void debug_message_send(pid_t pid, int level)
{ {
message_send_pid(pid, MSG_DEBUG, &level, sizeof(int)); message_send_pid(pid, MSG_DEBUG, &level, sizeof(int), False);
} }

View File

@ -71,7 +71,7 @@ a useful function for testing the message system
void ping_message(int msg_type, pid_t src, void *buf, size_t len) void ping_message(int msg_type, pid_t src, void *buf, size_t len)
{ {
DEBUG(1,("INFO: Received PING message from PID %d\n",src)); DEBUG(1,("INFO: Received PING message from PID %d\n",src));
message_send_pid(src, MSG_PONG, buf, len); message_send_pid(src, MSG_PONG, buf, len, True);
} }
/**************************************************************************** /****************************************************************************
@ -83,7 +83,7 @@ void debuglevel_message(int msg_type, pid_t src, void *buf, size_t len)
DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %d\n",src)); DEBUG(1,("INFO: Received REQ_DEBUGLEVEL message from PID %d\n",src));
level = DEBUGLEVEL; level = DEBUGLEVEL;
message_send_pid(src, MSG_DEBUGLEVEL, &level, sizeof(int)); message_send_pid(src, MSG_DEBUGLEVEL, &level, sizeof(int), True);
} }
/**************************************************************************** /****************************************************************************
@ -148,13 +148,23 @@ static BOOL message_notify(pid_t pid)
/**************************************************************************** /****************************************************************************
send a message to a particular pid send a message to a particular pid
****************************************************************************/ ****************************************************************************/
BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len) BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len, BOOL duplicates_allowed)
{ {
TDB_DATA kbuf; TDB_DATA kbuf;
TDB_DATA dbuf; TDB_DATA dbuf;
struct message_rec rec; struct message_rec rec;
void *p; void *p;
/*
* Do an early check for process exists - saves adding into a tdb
* and deleting again if the target is not present. JRA.
*/
if (kill(pid, 0) == -1) {
DEBUG(2,("message_send_pid: pid %d doesn't exist\n", (int)pid));
return False;
}
rec.msg_version = MESSAGE_VERSION; rec.msg_version = MESSAGE_VERSION;
rec.msg_type = msg_type; rec.msg_type = msg_type;
rec.dest = pid; rec.dest = pid;
@ -183,6 +193,30 @@ BOOL message_send_pid(pid_t pid, int msg_type, void *buf, size_t len)
goto ok; goto ok;
} }
if (!duplicates_allowed) {
char *ptr;
struct message_rec *prec;
for(ptr = (char *)dbuf.dptr, prec = (struct message_rec *)ptr; ptr < dbuf.dptr + dbuf.dsize;
ptr += (sizeof(rec) + prec->len), prec = (struct message_rec *)ptr) {
/*
* First check if the message header matches, then, if it's a non-zero
* sized message, check if the data matches. If so it's a duplicate and
* we can discard it. JRA.
*/
if (!memcmp(ptr, &rec, sizeof(rec))) {
if (!len || (len && !memcmp( ptr + sizeof(rec), (char *)buf, len))) {
DEBUG(10,("message_send_pid: discarding duplicate message.\n"));
free(dbuf.dptr);
tdb_unlockchain(tdb, kbuf);
return True;
}
}
}
}
/* we're adding to an existing entry */ /* we're adding to an existing entry */
p = (void *)malloc(dbuf.dsize + len + sizeof(rec)); p = (void *)malloc(dbuf.dsize + len + sizeof(rec));
if (!p) goto failed; if (!p) goto failed;
@ -323,6 +357,7 @@ static struct {
int msg_type; int msg_type;
void *buf; void *buf;
size_t len; size_t len;
BOOL duplicates;
} msg_all; } msg_all;
/**************************************************************************** /****************************************************************************
@ -335,7 +370,7 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, void
memcpy(&crec, dbuf.dptr, sizeof(crec)); memcpy(&crec, dbuf.dptr, sizeof(crec));
if (crec.cnum == -1) return 0; if (crec.cnum == -1) return 0;
message_send_pid(crec.pid, msg_all.msg_type, msg_all.buf, msg_all.len); message_send_pid(crec.pid, msg_all.msg_type, msg_all.buf, msg_all.len, msg_all.duplicates);
return 0; return 0;
} }
@ -344,7 +379,7 @@ this is a useful function for sending messages to all smbd processes.
It isn't very efficient, but should be OK for the sorts of applications that It isn't very efficient, but should be OK for the sorts of applications that
use it. When we need efficient broadcast we can add it. use it. When we need efficient broadcast we can add it.
****************************************************************************/ ****************************************************************************/
BOOL message_send_all(int msg_type, void *buf, size_t len) BOOL message_send_all(int msg_type, void *buf, size_t len, BOOL duplicates_allowed)
{ {
TDB_CONTEXT *the_tdb; TDB_CONTEXT *the_tdb;
@ -357,6 +392,7 @@ BOOL message_send_all(int msg_type, void *buf, size_t len)
msg_all.msg_type = msg_type; msg_all.msg_type = msg_type;
msg_all.buf = buf; msg_all.buf = buf;
msg_all.len = len; msg_all.len = len;
msg_all.duplicates = duplicates_allowed;
tdb_traverse(the_tdb, traverse_fn, NULL); tdb_traverse(the_tdb, traverse_fn, NULL);
tdb_close(the_tdb); tdb_close(the_tdb);

View File

@ -576,8 +576,7 @@ BOOL print_job_delete(struct current_user *user, int jobid, int *errcode)
printer_name = PRINTERNAME(snum); printer_name = PRINTERNAME(snum);
message_send_all(MSG_PRINTER_NOTIFY, printer_name, message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False);
strlen(printer_name) + 1);
return !print_job_exists(jobid); return !print_job_exists(jobid);
} }
@ -627,8 +626,7 @@ BOOL print_job_pause(struct current_user *user, int jobid, int *errcode)
printer_name = PRINTERNAME(snum); printer_name = PRINTERNAME(snum);
message_send_all(MSG_PRINTER_NOTIFY, printer_name, message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False);
strlen(printer_name) + 1);
/* how do we tell if this succeeded? */ /* how do we tell if this succeeded? */
@ -678,8 +676,7 @@ BOOL print_job_resume(struct current_user *user, int jobid, int *errcode)
printer_name = PRINTERNAME(snum); printer_name = PRINTERNAME(snum);
message_send_all(MSG_PRINTER_NOTIFY, printer_name, message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False);
strlen(printer_name) + 1);
return True; return True;
} }
@ -942,8 +939,7 @@ BOOL print_job_end(int jobid)
printer_name = PRINTERNAME(snum); printer_name = PRINTERNAME(snum);
message_send_all(MSG_PRINTER_NOTIFY, printer_name, message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False);
strlen(printer_name) + 1);
return True; return True;
} }
@ -1125,8 +1121,7 @@ BOOL print_queue_pause(struct current_user *user, int snum, int *errcode)
printer_name = PRINTERNAME(snum); printer_name = PRINTERNAME(snum);
message_send_all(MSG_PRINTER_NOTIFY, printer_name, message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False);
strlen(printer_name) + 1);
return True; return True;
} }
@ -1159,8 +1154,7 @@ BOOL print_queue_resume(struct current_user *user, int snum, int *errcode)
printer_name = PRINTERNAME(snum); printer_name = PRINTERNAME(snum);
message_send_all(MSG_PRINTER_NOTIFY, printer_name, message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False);
strlen(printer_name) + 1);
return True; return True;
} }
@ -1189,8 +1183,7 @@ BOOL print_queue_purge(struct current_user *user, int snum, int *errcode)
printer_name = PRINTERNAME(snum); printer_name = PRINTERNAME(snum);
message_send_all(MSG_PRINTER_NOTIFY, printer_name, message_send_all(MSG_PRINTER_NOTIFY, printer_name, strlen(printer_name) + 1, False);
strlen(printer_name) + 1);
return True; return True;
} }

View File

@ -86,7 +86,7 @@ void reqprofile_message(int msg_type, pid_t src, void *buf, size_t len)
level = 0; level = 0;
#endif #endif
DEBUG(1,("INFO: Received REQ_PROFILELEVEL message from PID %d\n",src)); DEBUG(1,("INFO: Received REQ_PROFILELEVEL message from PID %d\n",src));
message_send_pid(src, MSG_PROFILELEVEL, &level, sizeof(int)); message_send_pid(src, MSG_PROFILELEVEL, &level, sizeof(int), True);
} }
/******************************************************************* /*******************************************************************

View File

@ -640,7 +640,7 @@ static BOOL srv_spoolss_sendnotify(POLICY_HND *handle)
/*srv_spoolss_receive_message(printer);*/ /*srv_spoolss_receive_message(printer);*/
DEBUG(10,("srv_spoolss_sendnotify: Sending message about printer %s\n", printer )); DEBUG(10,("srv_spoolss_sendnotify: Sending message about printer %s\n", printer ));
message_send_all(MSG_PRINTER_NOTIFY, printer, strlen(printer) + 1); /* Null terminate... */ message_send_all(MSG_PRINTER_NOTIFY, printer, strlen(printer) + 1, False); /* Null terminate... */
return True; return True;
} }

View File

@ -36,12 +36,12 @@ void pong_message(int msg_type, pid_t src, void *buf, size_t len)
pong_count++; pong_count++;
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
pid_t pid; pid_t pid;
int i, n; int i, n;
static pstring servicesf = CONFIGFILE; static pstring servicesf = CONFIGFILE;
char buf[12];
TimeInit(); TimeInit();
setup_logging(argv[0],True); setup_logging(argv[0],True);
@ -52,13 +52,18 @@ void pong_message(int msg_type, pid_t src, void *buf, size_t len)
message_init(); message_init();
if (argc != 3) {
fprintf(stderr, "%s: Usage - %s pid count\n", argv[0], argv[0]);
exit(1);
}
pid = atoi(argv[1]); pid = atoi(argv[1]);
n = atoi(argv[2]); n = atoi(argv[2]);
message_register(MSG_PONG, pong_message); message_register(MSG_PONG, pong_message);
for (i=0;i<n;i++) { for (i=0;i<n;i++) {
message_send_pid(pid, MSG_PING, NULL, 0); message_send_pid(pid, MSG_PING, NULL, 0, True);
} }
while (pong_count < i) { while (pong_count < i) {
@ -66,6 +71,26 @@ void pong_message(int msg_type, pid_t src, void *buf, size_t len)
msleep(1); msleep(1);
} }
/* Now test that the duplicate filtering code works. */
pong_count = 0;
safe_strcpy(buf, "1234567890", sizeof(buf)-1);
for (i=0;i<n;i++) {
message_send_pid(getpid(), MSG_PING, NULL, 0, False);
message_send_pid(getpid(), MSG_PING, buf, 11, False);
}
for (i=0;i<n;i++) {
message_dispatch();
msleep(1);
}
if (pong_count != 2) {
fprintf(stderr, "Duplicate filter failed (%d).\n", pong_count);
exit(1);
}
return (0); return (0);
} }

View File

@ -115,13 +115,13 @@ void profilelevel_function(int msg_type, pid_t src, void *buf, size_t len)
/**************************************************************************** /****************************************************************************
send a message to a named destination send a message to a named destination
****************************************************************************/ ****************************************************************************/
static BOOL send_message(char *dest, int msg_type, void *buf, int len) static BOOL send_message(char *dest, int msg_type, void *buf, int len, BOOL duplicates)
{ {
pid_t pid; pid_t pid;
/* "smbd" is the only broadcast operation */ /* "smbd" is the only broadcast operation */
if (strequal(dest,"smbd")) { if (strequal(dest,"smbd")) {
return message_send_all(msg_type, buf, len); return message_send_all(msg_type, buf, len, duplicates);
} else if (strequal(dest,"nmbd")) { } else if (strequal(dest,"nmbd")) {
pid = pidfile_pid(dest); pid = pidfile_pid(dest);
if (pid == 0) { if (pid == 0) {
@ -136,7 +136,7 @@ static BOOL send_message(char *dest, int msg_type, void *buf, int len)
} }
} }
return message_send_pid(pid, msg_type, buf, len); return message_send_pid(pid, msg_type, buf, len, duplicates);
} }
/**************************************************************************** /****************************************************************************
@ -174,7 +174,7 @@ static BOOL do_command(char *dest, char *msg_name, char *params)
return(False); return(False);
} }
v = atoi(params); v = atoi(params);
send_message(dest, MSG_DEBUG, &v, sizeof(int)); send_message(dest, MSG_DEBUG, &v, sizeof(int), False);
break; break;
case MSG_PROFILE: case MSG_PROFILE:
@ -195,7 +195,7 @@ static BOOL do_command(char *dest, char *msg_name, char *params)
"MSG_PROFILE parameter must be off, count, on, or flush\n"); "MSG_PROFILE parameter must be off, count, on, or flush\n");
return(False); return(False);
} }
send_message(dest, MSG_PROFILE, &v, sizeof(int)); send_message(dest, MSG_PROFILE, &v, sizeof(int), False);
break; break;
case MSG_FORCE_ELECTION: case MSG_FORCE_ELECTION:
@ -203,7 +203,7 @@ static BOOL do_command(char *dest, char *msg_name, char *params)
fprintf(stderr,"force-election can only be sent to nmbd\n"); fprintf(stderr,"force-election can only be sent to nmbd\n");
return(False); return(False);
} }
send_message(dest, MSG_FORCE_ELECTION, NULL, 0); send_message(dest, MSG_FORCE_ELECTION, NULL, 0, False);
break; break;
case MSG_REQ_PROFILELEVEL: case MSG_REQ_PROFILELEVEL:
@ -212,7 +212,7 @@ static BOOL do_command(char *dest, char *msg_name, char *params)
profilelevel_registered = True; profilelevel_registered = True;
} }
got_level = False; got_level = False;
retval = send_message(dest, MSG_REQ_PROFILELEVEL, NULL, 0); retval = send_message(dest, MSG_REQ_PROFILELEVEL, NULL, 0, True);
if (retval) { if (retval) {
timeout_start = time(NULL); timeout_start = time(NULL);
while (!got_level) { while (!got_level) {
@ -231,7 +231,7 @@ static BOOL do_command(char *dest, char *msg_name, char *params)
debuglevel_registered = True; debuglevel_registered = True;
} }
got_level = False; got_level = False;
retval = send_message(dest, MSG_REQ_DEBUGLEVEL, NULL, 0); retval = send_message(dest, MSG_REQ_DEBUGLEVEL, NULL, 0, True);
if (retval) { if (retval) {
timeout_start = time(NULL); timeout_start = time(NULL);
while (!got_level) { while (!got_level) {
@ -254,7 +254,7 @@ static BOOL do_command(char *dest, char *msg_name, char *params)
return (False); return (False);
} }
retval = send_message(dest, MSG_PRINTER_NOTIFY, params, retval = send_message(dest, MSG_PRINTER_NOTIFY, params,
strlen(params) + 1); strlen(params) + 1, False);
break; break;
case MSG_PING: case MSG_PING:
@ -269,7 +269,7 @@ static BOOL do_command(char *dest, char *msg_name, char *params)
n = atoi(params); n = atoi(params);
pong_count = 0; pong_count = 0;
for (i=0;i<n;i++) { for (i=0;i<n;i++) {
retval = send_message(dest, MSG_PING, NULL, 0); retval = send_message(dest, MSG_PING, NULL, 0, True);
if (retval == False) break; if (retval == False) break;
} }
if (retval) { if (retval) {