MINOR: peers: Make peers protocol support new "server_name" data type.

Make usage of the APIs implemented for dictionaries (dict.c) and their LRU caches (struct dcache)
so that to send/receive server names used for the server by name stickiness. These
names are sent over the network as follows:

 - in every case we send the encode length of the data (STD_T_DICT), then
 - if the server names is not present in the cache used upon transmission (struct dcache_tx)
   we cache it and we the ID of this TX cache entry followed the encode length of the
   server name, and finally the sever name itseft (non NULL terminated string).
 - if the server name is present, we repead these operations but we only send the TX cache
   entry ID.

Upon receipt, the couple of (cache IDs, server name) are stored the LRU cache used
only upon receipt (struct dcache_rx). As the peers protocol is symetrical, the fact
that the server name is present in the received data (resp. or not) denotes if
the entry is absent (resp. or not).
This commit is contained in:
Frdric Lcaille 2019-05-20 18:22:52 +02:00 committed by Willy Tarreau
parent 03cdf55e69
commit 8d78fa7def
4 changed files with 114 additions and 2 deletions

View File

@ -55,6 +55,7 @@ static inline struct xprt_ops *peer_xprt(struct peer *p)
#endif
int peers_init_sync(struct peers *peers);
int peers_alloc_dcache(struct peers *peers);
void peers_register_table(struct peers *, struct stktable *table);
void peers_setup_frontend(struct proxy *fe);

View File

@ -71,6 +71,7 @@ struct peer {
struct shared_table *last_local_table;
struct shared_table *tables;
struct server *srv;
struct dcache *dcache; /* dictionary cache */
__decl_hathreads(HA_SPINLOCK_T lock); /* lock used to handle this peer section */
struct peer *next; /* next peer in the list */
};

View File

