From 6e88639ed99564b92da9a069ae6c42d1ebe09678 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Thu, 23 Jul 2015 12:08:42 +0200 Subject: [PATCH] s4:ldap_server: add support for async notification requests This is a simplified version that works with the current dsdb_notification module that requires the caller to retry periodically. We do that every 5 seconds or 100 microseconds if we're forcing a retry. Signed-off-by: Stefan Metzmacher Reviewed-by: Garming Sam Reviewed-by: Andrew Bartlett --- source4/ldap_server/ldap_backend.c | 82 ++++++++++++++++- source4/ldap_server/ldap_bind.c | 22 ++++- source4/ldap_server/ldap_extended.c | 8 +- source4/ldap_server/ldap_server.c | 138 ++++++++++++++++++++++++++-- source4/ldap_server/ldap_server.h | 15 +++ 5 files changed, 248 insertions(+), 17 deletions(-) diff --git a/source4/ldap_server/ldap_backend.c b/source4/ldap_server/ldap_backend.c index 7efb7ed4521..6a8a0cf5494 100644 --- a/source4/ldap_server/ldap_backend.c +++ b/source4/ldap_server/ldap_backend.c @@ -513,6 +513,7 @@ static NTSTATUS ldapsrv_SearchRequest(struct ldapsrv_call *call) struct ldb_search_options_control *search_options; struct ldb_control *extended_dn_control; struct ldb_extended_dn_control *extended_dn_decoded = NULL; + struct ldb_control *notification_control = NULL; enum ldb_scope scope = LDB_SCOPE_DEFAULT; const char **attrs = NULL; const char *scope_str, *errstr = NULL; @@ -617,6 +618,31 @@ static NTSTATUS ldapsrv_SearchRequest(struct ldapsrv_call *call) } } + notification_control = ldb_request_get_control(lreq, LDB_CONTROL_NOTIFICATION_OID); + if (notification_control != NULL) { + const struct ldapsrv_call *pc = NULL; + size_t count = 0; + + for (pc = call->conn->pending_calls; pc != NULL; pc = pc->next) { + count += 1; + } + + if (count >= call->conn->limits.max_notifications) { + DEBUG(10,("SearchRequest: error MaxNotificationPerConn\n")); + result = map_ldb_error(local_ctx, + LDB_ERR_ADMIN_LIMIT_EXCEEDED, + "MaxNotificationPerConn reached", + &errstr); + goto reply; + } + + /* + * For now we need to do periodic retries on our own. + * As the dsdb_notification module will return after each run. + */ + call->notification.busy = true; + } + ldb_set_timeout(samdb, lreq, req->timelimit); if (!call->conn->is_privileged) { @@ -667,6 +693,22 @@ queue_reply: ldapsrv_queue_reply(call, ent_r); } + if (call->notification.busy) { + /* Move/Add it to the end */ + DLIST_DEMOTE(call->conn->pending_calls, call); + call->notification.generation = + call->conn->service->notification.generation; + + if (res->count != 0) { + call->notification.generation += 1; + ldapsrv_notification_retry_setup(call->conn->service, + true); + } + + talloc_free(local_ctx); + return NT_STATUS_OK; + } + /* Send back referrals if they do exist (search operations) */ if (res->refs != NULL) { char **ref; @@ -691,6 +733,9 @@ queue_reply: } reply: + DLIST_REMOVE(call->conn->pending_calls, call); + call->notification.busy = false; + done_r = ldapsrv_init_reply(call, LDAP_TAG_SearchResultDone); NT_STATUS_HAVE_NO_MEMORY(done_r); @@ -1157,8 +1202,23 @@ static NTSTATUS ldapsrv_CompareRequest(struct ldapsrv_call *call) static NTSTATUS ldapsrv_AbandonRequest(struct ldapsrv_call *call) { -/* struct ldap_AbandonRequest *req = &call->request.r.AbandonRequest;*/ + struct ldap_AbandonRequest *req = &call->request->r.AbandonRequest; + struct ldapsrv_call *c = NULL; + struct ldapsrv_call *n = NULL; + DEBUG(10, ("AbandonRequest\n")); + + for (c = call->conn->pending_calls; c != NULL; c = n) { + n = c->next; + + if (c->request->messageid != req->messageid) { + continue; + } + + DLIST_REMOVE(call->conn->pending_calls, c); + TALLOC_FREE(c); + } + return NT_STATUS_OK; } @@ -1169,6 +1229,8 @@ NTSTATUS ldapsrv_do_call(struct ldapsrv_call *call) struct ldb_context *samdb = call->conn->ldb; NTSTATUS status; time_t *lastts; + bool recheck_schema = false; + /* Check for undecoded critical extensions */ for (i=0; msg->controls && msg->controls[i]; i++) { if (!msg->controls_decoded[i] && @@ -1187,26 +1249,31 @@ NTSTATUS ldapsrv_do_call(struct ldapsrv_call *call) case LDAP_TAG_SearchRequest: return ldapsrv_SearchRequest(call); case LDAP_TAG_ModifyRequest: + recheck_schema = true; status = ldapsrv_ModifyRequest(call); break; case LDAP_TAG_AddRequest: + recheck_schema = true; status = ldapsrv_AddRequest(call); break; case LDAP_TAG_DelRequest: - return ldapsrv_DelRequest(call); + status = ldapsrv_DelRequest(call); + break; case LDAP_TAG_ModifyDNRequest: - return ldapsrv_ModifyDNRequest(call); + status = ldapsrv_ModifyDNRequest(call); + break; case LDAP_TAG_CompareRequest: return ldapsrv_CompareRequest(call); case LDAP_TAG_AbandonRequest: return ldapsrv_AbandonRequest(call); case LDAP_TAG_ExtendedRequest: - return ldapsrv_ExtendedRequest(call); + status = ldapsrv_ExtendedRequest(call); + break; default: return ldapsrv_unwilling(call, LDAP_PROTOCOL_ERROR); } - if (NT_STATUS_IS_OK(status)) { + if (NT_STATUS_IS_OK(status) && recheck_schema) { lastts = (time_t *)ldb_get_opaque(samdb, DSDB_OPAQUE_LAST_SCHEMA_UPDATE_MSG_OPAQUE_NAME); if (lastts && !*lastts) { DEBUG(10, ("Schema update now was requested, " @@ -1222,5 +1289,10 @@ NTSTATUS ldapsrv_do_call(struct ldapsrv_call *call) ldb_set_opaque(samdb, DSDB_OPAQUE_LAST_SCHEMA_UPDATE_MSG_OPAQUE_NAME, lastts); } } + + if (NT_STATUS_IS_OK(status)) { + ldapsrv_notification_retry_setup(call->conn->service, true); + } + return status; } diff --git a/source4/ldap_server/ldap_bind.c b/source4/ldap_server/ldap_bind.c index fcbdadf52ee..77dfe902fdf 100644 --- a/source4/ldap_server/ldap_bind.c +++ b/source4/ldap_server/ldap_bind.c @@ -333,11 +333,25 @@ NTSTATUS ldapsrv_BindRequest(struct ldapsrv_call *call) struct ldapsrv_reply *reply; struct ldap_BindResponse *resp; + if (call->conn->pending_calls != NULL) { + reply = ldapsrv_init_reply(call, LDAP_TAG_BindResponse); + if (!reply) { + return NT_STATUS_NO_MEMORY; + } + + resp = &reply->msg->r.BindResponse; + resp->response.resultcode = LDAP_BUSY; + resp->response.dn = NULL; + resp->response.errormessage = talloc_asprintf(reply, "Pending requests on this LDAP session"); + resp->response.referral = NULL; + resp->SASL.secblob = NULL; + + ldapsrv_queue_reply(call, reply); + return NT_STATUS_OK; + } + /* - * TODO: we should fail the bind request - * if there're any pending requests. - * - * also a simple bind should cancel an + * TODO: a simple bind should cancel an * inprogress SASL bind. * (see RFC 4513) */ diff --git a/source4/ldap_server/ldap_extended.c b/source4/ldap_server/ldap_extended.c index 338858f0347..2d4a5345460 100644 --- a/source4/ldap_server/ldap_extended.c +++ b/source4/ldap_server/ldap_extended.c @@ -112,8 +112,7 @@ static NTSTATUS ldapsrv_StartTLS(struct ldapsrv_call *call, /* * TODO: give LDAP_OPERATIONS_ERROR also when - * there're pending requests or there's - * a SASL bind in progress + * there's a SASL bind in progress * (see rfc4513 section 3.1.1) */ if (call->conn->sockets.tls) { @@ -126,6 +125,11 @@ static NTSTATUS ldapsrv_StartTLS(struct ldapsrv_call *call, return NT_STATUS_LDAP(LDAP_OPERATIONS_ERROR); } + if (call->conn->pending_calls != NULL) { + (*errstr) = talloc_asprintf(reply, "START-TLS: pending requests on this LDAP session"); + return NT_STATUS_LDAP(LDAP_BUSY); + } + context = talloc(call, struct ldapsrv_starttls_postprocess_context); NT_STATUS_HAVE_NO_MEMORY(context); diff --git a/source4/ldap_server/ldap_server.c b/source4/ldap_server/ldap_server.c index 02c3c952920..9b2f18595fc 100644 --- a/source4/ldap_server/ldap_server.c +++ b/source4/ldap_server/ldap_server.c @@ -62,6 +62,8 @@ static void ldapsrv_terminate_connection(struct ldapsrv_connection *conn, return; } + DLIST_REMOVE(conn->service->connections, conn); + conn->limits.endtime = timeval_current_ofs(0, 500); tevent_queue_stop(conn->sockets.send_queue); @@ -167,6 +169,7 @@ static int ldapsrv_load_limits(struct ldapsrv_connection *conn) conn->limits.initial_timeout = 120; conn->limits.conn_idle_time = 900; conn->limits.max_page_size = 1000; + conn->limits.max_notifications = 5; conn->limits.search_timeout = 120; @@ -233,6 +236,10 @@ static int ldapsrv_load_limits(struct ldapsrv_connection *conn) conn->limits.max_page_size = policy_value; continue; } + if (strcasecmp("MaxNotificationPerConn", policy_name) == 0) { + conn->limits.max_notifications = policy_value; + continue; + } if (strcasecmp("MaxQueryDuration", policy_name) == 0) { conn->limits.search_timeout = policy_value; continue; @@ -347,6 +354,8 @@ static void ldapsrv_accept(struct stream_connection *c, /* register the server */ irpc_add_name(c->msg_ctx, "ldap_server"); + DLIST_ADD_END(ldapsrv_service->connections, conn); + if (port != 636 && port != 3269) { ldapsrv_call_read_next(conn); return; @@ -405,7 +414,11 @@ static bool ldapsrv_call_read_next(struct ldapsrv_connection *conn) { struct tevent_req *subreq; - if (timeval_is_zero(&conn->limits.endtime)) { + if (conn->pending_calls != NULL) { + conn->limits.endtime = timeval_zero(); + + ldapsrv_notification_retry_setup(conn->service, false); + } else if (timeval_is_zero(&conn->limits.endtime)) { conn->limits.endtime = timeval_current_ofs(conn->limits.initial_timeout, 0); } else { @@ -456,9 +469,11 @@ static bool ldapsrv_call_read_next(struct ldapsrv_connection *conn) "no memory for tstream_read_pdu_blob_send"); return false; } - tevent_req_set_endtime(subreq, - conn->connection->event.ctx, - conn->limits.endtime); + if (!timeval_is_zero(&conn->limits.endtime)) { + tevent_req_set_endtime(subreq, + conn->connection->event.ctx, + conn->limits.endtime); + } tevent_req_set_callback(subreq, ldapsrv_call_read_done, conn); conn->sockets.read_req = subreq; return true; @@ -544,6 +559,7 @@ static void ldapsrv_call_read_done(struct tevent_req *subreq) conn->active_call = subreq; } + static void ldapsrv_call_writev_done(struct tevent_req *subreq); static void ldapsrv_call_process_done(struct tevent_req *subreq) @@ -590,7 +606,9 @@ static void ldapsrv_call_process_done(struct tevent_req *subreq) } if (blob.length == 0) { - TALLOC_FREE(call); + if (!call->notification.busy) { + TALLOC_FREE(call); + } ldapsrv_call_read_next(conn); return; @@ -654,7 +672,9 @@ static void ldapsrv_call_writev_done(struct tevent_req *subreq) return; } - TALLOC_FREE(call); + if (!call->notification.busy) { + TALLOC_FREE(call); + } ldapsrv_call_read_next(conn); } @@ -688,6 +708,112 @@ static void ldapsrv_call_postprocess_done(struct tevent_req *subreq) ldapsrv_call_read_next(conn); } +static void ldapsrv_notification_retry_done(struct tevent_req *subreq); + +void ldapsrv_notification_retry_setup(struct ldapsrv_service *service, bool force) +{ + struct ldapsrv_connection *conn = NULL; + struct timeval retry; + size_t num_pending = 0; + size_t num_active = 0; + + if (force) { + TALLOC_FREE(service->notification.retry); + service->notification.generation += 1; + } + + if (service->notification.retry != NULL) { + return; + } + + for (conn = service->connections; conn != NULL; conn = conn->next) { + if (conn->pending_calls == NULL) { + continue; + } + + num_pending += 1; + + if (conn->pending_calls->notification.generation != + service->notification.generation) + { + num_active += 1; + } + } + + if (num_pending == 0) { + return; + } + + if (num_active != 0) { + retry = timeval_current_ofs(0, 100); + } else { + retry = timeval_current_ofs(5, 0); + } + + service->notification.retry = tevent_wakeup_send(service, + service->task->event_ctx, + retry); + if (service->notification.retry == NULL) { + /* retry later */ + return; + } + + tevent_req_set_callback(service->notification.retry, + ldapsrv_notification_retry_done, + service); +} + +static void ldapsrv_notification_retry_done(struct tevent_req *subreq) +{ + struct ldapsrv_service *service = + tevent_req_callback_data(subreq, + struct ldapsrv_service); + struct ldapsrv_connection *conn = NULL; + struct ldapsrv_connection *conn_next = NULL; + bool ok; + + service->notification.retry = NULL; + + ok = tevent_wakeup_recv(subreq); + TALLOC_FREE(subreq); + if (!ok) { + /* ignore */ + } + + for (conn = service->connections; conn != NULL; conn = conn_next) { + struct ldapsrv_call *call = conn->pending_calls; + + conn_next = conn->next; + + if (conn->pending_calls == NULL) { + continue; + } + + if (conn->active_call != NULL) { + continue; + } + + DLIST_DEMOTE(conn->pending_calls, call); + call->notification.generation = + service->notification.generation; + + /* queue the call in the global queue */ + subreq = ldapsrv_process_call_send(call, + conn->connection->event.ctx, + conn->service->call_queue, + call); + if (subreq == NULL) { + ldapsrv_terminate_connection(conn, + "ldapsrv_process_call_send failed"); + continue; + } + tevent_req_set_callback(subreq, ldapsrv_call_process_done, call); + conn->active_call = subreq; + } + + ldapsrv_notification_retry_setup(service, false); +} + struct ldapsrv_process_call_state { struct ldapsrv_call *call; }; diff --git a/source4/ldap_server/ldap_server.h b/source4/ldap_server/ldap_server.h index bfd95c05c18..27e0f1322bb 100644 --- a/source4/ldap_server/ldap_server.h +++ b/source4/ldap_server/ldap_server.h @@ -24,6 +24,7 @@ #include "system/network.h" struct ldapsrv_connection { + struct ldapsrv_connection *next, *prev; struct loadparm_context *lp_ctx; struct stream_connection *connection; struct gensec_security *gensec; @@ -48,15 +49,19 @@ struct ldapsrv_connection { int initial_timeout; int conn_idle_time; int max_page_size; + int max_notifications; int search_timeout; struct timeval endtime; const char *reason; } limits; struct tevent_req *active_call; + + struct ldapsrv_call *pending_calls; }; struct ldapsrv_call { + struct ldapsrv_call *prev, *next; struct ldapsrv_connection *conn; struct ldap_message *request; struct ldapsrv_reply { @@ -70,12 +75,22 @@ struct ldapsrv_call { void *private_data); NTSTATUS (*postprocess_recv)(struct tevent_req *req); void *postprocess_private; + + struct { + bool busy; + uint64_t generation; + } notification; }; struct ldapsrv_service { struct tstream_tls_params *tls_params; struct task_server *task; struct tevent_queue *call_queue; + struct ldapsrv_connection *connections; + struct { + uint64_t generation; + struct tevent_req *retry; + } notification; }; #include "ldap_server/proto.h"