1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

dbwrap_ctdb: implement parse_record_send()/recv()

This mainly works like the sync version, but calls ctdbd_parse_send/recv
instead.

We use one global ctdb connection that is used exclusively for async
requests.

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
Ralph Boehme 2016-12-21 08:38:25 +01:00 committed by Jeremy Allison
parent 6a1734ebfd
commit 2813b53e1a
4 changed files with 179 additions and 0 deletions

View File

@ -94,3 +94,8 @@ struct ctdbd_connection *messaging_ctdbd_connection(void)
{
return NULL;
}
int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
{
return ENOSYS;
}

View File

@ -35,6 +35,7 @@
#include "g_lock.h"
#include "messages.h"
#include "lib/cluster_support.h"
#include "lib/util/tevent_ntstatus.h"
struct db_ctdb_transaction_handle {
struct db_ctdb_ctx *ctx;
@ -68,6 +69,59 @@ struct db_ctdb_rec {
struct timeval lock_time;
};
struct ctdb_async_ctx {
bool initialized;
struct ctdbd_connection *async_conn;
};
static struct ctdb_async_ctx ctdb_async_ctx;
static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
bool reinit)
{
int ret;
if (reinit) {
TALLOC_FREE(ctdb_async_ctx.async_conn);
ctdb_async_ctx.initialized = false;
}
if (ctdb_async_ctx.initialized) {
return 0;
}
become_root();
ret = ctdbd_init_connection(mem_ctx,
lp_ctdbd_socket(),
lp_ctdb_timeout(),
&ctdb_async_ctx.async_conn);
unbecome_root();
if (ctdb_async_ctx.async_conn == NULL) {
DBG_ERR("ctdbd_init_connection failed\n");
return EIO;
}
ret = ctdbd_setup_fde(ctdb_async_ctx.async_conn, ev);
if (ret != 0) {
DBG_ERR("ctdbd_setup_ev failed\n");
return ret;
}
return 0;
}
static int ctdb_async_ctx_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
{
return ctdb_async_ctx_init_internal(mem_ctx, ev, false);
}
int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
{
return ctdb_async_ctx_init_internal(mem_ctx, ev, true);
}
static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
{
enum TDB_ERROR tret = tdb_error(tdb);
@ -1350,6 +1404,102 @@ static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
return NT_STATUS_OK;
}
static void db_ctdb_parse_record_done(struct tevent_req *subreq);
static struct tevent_req *db_ctdb_parse_record_send(
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct db_context *db,
TDB_DATA key,
void (*parser)(TDB_DATA key,
TDB_DATA data,
void *private_data),
void *private_data,
enum dbwrap_req_state *req_state)
{
struct db_ctdb_ctx *ctx = talloc_get_type_abort(
db->private_data, struct db_ctdb_ctx);
struct tevent_req *req = NULL;
struct tevent_req *subreq = NULL;
struct db_ctdb_parse_record_state *state = NULL;
NTSTATUS status;
req = tevent_req_create(mem_ctx, &state,
struct db_ctdb_parse_record_state);
if (req == NULL) {
*req_state = DBWRAP_REQ_ERROR;
return NULL;
}
*state = (struct db_ctdb_parse_record_state) {
.parser = parser,
.private_data = private_data,
.my_vnn = ctdbd_vnn(ctx->conn),
.empty_record = false,
};
status = db_ctdb_try_parse_local_record(ctx, key, state);
if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
if (tevent_req_nterror(req, status)) {
*req_state = DBWRAP_REQ_ERROR;
return tevent_req_post(req, ev);
}
*req_state = DBWRAP_REQ_DONE;
tevent_req_done(req);
return tevent_req_post(req, ev);
}
subreq = ctdbd_parse_send(state,
ev,
ctdb_async_ctx.async_conn,
ctx->db_id,
key,
state->ask_for_readonly_copy,
parser,
private_data,
req_state);
if (tevent_req_nomem(subreq, req)) {
*req_state = DBWRAP_REQ_ERROR;
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, db_ctdb_parse_record_done, req);
return req;
}
static void db_ctdb_parse_record_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
int ret;
ret = ctdbd_parse_recv(subreq);
TALLOC_FREE(subreq);
if (ret != 0) {
if (ret == ENOENT) {
/*
* This maps to NT_STATUS_OBJECT_NAME_NOT_FOUND. Our
* upper layers expect NT_STATUS_NOT_FOUND for "no
* record around". We need to convert dbwrap to 0/errno
* away from NTSTATUS ... :-)
*/
tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
return;
}
tevent_req_nterror(req, map_nt_error_from_unix(ret));
return;
}
tevent_req_done(req);
return;
}
static NTSTATUS db_ctdb_parse_record_recv(struct tevent_req *req)
{
return tevent_req_simple_recv_ntstatus(req);
}
struct traverse_state {
struct db_context *db;
int (*fn)(struct db_record *rec, void *private_data);
@ -1675,6 +1825,15 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
tdb_flags &= TDB_SEQNUM|TDB_VOLATILE|
TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST;
if (!result->persistent) {
ret = ctdb_async_ctx_init(NULL, messaging_tevent_context(msg_ctx));
if (ret != 0) {
DBG_ERR("ctdb_async_ctx_init failed: %s\n", strerror(ret));
TALLOC_FREE(result);
return NULL;
}
}
if (!result->persistent &&
(dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
{
@ -1745,6 +1904,8 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
result->fetch_locked = db_ctdb_fetch_locked;
result->try_fetch_locked = db_ctdb_try_fetch_locked;
result->parse_record = db_ctdb_parse_record;
result->parse_record_send = db_ctdb_parse_record_send;
result->parse_record_recv = db_ctdb_parse_record_recv;
result->traverse = db_ctdb_traverse;
result->traverse_read = db_ctdb_traverse_read;
result->get_seqnum = db_ctdb_get_seqnum;

View File

@ -36,5 +36,6 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
int open_flags, mode_t mode,
enum dbwrap_lock_order lock_order,
uint64_t dbwrap_flags);
int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev);
#endif /* __DBWRAP_CTDB_H__ */

View File

@ -35,6 +35,7 @@
#include "lib/util/sys_rw.h"
#include "lib/util/sys_rw_data.h"
#include "lib/util/util_process.h"
#include "lib/dbwrap/dbwrap_ctdb.h"
#ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
@ -437,6 +438,7 @@ NTSTATUS reinit_after_fork(struct messaging_context *msg_ctx,
const char *comment)
{
NTSTATUS status = NT_STATUS_OK;
int ret;
if (reinit_after_fork_pipe[1] != -1) {
close(reinit_after_fork_pipe[1]);
@ -478,6 +480,16 @@ NTSTATUS reinit_after_fork(struct messaging_context *msg_ctx,
DEBUG(0,("messaging_reinit() failed: %s\n",
nt_errstr(status)));
}
if (lp_clustering()) {
ret = ctdb_async_ctx_reinit(
NULL, messaging_tevent_context(msg_ctx));
if (ret != 0) {
DBG_ERR("db_ctdb_async_ctx_reinit failed: %s\n",
strerror(errno));
return map_nt_error_from_unix(ret);
}
}
}
if (comment) {