eventsapi: Add support for Client side Events
Client side gf_event uses ctx->cmd_args.volfile_server to push notifications to the eventsd. Socket server changed from Unix domain socket to UDP to support external events. Following to be addressed in different patch - Port used for eventsd is 24009. Make it configurable Already configurable in Server side. Configurable in gf_event API is required. - Auth Token yet to be added as discussed in https://www.gluster.org/pipermail/gluster-devel/2016-August/050324.html Change-Id: I159acf80b681d10b82d52cfb3ffdf85cb896542d BUG: 1367774 Signed-off-by: Aravinda VK <avishwan@redhat.com> Reviewed-on: http://review.gluster.org/15189 Smoke: Gluster Build System <jenkins@build.gluster.org> Reviewed-by: Prashanth Pai <ppai@redhat.com> Reviewed-by: Atin Mukherjee <amukherj@redhat.com> CentOS-regression: Gluster Build System <jenkins@build.gluster.org> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
This commit is contained in:
parent
c1f5cf0bda
commit
b71ae7d77d
@ -102,7 +102,9 @@ ERRORS = (
|
||||
"EVENT_ERROR_INVALID_INPUTS",
|
||||
"EVENT_ERROR_SOCKET",
|
||||
"EVENT_ERROR_CONNECT",
|
||||
"EVENT_ERROR_SEND"
|
||||
"EVENT_ERROR_SEND",
|
||||
"EVENT_ERROR_RESOLVE",
|
||||
"EVENT_ERROR_MSG_FORMAT",
|
||||
)
|
||||
|
||||
if gen_header_type == "C_HEADER":
|
||||
|
@ -9,7 +9,7 @@
|
||||
# cases as published by the Free Software Foundation.
|
||||
#
|
||||
|
||||
SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock"
|
||||
SERVER_ADDRESS = "0.0.0.0"
|
||||
DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
|
||||
CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
|
||||
CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
|
||||
@ -17,7 +17,9 @@ WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
|
||||
WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
|
||||
LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
|
||||
EVENTSD = "glustereventsd"
|
||||
CONFIG_KEYS = ["log_level"]
|
||||
CONFIG_KEYS = ["log_level", "port"]
|
||||
BOOL_CONFIGS = []
|
||||
RESTART_CONFIGS = []
|
||||
INT_CONFIGS = ["port"]
|
||||
RESTART_CONFIGS = ["port"]
|
||||
EVENTS_ENABLED = @EVENTS_ENABLED@
|
||||
UUID_FILE = "@GLUSTERD_WORKDIR@/glusterd.info"
|
||||
|
@ -1,3 +1,4 @@
|
||||
{
|
||||
"log_level": "INFO"
|
||||
"log_level": "INFO",
|
||||
"port": 24009
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ import time
|
||||
from eventsapiconf import SERVER_ADDRESS, EVENTS_ENABLED
|
||||
from eventtypes import all_events
|
||||
|
||||
from utils import logger, setup_logger
|
||||
from utils import logger, setup_logger, get_config
|
||||
|
||||
# Run this when this lib loads
|
||||
setup_logger()
|
||||
@ -31,10 +31,9 @@ def gf_event(event_type, **kwargs):
|
||||
return
|
||||
|
||||
try:
|
||||
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
client.connect(SERVER_ADDRESS)
|
||||
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
except socket.error as e:
|
||||
logger.error("Unable to connect to events.sock: {0}".format(e))
|
||||
logger.error("Unable to connect to events Server: {0}".format(e))
|
||||
return
|
||||
|
||||
# Convert key value args into KEY1=VALUE1;KEY2=VALUE2;..
|
||||
@ -45,7 +44,18 @@ def gf_event(event_type, **kwargs):
|
||||
# <TIMESTAMP> <EVENT_TYPE> <MSG>
|
||||
msg = "{0} {1} {2}".format(int(time.time()), event_type, msg.strip(";"))
|
||||
|
||||
port = get_config("port")
|
||||
if port is None:
|
||||
logger.error("Unable to get eventsd port details")
|
||||
return
|
||||
|
||||
try:
|
||||
client.sendall(msg)
|
||||
sent = client.sendto(msg, (SERVER_ADDRESS, port))
|
||||
assert sent == len(msg)
|
||||
except socket.error as e:
|
||||
logger.error("Unable to Send message: {0}".format(e))
|
||||
except AssertionError:
|
||||
logger.error("Unable to send message. Sent: {0}, Actual: {1}".format(
|
||||
sent, len(msg)))
|
||||
finally:
|
||||
client.close()
|
||||
|
@ -11,12 +11,10 @@
|
||||
#
|
||||
|
||||
from __future__ import print_function
|
||||
import asyncore
|
||||
import socket
|
||||
import os
|
||||
from multiprocessing import Process, Queue
|
||||
import sys
|
||||
import signal
|
||||
import SocketServer
|
||||
import socket
|
||||
|
||||
from eventtypes import all_events
|
||||
import handlers
|
||||
@ -24,26 +22,19 @@ import utils
|
||||
from eventsapiconf import SERVER_ADDRESS
|
||||
from utils import logger
|
||||
|
||||
# Global Queue, EventsHandler will add items to the queue
|
||||
# and process_event will gets each item and handles it
|
||||
events_queue = Queue()
|
||||
events_server_pid = None
|
||||
|
||||
class GlusterEventsRequestHandler(SocketServer.BaseRequestHandler):
|
||||
|
||||
def process_event():
|
||||
"""
|
||||
Seperate process which handles all the incoming events from Gluster
|
||||
processes.
|
||||
"""
|
||||
while True:
|
||||
data = events_queue.get()
|
||||
logger.debug("EVENT: {0}".format(repr(data)))
|
||||
def handle(self):
|
||||
data = self.request[0].strip()
|
||||
logger.debug("EVENT: {0} from {1}".format(repr(data),
|
||||
self.client_address[0]))
|
||||
try:
|
||||
# Event Format <TIMESTAMP> <TYPE> <DETAIL>
|
||||
ts, key, value = data.split(" ", 2)
|
||||
except ValueError:
|
||||
logger.warn("Invalid Event Format {0}".format(data))
|
||||
continue
|
||||
return
|
||||
|
||||
data_dict = {}
|
||||
try:
|
||||
@ -51,7 +42,7 @@ def process_event():
|
||||
data_dict = dict(x.split('=') for x in value.split(';'))
|
||||
except ValueError:
|
||||
logger.warn("Unable to parse Event {0}".format(data))
|
||||
continue
|
||||
return
|
||||
|
||||
try:
|
||||
# Event Type to Function Map, Recieved event data will be in
|
||||
@ -75,68 +66,28 @@ def process_event():
|
||||
handlers.generic_handler(ts, int(key), data_dict)
|
||||
|
||||
|
||||
def process_event_wrapper():
|
||||
try:
|
||||
process_event()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
|
||||
|
||||
class GlusterEventsHandler(asyncore.dispatcher_with_send):
|
||||
|
||||
def handle_read(self):
|
||||
data = self.recv(8192)
|
||||
if data:
|
||||
events_queue.put(data)
|
||||
self.send(data)
|
||||
|
||||
|
||||
class GlusterEventsServer(asyncore.dispatcher):
|
||||
|
||||
def __init__(self):
|
||||
global events_server_pid
|
||||
asyncore.dispatcher.__init__(self)
|
||||
# Start the Events listener process which listens to
|
||||
# the global queue
|
||||
p = Process(target=process_event_wrapper)
|
||||
p.start()
|
||||
events_server_pid = p.pid
|
||||
|
||||
# Create UNIX Domain Socket, bind to path
|
||||
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
self.bind(SERVER_ADDRESS)
|
||||
self.listen(5)
|
||||
|
||||
def handle_accept(self):
|
||||
pair = self.accept()
|
||||
if pair is not None:
|
||||
sock, addr = pair
|
||||
GlusterEventsHandler(sock)
|
||||
|
||||
|
||||
def signal_handler_sigusr2(sig, frame):
|
||||
if events_server_pid is not None:
|
||||
os.kill(events_server_pid, signal.SIGUSR2)
|
||||
utils.load_all()
|
||||
|
||||
|
||||
def init_event_server():
|
||||
utils.setup_logger()
|
||||
|
||||
# Delete Socket file if Exists
|
||||
try:
|
||||
os.unlink(SERVER_ADDRESS)
|
||||
except OSError:
|
||||
if os.path.exists(SERVER_ADDRESS):
|
||||
print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
|
||||
file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
utils.load_all()
|
||||
|
||||
# Start the Eventing Server, UNIX DOMAIN SOCKET Server
|
||||
GlusterEventsServer()
|
||||
asyncore.loop()
|
||||
port = utils.get_config("port")
|
||||
if port is None:
|
||||
sys.stderr.write("Unable to get Port details from Config\n")
|
||||
sys.exit(1)
|
||||
|
||||
# Start the Eventing Server, UDP Server
|
||||
try:
|
||||
server = SocketServer.ThreadingUDPServer(
|
||||
(SERVER_ADDRESS, port),
|
||||
GlusterEventsRequestHandler)
|
||||
except socket.error as e:
|
||||
sys.stderr.write("Failed to start Eventsd: {0}\n".format(e))
|
||||
sys.exit(1)
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
def main():
|
||||
|
@ -31,6 +31,7 @@ from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
|
||||
EVENTSD,
|
||||
CONFIG_KEYS,
|
||||
BOOL_CONFIGS,
|
||||
INT_CONFIGS,
|
||||
RESTART_CONFIGS)
|
||||
|
||||
|
||||
@ -462,6 +463,9 @@ class ConfigSetCmd(Cmd):
|
||||
if args.name in BOOL_CONFIGS:
|
||||
v = boolify(args.value)
|
||||
|
||||
if args.name in INT_CONFIGS:
|
||||
v = int(args.value)
|
||||
|
||||
new_data[args.name] = v
|
||||
file_content_overwrite(CUSTOM_CONFIG_FILE, new_data)
|
||||
|
||||
|
@ -17,11 +17,10 @@ import requests
|
||||
from eventsapiconf import (LOG_FILE,
|
||||
WEBHOOKS_FILE,
|
||||
DEFAULT_CONFIG_FILE,
|
||||
CUSTOM_CONFIG_FILE)
|
||||
CUSTOM_CONFIG_FILE,
|
||||
UUID_FILE)
|
||||
import eventtypes
|
||||
|
||||
from gluster.cliutils import get_node_uuid
|
||||
|
||||
|
||||
# Webhooks list
|
||||
_webhooks = {}
|
||||
@ -32,6 +31,23 @@ _config = {}
|
||||
|
||||
# Init Logger instance
|
||||
logger = logging.getLogger(__name__)
|
||||
NodeID = None
|
||||
|
||||
|
||||
def get_node_uuid():
|
||||
val = None
|
||||
with open(UUID_FILE) as f:
|
||||
for line in f:
|
||||
if line.startswith("UUID="):
|
||||
val = line.strip().split("=")[-1]
|
||||
break
|
||||
return val
|
||||
|
||||
|
||||
def get_config(key):
|
||||
if not _config:
|
||||
load_config()
|
||||
return _config.get(key, None)
|
||||
|
||||
|
||||
def get_event_type_name(idx):
|
||||
@ -109,8 +125,12 @@ def load_all():
|
||||
|
||||
|
||||
def publish(ts, event_key, data):
|
||||
global NodeID
|
||||
if NodeID is None:
|
||||
NodeID = get_node_uuid()
|
||||
|
||||
message = {
|
||||
"nodeid": get_node_uuid(),
|
||||
"nodeid": NodeID,
|
||||
"ts": int(ts),
|
||||
"event": get_event_type_name(event_key),
|
||||
"message": data
|
||||
|
@ -4,6 +4,7 @@
|
||||
<description>Default ports for gluster-distributed storage</description>
|
||||
<port protocol="tcp" port="24007"/> <!--For glusterd -->
|
||||
<port protocol="tcp" port="24008"/> <!--For glusterd RDMA port management -->
|
||||
<port protocol="tcp" port="24009"/> <!--For glustereventsd -->
|
||||
<port protocol="tcp" port="38465"/> <!--Gluster NFS service -->
|
||||
<port protocol="tcp" port="38466"/> <!--Gluster NFS service -->
|
||||
<port protocol="tcp" port="38467"/> <!--Gluster NFS service -->
|
||||
|
@ -16,73 +16,107 @@
|
||||
#include <time.h>
|
||||
#include <stdarg.h>
|
||||
#include <string.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <netdb.h>
|
||||
|
||||
#include "syscall.h"
|
||||
#include "mem-pool.h"
|
||||
#include "glusterfs.h"
|
||||
#include "globals.h"
|
||||
#include "events.h"
|
||||
|
||||
|
||||
#define EVENT_PATH DATADIR "/run/gluster/events.sock"
|
||||
#define EVENTS_MSG_MAX 2048
|
||||
#define EVENT_HOST "127.0.0.1"
|
||||
#define EVENT_PORT 24009
|
||||
|
||||
|
||||
int
|
||||
gf_event (eventtypes_t event, char *fmt, ...)
|
||||
{
|
||||
int ret = 0;
|
||||
int sock = -1;
|
||||
char eventstr[EVENTS_MSG_MAX] = "";
|
||||
struct sockaddr_un server;
|
||||
va_list arguments;
|
||||
char *msg = NULL;
|
||||
size_t eventstr_size = 0;
|
||||
int ret = 0;
|
||||
int sock = -1;
|
||||
char *eventstr = NULL;
|
||||
struct sockaddr_in server;
|
||||
va_list arguments;
|
||||
char *msg = NULL;
|
||||
glusterfs_ctx_t *ctx = NULL;
|
||||
struct hostent *host_data;
|
||||
char *host = NULL;
|
||||
|
||||
/* Global context */
|
||||
ctx = THIS->ctx;
|
||||
|
||||
if (event < 0 || event >= EVENT_LAST) {
|
||||
ret = EVENT_ERROR_INVALID_INPUTS;
|
||||
goto out;
|
||||
}
|
||||
|
||||
sock = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
/* Initialize UDP socket */
|
||||
sock = socket (AF_INET, SOCK_DGRAM, 0);
|
||||
if (sock < 0) {
|
||||
ret = EVENT_ERROR_SOCKET;
|
||||
goto out;
|
||||
}
|
||||
server.sun_family = AF_UNIX;
|
||||
strcpy(server.sun_path, EVENT_PATH);
|
||||
|
||||
if (connect(sock,
|
||||
(struct sockaddr *) &server,
|
||||
sizeof(struct sockaddr_un)) < 0) {
|
||||
ret = EVENT_ERROR_CONNECT;
|
||||
goto out;
|
||||
/* Get Host name to send message */
|
||||
if (ctx && ctx->cmd_args.volfile_server) {
|
||||
/* If it is client code then volfile_server is set
|
||||
use that information to push the events. */
|
||||
host_data = gethostbyname (ctx->cmd_args.volfile_server);
|
||||
if (host_data == NULL) {
|
||||
ret = EVENT_ERROR_RESOLVE;
|
||||
goto out;
|
||||
}
|
||||
host = inet_ntoa (*(struct in_addr *)(host_data->h_addr));
|
||||
} else {
|
||||
/* Localhost, Use the defined IP for localhost */
|
||||
host = EVENT_HOST;
|
||||
}
|
||||
|
||||
/* Socket Configurations */
|
||||
server.sin_family = AF_INET;
|
||||
server.sin_port = htons (EVENT_PORT);
|
||||
server.sin_addr.s_addr = inet_addr (host);
|
||||
memset (&server.sin_zero, '\0', sizeof (server.sin_zero));
|
||||
|
||||
va_start (arguments, fmt);
|
||||
ret = gf_vasprintf (&msg, fmt, arguments);
|
||||
va_end (arguments);
|
||||
|
||||
if (ret < 0) {
|
||||
ret = EVENT_ERROR_INVALID_INPUTS;
|
||||
goto out;
|
||||
}
|
||||
|
||||
eventstr_size = snprintf(NULL, 0, "%u %d %s", (unsigned)time(NULL),
|
||||
event, msg);
|
||||
ret = gf_asprintf (&eventstr, "%u %d %s",
|
||||
(unsigned)time(NULL), event, msg);
|
||||
|
||||
if (eventstr_size + 1 > EVENTS_MSG_MAX) {
|
||||
eventstr_size = EVENTS_MSG_MAX - 1;
|
||||
if (ret <= 0) {
|
||||
ret = EVENT_ERROR_MSG_FORMAT;
|
||||
goto out;
|
||||
}
|
||||
|
||||
snprintf(eventstr, eventstr_size+1, "%u %d %s",
|
||||
(unsigned)time(NULL), event, msg);
|
||||
|
||||
if (sys_write(sock, eventstr, strlen(eventstr)) <= 0) {
|
||||
/* Send Message */
|
||||
if (sendto (sock, eventstr, strlen (eventstr),
|
||||
0, (struct sockaddr *)&server, sizeof (server)) <= 0) {
|
||||
ret = EVENT_ERROR_SEND;
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = EVENT_SEND_OK;
|
||||
|
||||
out:
|
||||
sys_close(sock);
|
||||
GF_FREE(msg);
|
||||
if (sock >= 0) {
|
||||
sys_close (sock);
|
||||
}
|
||||
|
||||
/* Allocated by gf_vasprintf */
|
||||
if (msg)
|
||||
GF_FREE (msg);
|
||||
|
||||
/* Allocated by gf_asprintf */
|
||||
if (eventstr)
|
||||
GF_FREE (eventstr);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user