mirror of
https://github.com/samba-team/samba.git
synced 2024-12-22 13:34:15 +03:00
make sure we don't increment rx_cnt for redirected packets, or for packets that have been requeued after a lockwait
(This used to be ctdb commit 92e5569407dba173a27e9645b4339ce3e2c00520)
This commit is contained in:
parent
0f957f6c23
commit
a14fd9d29c
@ -234,15 +234,13 @@ uint32_t ctdb_get_num_connected_nodes(struct ctdb_context *ctdb)
|
||||
|
||||
|
||||
/*
|
||||
called by the transport layer when a packet comes in
|
||||
called when we need to process a packet. This can be a requeued packet
|
||||
after a lockwait, or a real packet from another node
|
||||
*/
|
||||
void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
|
||||
void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
{
|
||||
struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
|
||||
TALLOC_CTX *tmp_ctx;
|
||||
|
||||
ctdb->status.node_packets_recv++;
|
||||
|
||||
/* place the packet as a child of the tmp_ctx. We then use
|
||||
talloc_free() below to free it. If any of the calls want
|
||||
to keep it, then they will steal it somewhere else, and the
|
||||
@ -250,35 +248,10 @@ void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
|
||||
tmp_ctx = talloc_new(ctdb);
|
||||
talloc_steal(tmp_ctx, hdr);
|
||||
|
||||
if (length < sizeof(*hdr)) {
|
||||
ctdb_set_error(ctdb, "Bad packet length %d\n", length);
|
||||
goto done;
|
||||
}
|
||||
if (length != hdr->length) {
|
||||
ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
|
||||
hdr->length, length);
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (hdr->ctdb_magic != CTDB_MAGIC) {
|
||||
ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
|
||||
goto done;
|
||||
}
|
||||
|
||||
if (hdr->ctdb_version != CTDB_VERSION) {
|
||||
ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
|
||||
goto done;
|
||||
}
|
||||
|
||||
DEBUG(3,(__location__ " ctdb request %d of type %d length %d from "
|
||||
"node %d to %d\n", hdr->reqid, hdr->operation, hdr->length,
|
||||
hdr->srcnode, hdr->destnode));
|
||||
|
||||
/* up the counter for this source node, so we know its alive */
|
||||
if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
|
||||
ctdb->nodes[hdr->srcnode]->rx_cnt++;
|
||||
}
|
||||
|
||||
switch (hdr->operation) {
|
||||
case CTDB_REQ_CALL:
|
||||
case CTDB_REPLY_CALL:
|
||||
@ -361,15 +334,49 @@ done:
|
||||
talloc_free(tmp_ctx);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
called by the transport layer when a packet comes in
|
||||
*/
|
||||
void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length)
|
||||
static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
|
||||
{
|
||||
struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
|
||||
ctdb_recv_pkt(ctdb, data, length);
|
||||
struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
|
||||
|
||||
ctdb->status.node_packets_recv++;
|
||||
|
||||
if (length < sizeof(*hdr)) {
|
||||
ctdb_set_error(ctdb, "Bad packet length %d\n", length);
|
||||
return;
|
||||
}
|
||||
if (length != hdr->length) {
|
||||
ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
|
||||
hdr->length, length);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hdr->ctdb_magic != CTDB_MAGIC) {
|
||||
ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
|
||||
return;
|
||||
}
|
||||
|
||||
if (hdr->ctdb_version != CTDB_VERSION) {
|
||||
ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
|
||||
return;
|
||||
}
|
||||
|
||||
/* up the counter for this source node, so we know its alive */
|
||||
if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
|
||||
/* as a special case, redirected calls don't increment the rx_cnt */
|
||||
if (hdr->operation != CTDB_REQ_CALL ||
|
||||
((struct ctdb_req_call *)hdr)->hopcount == 0) {
|
||||
ctdb->nodes[hdr->srcnode]->rx_cnt++;
|
||||
}
|
||||
}
|
||||
|
||||
ctdb_input_pkt(ctdb, hdr);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
called by the transport layer when a node is dead
|
||||
*/
|
||||
@ -423,8 +430,7 @@ static void queue_next_trigger(struct event_context *ev, struct timed_event *te,
|
||||
struct timeval t, void *private_data)
|
||||
{
|
||||
struct queue_next *q = talloc_get_type(private_data, struct queue_next);
|
||||
ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length);
|
||||
talloc_free(q);
|
||||
ctdb_input_pkt(q->ctdb, q->hdr);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -447,8 +453,7 @@ static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header
|
||||
}
|
||||
#if 0
|
||||
/* use this to put packets directly into our recv function */
|
||||
ctdb_recv_pkt(q->ctdb, (uint8_t *)q->hdr, q->hdr->length);
|
||||
talloc_free(q);
|
||||
ctdb_input_pkt(q->ctdb, q->hdr);
|
||||
#else
|
||||
event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
|
||||
#endif
|
||||
|
@ -44,6 +44,16 @@
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
a varient of input packet that can be used in lock requeue
|
||||
*/
|
||||
void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
|
||||
{
|
||||
struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
|
||||
ctdb_input_pkt(ctdb, hdr);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
local version of ctdb_call
|
||||
*/
|
||||
@ -363,7 +373,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
|
||||
|
||||
/* fetch the current record */
|
||||
ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
|
||||
ctdb_recv_raw_pkt, ctdb, False);
|
||||
ctdb_call_input_pkt, ctdb, False);
|
||||
if (ret == -1) {
|
||||
ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
|
||||
return;
|
||||
@ -436,7 +446,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
if the call will be answered locally */
|
||||
|
||||
ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
|
||||
ctdb_recv_raw_pkt, ctdb, False);
|
||||
ctdb_call_input_pkt, ctdb, False);
|
||||
if (ret == -1) {
|
||||
ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
|
||||
return;
|
||||
@ -558,7 +568,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
data.dsize = c->datalen;
|
||||
|
||||
ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
|
||||
ctdb_recv_raw_pkt, ctdb, False);
|
||||
ctdb_call_input_pkt, ctdb, False);
|
||||
if (ret == -2) {
|
||||
return;
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ struct ctdb_client {
|
||||
};
|
||||
|
||||
|
||||
static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
|
||||
static void daemon_incoming_packet(void *, struct ctdb_req_header *);
|
||||
|
||||
static void ctdb_main_loop(struct ctdb_context *ctdb)
|
||||
{
|
||||
@ -438,9 +438,8 @@ static void daemon_request_control_from_client(struct ctdb_client *client,
|
||||
struct ctdb_req_control *c);
|
||||
|
||||
/* data contains a packet from the client */
|
||||
static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
|
||||
static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
|
||||
{
|
||||
struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
|
||||
struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
|
||||
TALLOC_CTX *tmp_ctx;
|
||||
struct ctdb_context *ctdb = client->ctdb;
|
||||
@ -539,7 +538,7 @@ static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
|
||||
hdr->srcnode, hdr->destnode));
|
||||
|
||||
/* it is the responsibility of the incoming packet function to free 'data' */
|
||||
daemon_incoming_packet(client, data, cnt);
|
||||
daemon_incoming_packet(client, hdr);
|
||||
}
|
||||
|
||||
static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
|
||||
|
@ -192,7 +192,7 @@ int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
|
||||
|
||||
struct lock_fetch_state {
|
||||
struct ctdb_context *ctdb;
|
||||
void (*recv_pkt)(void *, uint8_t *, uint32_t);
|
||||
void (*recv_pkt)(void *, struct ctdb_req_header *);
|
||||
void *recv_context;
|
||||
struct ctdb_req_header *hdr;
|
||||
uint32_t generation;
|
||||
@ -211,7 +211,7 @@ static void lock_fetch_callback(void *p)
|
||||
talloc_free(state->hdr);
|
||||
return;
|
||||
}
|
||||
state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
|
||||
state->recv_pkt(state->recv_context, state->hdr);
|
||||
DEBUG(2,(__location__ " PACKET REQUEUED\n"));
|
||||
}
|
||||
|
||||
@ -242,7 +242,7 @@ static void lock_fetch_callback(void *p)
|
||||
*/
|
||||
int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
|
||||
TDB_DATA key, struct ctdb_req_header *hdr,
|
||||
void (*recv_pkt)(void *, uint8_t *, uint32_t ),
|
||||
void (*recv_pkt)(void *, struct ctdb_req_header *),
|
||||
void *recv_context, bool ignore_generation)
|
||||
{
|
||||
int ret;
|
||||
@ -285,7 +285,7 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* we need to move the packet off the temporary context in ctdb_recv_pkt(),
|
||||
/* we need to move the packet off the temporary context in ctdb_input_pkt(),
|
||||
so it won't be freed yet */
|
||||
talloc_steal(state, hdr);
|
||||
talloc_steal(state, h);
|
||||
@ -300,7 +300,7 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
|
||||
int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
|
||||
TDB_DATA key, struct ctdb_ltdb_header *header,
|
||||
struct ctdb_req_header *hdr, TDB_DATA *data,
|
||||
void (*recv_pkt)(void *, uint8_t *, uint32_t ),
|
||||
void (*recv_pkt)(void *, struct ctdb_req_header *),
|
||||
void *recv_context, bool ignore_generation)
|
||||
{
|
||||
int ret;
|
||||
|
@ -576,14 +576,14 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
|
||||
void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
|
||||
int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
|
||||
TDB_DATA key, struct ctdb_req_header *hdr,
|
||||
void (*recv_pkt)(void *, uint8_t *, uint32_t ),
|
||||
void (*recv_pkt)(void *, struct ctdb_req_header *),
|
||||
void *recv_context, bool ignore_generation);
|
||||
int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
|
||||
TDB_DATA key, struct ctdb_ltdb_header *header,
|
||||
struct ctdb_req_header *hdr, TDB_DATA *data,
|
||||
void (*recv_pkt)(void *, uint8_t *, uint32_t ),
|
||||
void (*recv_pkt)(void *, struct ctdb_req_header *),
|
||||
void *recv_context, bool ignore_generation);
|
||||
void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length);
|
||||
void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *);
|
||||
|
||||
struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
|
||||
struct ctdb_call *call,
|
||||
|
Loading…
Reference in New Issue
Block a user