mirror of
https://github.com/samba-team/samba.git
synced 2025-01-26 10:04:02 +03:00
564049fbc9
We don't need any substitution for elasticsearch options. Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org>
838 lines
20 KiB
C
838 lines
20 KiB
C
/*
|
|
Unix SMB/CIFS implementation.
|
|
Main metadata server / Spotlight routines / ES backend
|
|
|
|
Copyright (C) Ralph Boehme 2019
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation; either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "includes.h"
|
|
#include "system/filesys.h"
|
|
#include "lib/util/time_basic.h"
|
|
#include "lib/tls/tls.h"
|
|
#include "lib/util/tevent_ntstatus.h"
|
|
#include "libcli/http/http.h"
|
|
#include "lib/util/tevent_unix.h"
|
|
#include "credentials.h"
|
|
#include "mdssvc.h"
|
|
#include "mdssvc_es.h"
|
|
#include "rpc_server/mdssvc/es_parser.tab.h"
|
|
|
|
#include <jansson.h>
|
|
|
|
#undef DBGC_CLASS
|
|
#define DBGC_CLASS DBGC_RPC_SRV
|
|
|
|
#define MDSSVC_ELASTIC_QUERY_TEMPLATE \
|
|
"{" \
|
|
" \"from\": %zu," \
|
|
" \"size\": %zu," \
|
|
" \"_source\": [%s]," \
|
|
" \"query\": {" \
|
|
" \"query_string\": {" \
|
|
" \"query\": \"%s\"" \
|
|
" }" \
|
|
" }" \
|
|
"}"
|
|
|
|
#define MDSSVC_ELASTIC_SOURCES \
|
|
"\"path.real\""
|
|
|
|
static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx)
|
|
{
|
|
struct mdssvc_es_ctx *mdssvc_es_ctx = NULL;
|
|
json_error_t json_error;
|
|
char *default_path = NULL;
|
|
const char *path = NULL;
|
|
|
|
mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx);
|
|
if (mdssvc_es_ctx == NULL) {
|
|
return false;
|
|
}
|
|
mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx;
|
|
|
|
mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx);
|
|
if (mdssvc_es_ctx->creds == NULL) {
|
|
TALLOC_FREE(mdssvc_es_ctx);
|
|
return false;
|
|
}
|
|
|
|
default_path = talloc_asprintf(
|
|
mdssvc_es_ctx,
|
|
"%s/mdssvc/elasticsearch_mappings.json",
|
|
get_dyn_SAMBA_DATADIR());
|
|
if (default_path == NULL) {
|
|
TALLOC_FREE(mdssvc_es_ctx);
|
|
return false;
|
|
}
|
|
|
|
path = lp_parm_const_string(GLOBAL_SECTION_SNUM,
|
|
"elasticsearch",
|
|
"mappings",
|
|
default_path);
|
|
if (path == NULL) {
|
|
TALLOC_FREE(mdssvc_es_ctx);
|
|
return false;
|
|
}
|
|
|
|
mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error);
|
|
if (mdssvc_es_ctx->mappings == NULL) {
|
|
DBG_ERR("Opening mapping file [%s] failed: %s\n",
|
|
path, json_error.text);
|
|
TALLOC_FREE(mdssvc_es_ctx);
|
|
return false;
|
|
}
|
|
TALLOC_FREE(default_path);
|
|
|
|
mdssvc_ctx->backend_private = mdssvc_es_ctx;
|
|
return true;
|
|
}
|
|
|
|
static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
static struct tevent_req *mds_es_connect_send(
|
|
TALLOC_CTX *mem_ctx,
|
|
struct tevent_context *ev,
|
|
struct mds_es_ctx *mds_es_ctx);
|
|
static int mds_es_connect_recv(struct tevent_req *req);
|
|
static void mds_es_connected(struct tevent_req *subreq);
|
|
static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx);
|
|
|
|
static bool mds_es_connect(struct mds_ctx *mds_ctx)
|
|
{
|
|
struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort(
|
|
mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx);
|
|
struct mds_es_ctx *mds_es_ctx = NULL;
|
|
struct tevent_req *subreq = NULL;
|
|
|
|
mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx);
|
|
if (mds_es_ctx == NULL) {
|
|
return false;
|
|
}
|
|
*mds_es_ctx = (struct mds_es_ctx) {
|
|
.mdssvc_es_ctx = mdssvc_es_ctx,
|
|
.mds_ctx = mds_ctx,
|
|
};
|
|
|
|
mds_ctx->backend_private = mds_es_ctx;
|
|
|
|
subreq = mds_es_connect_send(
|
|
mds_es_ctx,
|
|
mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
|
|
mds_es_ctx);
|
|
if (subreq == NULL) {
|
|
TALLOC_FREE(mds_es_ctx);
|
|
return false;
|
|
}
|
|
tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
|
|
return true;
|
|
}
|
|
|
|
static void mds_es_connected(struct tevent_req *subreq)
|
|
{
|
|
struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data(
|
|
subreq, struct mds_es_ctx);
|
|
int ret;
|
|
bool ok;
|
|
|
|
ret = mds_es_connect_recv(subreq);
|
|
TALLOC_FREE(subreq);
|
|
if (ret != 0) {
|
|
DBG_ERR("HTTP connect failed\n");
|
|
return;
|
|
}
|
|
|
|
ok = mds_es_next_search_trigger(mds_es_ctx);
|
|
if (!ok) {
|
|
DBG_ERR("mds_es_next_search_trigger failed\n");
|
|
}
|
|
return;
|
|
}
|
|
|
|
struct mds_es_connect_state {
|
|
struct tevent_context *ev;
|
|
struct mds_es_ctx *mds_es_ctx;
|
|
struct tevent_queue_entry *qe;
|
|
const char *server_addr;
|
|
uint16_t server_port;
|
|
struct tstream_tls_params *tls_params;
|
|
};
|
|
|
|
static void mds_es_http_connect_done(struct tevent_req *subreq);
|
|
static void mds_es_http_waited(struct tevent_req *subreq);
|
|
|
|
static struct tevent_req *mds_es_connect_send(
|
|
TALLOC_CTX *mem_ctx,
|
|
struct tevent_context *ev,
|
|
struct mds_es_ctx *mds_es_ctx)
|
|
{
|
|
struct tevent_req *req = NULL;
|
|
struct tevent_req *subreq = NULL;
|
|
struct mds_es_connect_state *state = NULL;
|
|
const char *server_addr = NULL;
|
|
bool use_tls;
|
|
NTSTATUS status;
|
|
|
|
req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state);
|
|
if (req == NULL) {
|
|
return NULL;
|
|
}
|
|
*state = (struct mds_es_connect_state) {
|
|
.ev = ev,
|
|
.mds_es_ctx = mds_es_ctx,
|
|
};
|
|
|
|
server_addr = lp_parm_const_string(
|
|
mds_es_ctx->mds_ctx->snum,
|
|
"elasticsearch",
|
|
"address",
|
|
"localhost");
|
|
state->server_addr = talloc_strdup(state, server_addr);
|
|
if (tevent_req_nomem(state->server_addr, req)) {
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
|
|
state->server_port = lp_parm_int(
|
|
mds_es_ctx->mds_ctx->snum,
|
|
"elasticsearch",
|
|
"port",
|
|
9200);
|
|
|
|
use_tls = lp_parm_bool(
|
|
mds_es_ctx->mds_ctx->snum,
|
|
"elasticsearch",
|
|
"use tls",
|
|
false);
|
|
|
|
DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n",
|
|
use_tls ? "S" : "", state->server_addr, state->server_port);
|
|
|
|
if (use_tls) {
|
|
const char *ca_file = lp__tls_cafile();
|
|
const char *crl_file = lp__tls_crlfile();
|
|
const char *tls_priority = lp_tls_priority();
|
|
enum tls_verify_peer_state verify_peer = lp_tls_verify_peer();
|
|
|
|
status = tstream_tls_params_client(state,
|
|
ca_file,
|
|
crl_file,
|
|
tls_priority,
|
|
verify_peer,
|
|
state->server_addr,
|
|
&state->tls_params);
|
|
if (!NT_STATUS_IS_OK(status)) {
|
|
DBG_ERR("Failed tstream_tls_params_client - %s\n",
|
|
nt_errstr(status));
|
|
tevent_req_nterror(req, status);
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
}
|
|
|
|
subreq = http_connect_send(state,
|
|
state->ev,
|
|
state->server_addr,
|
|
state->server_port,
|
|
mds_es_ctx->mdssvc_es_ctx->creds,
|
|
state->tls_params);
|
|
if (tevent_req_nomem(subreq, req)) {
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
tevent_req_set_callback(subreq, mds_es_http_connect_done, req);
|
|
return req;
|
|
}
|
|
|
|
static void mds_es_http_connect_done(struct tevent_req *subreq)
|
|
{
|
|
struct tevent_req *req = tevent_req_callback_data(
|
|
subreq, struct tevent_req);
|
|
struct mds_es_connect_state *state = tevent_req_data(
|
|
req, struct mds_es_connect_state);
|
|
int error;
|
|
|
|
error = http_connect_recv(subreq,
|
|
state->mds_es_ctx,
|
|
&state->mds_es_ctx->http_conn);
|
|
TALLOC_FREE(subreq);
|
|
if (error != 0) {
|
|
DBG_ERR("HTTP connect failed, retrying...\n");
|
|
|
|
subreq = tevent_wakeup_send(
|
|
state->mds_es_ctx,
|
|
state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
|
|
tevent_timeval_current_ofs(10, 0));
|
|
if (tevent_req_nomem(subreq, req)) {
|
|
return;
|
|
}
|
|
tevent_req_set_callback(subreq,
|
|
mds_es_http_waited,
|
|
req);
|
|
return;
|
|
}
|
|
|
|
DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n",
|
|
state->tls_params ? "S" : "",
|
|
state->server_addr, state->server_port);
|
|
|
|
tevent_req_done(req);
|
|
return;
|
|
}
|
|
|
|
static void mds_es_http_waited(struct tevent_req *subreq)
|
|
{
|
|
struct tevent_req *req = tevent_req_callback_data(
|
|
subreq, struct tevent_req);
|
|
struct mds_es_connect_state *state = tevent_req_data(
|
|
req, struct mds_es_connect_state);
|
|
bool ok;
|
|
|
|
ok = tevent_wakeup_recv(subreq);
|
|
TALLOC_FREE(subreq);
|
|
if (!ok) {
|
|
tevent_req_error(req, ETIMEDOUT);
|
|
return;
|
|
}
|
|
|
|
subreq = mds_es_connect_send(
|
|
state->mds_es_ctx,
|
|
state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
|
|
state->mds_es_ctx);
|
|
if (tevent_req_nomem(subreq, req)) {
|
|
return;
|
|
}
|
|
tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx);
|
|
}
|
|
|
|
static int mds_es_connect_recv(struct tevent_req *req)
|
|
{
|
|
return tevent_req_simple_recv_unix(req);
|
|
}
|
|
|
|
static void mds_es_reconnect_on_error(struct sl_es_search *s)
|
|
{
|
|
struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
|
|
struct tevent_req *subreq = NULL;
|
|
|
|
if (s->slq != NULL) {
|
|
s->slq->state = SLQ_STATE_ERROR;
|
|
}
|
|
|
|
DBG_WARNING("Reconnecting HTTP...\n");
|
|
TALLOC_FREE(mds_es_ctx->http_conn);
|
|
|
|
subreq = mds_es_connect_send(
|
|
mds_es_ctx,
|
|
mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
|
|
mds_es_ctx);
|
|
if (subreq == NULL) {
|
|
DBG_ERR("mds_es_connect_send failed\n");
|
|
return;
|
|
}
|
|
tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
|
|
}
|
|
|
|
static int search_destructor(struct sl_es_search *s)
|
|
{
|
|
DLIST_REMOVE(s->mds_es_ctx->searches, s);
|
|
return 0;
|
|
}
|
|
|
|
static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
|
|
struct tevent_context *ev,
|
|
struct sl_es_search *s);
|
|
static int mds_es_search_recv(struct tevent_req *req);
|
|
static void mds_es_search_done(struct tevent_req *subreq);
|
|
|
|
static bool mds_es_search(struct sl_query *slq)
|
|
{
|
|
struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort(
|
|
slq->mds_ctx->backend_private, struct mds_es_ctx);
|
|
struct sl_es_search *s = NULL;
|
|
bool ok;
|
|
|
|
s = talloc_zero(slq, struct sl_es_search);
|
|
if (s == NULL) {
|
|
return false;
|
|
}
|
|
*s = (struct sl_es_search) {
|
|
.ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
|
|
.mds_es_ctx = mds_es_ctx,
|
|
.slq = slq,
|
|
.size = MAX_SL_RESULTS,
|
|
};
|
|
|
|
/* 0 would mean no limit */
|
|
s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum,
|
|
"elasticsearch",
|
|
"max results",
|
|
MAX_SL_RESULTS);
|
|
|
|
DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string);
|
|
|
|
ok = map_spotlight_to_es_query(
|
|
s,
|
|
mds_es_ctx->mdssvc_es_ctx->mappings,
|
|
slq->path_scope,
|
|
slq->query_string,
|
|
&s->es_query);
|
|
if (!ok) {
|
|
TALLOC_FREE(s);
|
|
return false;
|
|
}
|
|
DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query);
|
|
|
|
slq->backend_private = s;
|
|
slq->state = SLQ_STATE_RUNNING;
|
|
DLIST_ADD_END(mds_es_ctx->searches, s);
|
|
talloc_set_destructor(s, search_destructor);
|
|
|
|
return mds_es_next_search_trigger(mds_es_ctx);
|
|
}
|
|
|
|
static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx)
|
|
{
|
|
struct tevent_req *subreq = NULL;
|
|
struct sl_es_search *s = mds_es_ctx->searches;
|
|
|
|
if (mds_es_ctx->http_conn == NULL) {
|
|
DBG_DEBUG("Waiting for HTTP connection...\n");
|
|
return true;
|
|
}
|
|
if (s == NULL) {
|
|
DBG_DEBUG("No pending searches, idling...\n");
|
|
return true;
|
|
}
|
|
if (s->pending) {
|
|
DBG_DEBUG("Search pending [%p]\n", s);
|
|
return true;
|
|
}
|
|
|
|
subreq = mds_es_search_send(s, s->ev, s);
|
|
if (subreq == NULL) {
|
|
return false;
|
|
}
|
|
tevent_req_set_callback(subreq, mds_es_search_done, s);
|
|
return true;
|
|
}
|
|
|
|
static void mds_es_search_done(struct tevent_req *subreq)
|
|
{
|
|
struct sl_es_search *s = tevent_req_callback_data(
|
|
subreq, struct sl_es_search);
|
|
struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
|
|
struct sl_query *slq = s->slq;
|
|
int ret;
|
|
bool ok;
|
|
|
|
DBG_DEBUG("Search done for search [%p]\n", s);
|
|
|
|
DLIST_REMOVE(mds_es_ctx->searches, s);
|
|
|
|
ret = mds_es_search_recv(subreq);
|
|
TALLOC_FREE(subreq);
|
|
if (ret != 0) {
|
|
mds_es_reconnect_on_error(s);
|
|
return;
|
|
}
|
|
|
|
if (slq == NULL) {
|
|
/*
|
|
* Closed by the user. This is the only place where we free "s"
|
|
* explicitly because the talloc parent slq is already gone.
|
|
* Everywhere else we rely on the destructor of slq to free s"."
|
|
*/
|
|
TALLOC_FREE(s);
|
|
goto trigger;
|
|
}
|
|
|
|
SLQ_DEBUG(10, slq, "search done");
|
|
|
|
if (s->total == 0 || s->from >= s->max) {
|
|
slq->state = SLQ_STATE_DONE;
|
|
goto trigger;
|
|
}
|
|
|
|
if (slq->query_results->num_results >= MAX_SL_RESULTS) {
|
|
slq->state = SLQ_STATE_FULL;
|
|
goto trigger;
|
|
}
|
|
|
|
/*
|
|
* Reschedule this query as there are more results waiting in the
|
|
* Elasticsearch server and the client result queue has room as
|
|
* well. But put it at the end of the list of active queries as a simple
|
|
* heuristic that should ensure all client queries are dispatched to the
|
|
* server.
|
|
*/
|
|
DLIST_ADD_END(mds_es_ctx->searches, s);
|
|
|
|
trigger:
|
|
ok = mds_es_next_search_trigger(mds_es_ctx);
|
|
if (!ok) {
|
|
DBG_ERR("mds_es_next_search_trigger failed\n");
|
|
}
|
|
}
|
|
|
|
static void mds_es_search_http_send_done(struct tevent_req *subreq);
|
|
static void mds_es_search_http_read_done(struct tevent_req *subreq);
|
|
|
|
struct mds_es_search_state {
|
|
struct tevent_context *ev;
|
|
struct sl_es_search *s;
|
|
struct tevent_queue_entry *qe;
|
|
struct http_request http_request;
|
|
struct http_request *http_response;
|
|
};
|
|
|
|
static int mds_es_search_pending_destructor(struct sl_es_search *s)
|
|
{
|
|
/*
|
|
* s is a child of slq which may get freed when a user closes a
|
|
* query. To maintain the HTTP request/response sequence on the HTTP
|
|
* channel, we keep processing pending requests and free s when we
|
|
* receive the HTTP response for pending requests.
|
|
*/
|
|
DBG_DEBUG("Preserving pending search [%p]\n", s);
|
|
s->slq = NULL;
|
|
return -1;
|
|
}
|
|
|
|
static void mds_es_search_set_pending(struct sl_es_search *s)
|
|
{
|
|
DBG_DEBUG("Set pending [%p]\n", s);
|
|
SLQ_DEBUG(10, s->slq, "pending");
|
|
|
|
s->pending = true;
|
|
talloc_set_destructor(s, mds_es_search_pending_destructor);
|
|
}
|
|
|
|
static void mds_es_search_unset_pending(struct sl_es_search *s)
|
|
{
|
|
DBG_DEBUG("Unset pending [%p]\n", s);
|
|
if (s->slq != NULL) {
|
|
SLQ_DEBUG(10, s->slq, "unset pending");
|
|
}
|
|
|
|
s->pending = false;
|
|
talloc_set_destructor(s, NULL);
|
|
}
|
|
|
|
static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
|
|
struct tevent_context *ev,
|
|
struct sl_es_search *s)
|
|
{
|
|
struct tevent_req *req = NULL;
|
|
struct tevent_req *subreq = NULL;
|
|
struct mds_es_search_state *state = NULL;
|
|
const char *index = NULL;
|
|
char *elastic_query = NULL;
|
|
char *uri = NULL;
|
|
size_t elastic_query_len;
|
|
char *elastic_query_len_str = NULL;
|
|
char *hostname = NULL;
|
|
bool pretty = false;
|
|
|
|
req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state);
|
|
if (req == NULL) {
|
|
return NULL;
|
|
}
|
|
*state = (struct mds_es_search_state) {
|
|
.ev = ev,
|
|
.s = s,
|
|
};
|
|
|
|
if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) {
|
|
return tevent_req_post(req, s->ev);
|
|
}
|
|
|
|
index = lp_parm_const_string(s->slq->mds_ctx->snum,
|
|
"elasticsearch",
|
|
"index",
|
|
"_all");
|
|
if (tevent_req_nomem(index, req)) {
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
|
|
if (DEBUGLVL(10)) {
|
|
pretty = true;
|
|
}
|
|
|
|
uri = talloc_asprintf(state,
|
|
"/%s/_search%s",
|
|
index,
|
|
pretty ? "?pretty" : "");
|
|
if (tevent_req_nomem(uri, req)) {
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
|
|
elastic_query = talloc_asprintf(state,
|
|
MDSSVC_ELASTIC_QUERY_TEMPLATE,
|
|
s->from,
|
|
s->size,
|
|
MDSSVC_ELASTIC_SOURCES,
|
|
s->es_query);
|
|
if (tevent_req_nomem(elastic_query, req)) {
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
DBG_DEBUG("Elastic query: '%s'\n", elastic_query);
|
|
|
|
elastic_query_len = strlen(elastic_query);
|
|
|
|
state->http_request = (struct http_request) {
|
|
.type = HTTP_REQ_POST,
|
|
.uri = uri,
|
|
.body = data_blob_const(elastic_query, elastic_query_len),
|
|
.major = '1',
|
|
.minor = '1',
|
|
};
|
|
|
|
elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len);
|
|
if (tevent_req_nomem(elastic_query_len_str, req)) {
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
|
|
hostname = get_myname(state);
|
|
if (tevent_req_nomem(hostname, req)) {
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
|
|
http_add_header(state, &state->http_request.headers,
|
|
"Content-Type", "application/json");
|
|
http_add_header(state, &state->http_request.headers,
|
|
"Accept", "application/json");
|
|
http_add_header(state, &state->http_request.headers,
|
|
"User-Agent", "Samba/mdssvc");
|
|
http_add_header(state, &state->http_request.headers,
|
|
"Host", hostname);
|
|
http_add_header(state, &state->http_request.headers,
|
|
"Content-Length", elastic_query_len_str);
|
|
|
|
subreq = http_send_request_send(state,
|
|
ev,
|
|
s->mds_es_ctx->http_conn,
|
|
&state->http_request);
|
|
if (tevent_req_nomem(subreq, req)) {
|
|
return tevent_req_post(req, ev);
|
|
}
|
|
mds_es_search_set_pending(s);
|
|
tevent_req_set_callback(subreq, mds_es_search_http_send_done, req);
|
|
return req;
|
|
}
|
|
|
|
static void mds_es_search_http_send_done(struct tevent_req *subreq)
|
|
{
|
|
struct tevent_req *req = tevent_req_callback_data(
|
|
subreq, struct tevent_req);
|
|
struct mds_es_search_state *state = tevent_req_data(
|
|
req, struct mds_es_search_state);
|
|
NTSTATUS status;
|
|
|
|
DBG_DEBUG("Sent out search [%p]\n", state->s);
|
|
|
|
status = http_send_request_recv(subreq);
|
|
TALLOC_FREE(subreq);
|
|
if (!NT_STATUS_IS_OK(status)) {
|
|
tevent_req_error(req, map_errno_from_nt_status(status));
|
|
return;
|
|
}
|
|
|
|
if (state->s->mds_es_ctx->mds_ctx == NULL) {
|
|
mds_es_search_unset_pending(state->s);
|
|
tevent_req_error(req, ECANCELED);
|
|
return;
|
|
}
|
|
|
|
subreq = http_read_response_send(state,
|
|
state->ev,
|
|
state->s->mds_es_ctx->http_conn,
|
|
MAX_SL_RESULTS * 8192);
|
|
if (tevent_req_nomem(subreq, req)) {
|
|
return;
|
|
}
|
|
tevent_req_set_callback(subreq, mds_es_search_http_read_done, req);
|
|
}
|
|
|
|
static void mds_es_search_http_read_done(struct tevent_req *subreq)
|
|
{
|
|
struct tevent_req *req = tevent_req_callback_data(
|
|
subreq, struct tevent_req);
|
|
struct mds_es_search_state *state = tevent_req_data(
|
|
req, struct mds_es_search_state);
|
|
struct sl_es_search *s = state->s;
|
|
struct sl_query *slq = s->slq;
|
|
json_t *root = NULL;
|
|
json_t *matches = NULL;
|
|
json_t *match = NULL;
|
|
size_t i;
|
|
json_error_t error;
|
|
int hits;
|
|
NTSTATUS status;
|
|
int ret;
|
|
bool ok;
|
|
|
|
DBG_DEBUG("Got response for search [%p]\n", s);
|
|
|
|
mds_es_search_unset_pending(s);
|
|
|
|
status = http_read_response_recv(subreq, state, &state->http_response);
|
|
TALLOC_FREE(subreq);
|
|
if (!NT_STATUS_IS_OK(status)) {
|
|
DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status));
|
|
tevent_req_error(req, map_errno_from_nt_status(status));
|
|
return;
|
|
}
|
|
|
|
if (slq == NULL) {
|
|
tevent_req_done(req);
|
|
return;
|
|
}
|
|
if (s->mds_es_ctx->mds_ctx == NULL) {
|
|
tevent_req_error(req, ECANCELED);
|
|
return;
|
|
}
|
|
|
|
switch (state->http_response->response_code) {
|
|
case 200:
|
|
break;
|
|
default:
|
|
DBG_ERR("HTTP server response: %u\n",
|
|
state->http_response->response_code);
|
|
goto fail;
|
|
}
|
|
|
|
DBG_DEBUG("JSON response:\n%s\n",
|
|
talloc_strndup(talloc_tos(),
|
|
(char *)state->http_response->body.data,
|
|
state->http_response->body.length));
|
|
|
|
root = json_loadb((char *)state->http_response->body.data,
|
|
state->http_response->body.length,
|
|
0,
|
|
&error);
|
|
if (root == NULL) {
|
|
DBG_ERR("json_loadb failed\n");
|
|
goto fail;
|
|
}
|
|
|
|
if (s->total == 0) {
|
|
/*
|
|
* Get the total number of results the first time, format
|
|
* used by Elasticsearch 7.0 or newer
|
|
*/
|
|
ret = json_unpack(root, "{s: {s: {s: i}}}",
|
|
"hits", "total", "value", &s->total);
|
|
if (ret != 0) {
|
|
/* Format used before 7.0 */
|
|
ret = json_unpack(root, "{s: {s: i}}",
|
|
"hits", "total", &s->total);
|
|
if (ret != 0) {
|
|
DBG_ERR("json_unpack failed\n");
|
|
goto fail;
|
|
}
|
|
}
|
|
|
|
DBG_DEBUG("Total: %zu\n", s->total);
|
|
|
|
if (s->total == 0) {
|
|
json_decref(root);
|
|
tevent_req_done(req);
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (s->max == 0 || s->max > s->total) {
|
|
s->max = s->total;
|
|
}
|
|
|
|
ret = json_unpack(root, "{s: {s:o}}",
|
|
"hits", "hits", &matches);
|
|
if (ret != 0 || matches == NULL) {
|
|
DBG_ERR("json_unpack hits failed\n");
|
|
goto fail;
|
|
}
|
|
|
|
hits = json_array_size(matches);
|
|
if (hits == 0) {
|
|
DBG_ERR("Hu?! No results?\n");
|
|
goto fail;
|
|
}
|
|
DBG_DEBUG("Hits: %d\n", hits);
|
|
|
|
for (i = 0; i < hits; i++) {
|
|
const char *path = NULL;
|
|
|
|
match = json_array_get(matches, i);
|
|
if (match == NULL) {
|
|
DBG_ERR("Hu?! No value for index %zu\n", i);
|
|
goto fail;
|
|
}
|
|
ret = json_unpack(match,
|
|
"{s: {s: {s: s}}}",
|
|
"_source",
|
|
"path",
|
|
"real",
|
|
&path);
|
|
if (ret != 0) {
|
|
DBG_ERR("Missing path.real in JSON result\n");
|
|
goto fail;
|
|
}
|
|
|
|
ok = mds_add_result(slq, path);
|
|
if (!ok) {
|
|
DBG_ERR("error adding result for path: %s\n", path);
|
|
goto fail;
|
|
}
|
|
}
|
|
json_decref(root);
|
|
|
|
s->from += hits;
|
|
slq->state = SLQ_STATE_RESULTS;
|
|
tevent_req_done(req);
|
|
return;
|
|
|
|
fail:
|
|
if (root != NULL) {
|
|
json_decref(root);
|
|
}
|
|
slq->state = SLQ_STATE_ERROR;
|
|
tevent_req_error(req, EINVAL);
|
|
return;
|
|
}
|
|
|
|
static int mds_es_search_recv(struct tevent_req *req)
|
|
{
|
|
return tevent_req_simple_recv_unix(req);
|
|
}
|
|
|
|
static bool mds_es_search_cont(struct sl_query *slq)
|
|
{
|
|
struct sl_es_search *s = talloc_get_type_abort(
|
|
slq->backend_private, struct sl_es_search);
|
|
|
|
SLQ_DEBUG(10, slq, "continue");
|
|
DLIST_ADD_END(s->mds_es_ctx->searches, s);
|
|
return mds_es_next_search_trigger(s->mds_es_ctx);
|
|
}
|
|
|
|
struct mdssvc_backend mdsscv_backend_es = {
|
|
.init = mdssvc_es_init,
|
|
.shutdown = mdssvc_es_shutdown,
|
|
.connect = mds_es_connect,
|
|
.search_start = mds_es_search,
|
|
.search_cont = mds_es_search_cont,
|
|
};
|