eventsapi: Gluster Eventing Feature implementation

[Depends on http://review.gluster.org/14627]

Design is available in `glusterfs-specs`, A change from the design
is support of webhook instead of Websockets as discussed in the design

http://review.gluster.org/13115

Since Websocket support depends on REST APIs, I will add Websocket support
once REST APIs patch gets merged

Usage:
Run following command to start/stop Eventsapi server in all Peers,
which will collect the notifications from any Gluster daemon and emits
to configured client.

    gluster-eventsapi start|stop|restart|reload

Status of running services can be checked using,

    gluster-eventsapi status

Events listener is a HTTP(S) server which listens to events emited by
the Gluster. Create a HTTP Server to listen on POST and register that
URL using,

    gluster-eventsapi webhook-add <URL> [--bearer-token <TOKEN>]

For example, if HTTP Server running in `http://192.168.122.188:9000`
then add that URL using,

    gluster-eventsapi webhook-add http://192.168.122.188:9000

If it expects a Token then specify it using `--bearer-token` or `-t`

We can also test Webhook if all peer nodes can send message or not
using,

    gluster-eventsapi webhook-test <URL> [--bearer-token <TOKEN>]

Configurations can be viewed/updated using,

    gluster-eventsapi config-get [--name]
    gluster-eventsapi config-set <NAME> <VALUE>
    gluster-eventsapi config-reset <NAME|all>

If any one peer node was down during config-set/reset or webhook
modifications, Run sync command from good node when a peer node comes
back. Automatic update is not yet implemented.

    gluster-eventsapi sync

Basic Events Client(HTTP Server) is included with the code, Start
running the client with required port and start listening to the
events.

    /usr/share/glusterfs/scripts/eventsdash.py --port 8080

Default port is 9000, if no port is specified, once it started running
then configure gluster-eventsapi to send events to that client.

Eventsapi Client can be outside of the Cluster, it can be run event on
Windows. But only requirement is the client URL should be accessible
by all peer nodes.(Or ngrok(https://ngrok.com) like tools can be used)

Events implemented with this patch,
- Volume Create
- Volume Start
- Volume Stop
- Volume Delete
- Peer Attach
- Peer Detach

It is easy to add/support more events, since it touches Gluster cmd
code and to avoid merge conflicts I will add support for more events
once this patch merges.

BUG: 1334044
Change-Id: I316827ac9dd1443454df7deffe4f54835f7f6a08
Signed-off-by: Aravinda VK <avishwan@redhat.com>
Reviewed-on: http://review.gluster.org/14248
Smoke: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
This commit is contained in:
Aravinda VK 2016-05-05 18:34:41 +05:30 committed by Jeff Darcy
parent 3863409679
commit 5ed781ecf5
27 changed files with 1354 additions and 6 deletions

4
.gitignore vendored
View File

@ -104,3 +104,7 @@ xlators/experimental/fdl/src/libfdl.c
xlators/experimental/fdl/src/librecon.c
xlators/experimental/jbr-client/src/jbrc-cg.c
xlators/experimental/jbr-server/src/jbr-cg.c
# Eventing
events/src/eventsapiconf.py
libglusterfs/src/events.h
extras/systemd/glustereventsd.service

View File

@ -209,6 +209,14 @@ S: Maintained
F: xlators/mgmt/glusterd/src/glusterd-snap*
F: extras/snap-scheduler.py
Events APIs
M: Aravinda VK <avishwan@redhat.com>
S: Maintained
F: events/
F: libglusterfs/src/events*
F: libglusterfs/src/eventtypes*
F: extras/systemd/glustereventsd*
Distribution Specific:
----------------------
Build:

View File

@ -12,7 +12,7 @@ EXTRA_DIST = autogen.sh \
SUBDIRS = $(ARGP_STANDALONE_DIR) libglusterfs rpc api xlators glusterfsd \
$(FUSERMOUNT_SUBDIR) doc extras cli heal @SYNCDAEMON_SUBDIR@ \
@UMOUNTD_SUBDIR@ tools
@UMOUNTD_SUBDIR@ tools @EVENTS_SUBDIR@
pkgconfigdir = @pkgconfigdir@
pkgconfig_DATA = glusterfs-api.pc libgfchangelog.pc

View File

@ -90,6 +90,12 @@ out:
CLI_STACK_DESTROY (frame);
#if (USE_EVENTS)
if (ret == 0) {
gf_event (EVENT_PEER_ATTACH, "host=%s", (char *)words[2]);
}
#endif
return ret;
}
@ -160,6 +166,12 @@ out:
CLI_STACK_DESTROY (frame);
#if (USE_EVENTS)
if (ret == 0) {
gf_event (EVENT_PEER_DETACH, "host=%s", (char *)words[2]);
}
#endif
return ret;
}

View File

@ -243,7 +243,11 @@ out:
}
CLI_STACK_DESTROY (frame);
#if (USE_EVENTS)
if (ret == 0) {
gf_event (EVENT_VOLUME_CREATE, "name=%s", (char *)words[2]);
}
#endif
return ret;
}
@ -318,6 +322,12 @@ out:
CLI_STACK_DESTROY (frame);
#if (USE_EVENTS)
if (ret == 0) {
gf_event (EVENT_VOLUME_DELETE, "name=%s", (char *)words[2]);
}
#endif
return ret;
}
@ -392,6 +402,12 @@ out:
CLI_STACK_DESTROY (frame);
#if (USE_EVENTS)
if (ret == 0) {
gf_event (EVENT_VOLUME_START, "name=%s", (char *)words[2]);
}
#endif
return ret;
}
@ -524,6 +540,12 @@ out:
CLI_STACK_DESTROY (frame);
#if (USE_EVENTS)
if (ret == 0) {
gf_event (EVENT_VOLUME_STOP, "name=%s", (char *)words[2]);
}
#endif
return ret;
}

