1
0
mirror of https://github.com/OpenNebula/one.git synced 2024-12-22 13:33:52 +03:00

B #1589: Better parsing for collectd arguments. Get rid of buffering (#1592)

This commit is contained in:
Ruben S. Montero 2017-12-21 17:12:52 +01:00 committed by GitHub
parent 3e454bf9fd
commit 6c6956be5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 75 additions and 149 deletions

View File

@ -23,29 +23,8 @@
/* -------------------------------------------------------------------------- */
const size_t ListenerThread::MESSAGE_SIZE = 100000;
const size_t ListenerThread::BUFFER_SIZE = 100;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void ListenerThread::flush_buffer(int fd)
{
lock();
std::vector<std::string>::iterator it;
for(it = monitor_data.begin() ; it != monitor_data.end(); ++it)
{
size_t size = (*it).size();
const char * message = (*it).c_str();
write(fd, message, size);
}
monitor_data.clear();
unlock();
}
pthread_mutex_t ListenerThread::mutex = PTHREAD_MUTEX_INITIALIZER;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -64,11 +43,9 @@ void ListenerThread::monitor_loop()
if (rc > 0 && rc < MESSAGE_SIZE)
{
std::string message(buffer, rc);
lock();
monitor_data.push_back(message);
write(fd, buffer, rc);
unlock();
}
@ -126,12 +103,3 @@ void ListenerPool::start_pool()
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void ListenerPool::flush_pool()
{
std::vector<ListenerThread>::iterator it;
for(it = listeners.begin() ; it != listeners.end(); ++it)
{
(*it).flush_buffer(out_fd);
}
}

View File

