1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-26 21:57:41 +03:00

s3-mdssvc: use tevent_glib_glue in mdssvc RPC service

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Noel Power <npower@samba.org>
This commit is contained in:
Ralph Boehme 2016-01-27 13:23:51 +01:00
parent bc053abdd1
commit 55b2cca14a
3 changed files with 77 additions and 148 deletions

View File

@ -28,6 +28,7 @@
#include "libcli/security/dom_sid.h"
#include "mdssvc.h"
#include "rpc_server/mdssvc/sparql_parser.tab.h"
#include "lib/tevent_glib_glue.h"
#undef DBGC_CLASS
#define DBGC_CLASS DBGC_RPC_SRV
@ -64,6 +65,15 @@ struct slq_destroy_state {
struct sl_query *slq;
};
/*
* This is a static global because we may be called multiple times and
* we only want one mdssvc_ctx per connection to Tracker.
*
* The client will bind multiple times to the mdssvc RPC service, once
* for every tree connect.
*/
static struct mdssvc_ctx *mdssvc_ctx = NULL;
/*
* If these functions return an error, they hit something like a non
* recoverable talloc error. Most errors are dealt with by returning
@ -731,7 +741,6 @@ static void tracker_con_cb(GObject *object,
}
DEBUG(10, ("connected to Tracker\n"));
g_main_loop_quit(mds_ctx->gmainloop);
}
static void tracker_cursor_cb_destroy_done(struct tevent_req *subreq);
@ -770,7 +779,6 @@ static void tracker_cursor_cb(GObject *object,
* we return.
*/
SLQ_DEBUG(10, slq, "closed");
g_main_loop_quit(slq->mds_ctx->gmainloop);
req = slq_destroy_send(slq, global_event_context(), &slq);
if (req == NULL) {
@ -785,13 +793,11 @@ static void tracker_cursor_cb(GObject *object,
DEBUG(1, ("Tracker cursor: %s\n", error->message));
g_error_free(error);
slq->state = SLQ_STATE_ERROR;
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
if (!more_results) {
slq->state = SLQ_STATE_DONE;
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
@ -799,14 +805,12 @@ static void tracker_cursor_cb(GObject *object,
if (uri == NULL) {
DEBUG(1, ("error fetching Tracker URI\n"));
slq->state = SLQ_STATE_ERROR;
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
path = tracker_to_unix_path(slq->query_results, uri);
if (path == NULL) {
DEBUG(1, ("error converting Tracker URI to path: %s\n", uri));
slq->state = SLQ_STATE_ERROR;
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
@ -866,7 +870,6 @@ static void tracker_cursor_cb(GObject *object,
if (result != 0) {
DEBUG(1, ("dalloc error\n"));
slq->state = SLQ_STATE_ERROR;
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
ok = add_filemeta(slq->reqinfo, slq->query_results->fm_array,
@ -874,7 +877,6 @@ static void tracker_cursor_cb(GObject *object,
if (!ok) {
DEBUG(1, ("add_filemeta error\n"));
slq->state = SLQ_STATE_ERROR;
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
@ -882,7 +884,6 @@ static void tracker_cursor_cb(GObject *object,
if (!ok) {
DEBUG(1, ("inode_map_add error\n"));
slq->state = SLQ_STATE_ERROR;
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
@ -892,7 +893,6 @@ done:
if (slq->query_results->num_results >= MAX_SL_RESULTS) {
slq->state = SLQ_STATE_FULL;
SLQ_DEBUG(10, slq, "full");
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
@ -929,13 +929,11 @@ static void tracker_query_cb(GObject *object,
slq->state = SLQ_STATE_ERROR;
DEBUG(1, ("Tracker query error: %s\n", error->message));
g_error_free(error);
g_main_loop_quit(slq->mds_ctx->gmainloop);
return;
}
if (slq->state == SLQ_STATE_DONE) {
SLQ_DEBUG(10, slq, "done");
g_main_loop_quit(slq->mds_ctx->gmainloop);
talloc_free(slq);
return;
}
@ -1302,13 +1300,11 @@ static bool slrpc_open_query(struct mds_ctx *mds_ctx,
DEBUG(10, ("SPARQL query: \"%s\"\n", slq->sparql_query));
g_main_context_push_thread_default(mds_ctx->gcontext);
tracker_sparql_connection_query_async(mds_ctx->tracker_con,
slq->sparql_query,
slq->gcancellable,
tracker_query_cb,
slq);
g_main_context_pop_thread_default(mds_ctx->gcontext);
slq->state = SLQ_STATE_RUNNING;
sl_result = 0;
@ -1401,13 +1397,11 @@ static bool slrpc_fetch_query_results(struct mds_ctx *mds_ctx,
}
if (slq->state == SLQ_STATE_FULL) {
slq->state = SLQ_STATE_RESULTS;
g_main_context_push_thread_default(mds_ctx->gcontext);
tracker_sparql_cursor_next_async(
slq->tracker_cursor,
slq->gcancellable,
tracker_cursor_cb,
slq);
g_main_context_pop_thread_default(mds_ctx->gcontext);
}
break;
@ -1811,6 +1805,42 @@ done:
return true;
}
static struct mdssvc_ctx *mdssvc_init(struct tevent_context *ev)
{
if (mdssvc_ctx != NULL) {
return mdssvc_ctx;
}
mdssvc_ctx = talloc_zero(ev, struct mdssvc_ctx);
if (mdssvc_ctx == NULL) {
return NULL;
}
mdssvc_ctx->ev_ctx = ev;
mdssvc_ctx->gmain_ctx = g_main_context_new();
if (mdssvc_ctx->gmain_ctx == NULL) {
DBG_ERR("error from g_main_context_new\n");
return NULL;
}
/*
* This ensures all glib threads, especially gioi worker threads
* dispatch their async callbacks via our gmain_ctx.
*/
g_main_context_push_thread_default(mdssvc_ctx->gmain_ctx);
mdssvc_ctx->glue = samba_tevent_glib_glue_create(ev,
mdssvc_ctx->ev_ctx,
mdssvc_ctx->gmain_ctx);
if (mdssvc_ctx->glue == NULL) {
DBG_ERR("samba_tevent_glib_glue_create failed\n");
return NULL;
}
return mdssvc_ctx;
}
/**
* Init callbacks at startup
**/
@ -1824,21 +1854,25 @@ bool mds_init(struct messaging_context *msg_ctx)
bool mds_shutdown(void)
{
if (mdssvc_ctx == NULL) {
return false;
}
samba_tevent_glib_glue_quit(mdssvc_ctx->glue);
TALLOC_FREE(mdssvc_ctx->glue);
g_main_context_pop_thread_default(mdssvc_ctx->gmain_ctx);
TALLOC_FREE(mdssvc_ctx);
return true;
}
static gboolean gmainloop_timer(gpointer user_data)
{
struct mds_ctx *ctx = talloc_get_type_abort(user_data, struct mds_ctx);
DEBUG(10,("%s\n", __func__));
g_main_loop_quit(ctx->gmainloop);
return G_SOURCE_CONTINUE;
}
/**
* Initialise a context per share handle
* Initialise a context per RPC bind
*
* This ends up being called for every tcon, because the client does a
* RPC bind for every tcon, so this is acually a per tcon context.
**/
struct mds_ctx *mds_init_ctx(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@ -1853,6 +1887,11 @@ struct mds_ctx *mds_init_ctx(TALLOC_CTX *mem_ctx,
}
talloc_set_destructor(mds_ctx, mds_ctx_destructor_cb);
mds_ctx->mdssvc_ctx = mdssvc_init(ev);
if (mds_ctx->mdssvc_ctx == NULL) {
goto error;
}
mds_ctx->spath = talloc_strdup(mds_ctx, path);
if (mds_ctx->spath == NULL) {
goto error;
@ -1872,22 +1911,8 @@ struct mds_ctx *mds_init_ctx(TALLOC_CTX *mem_ctx,
goto error;
}
mds_ctx->gcontext = g_main_context_new();
if (mds_ctx->gcontext == NULL) {
DEBUG(1,("error from g_main_context_new\n"));
goto error;
}
mds_ctx->gmainloop = g_main_loop_new(mds_ctx->gcontext, false);
if (mds_ctx->gmainloop == NULL) {
DEBUG(1,("error from g_main_loop_new\n"));
goto error;
}
g_main_context_push_thread_default(mds_ctx->gcontext);
tracker_sparql_connection_get_async(mds_ctx->gcancellable,
tracker_con_cb, mds_ctx);
g_main_context_pop_thread_default(mds_ctx->gcontext);
return mds_ctx;
@ -1920,76 +1945,12 @@ int mds_ctx_destructor_cb(struct mds_ctx *mds_ctx)
g_cancellable_cancel(mds_ctx->gcancellable);
g_object_unref(mds_ctx->gcancellable);
}
if (mds_ctx->gmainloop != NULL) {
g_main_loop_unref(mds_ctx->gmainloop);
}
if (mds_ctx->gcontext != NULL) {
g_main_context_unref(mds_ctx->gcontext);
}
ZERO_STRUCTP(mds_ctx);
return 0;
}
static bool mds_run_gmainloop(struct mds_ctx *mds_ctx, guint timeout)
{
guint timer_id;
GSource *timer;
/*
* It seems the event processing of the libtracker-sparql
* async subsystem defers callbacks until *all* events are
* processes by the async subsystem main processing loop.
*
* g_main_context_iteration(may_block=FALSE) can't be used,
* because a search that produces a few thousand matches
* generates as many events that must be processed in either
* g_main_context_iteration() or g_main_loop_run() before
* callbacks are called.
*
* Unfortunately g_main_context_iteration() only processes a
* small subset of these event (1-30) at a time when run in
* mds_dispatch(), which happens once a second while the
* client polls for results.
*
* Carefully using the blocking g_main_loop_run() fixes
* this. It processes events until we exit from the loop at
* defined exit points. By adding a 1 ms timeout we at least
* try to get as close as possible to non-blocking behaviour.
*/
if (!g_main_context_pending(mds_ctx->gcontext)) {
return true;
}
g_main_context_push_thread_default(mds_ctx->gcontext);
timer = g_timeout_source_new(timeout);
if (timer == NULL) {
DEBUG(1,("g_timeout_source_new_seconds\n"));
g_main_context_pop_thread_default(mds_ctx->gcontext);
return false;
}
timer_id = g_source_attach(timer, mds_ctx->gcontext);
if (timer_id == 0) {
DEBUG(1,("g_timeout_add failed\n"));
g_source_destroy(timer);
g_main_context_pop_thread_default(mds_ctx->gcontext);
return false;
}
g_source_set_callback(timer, gmainloop_timer, mds_ctx, NULL);
g_main_loop_run(mds_ctx->gmainloop);
g_source_destroy(timer);
g_main_context_pop_thread_default(mds_ctx->gcontext);
return true;
}
/**
* Dispatch a Spotlight RPC command
**/
@ -2014,34 +1975,6 @@ bool mds_dispatch(struct mds_ctx *mds_ctx,
response_blob->length = 0;
/*
* Process finished glib events.
*
* FIXME: integrate with tevent instead of piggy packing it
* onto the processing of new requests.
*
* mds_dispatch() is called by the client a few times in a row:
*
* - first in order to open/start a search query
*
* - later in order to fetch results asynchronously, typically
* once a second. If no results have been retrieved from the
* search store (Tracker) yet, we return no results.
* The client asks for more results every second as long
* as the "Search Window" in the client gui is open.
*
* - at some point the query is closed
*
* This means we try to iterate through the glib event loop
* before processing the request in order to get result
* from tracker which can be returned to the client.
*/
ok = mds_run_gmainloop(mds_ctx, MDS_TRACKER_ASYNC_TIMEOUT_MS);
if (!ok) {
goto cleanup;
}
DEBUG(10, ("share path: %s\n", mds_ctx->spath));
query = dalloc_new(mds_ctx);
@ -2102,17 +2035,6 @@ bool mds_dispatch(struct mds_ctx *mds_ctx,
goto cleanup;
}
/*
* Run g_main_loop a second time in order to dispatch events
* that may have been queued at the libtracker-sparql level.
* As we only want to dispatch (write out requests) but not
* wait for anything, we use a much shorter timeout here.
*/
ok = mds_run_gmainloop(mds_ctx, MDS_TRACKER_ASYNC_TIMEOUT_MS / 10);
if (!ok) {
goto cleanup;
}
response_blob->length = len;
cleanup:

View File

@ -96,15 +96,22 @@ struct sl_inode_path_map {
char *path;
};
/* Per process state */
struct mdssvc_ctx {
struct tevent_context *ev_ctx;
GMainContext *gmain_ctx;
struct tevent_glib_glue *glue;
};
/* Per tree connect state */
struct mds_ctx {
struct mdssvc_ctx *mdssvc_ctx;
struct auth_session_info *pipe_session_info;
struct dom_sid sid;
uid_t uid;
const char *spath;
GCancellable *gcancellable;
TrackerSparqlConnection *tracker_con;
GMainContext *gcontext;
GMainLoop *gmainloop;
struct sl_query *query_list; /* list of active queries */
struct db_context *ino_path_map; /* dbwrap rbt for storing inode->path mappings */
};

View File

@ -150,7 +150,7 @@ bld.SAMBA3_MODULE('rpc_mdssvc_module',
mdssvc/srv_mdssvc_nt.c
../../librpc/gen_ndr/srv_mdssvc.c''',
init_function='',
deps='samba-util ' + bld.env['libtracker'],
deps='samba-util tevent-glib-glue ' + bld.env['libtracker'],
internal_module=bld.SAMBA3_IS_STATIC_MODULE('rpc_mdssvc_module'),
enabled=bld.SAMBA3_IS_ENABLED_MODULE('rpc_mdssvc_module'))
@ -205,5 +205,5 @@ bld.SAMBA3_SUBSYSTEM('FSSD',
bld.SAMBA3_SUBSYSTEM('MDSSD',
source='mdssd.c',
deps='RPC_SOCK_HELPER RPC_MODULES samba-util',
deps='RPC_SOCK_HELPER RPC_MODULES samba-util tevent-glib-glue',
enabled=bld.env.with_spotlight)