View File

@ -38,6 +38,7 @@ AC_CONFIG_HEADERS([config.h])
AC_CONFIG_FILES([Makefile
libglusterfs/Makefile
libglusterfs/src/Makefile
libglusterfs/src/events.h
libglusterfs/src/gfdb/Makefile
geo-replication/src/peer_gsec_create
geo-replication/src/peer_mountbroker
@ -225,6 +226,7 @@ AC_CONFIG_FILES([Makefile
extras/ganesha/ocf/Makefile
extras/systemd/Makefile
extras/systemd/glusterd.service
extras/systemd/glustereventsd.service
extras/run-gluster.tmpfiles
extras/benchmarking/Makefile
extras/hook-scripts/Makefile
@ -248,6 +250,10 @@ AC_CONFIG_FILES([Makefile
extras/hook-scripts/reset/post/Makefile
extras/hook-scripts/reset/pre/Makefile
extras/snap_scheduler/Makefile
events/Makefile
events/src/Makefile
events/src/eventsapiconf.py
events/tools/Makefile
contrib/fuse-util/Makefile
contrib/umountd/Makefile
contrib/uuid/uuid_types.h
@ -718,6 +724,43 @@ fi
AC_SUBST(GEOREP_EXTRAS_SUBDIR)
AM_CONDITIONAL(USE_GEOREP, test "x$enable_georeplication" != "xno")
# Events section
AC_ARG_ENABLE([events],
AC_HELP_STRING([--disable-events],
[Do not install Events components]))
BUILD_EVENTS=no
EVENTS_ENABLED=0
EVENTS_SUBDIR=
have_python2=no
if test "x$enable_events" != "xno"; then
EVENTS_SUBDIR=events
EVENTS_ENABLED=1
BUILD_EVENTS="yes"
AM_PATH_PYTHON()
dnl Check if version matches that we require
if echo $PYTHON_VERSION | grep ^2; then
have_python2=yes
fi
if test "x$have_python2" = "xno"; then
if test "x$enable_events" = "xyes"; then
AC_MSG_ERROR([python 2.x packages required. exiting..])
fi
AC_MSG_WARN([python 2.x not found, disabling events])
EVENTS_SUBDIR=
EVENTS_ENABLED=0
BUILD_EVENTS="no"
else
AC_DEFINE(USE_EVENTS, 1, [define if events enabled])
fi
fi
AC_SUBST(EVENTS_ENABLED)
AC_SUBST(EVENTS_SUBDIR)
AM_CONDITIONAL([BUILD_EVENTS], [test x$BUILD_EVENTS = xyes])
# end Events section
# CDC xlator - check if libz is present if so enable HAVE_LIB_Z
BUILD_CDC=yes
PKG_CHECK_MODULES([ZLIB], [zlib >= 1.2.0],,
@ -1097,10 +1140,15 @@ eval sbintemp=\"${sbintemp}\"
eval sbintemp=\"${sbintemp}\"
SBIN_DIR=${sbintemp}
sysconfdirtemp="${sysconfdir}"
eval sysconfdirtemp=\"${sysconfdirtemp}\"
SYSCONF_DIR=${sysconfdirtemp}
prefix=$prefix_temp
exec_prefix=$exec_prefix_temp
AC_SUBST(SBIN_DIR)
AC_SUBST(SYSCONF_DIR)
# lazy umount emulation
UMOUNTD_SUBDIR=""
@ -1377,4 +1425,5 @@ echo "POSIX ACLs : $BUILD_POSIX_ACLS"
echo "Data Classification : $BUILD_GFDB"
echo "firewalld-config : $BUILD_FIREWALLD"
echo "Experimental xlators : $BUILD_EXPERIMENTAL"
echo "Events : $BUILD_EVENTS"
echo

6
events/Makefile.am Normal file
View File

@ -0,0 +1,6 @@
SUBDIRS = src tools
noinst_PYTHON = eventskeygen.py
install-data-hook:
$(INSTALL) -d -m 755 $(DESTDIR)@GLUSTERD_WORKDIR@/events

65
events/eventskeygen.py Normal file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
import os
GLUSTER_SRC_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
eventtypes_h = os.path.join(GLUSTER_SRC_ROOT, "libglusterfs/src/eventtypes.h")
eventtypes_py = os.path.join(GLUSTER_SRC_ROOT, "events/src/eventtypes.py")
# When adding new keys add it to the END
keys = (
"EVENT_PEER_ATTACH",
"EVENT_PEER_DETACH",
"EVENT_VOLUME_CREATE",
"EVENT_VOLUME_START",
"EVENT_VOLUME_STOP",
"EVENT_VOLUME_DELETE",
)
LAST_EVENT = "EVENT_LAST"
ERRORS = (
"EVENT_SEND_OK",
"EVENT_ERROR_INVALID_INPUTS",
"EVENT_ERROR_SOCKET",
"EVENT_ERROR_CONNECT",
"EVENT_ERROR_SEND"
)
# Generate eventtypes.h
with open(eventtypes_h, "w") as f:
f.write("#ifndef __EVENTTYPES_H__\n")
f.write("#define __EVENTTYPES_H__\n\n")
f.write("typedef enum {\n")
for k in ERRORS:
f.write(" {0},\n".format(k))
f.write("} event_errors_t;\n")
f.write("\n")
f.write("typedef enum {\n")
for k in keys:
f.write(" {0},\n".format(k))
f.write(" {0}\n".format(LAST_EVENT))
f.write("} eventtypes_t;\n")
f.write("\n#endif /* __EVENTTYPES_H__ */\n")
# Generate eventtypes.py
with open(eventtypes_py, "w") as f:
f.write("# -*- coding: utf-8 -*-\n")
f.write("all_events = [\n")
for ev in keys:
f.write(' "{0}",\n'.format(ev))
f.write("]\n")

24
events/src/Makefile.am Normal file
View File

@ -0,0 +1,24 @@
EXTRA_DIST = glustereventsd.py __init__.py eventsapiconf.py.in eventtypes.py \
handlers.py utils.py peer_eventsapi.py eventsconfig.json
eventsdir = $(libexecdir)/glusterfs/events
eventspeerscriptdir = $(libexecdir)/glusterfs
eventsconfdir = $(sysconfdir)/glusterfs
eventsconf_DATA = eventsconfig.json
events_PYTHON = __init__.py eventsapiconf.py eventtypes.py handlers.py utils.py
events_SCRIPTS = glustereventsd.py
eventspeerscript_SCRIPTS = peer_eventsapi.py
install-exec-hook:
$(mkdir_p) $(DESTDIR)$(sbindir)
rm -f $(DESTDIR)$(sbindir)/glustereventsd
ln -s $(libexecdir)/glusterfs/events/glustereventsd.py \
$(DESTDIR)$(sbindir)/glustereventsd
rm -f $(DESTDIR)$(sbindir)/gluster-eventing
ln -s $(libexecdir)/glusterfs/peer_eventsapi.py \
$(DESTDIR)$(sbindir)/gluster-eventsapi
uninstall-hook:
rm -f $(DESTDIR)$(sbindir)/glustereventsd
rm -f $(DESTDIR)$(sbindir)/gluster-eventsapi

10
events/src/__init__.py Normal file
View File

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#

View File

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
SERVER_ADDRESS = "@localstatedir@/run/gluster/events.sock"
DEFAULT_CONFIG_FILE = "@SYSCONF_DIR@/glusterfs/eventsconfig.json"
CUSTOM_CONFIG_FILE_TO_SYNC = "/events/config.json"
CUSTOM_CONFIG_FILE = "@GLUSTERD_WORKDIR@" + CUSTOM_CONFIG_FILE_TO_SYNC
WEBHOOKS_FILE_TO_SYNC = "/events/webhooks.json"
WEBHOOKS_FILE = "@GLUSTERD_WORKDIR@" + WEBHOOKS_FILE_TO_SYNC
LOG_FILE = "@localstatedir@/log/glusterfs/events.log"
EVENTSD = "glustereventsd"
CONFIG_KEYS = ["log_level"]
BOOL_CONFIGS = []
RESTART_CONFIGS = []

View File

@ -0,0 +1,3 @@
{
"log_level": "INFO"
}

9
events/src/eventtypes.py Normal file
View File

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
all_events = [
"EVENT_PEER_ATTACH",
"EVENT_PEER_DETACH",
"EVENT_VOLUME_CREATE",
"EVENT_VOLUME_START",
"EVENT_VOLUME_STOP",
"EVENT_VOLUME_DELETE",
]

View File

@ -0,0 +1,151 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
from __future__ import print_function
import asyncore
import socket
import os
from multiprocessing import Process, Queue
import sys
import signal
from eventtypes import all_events
import handlers
import utils
from eventsapiconf import SERVER_ADDRESS
from utils import logger
# Global Queue, EventsHandler will add items to the queue
# and process_event will gets each item and handles it
events_queue = Queue()
events_server_pid = None
def process_event():
"""
Seperate process which handles all the incoming events from Gluster
processes.
"""
while True:
data = events_queue.get()
logger.debug("EVENT: {0}".format(repr(data)))
try:
# Event Format <TIMESTAMP> <TYPE> <DETAIL>
ts, key, value = data.split(" ", 2)
except ValueError:
logger.warn("Invalid Event Format {0}".format(data))
continue
data_dict = {}
try:
# Format key=value;key=value
data_dict = dict(x.split('=') for x in value.split(';'))
except ValueError:
logger.warn("Unable to parse Event {0}".format(data))
continue
try:
# Event Type to Function Map, Recieved event data will be in
# the form <TIMESTAMP> <TYPE> <DETAIL>, Get Event name for the
# recieved Type/Key and construct a function name starting with
# handle_ For example: handle_event_volume_create
func_name = "handle_" + all_events[int(key)].lower()
except IndexError:
# This type of Event is not handled?
logger.warn("Unhandled Event: {0}".format(key))
func_name = None
if func_name is not None:
# Get function from handlers module
func = getattr(handlers, func_name, None)
# If func is None, then handler unimplemented for that event.
if func is not None:
func(ts, int(key), data_dict)
else:
# Generic handler, broadcast whatever received
handlers.generic_handler(ts, int(key), data_dict)
def process_event_wrapper():
try:
process_event()
except KeyboardInterrupt:
return
class GlusterEventsHandler(asyncore.dispatcher_with_send):
def handle_read(self):
data = self.recv(8192)
if data:
events_queue.put(data)
self.send(data)
class GlusterEventsServer(asyncore.dispatcher):
def __init__(self):
global events_server_pid
asyncore.dispatcher.__init__(self)
# Start the Events listener process which listens to
# the global queue
p = Process(target=process_event_wrapper)
p.start()
events_server_pid = p.pid
# Create UNIX Domain Socket, bind to path
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.bind(SERVER_ADDRESS)
self.listen(5)
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
GlusterEventsHandler(sock)
def signal_handler_sigusr2(sig, frame):
if events_server_pid is not None:
os.kill(events_server_pid, signal.SIGUSR2)
utils.load_all()
def init_event_server():
utils.setup_logger()
# Delete Socket file if Exists
try:
os.unlink(SERVER_ADDRESS)
except OSError:
if os.path.exists(SERVER_ADDRESS):
print ("Failed to cleanup socket file {0}".format(SERVER_ADDRESS),
file=sys.stderr)
sys.exit(1)
utils.load_all()
# Start the Eventing Server, UNIX DOMAIN SOCKET Server
GlusterEventsServer()
asyncore.loop()
def main():
try:
init_event_server()
except KeyboardInterrupt:
sys.exit(1)
if __name__ == "__main__":
signal.signal(signal.SIGUSR2, signal_handler_sigusr2)
main()

21
events/src/handlers.py Normal file
View File

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
import utils
def generic_handler(ts, key, data):
"""
Generic handler to broadcast message to all peers, custom handlers
can be created by func name handler_<event_name>
Ex: handle_event_volume_create(ts, key, data)
"""
utils.publish(ts, key, data)

View File

@ -0,0 +1,521 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
from __future__ import print_function
import os
import json
from errno import EEXIST
import requests
import fasteners
from prettytable import PrettyTable
from gluster.cliutils import (Cmd, execute, node_output_ok, node_output_notok,
sync_file_to_peers, GlusterCmdException,
output_error, execute_in_peers, runcli)
from events.eventsapiconf import (WEBHOOKS_FILE_TO_SYNC,
WEBHOOKS_FILE,
DEFAULT_CONFIG_FILE,
CUSTOM_CONFIG_FILE,
CUSTOM_CONFIG_FILE_TO_SYNC,
EVENTSD,
CONFIG_KEYS,
BOOL_CONFIGS,
RESTART_CONFIGS)
def file_content_overwrite(fname, data):
with open(fname + ".tmp", "w") as f:
f.write(json.dumps(data))
os.rename(fname + ".tmp", fname)
def create_custom_config_file_if_not_exists():
mkdirp(os.path.dirname(CUSTOM_CONFIG_FILE))
if not os.path.exists(CUSTOM_CONFIG_FILE):
with open(CUSTOM_CONFIG_FILE, "w") as f:
f.write("{}")
def create_webhooks_file_if_not_exists():
mkdirp(os.path.dirname(WEBHOOKS_FILE))
if not os.path.exists(WEBHOOKS_FILE):
with open(WEBHOOKS_FILE, "w") as f:
f.write("{}")
def boolify(value):
val = False
if value.lower() in ["enabled", "true", "on", "yes"]:
val = True
return val
def mkdirp(path, exit_on_err=False, logger=None):
"""
Try creating required directory structure
ignore EEXIST and raise exception for rest of the errors.
Print error in stderr and exit
"""
try:
os.makedirs(path)
except (OSError, IOError) as e:
if e.errno == EEXIST and os.path.isdir(path):
pass
else:
output_error("Fail to create dir %s: %s" % (path, e))
def is_enabled(service):
rc, out, err = execute(["systemctl", "is-enabled", service])
return rc == 0
def is_active(service):
rc, out, err = execute(["systemctl", "is-active", service])
return rc == 0
def enable_service(service):
if not is_enabled(service):
cmd = ["systemctl", "enable", service]
return execute(cmd)
return (0, "", "")
def disable_service(service):
if is_enabled(service):
cmd = ["systemctl", "disable", service]
return execute(cmd)
return (0, "", "")
def start_service(service):
rc, out, err = enable_service(service)
if rc != 0:
return (rc, out, err)
cmd = ["systemctl", "start", service]
return execute(cmd)
def stop_service(service):
rc, out, err = disable_service(service)
if rc != 0:
return (rc, out, err)
cmd = ["systemctl", "stop", service]
return execute(cmd)
def restart_service(service):
rc, out, err = stop_service(service)
if rc != 0:
return (rc, out, err)
return start_service(service)
def reload_service(service):
if is_active(service):
cmd = ["systemctl", "reload", service]
return execute(cmd)
return (0, "", "")
def sync_to_peers(restart=False):
if os.path.exists(WEBHOOKS_FILE):
try:
sync_file_to_peers(WEBHOOKS_FILE_TO_SYNC)
except GlusterCmdException as e:
output_error("Failed to sync Webhooks file: [Error: {0}]"
"{1}".format(e[0], e[2]))
if os.path.exists(CUSTOM_CONFIG_FILE):
try:
sync_file_to_peers(CUSTOM_CONFIG_FILE_TO_SYNC)
except GlusterCmdException as e:
output_error("Failed to sync Config file: [Error: {0}]"
"{1}".format(e[0], e[2]))
action = "node-reload"
if restart:
action = "node-restart"
out = execute_in_peers(action)
table = PrettyTable(["NODE", "NODE STATUS", "SYNC STATUS"])
table.align["NODE STATUS"] = "r"
table.align["SYNC STATUS"] = "r"
for p in out:
table.add_row([p.hostname,
"UP" if p.node_up else "DOWN",
"OK" if p.ok else "NOT OK: {0}".format(
p.error)])
print (table)
def node_output_handle(resp):
rc, out, err = resp
if rc == 0:
node_output_ok(out)
else:
node_output_notok(err)
def action_handle(action):
out = execute_in_peers("node-" + action)
column_name = action.upper()
if action == "status":
column_name = EVENTSD.upper()
table = PrettyTable(["NODE", "NODE STATUS", column_name + " STATUS"])
table.align["NODE STATUS"] = "r"
table.align[column_name + " STATUS"] = "r"
for p in out:
status_col_val = "OK" if p.ok else "NOT OK: {0}".format(
p.error)
if action == "status":
status_col_val = "DOWN"
if p.ok:
status_col_val = p.output
table.add_row([p.hostname,
"UP" if p.node_up else "DOWN",
status_col_val])
print (table)
class NodeStart(Cmd):
name = "node-start"
def run(self, args):
node_output_handle(start_service(EVENTSD))
class StartCmd(Cmd):
name = "start"
def run(self, args):
action_handle("start")
class NodeStop(Cmd):
name = "node-stop"
def run(self, args):
node_output_handle(stop_service(EVENTSD))
class StopCmd(Cmd):
name = "stop"
def run(self, args):
action_handle("stop")
class NodeRestart(Cmd):
name = "node-restart"
def run(self, args):
node_output_handle(restart_service(EVENTSD))
class RestartCmd(Cmd):
name = "restart"
def run(self, args):
action_handle("restart")
class NodeReload(Cmd):
name = "node-reload"
def run(self, args):
node_output_handle(reload_service(EVENTSD))
class ReloadCmd(Cmd):
name = "reload"
def run(self, args):
action_handle("reload")
class NodeStatus(Cmd):
name = "node-status"
def run(self, args):
node_output_ok("UP" if is_active(EVENTSD) else "DOWN")
class StatusCmd(Cmd):
name = "status"
def run(self, args):
webhooks = {}
if os.path.exists(WEBHOOKS_FILE):
webhooks = json.load(open(WEBHOOKS_FILE))
print ("Webhooks: " + ("" if webhooks else "None"))
for w in webhooks:
print (w)
print ()
action_handle("status")
class WebhookAddCmd(Cmd):
name = "webhook-add"
def args(self, parser):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token",
default="")
def run(self, args):
create_webhooks_file_if_not_exists()
with fasteners.InterProcessLock(WEBHOOKS_FILE):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is not None:
output_error("Webhook already exists")
data[args.url] = args.bearer_token
file_content_overwrite(WEBHOOKS_FILE, data)
sync_to_peers()
class WebhookModCmd(Cmd):
name = "webhook-mod"
def args(self, parser):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token",
default="")
def run(self, args):
create_webhooks_file_if_not_exists()
with fasteners.InterProcessLock(WEBHOOKS_FILE):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is None:
output_error("Webhook does not exists")
data[args.url] = args.bearer_token
file_content_overwrite(WEBHOOKS_FILE, data)
sync_to_peers()
class WebhookDelCmd(Cmd):
name = "webhook-del"
def args(self, parser):
parser.add_argument("url", help="URL of Webhook")
def run(self, args):
create_webhooks_file_if_not_exists()
with fasteners.InterProcessLock(WEBHOOKS_FILE):
data = json.load(open(WEBHOOKS_FILE))
if data.get(args.url, None) is None:
output_error("Webhook does not exists")
del data[args.url]
file_content_overwrite(WEBHOOKS_FILE, data)
sync_to_peers()
class NodeWebhookTestCmd(Cmd):
name = "node-webhook-test"
def args(self, parser):
parser.add_argument("url")
parser.add_argument("bearer_token")
def run(self, args):
http_headers = {}
if args.bearer_token != ".":
http_headers["Authorization"] = "Bearer " + args.bearer_token
try:
resp = requests.post(args.url, headers=http_headers)
except requests.ConnectionError as e:
node_output_notok("{0}".format(e))
if resp.status_code != 200:
node_output_notok("{0}".format(resp.status_code))
node_output_ok()
class WebhookTestCmd(Cmd):
name = "webhook-test"
def args(self, parser):
parser.add_argument("url", help="URL of Webhook")
parser.add_argument("--bearer_token", "-t", help="Bearer Token")
def run(self, args):
url = args.url
bearer_token = args.bearer_token
if not args.url:
url = "."
if not args.bearer_token:
bearer_token = "."
out = execute_in_peers("node-webhook-test", [url, bearer_token])
table = PrettyTable(["NODE", "NODE STATUS", "WEBHOOK STATUS"])
table.align["NODE STATUS"] = "r"
table.align["WEBHOOK STATUS"] = "r"
for p in out:
table.add_row([p.hostname,
"UP" if p.node_up else "DOWN",
"OK" if p.ok else "NOT OK: {0}".format(
p.error)])
print (table)
class ConfigGetCmd(Cmd):
name = "config-get"
def args(self, parser):
parser.add_argument("--name", help="Config Name")
def run(self, args):
data = json.load(open(DEFAULT_CONFIG_FILE))
if os.path.exists(CUSTOM_CONFIG_FILE):
data.update(json.load(open(CUSTOM_CONFIG_FILE)))
if args.name is not None and args.name not in CONFIG_KEYS:
output_error("Invalid Config item")
table = PrettyTable(["NAME", "VALUE"])
if args.name is None:
for k, v in data.items():
table.add_row([k, v])
else:
table.add_row([args.name, data[args.name]])
print (table)
def read_file_content_json(fname):
content = "{}"
with open(fname) as f:
content = f.read()
if content.strip() == "":
content = "{}"
return json.loads(content)
class ConfigSetCmd(Cmd):
name = "config-set"
def args(self, parser):
parser.add_argument("name", help="Config Name")
parser.add_argument("value", help="Config Value")
def run(self, args):
if args.name not in CONFIG_KEYS:
output_error("Invalid Config item")
with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE):
data = json.load(open(DEFAULT_CONFIG_FILE))
if os.path.exists(CUSTOM_CONFIG_FILE):
config_json = read_file_content_json(CUSTOM_CONFIG_FILE)
data.update(config_json)
# Do Nothing if same as previous value
if data[args.name] == args.value:
return
# TODO: Validate Value
create_custom_config_file_if_not_exists()
new_data = read_file_content_json(CUSTOM_CONFIG_FILE)
v = args.value
if args.name in BOOL_CONFIGS:
v = boolify(args.value)
new_data[args.name] = v
file_content_overwrite(CUSTOM_CONFIG_FILE, new_data)
# If any value changed which requires restart of REST server
restart = False
if args.name in RESTART_CONFIGS:
restart = True
sync_to_peers(restart=restart)
class ConfigResetCmd(Cmd):
name = "config-reset"
def args(self, parser):
parser.add_argument("name", help="Config Name or all")
def run(self, args):
with fasteners.InterProcessLock(CUSTOM_CONFIG_FILE):
changed_keys = []
data = {}
if os.path.exists(CUSTOM_CONFIG_FILE):
data = read_file_content_json(CUSTOM_CONFIG_FILE)
if not data:
return
if args.name.lower() == "all":
for k, v in data.items():
changed_keys.append(k)
# Reset all keys
file_content_overwrite(CUSTOM_CONFIG_FILE, {})
else:
changed_keys.append(args.name)
del data[args.name]
file_content_overwrite(CUSTOM_CONFIG_FILE, data)
# If any value changed which requires restart of REST server
restart = False
for key in changed_keys:
if key in RESTART_CONFIGS:
restart = True
break
sync_to_peers(restart=restart)
class SyncCmd(Cmd):
name = "sync"
def run(self, args):
sync_to_peers()
if __name__ == "__main__":
runcli()

150
events/src/utils.py Normal file
View File

@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
import json
import os
import logging
import requests
from eventsapiconf import (LOG_FILE,
WEBHOOKS_FILE,
DEFAULT_CONFIG_FILE,
CUSTOM_CONFIG_FILE)
import eventtypes
from gluster.cliutils import get_node_uuid
# Webhooks list
_webhooks = {}
# Default Log Level
_log_level = "INFO"
# Config Object
_config = {}
# Init Logger instance
logger = logging.getLogger(__name__)
def get_event_type_name(idx):
"""
Returns Event Type text from the index. For example, VOLUME_CREATE
"""
return eventtypes.all_events[idx].replace("EVENT_", "")
def setup_logger():
"""
Logging initialization, Log level by default will be INFO, once config
file is read, respective log_level will be set.
"""
global logger
logger.setLevel(logging.INFO)
# create the logging file handler
fh = logging.FileHandler(LOG_FILE)
formatter = logging.Formatter("[%(asctime)s] %(levelname)s "
"[%(module)s - %(lineno)s:%(funcName)s] "
"- %(message)s")
fh.setFormatter(formatter)
# add handler to logger object
logger.addHandler(fh)
def load_config():
"""
Load/Reload the config from REST Config files. This function will
be triggered during init and when SIGUSR2.
"""
global _config
_config = {}
if os.path.exists(DEFAULT_CONFIG_FILE):
_config = json.load(open(DEFAULT_CONFIG_FILE))
if os.path.exists(CUSTOM_CONFIG_FILE):
_config.update(json.load(open(CUSTOM_CONFIG_FILE)))
def load_log_level():
"""
Reads log_level from Config file and sets accordingly. This function will
be triggered during init and when SIGUSR2.
"""
global logger, _log_level
new_log_level = _config.get("log_level", "INFO")
if _log_level != new_log_level:
logger.setLevel(getattr(logging, new_log_level.upper()))
_log_level = new_log_level.upper()
def load_webhooks():
"""
Load/Reload the webhooks list. This function will
be triggered during init and when SIGUSR2.
"""
global _webhooks
_webhooks = {}
if os.path.exists(WEBHOOKS_FILE):
_webhooks = json.load(open(WEBHOOKS_FILE))
def load_all():
"""
Wrapper function to call all load/reload functions. This function will
be triggered during init and when SIGUSR2.
"""
load_config()
load_webhooks()
load_log_level()
def publish(ts, event_key, data):
message = {
"nodeid": get_node_uuid(),
"ts": int(ts),
"event": get_event_type_name(event_key),
"message": data
}
if _webhooks:
plugin_webhook(message)
else:
# TODO: Default action?
pass
def plugin_webhook(message):
message_json = json.dumps(message, sort_keys=True)
logger.debug("EVENT: {0}".format(message_json))
for url, token in _webhooks.items():
http_headers = {"Content-Type": "application/json"}
if token != "" and token is not None:
http_headers["Authorization"] = "Bearer " + token
try:
resp = requests.post(url, headers=http_headers, data=message_json)
except requests.ConnectionError as e:
logger.warn("Event push failed to URL: {url}, "
"Event: {event}, "
"Status: {error}".format(
url=url,
event=message_json,
error=e))
continue
if resp.status_code != 200:
logger.warn("Event push failed to URL: {url}, "
"Event: {event}, "
"Status Code: {status_code}".format(
url=url,
event=message_json,
status_code=resp.status_code))

3
events/tools/Makefile.am Normal file
View File

@ -0,0 +1,3 @@
scriptsdir = $(datadir)/glusterfs/scripts
scripts_SCRIPTS = eventsdash.py
EXTRA_DIST = eventsdash.py

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
# This file is part of GlusterFS.
#
# This file is licensed to you under your choice of the GNU Lesser
# General Public License, version 3 or any later version (LGPLv3 or
# later), or the GNU General Public License, version 2 (GPLv2), in all
# cases as published by the Free Software Foundation.
#
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import logging
from datetime import datetime
from flask import Flask, request
app = Flask(__name__)
app.logger.disabled = True
log = logging.getLogger('werkzeug')
log.disabled = True
def human_time(ts):
return datetime.fromtimestamp(float(ts)).strftime("%Y-%m-%d %H:%M:%S")
@app.route("/")
def home():
return "OK"
@app.route("/listen", methods=["POST"])
def listen():
data = request.json
if data is None:
return "OK"
message = []
for k, v in data.get("message", {}).items():
message.append("{0}={1}".format(k, v))
print ("{0:20s} {1:20s} {2:36} {3}".format(
human_time(data.get("ts")),
data.get("event"),
data.get("nodeid"),
" ".join(message)))
return "OK"
def main():
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
description=__doc__)
parser.add_argument("--port", type=int, help="Port", default=9000)
parser.add_argument("--debug", help="Run Server in debug mode",
action="store_true")
args = parser.parse_args()
print ("{0:20s} {1:20s} {2:36} {3}".format(
"TIMESTAMP", "EVENT", "NODE ID", "MESSAGE"
))
print ("{0:20s} {1:20s} {2:36} {3}".format(
"-"*20, "-"*20, "-"*36, "-"*20
))
if args.debug:
app.debug = True
app.run(host="0.0.0.0", port=args.port)
if __name__ == "__main__":
main()

View File

@ -1,5 +1,5 @@
CLEANFILES =
CLEANFILES = glustereventsd.service
EXTRA_DIST = glustereventsd.service.in
SYSTEMD_DIR = @systemddir@
@ -8,4 +8,7 @@ install-exec-local:
$(mkdir_p) $(DESTDIR)$(SYSTEMD_DIR); \
$(INSTALL_PROGRAM) glusterd.service $(DESTDIR)$(SYSTEMD_DIR)/; \
fi
@if [ @EVENTS_ENABLED@ = 1 ] && [ -d $(SYSTEMD_DIR) ]; then \
$(mkdir_p) $(DESTDIR)$(SYSTEMD_DIR); \
$(INSTALL_PROGRAM) glustereventsd.service $(DESTDIR)$(SYSTEMD_DIR)/; \
fi

View File

@ -0,0 +1,12 @@
[Unit]
Description=Gluster Events Notifier
After=syslog.target network.target
[Service]
Type=simple
ExecStart=@SBIN_DIR@/glustereventsd
ExecReload=/bin/kill -SIGUSR2 $MAINPID
KillMode=control-group
[Install]
WantedBy=multi-user.target

View File

@ -90,6 +90,11 @@
%global _with_tmpfilesdir --without-tmpfilesdir
%endif
# Eventing
%if ( 0%{?rhel} && 0%{?rhel} < 6 )
%global _without_events --disable-events
%endif
# From https://fedoraproject.org/wiki/Packaging:Python#Macros
%if ( 0%{?rhel} && 0%{?rhel} <= 5 )
%{!?python_sitelib: %global python_sitelib %(python -c "from distutils.sysconfig import get_python_lib; print(get_python_lib())")}
@ -569,6 +574,23 @@ is in user space and easily manageable.
This package provides the translators needed on any GlusterFS client.
%if ( 0%{!?_without_events:1} )
%package events
Summary: GlusterFS Events
Group: Applications/File
Requires: %{name}-server%{?_isa} = %{version}-%{release}
Requires: python python-fasteners python-requests python-flask
Requires: python-prettytable
Requires: python-gluster = %{version}-%{release}
%if ( 0%{?rhel} && 0%{?rhel} <= 6 )
Requires: python-argparse
%endif
%description events
GlusterFS Events
%endif
%prep
%setup -q -n %{name}-%{version}%{?prereltag}
@ -595,7 +617,8 @@ export CFLAGS
%{?_without_ocf} \
%{?_without_rdma} \
%{?_without_syslog} \
%{?_without_tiering}
%{?_without_tiering} \
%{?_without_events}
# fix hardening and remove rpath in shlibs
%if ( 0%{?fedora} && 0%{?fedora} > 17 ) || ( 0%{?rhel} && 0%{?rhel} > 6 )
@ -1195,7 +1218,25 @@ exit 0
%{_sbindir}/gf_logdump
%{_sbindir}/gf_recon
# Events
%if ( 0%{!?_without_events:1} )
%files events
%config %attr(0600, root, root) %{_sysconfdir}/glusterfs/eventsconfig.json
%dir %attr(0755,-,-) %{_sharedstatedir}/glusterd/events
%{_libexecdir}/glusterfs/events
%{_libexecdir}/glusterfs/peer_eventsapi.py*
%{_sbindir}/glustereventsd
%{_sbindir}/gluster-eventsapi
%{_datadir}/glusterfs/scripts/eventsdash.py*
%if ( 0%{?_with_systemd:1} )
%{_unitdir}/glustereventsd.service
%endif
%endif
%changelog
* Wed Jul 15 2016 Aravinda VK <avishwan@redhat.com>
- Added new subpackage events(glusterfs-events) (#1334044)
* Fri Jul 15 2016 Aravinda VK <avishwan@redhat.com>
- Removed ".py" extension from symlink(S57glusterfind-delete-post)(#1356868)

View File

@ -71,6 +71,12 @@ libglusterfs_la_SOURCES += $(CONTRIBDIR)/uuid/clear.c \
$(CONTRIBDIR)/uuid/unpack.c
endif
if BUILD_EVENTS
libglusterfs_la_SOURCES += events.c
libglusterfs_la_HEADERS += events.h eventtypes.h
endif
libgfchangelog_HEADERS = changelog.h
EXTRA_DIST = graph.l graph.y defaults-tmpl.c

83
libglusterfs/src/events.c Normal file
View File

@ -0,0 +1,83 @@
/*
Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include <stdarg.h>
#include <string.h>
#include "syscall.h"
#include "mem-pool.h"
#include "events.h"
int
gf_event (int event, char *fmt, ...)
{
int sock = -1;
char eventstr[EVENTS_MSG_MAX] = "";
struct sockaddr_un server;
va_list arguments;
char *msg = NULL;
int ret = 0;
size_t eventstr_size = 0;
if (event < 0 || event >= EVENT_LAST) {
ret = EVENT_ERROR_INVALID_INPUTS;
goto out;
}
sock = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock < 0) {
ret = EVENT_ERROR_SOCKET;
goto out;
}
server.sun_family = AF_UNIX;
strcpy(server.sun_path, EVENT_PATH);
if (connect(sock,
(struct sockaddr *) &server,
sizeof(struct sockaddr_un)) < 0) {
ret = EVENT_ERROR_CONNECT;
goto out;
}
va_start (arguments, fmt);
ret = gf_vasprintf (&msg, fmt, arguments);
va_end (arguments);
if (ret < 0) {
ret = EVENT_ERROR_INVALID_INPUTS;
goto out;
}
eventstr_size = snprintf(NULL, 0, "%u %d %s", (unsigned)time(NULL),
event, msg);
if (eventstr_size + 1 > EVENTS_MSG_MAX) {
eventstr_size = EVENTS_MSG_MAX - 1;
}
snprintf(eventstr, eventstr_size+1, "%u %d %s",
(unsigned)time(NULL), event, msg);
if (sys_write(sock, eventstr, strlen(eventstr)) <= 0) {
ret = EVENT_ERROR_SEND;
goto out;
}
ret = EVENT_SEND_OK;
out:
sys_close(sock);
GF_FREE(msg);
return ret;
}

View File

@ -0,0 +1,23 @@
/*
Copyright (c) 2016 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#ifndef __EVENTS_H__
#define __EVENTS_H__
#include <stdio.h>
#include "eventtypes.h"
#define EVENT_PATH "@localstatedir@/run/gluster/events.sock"
#define EVENTS_MSG_MAX 2048
extern int gf_event(int key, char *fmt, ...);
#endif /* __EVENTS_H__ */

View File

@ -0,0 +1,22 @@
#ifndef __EVENTTYPES_H__
#define __EVENTTYPES_H__
typedef enum {
EVENT_SEND_OK,
EVENT_ERROR_INVALID_INPUTS,
EVENT_ERROR_SOCKET,
EVENT_ERROR_CONNECT,
EVENT_ERROR_SEND,
} event_errors_t;
typedef enum {
EVENT_PEER_ATTACH,
EVENT_PEER_DETACH,
EVENT_VOLUME_CREATE,
EVENT_VOLUME_START,
EVENT_VOLUME_STOP,
EVENT_VOLUME_DELETE,
EVENT_LAST
} eventtypes_t;
#endif /* __EVENTTYPES_H__ */

View File

@ -37,6 +37,10 @@
#include "lkowner.h"
#include "compat-uuid.h"
#if (USE_EVENTS)
#include "events.h"
#endif
#define GF_YES 1
#define GF_NO 0