@ -3960,7 +3960,7 @@ out_uri_auth_compat:
bind_conf->xprt->prepare_bind_conf(bind_conf) < 0)
cfgerr++;
}
if (!peers_init_sync(curpeers)) {
if (!peers_init_sync(curpeers) || !peers_alloc_dcache(curpeers)) {
ha_alert("Peers section '%s': out of memory, giving up on peers.\n",
curpeers->id);
cfgerr++;

View File

@ -36,6 +36,7 @@
#include <proto/applet.h>
#include <proto/channel.h>
#include <proto/cli.h>
#include <proto/dict.h>
#include <proto/fd.h>
#include <proto/frontend.h>
#include <proto/log.h>
@ -139,6 +140,7 @@ struct peer_prep_params {
unsigned int updateid;
int use_identifier;
int use_timed;
struct peer *peer;
} updt;
struct {
struct shared_table *shared_table;
@ -172,6 +174,8 @@ struct peer_prep_params {
#define PEER_MSG_HEADER_LEN 2
#define PEER_STKT_CACHE_MAX_ENTRIES 128
/**********************************/
/* Peer Session IO handler states */
/**********************************/
@ -217,6 +221,10 @@ static size_t proto_len = sizeof(PEER_SESSION_PROTO_NAME) - 1;
struct peers *cfg_peers = NULL;
static void peer_session_forceshutdown(struct peer *peer);
struct dcache_tx_entry *dcache_tx_insert(struct dcache *dc,
struct dcache_tx_entry *i);
static inline void flush_dcache(struct peer *peer);
static const char *statuscode_str(int statuscode)
{
switch (statuscode) {
@ -399,12 +407,14 @@ static int peer_prepare_updatemsg(char *msg, size_t size, struct peer_prep_param
unsigned int updateid;
int use_identifier;
int use_timed;
struct peer *peer;
ts = p->updt.stksess;
st = p->updt.shared_table;
updateid = p->updt.updateid;
use_identifier = p->updt.use_identifier;
use_timed = p->updt.use_timed;
peer = p->updt.peer;
cursor = datamsg = msg + PEER_MSG_HEADER_LEN + PEER_MSG_ENC_LENGTH_MAXLEN;
@ -483,6 +493,43 @@ static int peer_prepare_updatemsg(char *msg, size_t size, struct peer_prep_param
intencode(frqp->prev_ctr, &cursor);
break;
}
case STD_T_DICT: {
struct dict_entry *de;
struct dcache_tx_entry *cached_de;
struct dcache_tx_entry cde = {0};
char *beg, *end;
size_t value_len, data_len;
struct dcache *dc;
de = stktable_data_cast(data_ptr, std_t_dict);
if (!de)
break;
dc = peer->dcache;
cde.value.key = de;
cached_de = dcache_tx_insert(dc, &cde);
if (!cached_de)
break;
/* Leave enough room to encode the remaining data length. */
end = beg = cursor + PEER_MSG_ENC_LENGTH_MAXLEN;
/* Encode the dictionary entry key */
intencode(cached_de->key.key + 1, &end);
if (cached_de != &cde) {
/* Encode the length of the dictionary entry data */
value_len = strlen(de->value.key);
intencode(value_len, &end);
/* Copy the data */
memcpy(end, de->value.key, value_len);
end += value_len;
}
/* Encode the length of the data */
data_len = end - beg;
intencode(data_len, &cursor);
memmove(cursor, beg, data_len);
cursor += data_len;
break;
}
}
}
}
@ -550,6 +597,7 @@ static int peer_prepare_switchmsg(char *msg, size_t size, struct peer_prep_param
case STD_T_SINT:
case STD_T_UINT:
case STD_T_ULL:
case STD_T_DICT:
data |= 1 << data_type;
break;
case STD_T_FRQP:
@ -654,6 +702,8 @@ void __peer_session_deinit(struct peer *peer)
HA_ATOMIC_SUB(&active_peers, 1);
flush_dcache(peer);
/* Re-init current table pointers to force announcement on re-connect */
peer->remote_table = peer->last_local_table = NULL;
peer->appctx = NULL;
@ -877,6 +927,7 @@ static inline int peer_send_updatemsg(struct shared_table *st, struct appctx *ap
.updt.updateid = updateid,
.updt.use_identifier = use_identifier,
.updt.use_timed = use_timed,
.updt.peer = appctx->ctx.peers.ptr,
};
return peer_send_msg(appctx, peer_prepare_updatemsg, &p);
@ -1354,6 +1405,48 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt,
stktable_data_cast(data_ptr, std_t_frqp) = data;
break;
}
case STD_T_DICT: {
struct buffer *chunk;
size_t data_len, value_len;
unsigned int id;
struct dict_entry *de;
struct dcache *dc;
data_len = decoded_int;
if (*msg_cur + data_len > msg_end)
goto malformed_unlock;
id = intdecode(msg_cur, msg_end);
if (!*msg_cur || !id)
goto malformed_unlock;
dc = p->dcache;
if (*msg_cur == msg_end) {
/* Dictionary entry key without value. */
if (id > dc->max_entries)
break;
/* IDs sent over the network are numbered from 1. */
de = dc->rx[id - 1].de;
}
else {
chunk = get_trash_chunk();
value_len = intdecode(msg_cur, msg_end);
if (!*msg_cur || *msg_cur + value_len > msg_end ||
unlikely(value_len + 1 >= chunk->size))
goto malformed_unlock;
chunk_memcpy(chunk, *msg_cur, value_len);
chunk->area[chunk->data] = '\0';
de = dict_insert(&server_name_dict, chunk->area);
dc->rx[id - 1].de = de;
}
if (de) {
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr)
stktable_data_cast(data_ptr, std_t_dict) = de;
}
break;
}
}
}
/* Force new expiration */
@ -2834,7 +2927,7 @@ struct dcache_tx_entry *new_dcache_tx_entry(unsigned int k, struct dict_entry *d
static inline void flush_dcache(struct peer *peer)
{
int i;
struct dcache *dc = peer_stkt_dict_cache(peer, st);
struct dcache *dc = peer->dcache;
if (!eb_is_empty(&dc->tx->keys)) {
struct eb32_node *node, *next;
@ -2902,6 +2995,23 @@ struct dcache_tx_entry *dcache_tx_insert(struct dcache *dc,
return o;
}
/*
* Allocate a dictionary cache for each peer of <peers> section.
* Return 1 if succeeded, 0 if not.
*/
int peers_alloc_dcache(struct peers *peers)
{
struct peer *p;
for (p = peers->remote; p; p = p->next) {
p->dcache = new_dcache(PEER_STKT_CACHE_MAX_ENTRIES);
if (!p->dcache)
return 0;
}
return 1;
}
/*
* Function used to register a table for sync on a group of peers
*