mirror of
https://github.com/systemd/systemd.git
synced 2024-12-22 17:35:35 +03:00
sd-netlink: reimplement received message queue
By using OrderedSet and Hashmap, we can drop all memmove() calls. No functional changes, just refactoring.
This commit is contained in:
parent
2ea465ef62
commit
e417c4ac44
@ -7,6 +7,7 @@
|
||||
|
||||
#include "list.h"
|
||||
#include "netlink-types.h"
|
||||
#include "ordered-set.h"
|
||||
#include "prioq.h"
|
||||
#include "time-util.h"
|
||||
|
||||
@ -72,11 +73,8 @@ struct sd_netlink {
|
||||
Hashmap *broadcast_group_refs;
|
||||
bool broadcast_group_dont_leave:1; /* until we can rely on 4.2 */
|
||||
|
||||
sd_netlink_message **rqueue;
|
||||
unsigned rqueue_size;
|
||||
|
||||
sd_netlink_message **rqueue_partial;
|
||||
unsigned rqueue_partial_size;
|
||||
OrderedSet *rqueue;
|
||||
Hashmap *rqueue_partial_by_serial;
|
||||
|
||||
struct nlmsghdr *rbuffer;
|
||||
|
||||
|
@ -241,57 +241,50 @@ static int socket_recv_message(int fd, void *buf, size_t buf_size, uint32_t *ret
|
||||
return (int) n;
|
||||
}
|
||||
|
||||
DEFINE_PRIVATE_HASH_OPS_WITH_VALUE_DESTRUCTOR(
|
||||
netlink_message_hash_ops,
|
||||
void, trivial_hash_func, trivial_compare_func,
|
||||
sd_netlink_message, sd_netlink_message_unref);
|
||||
|
||||
static int netlink_queue_received_message(sd_netlink *nl, sd_netlink_message *m) {
|
||||
int r;
|
||||
|
||||
assert(nl);
|
||||
assert(m);
|
||||
|
||||
if (nl->rqueue_size >= NETLINK_RQUEUE_MAX)
|
||||
if (ordered_set_size(nl->rqueue) >= NETLINK_RQUEUE_MAX)
|
||||
return log_debug_errno(SYNTHETIC_ERRNO(ENOBUFS),
|
||||
"sd-netlink: exhausted the read queue size (%d)",
|
||||
NETLINK_RQUEUE_MAX);
|
||||
"sd-netlink: exhausted the read queue size (%d)", NETLINK_RQUEUE_MAX);
|
||||
|
||||
if (!GREEDY_REALLOC(nl->rqueue, nl->rqueue_size + 1))
|
||||
return -ENOMEM;
|
||||
r = ordered_set_ensure_put(&nl->rqueue, &netlink_message_hash_ops, m);
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
nl->rqueue[nl->rqueue_size++] = sd_netlink_message_ref(m);
|
||||
sd_netlink_message_ref(m);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int netlink_queue_partially_received_message(sd_netlink *nl, sd_netlink_message *m) {
|
||||
uint32_t serial;
|
||||
int r;
|
||||
|
||||
assert(nl);
|
||||
assert(m);
|
||||
assert(m->hdr->nlmsg_flags & NLM_F_MULTI);
|
||||
|
||||
if (nl->rqueue_partial_size >= NETLINK_RQUEUE_MAX)
|
||||
if (hashmap_size(nl->rqueue_partial_by_serial) >= NETLINK_RQUEUE_MAX)
|
||||
return log_debug_errno(SYNTHETIC_ERRNO(ENOBUFS),
|
||||
"sd-netlink: exhausted the partial read queue size (%d)",
|
||||
NETLINK_RQUEUE_MAX);
|
||||
"sd-netlink: exhausted the partial read queue size (%d)", NETLINK_RQUEUE_MAX);
|
||||
|
||||
if (!GREEDY_REALLOC(nl->rqueue_partial, nl->rqueue_partial_size + 1))
|
||||
return -ENOMEM;
|
||||
serial = message_get_serial(m);
|
||||
r = hashmap_ensure_put(&nl->rqueue_partial_by_serial, &netlink_message_hash_ops, UINT32_TO_PTR(serial), m);
|
||||
if (r < 0)
|
||||
return r;
|
||||
|
||||
nl->rqueue_partial[nl->rqueue_partial_size++] = sd_netlink_message_ref(m);
|
||||
sd_netlink_message_ref(m);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static sd_netlink_message *netlink_take_partial_message(sd_netlink *nl, uint32_t seqnum) {
|
||||
assert(nl);
|
||||
|
||||
for (unsigned i = 0; i < nl->rqueue_partial_size; i++)
|
||||
if (message_get_serial(nl->rqueue_partial[i]) == seqnum) {
|
||||
sd_netlink_message *found = nl->rqueue_partial[i];
|
||||
|
||||
/* remove the message form the partial read queue */
|
||||
memmove(nl->rqueue_partial + i, nl->rqueue_partial + i + 1,
|
||||
sizeof(sd_netlink_message*) * (nl->rqueue_partial_size - i - 1));
|
||||
nl->rqueue_partial_size--;
|
||||
|
||||
return found;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* On success, the number of bytes received is returned and *ret points to the received message
|
||||
* which has a valid header and the correct size.
|
||||
* If nothing useful was received 0 is returned.
|
||||
@ -329,7 +322,7 @@ int socket_read_message(sd_netlink *nl) {
|
||||
|
||||
if (nl->rbuffer->nlmsg_flags & NLM_F_MULTI) {
|
||||
multi_part = true;
|
||||
first = netlink_take_partial_message(nl, nl->rbuffer->nlmsg_seq);
|
||||
first = hashmap_remove(nl->rqueue_partial_by_serial, UINT32_TO_PTR(nl->rbuffer->nlmsg_seq));
|
||||
}
|
||||
|
||||
for (struct nlmsghdr *new_msg = nl->rbuffer; NLMSG_OK(new_msg, len) && !done; new_msg = NLMSG_NEXT(new_msg, len)) {
|
||||
|
@ -116,18 +116,11 @@ int sd_netlink_increase_rxbuf(sd_netlink *nl, size_t size) {
|
||||
|
||||
static sd_netlink *netlink_free(sd_netlink *nl) {
|
||||
sd_netlink_slot *s;
|
||||
unsigned i;
|
||||
|
||||
assert(nl);
|
||||
|
||||
for (i = 0; i < nl->rqueue_size; i++)
|
||||
sd_netlink_message_unref(nl->rqueue[i]);
|
||||
free(nl->rqueue);
|
||||
|
||||
for (i = 0; i < nl->rqueue_partial_size; i++)
|
||||
sd_netlink_message_unref(nl->rqueue_partial[i]);
|
||||
free(nl->rqueue_partial);
|
||||
|
||||
ordered_set_free(nl->rqueue);
|
||||
hashmap_free(nl->rqueue_partial_by_serial);
|
||||
free(nl->rbuffer);
|
||||
|
||||
while ((s = nl->slots)) {
|
||||
@ -175,29 +168,26 @@ int sd_netlink_send(
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int dispatch_rqueue(sd_netlink *nl, sd_netlink_message **message) {
|
||||
static int dispatch_rqueue(sd_netlink *nl, sd_netlink_message **ret) {
|
||||
sd_netlink_message *m;
|
||||
int r;
|
||||
|
||||
assert(nl);
|
||||
assert(message);
|
||||
assert(ret);
|
||||
|
||||
if (nl->rqueue_size <= 0) {
|
||||
if (ordered_set_size(nl->rqueue) <= 0) {
|
||||
/* Try to read a new message */
|
||||
r = socket_read_message(nl);
|
||||
if (r == -ENOBUFS) { /* FIXME: ignore buffer overruns for now */
|
||||
if (r == -ENOBUFS) /* FIXME: ignore buffer overruns for now */
|
||||
log_debug_errno(r, "sd-netlink: Got ENOBUFS from netlink socket, ignoring.");
|
||||
return 1;
|
||||
}
|
||||
if (r <= 0)
|
||||
else if (r < 0)
|
||||
return r;
|
||||
}
|
||||
|
||||
/* Dispatch a queued message */
|
||||
*message = nl->rqueue[0];
|
||||
nl->rqueue_size--;
|
||||
memmove(nl->rqueue, nl->rqueue + 1, sizeof(sd_netlink_message*) * nl->rqueue_size);
|
||||
|
||||
return 1;
|
||||
m = ordered_set_steal_first(nl->rqueue);
|
||||
*ret = m;
|
||||
return !!m;
|
||||
}
|
||||
|
||||
static int process_timeout(sd_netlink *nl) {
|
||||
@ -437,7 +427,7 @@ int sd_netlink_wait(sd_netlink *nl, uint64_t timeout_usec) {
|
||||
assert_return(nl, -EINVAL);
|
||||
assert_return(!netlink_pid_changed(nl), -ECHILD);
|
||||
|
||||
if (nl->rqueue_size > 0)
|
||||
if (ordered_set_size(nl->rqueue) > 0)
|
||||
return 0;
|
||||
|
||||
r = netlink_poll(nl, false, timeout_usec);
|
||||
@ -538,23 +528,21 @@ int sd_netlink_read(
|
||||
timeout = calc_elapse(usec);
|
||||
|
||||
for (;;) {
|
||||
sd_netlink_message *m;
|
||||
usec_t left;
|
||||
|
||||
for (unsigned i = 0; i < nl->rqueue_size; i++) {
|
||||
ORDERED_SET_FOREACH(m, nl->rqueue) {
|
||||
_cleanup_(sd_netlink_message_unrefp) sd_netlink_message *incoming = NULL;
|
||||
uint32_t received_serial;
|
||||
uint16_t type;
|
||||
|
||||
received_serial = message_get_serial(nl->rqueue[i]);
|
||||
received_serial = message_get_serial(m);
|
||||
if (received_serial != serial)
|
||||
continue;
|
||||
|
||||
incoming = nl->rqueue[i];
|
||||
|
||||
/* found a match, remove from rqueue and return it */
|
||||
memmove(nl->rqueue + i, nl->rqueue + i + 1,
|
||||
sizeof(sd_netlink_message*) * (nl->rqueue_size - i - 1));
|
||||
nl->rqueue_size--;
|
||||
ordered_set_remove(nl->rqueue, m);
|
||||
incoming = TAKE_PTR(m);
|
||||
|
||||
r = sd_netlink_message_get_errno(incoming);
|
||||
if (r < 0)
|
||||
@ -625,7 +613,7 @@ int sd_netlink_get_events(sd_netlink *nl) {
|
||||
assert_return(nl, -EINVAL);
|
||||
assert_return(!netlink_pid_changed(nl), -ECHILD);
|
||||
|
||||
return nl->rqueue_size == 0 ? POLLIN : 0;
|
||||
return ordered_set_size(nl->rqueue) == 0 ? POLLIN : 0;
|
||||
}
|
||||
|
||||
int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) {
|
||||
@ -635,7 +623,7 @@ int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) {
|
||||
assert_return(timeout_usec, -EINVAL);
|
||||
assert_return(!netlink_pid_changed(nl), -ECHILD);
|
||||
|
||||
if (nl->rqueue_size > 0) {
|
||||
if (ordered_set_size(nl->rqueue) > 0) {
|
||||
*timeout_usec = 0;
|
||||
return 1;
|
||||
}
|
||||
@ -647,7 +635,6 @@ int sd_netlink_get_timeout(sd_netlink *nl, uint64_t *timeout_usec) {
|
||||
}
|
||||
|
||||
*timeout_usec = c->timeout;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user