From fa1a5660bb2ef7fabd72ad9b993dd91d59ac409e Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Wed, 14 Dec 2005 19:19:43 +0000 Subject: [PATCH] r12242: - make the push notifications triggered by the change count - for now we fake the change count to '1', so we'll still have periodicly triggered push notifies, the interval is the 'wreplsrv:periodic_interval=60' - add the 'pushUseInform' attribute to the wreplPartner objectClass to configure if we'll use WREPL_REPL_INFORM notifies metze --- source/wrepl_server/config.mk | 3 +- source/wrepl_server/wrepl_out_connection.c | 78 ------------ source/wrepl_server/wrepl_out_push.c | 134 +++++++++++++++++++++ source/wrepl_server/wrepl_periodic.c | 3 + source/wrepl_server/wrepl_server.c | 1 + source/wrepl_server/wrepl_server.h | 19 ++- 6 files changed, 155 insertions(+), 83 deletions(-) create mode 100644 source/wrepl_server/wrepl_out_push.c diff --git a/source/wrepl_server/config.mk b/source/wrepl_server/config.mk index 6a6ce7185aa..53172864d50 100644 --- a/source/wrepl_server/config.mk +++ b/source/wrepl_server/config.mk @@ -10,7 +10,8 @@ INIT_OBJ_FILES = \ wrepl_out_connection.o \ wrepl_out_helpers.o \ wrepl_apply_records.o \ - wrepl_periodic.o + wrepl_periodic.o \ + wrepl_out_push.o REQUIRED_SUBSYSTEMS = \ LIBCLI_WREPL WINSDB # End SUBSYSTEM WREPL_SRV diff --git a/source/wrepl_server/wrepl_out_connection.c b/source/wrepl_server/wrepl_out_connection.c index e1bade5f877..31a72307d70 100644 --- a/source/wrepl_server/wrepl_out_connection.c +++ b/source/wrepl_server/wrepl_out_connection.c @@ -158,79 +158,6 @@ requeue: return NT_STATUS_OK; } -static void wreplsrv_push_handler_te(struct event_context *ev, struct timed_event *te, - struct timeval t, void *ptr); - -static void wreplsrv_push_handler_creq(struct composite_context *creq) -{ - struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner); - uint32_t interval; - - partner->push.last_status = wreplsrv_push_notify_recv(partner->push.creq); - partner->push.creq = NULL; - talloc_free(partner->push.notify_io); - partner->push.notify_io = NULL; - - if (!NT_STATUS_IS_OK(partner->push.last_status)) { - interval = 15; - - DEBUG(1,("wreplsrv_push_notify(%s): %s: next: %us\n", - partner->address, nt_errstr(partner->push.last_status), - interval)); - } else { - interval = 100; - - DEBUG(2,("wreplsrv_push_notify(%s): %s: next: %us\n", - partner->address, nt_errstr(partner->push.last_status), - interval)); - } - - partner->push.te = event_add_timed(partner->service->task->event_ctx, partner, - timeval_current_ofs(interval, 0), - wreplsrv_push_handler_te, partner); - if (!partner->push.te) { - DEBUG(0,("wreplsrv_push_handler_creq: event_add_timed() failed! no memory!\n")); - } -} - -static void wreplsrv_push_handler_te(struct event_context *ev, struct timed_event *te, - struct timeval t, void *ptr) -{ - struct wreplsrv_partner *partner = talloc_get_type(ptr, struct wreplsrv_partner); - - partner->push.te = NULL; - - partner->push.notify_io = talloc(partner, struct wreplsrv_push_notify_io); - if (!partner->push.notify_io) { - goto requeue; - } - - partner->push.notify_io->in.partner = partner; - partner->push.notify_io->in.inform = False; - partner->push.notify_io->in.propagate = False; - partner->push.creq = wreplsrv_push_notify_send(partner->push.notify_io, partner->push.notify_io); - if (!partner->push.creq) { - DEBUG(1,("wreplsrv_push_notify_send(%s) failed\n", - partner->address)); - goto requeue; - } - - partner->push.creq->async.fn = wreplsrv_push_handler_creq; - partner->push.creq->async.private_data = partner; - - return; -requeue: - talloc_free(partner->push.notify_io); - partner->push.notify_io = NULL; - /* retry later */ - partner->push.te = event_add_timed(partner->service->task->event_ctx, partner, - timeval_add(&t, 5, 0), - wreplsrv_push_handler_te, partner); - if (!partner->push.te) { - DEBUG(0,("wreplsrv_push_handler_te: event_add_timed() failed! no memory!\n")); - } -} - NTSTATUS wreplsrv_setup_out_connections(struct wreplsrv_service *service) { struct wreplsrv_partner *cur; @@ -241,11 +168,6 @@ NTSTATUS wreplsrv_setup_out_connections(struct wreplsrv_service *service) timeval_zero(), wreplsrv_pull_handler_te, cur); NT_STATUS_HAVE_NO_MEMORY(cur->pull.te); } - if ((cur->type & WINSREPL_PARTNER_PUSH) && cur->push.change_count) { - cur->push.te = event_add_timed(service->task->event_ctx, cur, - timeval_zero(), wreplsrv_push_handler_te, cur); - NT_STATUS_HAVE_NO_MEMORY(cur->push.te); - } } return NT_STATUS_OK; diff --git a/source/wrepl_server/wrepl_out_push.c b/source/wrepl_server/wrepl_out_push.c new file mode 100644 index 00000000000..d3e4bdb44b6 --- /dev/null +++ b/source/wrepl_server/wrepl_out_push.c @@ -0,0 +1,134 @@ +/* + Unix SMB/CIFS implementation. + + WINS Replication server + + Copyright (C) Stefan Metzmacher 2005 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +*/ + +#include "includes.h" +#include "dlinklist.h" +#include "lib/events/events.h" +#include "lib/socket/socket.h" +#include "smbd/service_task.h" +#include "smbd/service_stream.h" +#include "lib/messaging/irpc.h" +#include "librpc/gen_ndr/ndr_winsrepl.h" +#include "wrepl_server/wrepl_server.h" +#include "nbt_server/wins/winsdb.h" +#include "ldb/include/ldb.h" +#include "libcli/composite/composite.h" +#include "libcli/wrepl/winsrepl.h" +#include "wrepl_server/wrepl_out_helpers.h" + +static void wreplsrv_out_partner_push(struct wreplsrv_partner *partner, BOOL propagate); + +static void wreplsrv_push_handler_creq(struct composite_context *creq) +{ + struct wreplsrv_partner *partner = talloc_get_type(creq->async.private_data, struct wreplsrv_partner); + + partner->push.last_status = wreplsrv_push_notify_recv(partner->push.creq); + partner->push.creq = NULL; + talloc_free(partner->push.notify_io); + partner->push.notify_io = NULL; + + partner->push.last_run = timeval_current(); + + if (NT_STATUS_IS_OK(partner->push.last_status)) { + partner->push.error_count = 0; + DEBUG(2,("wreplsrv_push_notify(%s): %s\n", + partner->address, nt_errstr(partner->push.last_status))); + return; + } + + partner->push.error_count++; + + if (partner->push.error_count > 1) { + DEBUG(1,("wreplsrv_push_notify(%s): %s: error_count: %u: giving up\n", + partner->address, nt_errstr(partner->push.last_status), + partner->push.error_count)); + return; + } + + DEBUG(1,("wreplsrv_push_notify(%s): %s: error_count: %u: retry\n", + partner->address, nt_errstr(partner->push.last_status), + partner->push.error_count)); + wreplsrv_out_partner_push(partner, partner->push.notify_io->in.propagate); +} + +static void wreplsrv_out_partner_push(struct wreplsrv_partner *partner, BOOL propagate) +{ + /* a push for this partner is currently in progress, so we're done */ + if (partner->push.creq) return; + + /* now prepare the push notify */ + partner->push.notify_io = talloc(partner, struct wreplsrv_push_notify_io); + if (!partner->push.notify_io) { + goto nomem; + } + + partner->push.notify_io->in.partner = partner; + partner->push.notify_io->in.inform = partner->push.use_inform; + partner->push.notify_io->in.propagate = propagate; + partner->push.creq = wreplsrv_push_notify_send(partner->push.notify_io, partner->push.notify_io); + if (!partner->push.creq) { + DEBUG(1,("wreplsrv_push_notify_send(%s) failed\n", + partner->address)); + goto nomem; + } + + partner->push.creq->async.fn = wreplsrv_push_handler_creq; + partner->push.creq->async.private_data = partner; + + return; +nomem: + talloc_free(partner->push.notify_io); + partner->push.notify_io = NULL; + DEBUG(1,("wreplsrv_push_notify_send(%s) failed nomem? (ignoring)\n", + partner->address)); + return; +} + +static uint32_t wreplsrv_calc_change_count(struct wreplsrv_partner *partner) +{ + /* TODO: add a real implementation here */ + return (uint32_t)-1; +} + +uint32_t wreplsrv_out_push_run(struct wreplsrv_service *service, uint32_t next_interval) +{ + struct wreplsrv_partner *partner; + uint32_t change_count; + + for (partner = service->partners; partner; partner = partner->next) { + /* if it's not a push partner, go to the next partner */ + if (!(partner->type & WINSREPL_PARTNER_PUSH)) continue; + + /* if push notifies are disabled for this partner, go to the next partner */ + if (partner->push.change_count == 0) continue; + + /* get the actual change count for the partner */ + change_count = wreplsrv_calc_change_count(partner); + + /* if the configured change count isn't reached, go to the next partner */ + if (change_count < partner->push.change_count) continue; + + wreplsrv_out_partner_push(partner, False); + } + + return next_interval; +} diff --git a/source/wrepl_server/wrepl_periodic.c b/source/wrepl_server/wrepl_periodic.c index 4ba047d3a00..12e1448139c 100644 --- a/source/wrepl_server/wrepl_periodic.c +++ b/source/wrepl_server/wrepl_periodic.c @@ -37,6 +37,8 @@ static uint32_t wreplsrv_periodic_run(struct wreplsrv_service *service, uint32_t next_interval) { + next_interval = wreplsrv_out_push_run(service, next_interval); + DEBUG(2,("wreplsrv_periodic_run: next in %u secs\n", next_interval)); return next_interval; } @@ -48,6 +50,7 @@ static void wreplsrv_periodic_handler_te(struct event_context *ev, struct timed_ uint32_t next_interval; service->periodic.te = NULL; + service->periodic.current_event = t; next_interval = wreplsrv_periodic_run(service, service->config.periodic_interval); diff --git a/source/wrepl_server/wrepl_server.c b/source/wrepl_server/wrepl_server.c index 5a7ad57f867..157a39ec854 100644 --- a/source/wrepl_server/wrepl_server.c +++ b/source/wrepl_server/wrepl_server.c @@ -109,6 +109,7 @@ static NTSTATUS wreplsrv_load_partners(struct wreplsrv_service *service) partner->our_address = ldb_msg_find_string(res->msgs[i], "ourAddress", NULL); partner->push.change_count = ldb_msg_find_uint(res->msgs[i], "pushChangeCount", WINSREPL_DEFAULT_PUSH_CHANGE_COUNT); + partner->push.use_inform = ldb_msg_find_uint(res->msgs[i], "pushUseInform", False); talloc_steal(partner, partner->address); talloc_steal(partner, partner->name); diff --git a/source/wrepl_server/wrepl_server.h b/source/wrepl_server/wrepl_server.h index 0e0ed35e946..e781d85ab85 100644 --- a/source/wrepl_server/wrepl_server.h +++ b/source/wrepl_server/wrepl_server.h @@ -169,9 +169,18 @@ struct wreplsrv_partner { /* change count till push notification */ uint32_t change_count; + /* we should use WREPL_REPL_INFORM* messages to this partner */ + BOOL use_inform; + + /* the error count till the last success */ + uint32_t error_count; + /* the status of the last push cycle */ NTSTATUS last_status; + /* the timestamp of the last run */ + struct timeval last_run; + /* the outgoing connection to the partner */ struct wreplsrv_out_connection *wreplconn; @@ -180,9 +189,6 @@ struct wreplsrv_partner { /* the pull cycle io params */ struct wreplsrv_push_notify_io *notify_io; - - /* the current timed_event to the next push notify */ - struct timed_event *te; } push; }; @@ -252,7 +258,12 @@ struct wreplsrv_service { /* some stuff for periodic processing */ struct { /* - * the timestamp for the current or next event, + * the timestamp for the current event, + */ + struct timeval current_event; + + /* + * the timestamp for the next event, * this is the timstamp passed to event_add_timed() */ struct timeval next_event;