@ -21,12 +21,9 @@
/**
* This class implements a listener thread for the IM collector. It receives
* messages from a UDP port and stores it in a BUFFER. The class is controlled
* by two parameters
* - BUFFER_SIZE the number of pre-allocated capacity to store monitor
* messages. It should be roughly equal to the number of hosts per thread
* - MESSAGE_SIZE the size of each monitor message (100K by default). Each VM
* needs ~100bytes so ~1000VMs per host
* messages from a UDP port and sends them to oned. The class is controlled
* by the MESSAGE_SIZE the size of each monitor message (100K by default).
* Each VM needs ~100bytes so ~1000VMs per host
*/
class ListenerThread
{
@ -34,24 +31,9 @@ public:
/**
* @param _socket descriptor to listen for messages
*/
ListenerThread(int _socket):socket(_socket)
{
pthread_mutex_init(&mutex,0);
ListenerThread(int _socket, int _fd):socket(_socket), fd(_fd){};
monitor_data.reserve(BUFFER_SIZE);
};
~ListenerThread()
{
pthread_mutex_destroy(&mutex);
};
/**
* Write the contents of the message buffer to a descriptor. Buffer is
* cleared
* @param fd file descriptor to send monitor data.
*/
void flush_buffer(int fd);
~ListenerThread(){};
/**
* Waits for UDP messages in a loop and store them in a buffer
@ -76,14 +58,13 @@ public:
private:
static const size_t MESSAGE_SIZE; /**< Monitor message size */
static const size_t BUFFER_SIZE; /**< Pre-allocated capacoty */
pthread_mutex_t mutex;
pthread_t _thread_id;
static pthread_mutex_t mutex; /**< stream lock for writes */
std::vector<std::string> monitor_data;
pthread_t _thread_id;
int socket;
int fd;
void lock()
{
@ -105,8 +86,7 @@ private:
extern "C" void * listener_main(void *arg);
/**
* Represents a pool of listener threads, it should be periodically flushed to
* a file descriptor
* Represents a pool of listener threads
*/
class ListenerPool
{
@ -117,17 +97,13 @@ public:
* @param num number of threads in the pool
*/
ListenerPool(int fd, int sock, size_t num)
:listeners(num, ListenerThread(sock)), out_fd(fd), socket(sock){};
:listeners(num, ListenerThread(sock, fd)){};
~ListenerPool();
void start_pool();
void flush_pool();
private:
std::vector<ListenerThread> listeners;
int out_fd;
int socket;
};

View File

@ -158,43 +158,11 @@ int IMCollectorDriver::init_collector()
void IMCollectorDriver::start_collector()
{
pthread_attr_t attr;
pthread_t id;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&id, &attr, flush_thread, (void *)this);
pool->start_pool();
start();
pthread_attr_destroy(&attr);
pthread_cancel(id);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
extern "C" void * flush_thread(void *arg)
{
IMCollectorDriver * collectd = static_cast<IMCollectorDriver *>(arg);
collectd->flush_loop();
return 0;
}
/* -------------------------------------------------------------------------- */
void IMCollectorDriver::flush_loop()
{
while(true)
{
sleep(_flush_period);
pool->flush_pool();
}
};

View File

@ -78,17 +78,13 @@ class IMCollectorDriver: public OpenNebulaDriver
{
public:
IMCollectorDriver(std::string address, int port, int threads,
int flush_period)
:OpenNebulaDriver(),_address(address),_port(port),_threads(threads),
_flush_period(flush_period){};
IMCollectorDriver(std::string a, int p, int t)
:OpenNebulaDriver(),_address(a),_port(p),_threads(t){};
virtual ~IMCollectorDriver(){};
int init_collector();
void flush_loop();
void start_collector();
private:
@ -100,8 +96,6 @@ private:
int _threads;
int _flush_period;
ListenerPool *pool;
};

View File

@ -36,6 +36,7 @@ env.StaticLibrary(lib_name, source_files)
# Build daemon
env.Prepend(LIBS=[
'im_collectd',
'nebula_common',
'util'
])

View File

@ -15,6 +15,7 @@
/* -------------------------------------------------------------------------- */
#include "OpenNebulaDriver.h"
#include "NebulaUtil.h"
#include <unistd.h>
#include <string>
#include <sstream>
@ -27,13 +28,14 @@
static const char * usage =
"\n collectd [-h] [-a address] [-p port] [-t threads] [-f flush]\n\n"
"SYNOPSIS\n"
" Information Collector for OpenNebula. It should not be started directly\n\n"
" Information Collector for OpenNebula. It should not be started directly.\n"
" All arguments MUST be passed as a **single** string \n\n"
"OPTIONS\n"
"\t-h\tprints this help.\n"
"\t-a\tAddress to bind the collectd sockect\n"
"\t-p\tUDP port to listen for monitor information\n"
"\t-f\tInterval in seconds to flush collected information\n"
"\t-t\tNumber of threads for the server\n";
"\t-t\tNumber of threads for the server\n"
"\t-f and -i\tOptions are ignored\n";
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
@ -64,49 +66,66 @@ int main(int argc, char ** argv)
std::string address = "0.0.0.0";
int port = 4124;
int threads = 50;
int flush = 5;
std::istringstream iss;
int opt;
while((opt = getopt(argc,argv,":ha:p:t:f:")) != -1)
switch(opt)
if ( argv[1] != 0 )
{
std::string argv_1 = argv[1];
std::vector<std::string> _argv = one_util::split(argv_1, ' ');
int _argc = _argv.size() + 1;
char ** _argv_c = (char **) malloc(sizeof(char *) * (_argc + 1));
_argv_c[0] = argv[0];
for (int i=1 ; i < _argc ; ++i)
{
case 'h':
std::cout << usage;
return 0;
break;
case 'a':
address = optarg;
break;
case 'p':
iss.clear();
iss.str(optarg);
iss >> port;
break;
case 't':
iss.clear();
iss.str(optarg);
iss >> threads;
break;
case 'f':
iss.clear();
iss.str(optarg);
iss >> flush;
break;
default:
std::cerr << usage;
return -1;
break;
_argv_c[i] = const_cast<char *>(_argv[i-1].c_str());
}
_argv_c[_argc] = 0;
while((opt = getopt(_argc, _argv_c, ":ha:p:t:f:i:")) != -1)
switch(opt)
{
case 'h':
std::cout << usage;
return 0;
break;
case 'a':
address = optarg;
break;
case 'p':
iss.clear();
iss.str(optarg);
iss >> port;
break;
case 't':
iss.clear();
iss.str(optarg);
iss >> threads;
break;
case 'f': //Compatibility with previous releases
case 'i': //Argument for collectd client
break;
default:
std::cerr << usage;
return -1;
break;
}
free(_argv_c);
}
//--------------------------------------------------------------------------
// Block all signals before creating server threads
//--------------------------------------------------------------------------
@ -130,7 +149,7 @@ int main(int argc, char ** argv)
// -------------------------------------------------------------------------
// Start the collector and server threads
// -------------------------------------------------------------------------
IMCollectorDriver collectd(address, port, threads, flush);
IMCollectorDriver collectd(address, port, threads);
if ( collectd.init_collector() != 0 )
{