1
0
mirror of git://sourceware.org/git/lvm2.git synced 2024-12-31 21:18:26 +03:00
lvm2/daemons/dmeventd/dmeventd.c
2006-05-11 19:08:02 +00:00

1340 lines
30 KiB
C

/*
* Copyright (C) 2005 Red Hat, Inc. All rights reserved.
*
* This file is part of the device-mapper userspace tools.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License v.2.1.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
* dmeventd - dm event daemon to monitor active mapped devices
*/
#define _GNU_SOURCE
#define _FILE_OFFSET_BITS 64
#include "libdevmapper.h"
#include "libdevmapper-event.h"
#include "list.h"
#include "dmeventd.h"
//#include "libmultilog.h"
#include "log.h"
#include <dlfcn.h>
#include <errno.h>
#include <fcntl.h>
#include <libgen.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <stdarg.h>
#ifdef linux
#include <malloc.h>
#endif
/* List (un)link macros. */
#define LINK(x, head) list_add(head, &(x)->list)
#define LINK_DSO(dso) LINK(dso, &dso_registry)
#define LINK_THREAD(thread) LINK(thread, &thread_registry)
#define UNLINK(x) list_del(&(x)->list)
#define UNLINK_DSO(x) UNLINK(x)
#define UNLINK_THREAD(x) UNLINK(x)
#define DAEMON_NAME "dmeventd"
/* Global mutex for list accesses. */
static pthread_mutex_t mutex;
/* Data kept about a DSO. */
struct dso_data {
struct list list;
char *dso_name; /* DSO name (eg, "evms", "dmraid", "lvm2"). */
void *dso_handle; /* Opaque handle as returned from dlopen(). */
unsigned int ref_count; /* Library reference count. */
/*
* Event processing.
*
* The DSO can do whatever appropriate steps if an event happens
* such as changing the mapping in case a mirror fails, update
* the application metadata etc.
*/
void (*process_event)(const char *device, enum dm_event_type event);
/*
* Device registration.
*
* When an application registers a device for an event, the DSO
* can carry out appropriate steps so that a later call to
* the process_event() function is sane (eg, read metadata
* and activate a mapping).
*/
int (*register_device)(const char *device);
/*
* Device unregistration.
*
* In case all devices of a mapping (eg, RAID10) are unregistered
* for events, the DSO can recognize this and carry out appropriate
* steps (eg, deactivate mapping, metadata update).
*/
int (*unregister_device)(const char *device);
};
static LIST_INIT(dso_registry);
/* Structure to keep parsed register variables from client message. */
struct message_data {
char *dso_name; /* Name of DSO. */
char *device_path; /* Mapped device path. */
union {
char *str; /* Events string as fetched from message. */
enum dm_event_type field; /* Events bitfield. */
} events;
union {
char *str;
uint32_t secs;
} timeout;
struct dm_event_daemon_message *msg; /* Pointer to message buffer. */
};
/*
* Housekeeping of thread+device states.
*
* One thread per mapped device which can block on it until an event
* occurs and the event processing function of the DSO gets called.
*/
struct thread_status {
struct list list;
pthread_t thread;
struct dso_data *dso_data;/* DSO this thread accesses. */
char *device_path; /* Mapped device path. */
uint32_t event_nr; /* event number */
int processing; /* Set when event is being processed */
enum dm_event_type events; /* bitfield for event filter. */
enum dm_event_type current_events;/* bitfield for occured events. */
enum dm_event_type processed_events;/* bitfield for processed events. */
time_t next_time;
uint32_t timeout;
struct list timeout_list;
};
static LIST_INIT(thread_registry);
static LIST_INIT(thread_registry_unused);
static int timeout_running;
static LIST_INIT(timeout_registry);
static pthread_mutex_t timeout_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t timeout_cond = PTHREAD_COND_INITIALIZER;
/* Allocate/free the status structure for a monitoring thread. */
static struct thread_status *alloc_thread_status(struct message_data *data,
struct dso_data *dso_data)
{
struct thread_status *ret = (typeof(ret)) dm_malloc(sizeof(*ret));
if (ret) {
if (!memset(ret, 0, sizeof(*ret)) ||
!(ret->device_path = dm_strdup(data->device_path))) {
dm_free(ret);
ret = NULL;
} else {
ret->dso_data = dso_data;
ret->events = data->events.field;
ret->timeout = data->timeout.secs;
list_init(&ret->timeout_list);
}
}
return ret;
}
static void free_thread_status(struct thread_status *thread)
{
dm_free(thread->device_path);
dm_free(thread);
}
/* Allocate/free DSO data. */
static struct dso_data *alloc_dso_data(struct message_data *data)
{
struct dso_data *ret = (typeof(ret)) dm_malloc(sizeof(*ret));
if (ret) {
if (!memset(ret, 0, sizeof(*ret)) ||
!(ret->dso_name = dm_strdup(data->dso_name))) {
dm_free(ret);
ret = NULL;
}
}
return ret;
}
static void free_dso_data(struct dso_data *data)
{
dm_free(data->dso_name);
dm_free(data);
}
/* FIXME: Factor out. */
static char *dm_basename(char *str)
{
char *p = strrchr(str, '/');
return p ? p + 1 : str;
}
/*
* Fetch a string off src and duplicate it into *ptr.
* Pay attention to 0 lenght strings.
*/
/* FIXME: move to separate module to share with the client lib. */
static const char delimiter = ' ';
static int fetch_string(char **ptr, char **src)
{
int ret = 0;
char *p;
size_t len;
if ((p = strchr(*src, delimiter)))
*p = 0;
if ((*ptr = dm_strdup(*src))) {
if ((len = strlen(*ptr)))
*src += len;
else {
dm_free(*ptr);
*ptr = NULL;
}
(*src)++;
ret = 1;
}
if (p)
*p = delimiter;
return ret;
}
/* Free message memory. */
static void free_message(struct message_data *message_data)
{
if (message_data->dso_name)
dm_free(message_data->dso_name);
if (message_data->device_path)
dm_free(message_data->device_path);
}
/* Parse a register message from the client. */
static int parse_message(struct message_data *message_data)
{
char *p = message_data->msg->msg;
/*
* Retrieve application identifier, mapped device
* path and events # string from message.
*/
if (fetch_string(&message_data->dso_name, &p) &&
fetch_string(&message_data->device_path, &p) &&
fetch_string(&message_data->events.str, &p) &&
fetch_string(&message_data->timeout.str, &p)) {
if (message_data->events.str) {
enum dm_event_type i = atoi(message_data->events.str);
/*
* Free string representaion of events.
* Not needed an more.
*/
dm_free(message_data->events.str);
message_data->events.field = i;
}
if (message_data->timeout.str) {
uint32_t secs = atoi(message_data->timeout.str);
dm_free(message_data->timeout.str);
message_data->timeout.secs = secs ? secs :
DM_EVENT_DEFAULT_TIMEOUT;
}
return 1;
}
return 0;
};
/* Global mutex to lock access to lists et al. */
static int lock_mutex(void)
{
return pthread_mutex_lock(&mutex);
}
static int unlock_mutex(void)
{
return pthread_mutex_unlock(&mutex);
}
/* Store pid in pidfile. */
static int storepid(int lf)
{
int len;
char pid[8];
if ((len = snprintf(pid, sizeof(pid), "%u\n", getpid())) < 0)
return 0;
if (len > (int) sizeof(pid))
len = (int) sizeof(pid);
if (write(lf, pid, (size_t) len) != len)
return 0;
fsync(lf);
return 1;
}
/* FIXME This is unreliable: should use DM_DEVICE_INFO ioctl instead. */
/* Check, if a device exists. */
static int device_exists(char *device)
{
struct stat st_buf;
char path2[PATH_MAX];
if (!device || !*device)
return 0;
if (device[0] == '/') /* absolute path */
return !stat(device, &st_buf) && S_ISBLK(st_buf.st_mode);
if (PATH_MAX <= snprintf(path2, PATH_MAX, "%s/%s", dm_dir(), device))
return 0;
return !stat(path2, &st_buf) && S_ISBLK(st_buf.st_mode);
}
/*
* Find an existing thread for a device.
*
* Mutex must be held when calling this.
*/
static struct thread_status *lookup_thread_status(struct message_data *data)
{
struct thread_status *thread;
list_iterate_items(thread, &thread_registry) {
if (!strcmp(data->device_path, thread->device_path))
return thread;
}
return NULL;
}
/* Cleanup at exit. */
static void exit_dm_lib(void)
{
dm_lib_release();
dm_lib_exit();
}
/* Derive error case from target parameter string. */
/* FIXME Remove? */
static int error_detected(struct thread_status *thread, char *params) __attribute__ ((unused));
static int error_detected(struct thread_status *thread, char *params)
{
size_t len;
/*
Leave it to the DSO to decide how to interpret the status info
if ((len = strlen(params)) &&
params[len - 1] == 'F') {
*/
if (params && (len = strlen(params))) {
thread->current_events |= DM_EVENT_DEVICE_ERROR;
return 1;
}
return 0;
}
static void exit_timeout(void *unused)
{
timeout_running = 0;
pthread_mutex_unlock(&timeout_mutex);
}
/* Wake up monitor threads every so often. */
static void *timeout_thread(void *unused)
{
struct timespec timeout;
time_t curr_time;
timeout.tv_nsec = 0;
pthread_cleanup_push(exit_timeout, NULL);
pthread_mutex_lock(&timeout_mutex);
while (!list_empty(&timeout_registry)) {
struct thread_status *thread;
timeout.tv_sec = (time_t)-1;
curr_time = time(NULL);
list_iterate_items_gen(thread, &timeout_registry,
timeout_list) {
if (thread->next_time < curr_time) {
thread->next_time = curr_time + thread->timeout;
pthread_kill(thread->thread, SIGALRM);
}
if (thread->next_time < timeout.tv_sec)
timeout.tv_sec = thread->next_time;
}
pthread_cond_timedwait(&timeout_cond, &timeout_mutex, &timeout);
}
pthread_cleanup_pop(1);
return NULL;
}
static int register_for_timeout(struct thread_status *thread)
{
int ret = 0;
pthread_mutex_lock(&timeout_mutex);
thread->next_time = time(NULL) + thread->timeout;
if (list_empty(&thread->timeout_list)) {
list_add(&timeout_registry, &thread->timeout_list);
if (timeout_running)
pthread_cond_signal(&timeout_cond);
}
if (!timeout_running) {
pthread_t timeout_id;
if (!(ret = -pthread_create(&timeout_id, NULL,
timeout_thread, NULL)))
timeout_running = 1;
}
pthread_mutex_unlock(&timeout_mutex);
return ret;
}
static void unregister_for_timeout(struct thread_status *thread)
{
pthread_mutex_lock(&timeout_mutex);
if (!list_empty(&thread->timeout_list)) {
list_del(&thread->timeout_list);
list_init(&thread->timeout_list);
}
pthread_mutex_unlock(&timeout_mutex);
}
static void no_intr_log(int level, const char *file, int line,
const char *f, ...)
{
va_list ap;
if (errno == EINTR)
return;
if (level > _LOG_WARN)
return;
va_start(ap, f);
if (level < _LOG_WARN)
vfprintf(stderr, f, ap);
else
vprintf(f, ap);
va_end(ap);
if (level < _LOG_WARN)
fprintf(stderr, "\n");
else
fprintf(stdout, "\n");
}
static sigset_t unblock_sigalrm(void)
{
sigset_t set, old;
sigemptyset(&set);
sigaddset(&set, SIGALRM);
pthread_sigmask(SIG_UNBLOCK, &set, &old);
return old;
}
/* Wait on a device until an event occurs. */
static int event_wait(struct thread_status *thread)
{
sigset_t set;
int ret = 0;
/*
void *next = NULL;
char *params, *target_type;
uint64_t start, length;
*/
struct dm_task *dmt;
struct dm_info info;
if (!(dmt = dm_task_create(DM_DEVICE_WAITEVENT)))
return 0;
if (!(ret = dm_task_set_name(dmt, dm_basename(thread->device_path))) ||
!(ret = dm_task_set_event_nr(dmt, thread->event_nr)))
goto out;
/*
* This is so that you can break out of waiting on an event,
* either for a timeout event, or to cancel the thread.
*/
set = unblock_sigalrm();
dm_log_init(no_intr_log);
errno = 0;
if ((ret = dm_task_run(dmt))) {
/*
do {
params = NULL;
next = dm_get_next_target(dmt, next, &start, &length,
&target_type, &params);
log_error("%s: %s\n", __func__, params);
if ((ret = error_detected(thread, params)))
break;
} while(next);
*/
thread->current_events |= DM_EVENT_DEVICE_ERROR;
ret = 1;
/*
* FIXME: I am setting processed_events to zero here
* because it is causing problems. for example, the
* mirror target emits a signal for INSYNC, then
* subsequent events (device failures) are not handled
*/
thread->processed_events = 0;
if ((ret = dm_task_get_info(dmt, &info)))
thread->event_nr = info.event_nr;
} else if (thread->events & DM_EVENT_TIMEOUT && errno == EINTR) {
thread->current_events |= DM_EVENT_TIMEOUT;
ret = 1;
thread->processed_events = 0;
}
pthread_sigmask(SIG_SETMASK, &set, NULL);
dm_log_init(NULL);
out:
dm_task_destroy(dmt);
return ret;
}
/* Register a device with the DSO. */
static int do_register_device(struct thread_status *thread)
{
return thread->dso_data->register_device(thread->device_path);
}
/* Unregister a device with the DSO. */
static int do_unregister_device(struct thread_status *thread)
{
return thread->dso_data->unregister_device(thread->device_path);
}
/* Process an event in the DSO. */
static void do_process_event(struct thread_status *thread)
{
thread->dso_data->process_event(thread->device_path,
thread->current_events);
}
/* Thread cleanup handler to unregister device. */
static void monitor_unregister(void *arg)
{
struct thread_status *thread = arg;
if (!do_unregister_device(thread))
log_error("%s: %s unregister failed\n", __func__,
thread->device_path);
}
/* Device monitoring thread. */
static void *monitor_thread(void *arg)
__attribute((noreturn));
static void *monitor_thread(void *arg)
{
struct thread_status *thread = arg;
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(monitor_unregister, thread);
/* Wait for do_process_request() to finish its task. */
lock_mutex();
unlock_mutex();
/* Loop forever awaiting/analyzing device events. */
while (1) {
thread->current_events = 0;
/*
* FIXME: if unrecoverable error (ENODEV) happens,
* we loop indefinitely. event_wait should return
* more than 0/1.
*/
if (!event_wait(thread))
continue;
/*
* Check against filter.
*
* If there's current events delivered from event_wait() AND
* the device got registered for those events AND
* those events haven't been processed yet, call
* the DSO's process_event() handler.
*
* FIXME: when does processed_events get cleared? What if
* the same type of event happens later... after the first
* was handled properly?
*/
if (thread->events &
thread->current_events &
~thread->processed_events) {
lock_mutex();
thread->processing = 1;
unlock_mutex();
do_process_event(thread);
thread->processed_events |= thread->current_events;
lock_mutex();
thread->processing = 0;
unlock_mutex();
}
}
pthread_cleanup_pop(0);
}
/* Create a device monitoring thread. */
/* FIXME: call this with mutex hold ? */
static int create_thread(struct thread_status *thread)
{
return pthread_create(&thread->thread, NULL, monitor_thread, thread);
}
static int terminate_thread(struct thread_status *thread)
{
int ret;
if ((ret = pthread_cancel(thread->thread)))
return ret;
return pthread_kill(thread->thread, SIGALRM);
}
/* DSO reference counting. */
static void lib_get(struct dso_data *data)
{
data->ref_count++;
}
static void lib_put(struct dso_data *data)
{
if (!--data->ref_count) {
dlclose(data->dso_handle);
UNLINK_DSO(data);
free_dso_data(data);
}
}
/* Find DSO data. */
static struct dso_data *lookup_dso(struct message_data *data)
{
struct dso_data *dso_data, *ret = NULL;
lock_mutex();
list_iterate_items(dso_data, &dso_registry) {
if (!strcmp(data->dso_name, dso_data->dso_name)) {
lib_get(dso_data);
ret = dso_data;
break;
}
}
unlock_mutex();
return ret;
}
/* Lookup DSO symbols we need. */
static int lookup_symbol(void *dl, struct dso_data *data,
void **symbol, const char *name)
{
if ((*symbol = dlsym(dl, name)))
return 1;
log_error("looking up %s symbol in %s\n", name, data->dso_name);
return 0;
}
static int lookup_symbols(void *dl, struct dso_data *data)
{
return lookup_symbol(dl, data, (void*) &data->process_event,
"process_event") &&
lookup_symbol(dl, data, (void*) &data->register_device,
"register_device") &&
lookup_symbol(dl, data, (void*) &data->unregister_device,
"unregister_device");
}
/* Load an application specific DSO. */
static struct dso_data *load_dso(struct message_data *data)
{
void *dl;
struct dso_data *ret = NULL;
log_very_verbose("Opening shared library %s", data->dso_name);
if (!(dl = dlopen(data->dso_name, RTLD_NOW))){
log_error("dmeventd %s dlopen failed: %s", data->dso_name,
dlerror());
return NULL;
}
if (!(ret = alloc_dso_data(data))) {
dlclose(dl);
return NULL;
}
if (!(lookup_symbols(dl, ret))) {
free_dso_data(ret);
dlclose(dl);
return NULL;
}
/*
* Keep handle to close the library once
* we've got no references to it any more.
*/
ret->dso_handle = dl;
lib_get(ret);
lock_mutex();
LINK_DSO(ret);
unlock_mutex();
return ret;
}
/* Return success on daemon active check. */
static int active(struct message_data *message_data)
{
return 0;
}
/*
* Register for an event.
*
* Only one caller at a time here, because we use
* a FIFO and lock it against multiple accesses.
*/
static int register_for_event(struct message_data *message_data)
{
int ret = 0;
struct thread_status *thread, *thread_new = NULL;
struct dso_data *dso_data;
if (!device_exists(message_data->device_path)) {
stack;
ret = -ENODEV;
goto out;
}
if (!(dso_data = lookup_dso(message_data)) &&
!(dso_data = load_dso(message_data))) {
stack;
/* FIXME */
#ifdef ELIBACC
ret = -ELIBACC;
#else
ret = -ENODEV;
#endif
goto out;
}
/* Preallocate thread status struct to avoid deadlock. */
if (!(thread_new = alloc_thread_status(message_data, dso_data))) {
stack;
ret = -ENOMEM;
goto out;
}
lock_mutex();
if (!(thread = lookup_thread_status(message_data))) {
unlock_mutex();
/*
* FIXME: better do this asynchronously in the
* monitoring thread ?
*/
if (!(ret = do_register_device(thread_new)))
goto out;
thread = thread_new;
thread_new = NULL;
/* Try to create the monitoring thread for this device. */
lock_mutex();
if ((ret = -create_thread(thread))) {
unlock_mutex();
do_unregister_device(thread);
free_thread_status(thread);
goto out;
} else
LINK_THREAD(thread);
}
/* Or event # into events bitfield. */
thread->events |= message_data->events.field;
unlock_mutex();
/* FIXME - If you fail to register for timeout events, you
still monitor all the other events. Is this the right
action for newly created devices? Also, you are still
on the timeout registry, so if a timeout thread is
successfully started up later, you will start receiving
DM_EVENT_TIMEOUT events */
if (thread->events & DM_EVENT_TIMEOUT)
ret = -register_for_timeout(thread);
out:
/*
* Deallocate thread status after releasing
* the lock in case we haven't used it.
*/
if (thread_new)
free_thread_status(thread_new);
return ret;
}
/*
* Unregister for an event.
*
* Only one caller at a time here as with register_for_event().
*/
static int unregister_for_event(struct message_data *message_data)
{
int ret = 0;
struct thread_status *thread;
/*
* Clear event in bitfield and deactivate
* monitoring thread in case bitfield is 0.
*/
lock_mutex();
if (!(thread = lookup_thread_status(message_data))) {
unlock_mutex();
ret = -ENODEV;
goto out;
}
thread->events &= ~message_data->events.field;
if (!(thread->events & DM_EVENT_TIMEOUT))
unregister_for_timeout(thread);
/*
* In case there's no events to monitor on this device ->
* unlink and terminate its monitoring thread.
*/
if (!thread->events) {
UNLINK_THREAD(thread);
LINK(thread, &thread_registry_unused);
}
unlock_mutex();
out:
return ret;
}
/*
* Get registered device.
*
* Only one caller at a time here as with register_for_event().
*/
static int registered_device(struct message_data *message_data,
struct thread_status *thread)
{
struct dm_event_daemon_message *msg = message_data->msg;
snprintf(msg->msg, sizeof(msg->msg), "%s %s %u",
thread->dso_data->dso_name, thread->device_path,
thread->events);
unlock_mutex();
return 0;
}
static int want_registered_device(char *dso_name, char *device_path,
struct thread_status *thread)
{
/* If DSO names and device paths are equal. */
if (dso_name && device_path)
return !strcmp(dso_name, thread->dso_data->dso_name) &&
!strcmp(device_path, thread->device_path);
/* If DSO names are equal. */
if (dso_name)
return !strcmp(dso_name, thread->dso_data->dso_name);
/* If device paths are equal. */
if (device_path)
return !strcmp(device_path, thread->device_path);
return 1;
}
static int _get_registered_device(struct message_data *message_data, int next)
{
int hit = 0;
struct thread_status *thread;
lock_mutex();
/* Iterate list of threads checking if we want a particular one. */
list_iterate_items(thread, &thread_registry) {
if ((hit = want_registered_device(message_data->dso_name,
message_data->device_path,
thread)))
break;
}
/*
* If we got a registered device and want the next one ->
* fetch next conforming element off the list.
*/
if (hit) {
if (next) {
do {
if (list_end(&thread_registry, &thread->list))
goto out;
thread = list_item(thread->list.n,
struct thread_status);
} while (!want_registered_device(message_data->dso_name,
NULL, thread));
}
return registered_device(message_data, thread);
}
out:
unlock_mutex();
return -ENOENT;
}
static int get_registered_device(struct message_data *message_data)
{
return _get_registered_device(message_data, 0);
}
static int get_next_registered_device(struct message_data *message_data)
{
return _get_registered_device(message_data, 1);
}
static int set_timeout(struct message_data *message_data)
{
struct thread_status *thread;
lock_mutex();
if ((thread = lookup_thread_status(message_data)))
thread->timeout = message_data->timeout.secs;
unlock_mutex();
return thread ? 0 : -ENODEV;
}
static int get_timeout(struct message_data *message_data)
{
struct thread_status *thread;
struct dm_event_daemon_message *msg = message_data->msg;
lock_mutex();
if ((thread = lookup_thread_status(message_data)))
snprintf(msg->msg, sizeof(msg->msg),
"%"PRIu32, thread->timeout);
unlock_mutex();
return thread ? 0 : -ENODEV;
}
/* Initialize a fifos structure with path names. */
static int init_fifos(struct dm_event_fifos *fifos)
{
if (memset(fifos, 0, sizeof(*fifos))) {
fifos->client_path = DM_EVENT_FIFO_CLIENT;
fifos->server_path = DM_EVENT_FIFO_SERVER;
return 0;
}
return -ENOMEM;
}
/* Open fifos used for client communication. */
static int open_fifos(struct dm_event_fifos *fifos)
{
/* Blocks until client is ready to write. */
if ((fifos->server = open(fifos->server_path, O_WRONLY)) < 0) {
stack;
return -EXIT_FIFO_FAILURE;
}
/* Need to open read+write for select() to work. */
if ((fifos->client = open(fifos->client_path, O_RDWR)) < 0) {
stack;
close(fifos->server);
return -EXIT_FIFO_FAILURE;
}
return 0;
}
/*
* Read message from client making sure that data is available
* and a complete message is read. Must not block indefinitely.
*/
static int client_read(struct dm_event_fifos *fifos, struct dm_event_daemon_message *msg)
{
struct timeval t;
unsigned bytes = 0;
int ret = 0;
fd_set fds;
errno = 0;
while (bytes < sizeof(*msg) && errno != EOF) {
/* Watch client read FIFO for input. */
FD_ZERO(&fds);
FD_SET(fifos->client, &fds);
t.tv_sec = 1;
t.tv_usec = 0;
ret = select(fifos->client+1, &fds, NULL, NULL, &t);
if (!ret && !bytes) /* nothing to read */
return 0;
if (!ret) /* trying to finish read */
continue;
if (ret < 0) /* error */
return 0;
ret = read(fifos->client, msg, sizeof(*msg) - bytes);
bytes += ret > 0 ? ret : 0;
}
return bytes == sizeof(*msg);
}
/*
* Write a message to the client making sure that it is ready to write.
*/
static int client_write(struct dm_event_fifos *fifos, struct dm_event_daemon_message *msg)
{
unsigned bytes = 0;
int ret = 0;
fd_set fds;
errno = 0;
while (bytes < sizeof(*msg) && errno != EIO) {
do {
/* Watch client write FIFO to be ready for output. */
FD_ZERO(&fds);
FD_SET(fifos->server, &fds);
} while (select(fifos->server +1, NULL, &fds, NULL, NULL) != 1);
ret = write(fifos->server, msg, sizeof(*msg) - bytes);
bytes += ret > 0 ? ret : 0;
}
return bytes == sizeof(*msg);
}
/*
* Handle a client request.
*
* We put the request handling functions into
* a list because of the growing number.
*/
static int handle_request(struct dm_event_daemon_message *msg,
struct message_data *message_data)
{
static struct {
unsigned int cmd;
int (*f)(struct message_data*);
} requests[] = {
{ DM_EVENT_CMD_REGISTER_FOR_EVENT, register_for_event },
{ DM_EVENT_CMD_UNREGISTER_FOR_EVENT, unregister_for_event },
{ DM_EVENT_CMD_GET_REGISTERED_DEVICE, get_registered_device },
{ DM_EVENT_CMD_GET_NEXT_REGISTERED_DEVICE, get_next_registered_device },
{ DM_EVENT_CMD_SET_TIMEOUT, set_timeout },
{ DM_EVENT_CMD_GET_TIMEOUT, get_timeout },
{ DM_EVENT_CMD_ACTIVE, active },
}, *req;
for (req = requests; req < requests + sizeof(requests); req++) {
if (req->cmd == msg->opcode.cmd)
return req->f(message_data);
}
return -EINVAL;
}
/* Process a request passed from the communication thread. */
static int do_process_request(struct dm_event_daemon_message *msg)
{
int ret;
static struct message_data message_data;
/* Parse the message. */
memset(&message_data, 0, sizeof(message_data));
message_data.msg = msg;
if (msg->opcode.cmd != DM_EVENT_CMD_ACTIVE &&
!parse_message(&message_data)) {
stack;
ret = -EINVAL;
} else {
ret = handle_request(msg, &message_data);
}
free_message(&message_data);
return ret;
}
/* Only one caller at a time. */
static void process_request(struct dm_event_fifos *fifos)
{
struct dm_event_daemon_message msg;
/* FIXME: better error handling */
memset(&msg, 0, sizeof(msg));
/*
* Read the request from the client.
* Of course, it's tough to tell what to do when
* we use fucking retarded return codes like
* 0 for error.
*/
if (!client_read(fifos, &msg))
return;
msg.opcode.status = do_process_request(&msg);
if (!client_write(fifos, &msg))
stack;
}
static void cleanup_unused_threads(void)
{
int ret;
struct list *l;
struct thread_status *thread;
lock_mutex();
while ((l = list_first(&thread_registry_unused))) {
thread = list_item(l, struct thread_status);
if (thread->processing) {
goto out; /* cleanup on the next round */
}
list_del(l);
if (!thread->events) {
/* turn codes negative -- should we be returning this? */
if ((ret = -terminate_thread(thread)))
stack;
else {
pthread_join(thread->thread, NULL);
lib_put(thread->dso_data);
free_thread_status(thread);
}
} else {
log_error("thread can't be on unused list unless !thread->events");
LINK_THREAD(thread);
}
}
out:
unlock_mutex();
}
static void sig_alarm(int signum)
{
pthread_testcancel();
}
/* Init thread signal handling. */
static void init_thread_signals(void)
{
sigset_t my_sigset;
struct sigaction act;
memset(&act, 0, sizeof(act));
act.sa_handler = sig_alarm;
sigaction(SIGALRM, &act, NULL);
sigfillset(&my_sigset);
pthread_sigmask(SIG_BLOCK, &my_sigset, NULL);
}
static int daemonize(void)
{
setsid();
if (chdir("/"))
return -EXIT_CHDIR_FAILURE;
/* FIXME: activate again after we're done with tracing.
if ((close(STDIN_FILENO) < 0) ||
(close(STDOUT_FILENO) < 0) ||
(close(STDERR_FILENO) < 0))
return -EXIT_DESC_CLOSE_FAILURE;
*/
return 0;
}
static int lock_pidfile(void)
{
int lf;
char pidfile[] = "/var/run/dmeventd.pid";
if ((lf = open(pidfile, O_CREAT | O_RDWR, 0644)) < 0)
return -EXIT_OPEN_PID_FAILURE;
if (flock(lf, LOCK_EX | LOCK_NB) < 0)
return -EXIT_LOCKFILE_INUSE;
if (!storepid(lf))
return -EXIT_FAILURE;
return 0;
}
void dmeventd(void)
{
int ret;
struct dm_event_fifos fifos;
// struct sys_log logdata = {DAEMON_NAME, LOG_DAEMON};
if ((ret = daemonize()))
exit(-ret);
/* FIXME: set daemon name. */
// set_name();
if ((ret = lock_pidfile()))
exit(-ret);
init_thread_signals();
//multilog_clear_logging();
//multilog_add_type(std_syslog, &logdata);
//multilog_init_verbose(std_syslog, _LOG_DEBUG);
//multilog_async(1);
if ((ret = init_fifos(&fifos)))
exit(-ret);
pthread_mutex_init(&mutex, NULL);
#ifdef MCL_CURRENT
if (mlockall(MCL_CURRENT | MCL_FUTURE) == -1)
exit(EXIT_FAILURE);
#endif
if ((ret = open_fifos(&fifos)))
exit(-ret);
/* Signal parent, letting them know we are ready to go. */
kill(getppid(), SIGUSR1);
/*
* We exit when there are no more devices to watch.
* That is, when the last unregister happens.
*
* We must be careful though. One of our threads which is
* watching a device may receive an event and:
* 1) Alter the device and unregister it
* or
* 2) Alter the device, unregister, [alter again,] and reregister
*
* We must be capable of answering a request to unregister
* that comes from the very thread that must be unregistered.
* Additionally, if that thread unregisters itself and it was the
* only thread being monitored, we must also handle the case where
* that thread may perform a register before exiting. (In other
* words, we can not simply exit if all threads have been unregistered
* unless all threads are done processing.
*/
do {
process_request(&fifos);
cleanup_unused_threads();
} while(!list_empty(&thread_registry) || !list_empty(&thread_registry_unused));
exit_dm_lib();
#ifdef MCL_CURRENT
munlockall();
#endif
pthread_mutex_destroy(&mutex);
exit(EXIT_SUCCESS);
}
/*
* Overrides for Emacs so that we follow Linus's tabbing style.
* Emacs will notice this stuff at the end of the file and automatically
* adjust the settings for this buffer only. This must remain at the end
* of the file.
* ---------------------------------------------------------------------------
* Local variables:
* c-file-style: "linux"
* End:
*/