diff --git a/include/proto/peers.h b/include/proto/peers.h index ce4feaa4c..74e20dadc 100644 --- a/include/proto/peers.h +++ b/include/proto/peers.h @@ -55,6 +55,7 @@ static inline struct xprt_ops *peer_xprt(struct peer *p) #endif int peers_init_sync(struct peers *peers); +int peers_alloc_dcache(struct peers *peers); void peers_register_table(struct peers *, struct stktable *table); void peers_setup_frontend(struct proxy *fe); diff --git a/include/types/peers.h b/include/types/peers.h index 0139edc05..8a7a4de4f 100644 --- a/include/types/peers.h +++ b/include/types/peers.h @@ -71,6 +71,7 @@ struct peer { struct shared_table *last_local_table; struct shared_table *tables; struct server *srv; + struct dcache *dcache; /* dictionary cache */ __decl_hathreads(HA_SPINLOCK_T lock); /* lock used to handle this peer section */ struct peer *next; /* next peer in the list */ }; diff --git a/src/cfgparse.c b/src/cfgparse.c index bec5b01f8..ff20de137 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -3960,7 +3960,7 @@ out_uri_auth_compat: bind_conf->xprt->prepare_bind_conf(bind_conf) < 0) cfgerr++; } - if (!peers_init_sync(curpeers)) { + if (!peers_init_sync(curpeers) || !peers_alloc_dcache(curpeers)) { ha_alert("Peers section '%s': out of memory, giving up on peers.\n", curpeers->id); cfgerr++; diff --git a/src/peers.c b/src/peers.c index 7ccd0c5b6..76e94c15e 100644 --- a/src/peers.c +++ b/src/peers.c @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include @@ -139,6 +140,7 @@ struct peer_prep_params { unsigned int updateid; int use_identifier; int use_timed; + struct peer *peer; } updt; struct { struct shared_table *shared_table; @@ -172,6 +174,8 @@ struct peer_prep_params { #define PEER_MSG_HEADER_LEN 2 +#define PEER_STKT_CACHE_MAX_ENTRIES 128 + /**********************************/ /* Peer Session IO handler states */ /**********************************/ @@ -217,6 +221,10 @@ static size_t proto_len = sizeof(PEER_SESSION_PROTO_NAME) - 1; struct peers *cfg_peers = NULL; static void peer_session_forceshutdown(struct peer *peer); +struct dcache_tx_entry *dcache_tx_insert(struct dcache *dc, + struct dcache_tx_entry *i); +static inline void flush_dcache(struct peer *peer); + static const char *statuscode_str(int statuscode) { switch (statuscode) { @@ -399,12 +407,14 @@ static int peer_prepare_updatemsg(char *msg, size_t size, struct peer_prep_param unsigned int updateid; int use_identifier; int use_timed; + struct peer *peer; ts = p->updt.stksess; st = p->updt.shared_table; updateid = p->updt.updateid; use_identifier = p->updt.use_identifier; use_timed = p->updt.use_timed; + peer = p->updt.peer; cursor = datamsg = msg + PEER_MSG_HEADER_LEN + PEER_MSG_ENC_LENGTH_MAXLEN; @@ -483,6 +493,43 @@ static int peer_prepare_updatemsg(char *msg, size_t size, struct peer_prep_param intencode(frqp->prev_ctr, &cursor); break; } + case STD_T_DICT: { + struct dict_entry *de; + struct dcache_tx_entry *cached_de; + struct dcache_tx_entry cde = {0}; + char *beg, *end; + size_t value_len, data_len; + struct dcache *dc; + + de = stktable_data_cast(data_ptr, std_t_dict); + if (!de) + break; + + dc = peer->dcache; + cde.value.key = de; + cached_de = dcache_tx_insert(dc, &cde); + if (!cached_de) + break; + + /* Leave enough room to encode the remaining data length. */ + end = beg = cursor + PEER_MSG_ENC_LENGTH_MAXLEN; + /* Encode the dictionary entry key */ + intencode(cached_de->key.key + 1, &end); + if (cached_de != &cde) { + /* Encode the length of the dictionary entry data */ + value_len = strlen(de->value.key); + intencode(value_len, &end); + /* Copy the data */ + memcpy(end, de->value.key, value_len); + end += value_len; + } + /* Encode the length of the data */ + data_len = end - beg; + intencode(data_len, &cursor); + memmove(cursor, beg, data_len); + cursor += data_len; + break; + } } } } @@ -550,6 +597,7 @@ static int peer_prepare_switchmsg(char *msg, size_t size, struct peer_prep_param case STD_T_SINT: case STD_T_UINT: case STD_T_ULL: + case STD_T_DICT: data |= 1 << data_type; break; case STD_T_FRQP: @@ -654,6 +702,8 @@ void __peer_session_deinit(struct peer *peer) HA_ATOMIC_SUB(&active_peers, 1); + flush_dcache(peer); + /* Re-init current table pointers to force announcement on re-connect */ peer->remote_table = peer->last_local_table = NULL; peer->appctx = NULL; @@ -877,6 +927,7 @@ static inline int peer_send_updatemsg(struct shared_table *st, struct appctx *ap .updt.updateid = updateid, .updt.use_identifier = use_identifier, .updt.use_timed = use_timed, + .updt.peer = appctx->ctx.peers.ptr, }; return peer_send_msg(appctx, peer_prepare_updatemsg, &p); @@ -1354,6 +1405,48 @@ static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt, stktable_data_cast(data_ptr, std_t_frqp) = data; break; } + case STD_T_DICT: { + struct buffer *chunk; + size_t data_len, value_len; + unsigned int id; + struct dict_entry *de; + struct dcache *dc; + + data_len = decoded_int; + if (*msg_cur + data_len > msg_end) + goto malformed_unlock; + + id = intdecode(msg_cur, msg_end); + if (!*msg_cur || !id) + goto malformed_unlock; + + dc = p->dcache; + if (*msg_cur == msg_end) { + /* Dictionary entry key without value. */ + if (id > dc->max_entries) + break; + /* IDs sent over the network are numbered from 1. */ + de = dc->rx[id - 1].de; + } + else { + chunk = get_trash_chunk(); + value_len = intdecode(msg_cur, msg_end); + if (!*msg_cur || *msg_cur + value_len > msg_end || + unlikely(value_len + 1 >= chunk->size)) + goto malformed_unlock; + + chunk_memcpy(chunk, *msg_cur, value_len); + chunk->area[chunk->data] = '\0'; + de = dict_insert(&server_name_dict, chunk->area); + dc->rx[id - 1].de = de; + } + if (de) { + data_ptr = stktable_data_ptr(st->table, ts, data_type); + if (data_ptr) + stktable_data_cast(data_ptr, std_t_dict) = de; + } + break; + } } } /* Force new expiration */ @@ -2834,7 +2927,7 @@ struct dcache_tx_entry *new_dcache_tx_entry(unsigned int k, struct dict_entry *d static inline void flush_dcache(struct peer *peer) { int i; - struct dcache *dc = peer_stkt_dict_cache(peer, st); + struct dcache *dc = peer->dcache; if (!eb_is_empty(&dc->tx->keys)) { struct eb32_node *node, *next; @@ -2902,6 +2995,23 @@ struct dcache_tx_entry *dcache_tx_insert(struct dcache *dc, return o; } +/* + * Allocate a dictionary cache for each peer of section. + * Return 1 if succeeded, 0 if not. + */ +int peers_alloc_dcache(struct peers *peers) +{ + struct peer *p; + + for (p = peers->remote; p; p = p->next) { + p->dcache = new_dcache(PEER_STKT_CACHE_MAX_ENTRIES); + if (!p->dcache) + return 0; + } + + return 1; +} + /* * Function used to register a table for sync on a group of peers *