1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-08 21:17:43 +03:00

im-collectd: Collect driver that listens UDP async connections from hosts. Collectd, collects the hosts monitoring data and periodically flushes to OpenNebula core. Needs configuration, error checking and logs

This commit is contained in:
Ruben S. Montero 2013-09-29 23:08:08 +02:00
parent 71bec54fb3
commit 577a721873
8 changed files with 632 additions and 1 deletions

View File

@ -246,7 +246,8 @@ build_scripts=[
'share/man/SConstruct',
'src/sunstone/locale/languages/SConstruct',
'share/scripts/context-packages/SConstruct',
'share/rubygems/SConstruct'
'share/rubygems/SConstruct',
'src/im_mad/collectd/SConstruct'
]
# Testing

View File

@ -755,6 +755,7 @@ MADS_LIB_FILES="src/mad/sh/madcommon.sh \
src/im_mad/ec2/one_im_ec2 \
src/im_mad/dummy/one_im_dummy.rb \
src/im_mad/dummy/one_im_dummy \
src/im_mad/collectd/collectd \
src/tm_mad/one_tm \
src/tm_mad/one_tm.rb \
src/hm_mad/one_hm.rb \

View File

@ -0,0 +1,137 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2013, OpenNebula Project (OpenNebula.org), C12G Labs */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include "ListenerThread.h"
#include <unistd.h>
#include <sys/socket.h>
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
const size_t ListenerThread::MESSAGE_SIZE = 100000;
const size_t ListenerThread::BUFFER_SIZE = 100;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void ListenerThread::flush_buffer(int fd)
{
lock();
std::vector<std::string>::iterator it;
for(it = monitor_data.begin() ; it != monitor_data.end(); ++it)
{
size_t size = (*it).size();
const char * message = (*it).c_str();
write(fd, message, size);
}
monitor_data.clear();
unlock();
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void ListenerThread::monitor_loop()
{
char buffer[MESSAGE_SIZE];
size_t rc;
struct sockaddr addr;
socklen_t addr_size = sizeof(struct sockaddr);
while(true)
{
rc = recvfrom(socket, buffer, MESSAGE_SIZE, 0, &addr, &addr_size);
if (rc > 0 && rc < MESSAGE_SIZE)
{
std::string message(buffer, rc);
lock();
monitor_data.push_back(message);
unlock();
}
}
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
extern "C" void * listener_main(void *arg)
{
ListenerThread * listener = static_cast<ListenerThread *>(arg);
listener->monitor_loop();
return 0;
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
ListenerPool::~ListenerPool()
{
std::vector<ListenerThread>::iterator it;
for(it = listeners.begin() ; it != listeners.end(); ++it)
{
pthread_cancel((*it).thread_id());
}
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void ListenerPool::start_pool()
{
pthread_attr_t attr;
pthread_t id;
std::vector<ListenerThread>::iterator it;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for(it = listeners.begin() ; it != listeners.end(); ++it)
{
pthread_create(&id, &attr, listener_main, (void *)&(*it));
(*it).thread_id(id);
}
pthread_attr_destroy(&attr);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void ListenerPool::flush_pool()
{
std::vector<ListenerThread>::iterator it;
for(it = listeners.begin() ; it != listeners.end(); ++it)
{
(*it).flush_buffer(out_fd);
}
}

View File

@ -0,0 +1,133 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2013, OpenNebula Project (OpenNebula.org), C12G Labs */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include <string>
#include <vector>
#include <pthread.h>
/**
* This class implements a listener thread for the IM collector. It receives
* messages from a UDP port and stores it in a BUFFER. The class is controlled
* by two parameters
* - BUFFER_SIZE the number of pre-allocated capacity to store monitor
* messages. It should be roughly equal to the number of hosts per thread
* - MESSAGE_SIZE the size of each monitor message (100K by default). Each VM
* needs ~100bytes so ~1000VMs per host
*/
class ListenerThread
{
public:
/**
* @param _socket descriptor to listen for messages
*/
ListenerThread(int _socket):socket(_socket)
{
pthread_mutex_init(&mutex,0);
monitor_data.reserve(BUFFER_SIZE);
};
~ListenerThread()
{
pthread_mutex_destroy(&mutex);
};
/**
* Write the contents of the message buffer to a descriptor. Buffer is
* cleared
* @param fd file descriptor to send monitor data.
*/
void flush_buffer(int fd);
/**
* Waits for UDP messages in a loop and store them in a buffer
*/
void monitor_loop();
/**
* Set the thread ID for the listener
*/
void thread_id(pthread_t id)
{
_thread_id = id;
}
/**
* Get the thread ID of the listener
*/
pthread_t thread_id()
{
return _thread_id;
}
private:
static const size_t MESSAGE_SIZE; /**< Monitor message size */
static const size_t BUFFER_SIZE; /**< Pre-allocated capacoty */
pthread_mutex_t mutex;
pthread_t _thread_id;
std::vector<std::string> monitor_data;
int socket;
void lock()
{
pthread_mutex_lock(&mutex);
}
void unlock()
{
pthread_mutex_unlock(&mutex);
}
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
/**
* Main function for each listener, starts the monitor loop
*/
extern "C" void * listener_main(void *arg);
/**
* Represents a pool of listener threads, it should be periodically flushed to
* a file descriptor
*/
class ListenerPool
{
public:
/**
* @param fd descriptor to flush the data
* @param sock socket for the UDP connections
* @param num number of threads in the pool
*/
ListenerPool(int fd, int sock, size_t num)
:listeners(num, ListenerThread(sock)), out_fd(fd), socket(sock){};
~ListenerPool();
void start_pool();
void flush_pool();
private:
std::vector<ListenerThread> listeners;
int out_fd;
int socket;
};

View File

@ -0,0 +1,184 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2013, OpenNebula Project (OpenNebula.org), C12G Labs */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include <sstream>
#include <unistd.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "OpenNebulaDriver.h"
#include "ListenerThread.h"
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int OpenNebulaDriver::read_one(std::string& message)
{
fd_set in_pipes;
std::ostringstream oss;
char c;
int rc;
FD_ZERO(&in_pipes);
FD_SET (0,&in_pipes);
rc = select(1, &in_pipes, NULL, NULL, NULL);
if (rc == -1)
{
return -1;
}
do
{
rc = read(0, (void *) &c, sizeof(char));
oss << c;
}
while ( rc > 0 && c != '\n' );
if (rc < 0)
{
return -1;
}
message = oss.str();
return 0;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void OpenNebulaDriver::driver_loop()
{
while (true)
{
std::string message;
if (read_one(message) == 0)
{
std::istringstream is(message);
std::string action;
if ( is.good() )
{
is >> action >> std::ws;
}
else
{
continue;
}
if (action == "INIT")
{
write2one("INIT SUCCESS\n",13);
}
else if (action == "FINALIZE")
{
break;
}
else
{
driver_action(action, is);
}
}
}
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int IMCollectorDriver::init_collector()
{
struct sockaddr_in im_server;
int rc;
int sock = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
if ( sock < 0 )
{
return -1;
}
im_server.sin_family = AF_INET;
im_server.sin_port = htons(_port);
if ( inet_pton(AF_INET, _address.c_str(), &im_server.sin_addr.s_addr) < 0 )
{
return -1;
}
rc = bind(sock, (struct sockaddr *) &im_server, sizeof(struct sockaddr_in));
if ( rc < 0 )
{
return -1;
}
pool = new ListenerPool(1, sock, _threads);
return 0;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void IMCollectorDriver::start_collector()
{
pthread_attr_t attr;
pthread_t id;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&id, &attr, flush_thread, (void *)this);
pool->start_pool();
start();
pthread_attr_destroy(&attr);
pthread_cancel(id);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
extern "C" void * flush_thread(void *arg)
{
IMCollectorDriver * collectd = static_cast<IMCollectorDriver *>(arg);
collectd->flush_loop();
return 0;
}
/* -------------------------------------------------------------------------- */
void IMCollectorDriver::flush_loop()
{
while(true)
{
sleep(_flush_period);
pool->flush_pool();
}
};

View File

@ -0,0 +1,108 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2013, OpenNebula Project (OpenNebula.org), C12G Labs */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#ifndef _OPENNEBULA_DRIVER_H
#define _OPENNEBULA_DRIVER_H
#include <unistd.h>
#include <string>
class OpenNebulaDriver
{
public:
OpenNebulaDriver(){};
virtual ~OpenNebulaDriver(){};
void start()
{
driver_loop();
};
protected:
void write2one(const char * buf, size_t bsize) const
{
write(1, buf, bsize);
};
void write2one(const std::string& buf) const
{
write2one(buf.c_str(), buf.size());
}
private:
/**
* Main driver loop. reads actions from OpenNebula core and deals with them
*/
void driver_loop();
/**
* Read OpenNebula message
* @param message from OpenNebula
* @retuen 0 on success
*/
int read_one(std::string& message);
/**
* Generic Driver Action
*/
virtual void driver_action(const std::string& action,
std::istringstream &is) = 0;
};
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
extern "C" void * flush_thread(void *arg);
class ListenerPool;
class IMCollectorDriver: public OpenNebulaDriver
{
public:
IMCollectorDriver(std::string address, int port, int threads,
int flush_period)
:OpenNebulaDriver(),_address(address),_port(port),_threads(threads),
_flush_period(flush_period){};
virtual ~IMCollectorDriver(){};
int init_collector();
void flush_loop();
void start_collector();
private:
void driver_action(const std::string& action, std::istringstream &is){};
std::string _address;
int _port;
int _threads;
int _flush_period;
ListenerPool *pool;
};
#endif

View File

@ -0,0 +1,42 @@
# ---------------------------------------------------------------------------- #
# Copyright 2010-2013, C12G Labs S.L #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may #
# not use this file except in compliance with the License. You may obtain #
# a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
# ---------------------------------------------------------------------------- #
import os
Import('env')
cwd=os.getcwd()
env.Append(LIBPATH=[
cwd
])
lib_name='im_collectd'
source_files=[
'OpenNebulaDriver.cc',
'ListenerThread.cc'
]
# Build library
env.StaticLibrary(lib_name, source_files)
# Build daemon
env.Prepend(LIBS=[
'im_collectd',
'util'
])
env.Program('collectd.cc')

View File

@ -0,0 +1,25 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2013, OpenNebula Project (OpenNebula.org), C12G Labs */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include "OpenNebulaDriver.h"
int main()
{
IMCollectorDriver collectd("127.0.0.1", 9876, 1, 1);
collectd.init_collector();
collectd.start_collector();
}