mirror of
https://github.com/samba-team/samba.git
synced 2025-02-02 09:47:23 +03:00
common/io: Limit the queue buffer size for fair scheduling via tevent
If we process all the data available in a socket buffer, CTDB can stay busy processing lots of packets via immediate event mechanism in tevent. After processing an immediate event, tevent returns without epoll_wait. So as long as there are immediate events, tevent will never poll other FDs. CTDB will report this as "Event handling took xx seconds" warning. This is misleading since CTDB is very busy processing packets, but never gets to the point of polling FDs. The improvement in socket handling made it worse when handling traverse control. There were lots of packets filled in the socket buffer quickly and CTDB stayed busy processing those packets and not polling other FDs and timer events. This can lead to controls timing out and in worse case other nodes marking busy node as disconnected. Signed-off-by: Amitay Isaacs <amitay@gmail.com> (This used to be ctdb commit 92939c1178d04116d842708bc2d6a9c2950e36cc)
This commit is contained in:
parent
cfb7f74fa2
commit
a61a4b1254
@ -29,11 +29,14 @@
|
||||
#include "../include/ctdb_client.h"
|
||||
#include <stdarg.h>
|
||||
|
||||
#define QUEUE_BUFFER_SIZE (16*1024)
|
||||
|
||||
/* structures for packet queueing - see common/ctdb_io.c */
|
||||
struct ctdb_buffer {
|
||||
uint8_t *data;
|
||||
uint32_t length;
|
||||
uint32_t size;
|
||||
uint32_t extend;
|
||||
};
|
||||
|
||||
struct ctdb_queue_pkt {
|
||||
@ -98,7 +101,9 @@ static void queue_process(struct ctdb_queue *queue)
|
||||
}
|
||||
|
||||
if (queue->buffer.length < pkt_size) {
|
||||
DEBUG(DEBUG_DEBUG, ("Partial packet data read\n"));
|
||||
if (pkt_size > QUEUE_BUFFER_SIZE) {
|
||||
queue->buffer.extend = pkt_size;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -122,6 +127,11 @@ static void queue_process(struct ctdb_queue *queue)
|
||||
/* There is more data to be processed, schedule an event */
|
||||
tevent_schedule_immediate(queue->im, queue->ctdb->ev,
|
||||
queue_process_event, queue);
|
||||
} else {
|
||||
if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
|
||||
TALLOC_FREE(queue->buffer.data);
|
||||
queue->buffer.size = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* It is the responsibility of the callback to free 'data' */
|
||||
@ -143,6 +153,7 @@ static void queue_io_read(struct ctdb_queue *queue)
|
||||
int num_ready = 0;
|
||||
ssize_t nread;
|
||||
uint8_t *data;
|
||||
int navail;
|
||||
|
||||
/* check how much data is available on the socket for immediately
|
||||
guaranteed nonblocking access.
|
||||
@ -160,29 +171,37 @@ static void queue_io_read(struct ctdb_queue *queue)
|
||||
|
||||
if (queue->buffer.data == NULL) {
|
||||
/* starting fresh, allocate buf to read data */
|
||||
queue->buffer.data = talloc_size(queue, num_ready);
|
||||
queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
|
||||
if (queue->buffer.data == NULL) {
|
||||
DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
|
||||
goto failed;
|
||||
}
|
||||
queue->buffer.size = num_ready;
|
||||
} else if (queue->buffer.length + num_ready > queue->buffer.size) {
|
||||
queue->buffer.size = QUEUE_BUFFER_SIZE;
|
||||
} else if (queue->buffer.extend > 0) {
|
||||
/* extending buffer */
|
||||
data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.length + num_ready);
|
||||
data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
|
||||
if (data == NULL) {
|
||||
DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.length + num_ready));
|
||||
DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
|
||||
goto failed;
|
||||
}
|
||||
queue->buffer.data = data;
|
||||
queue->buffer.size = queue->buffer.length + num_ready;
|
||||
queue->buffer.size = queue->buffer.extend;
|
||||
queue->buffer.extend = 0;
|
||||
}
|
||||
|
||||
nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
|
||||
if (nread <= 0) {
|
||||
DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
|
||||
goto failed;
|
||||
navail = queue->buffer.size - queue->buffer.length;
|
||||
if (num_ready > navail) {
|
||||
num_ready = navail;
|
||||
}
|
||||
|
||||
if (num_ready > 0) {
|
||||
nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
|
||||
if (nread <= 0) {
|
||||
DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
|
||||
goto failed;
|
||||
}
|
||||
queue->buffer.length += nread;
|
||||
}
|
||||
queue->buffer.length += nread;
|
||||
|
||||
queue_process(queue);
|
||||
return;
|
||||
|
Loading…
x
Reference in New Issue
Block a user