1
0
mirror of https://github.com/OpenNebula/one.git synced 2024-12-22 13:33:52 +03:00

F #4396: Replace pthread with std::thread. Require C++14

co-authored-by: Pavel Czerny <pczerny@opennebula.systems>
This commit is contained in:
Ruben S. Montero 2020-09-10 13:32:52 +02:00
parent 94198be481
commit 0471e486b0
No known key found for this signature in database
GPG Key ID: A0CEA6FA880A1D87
35 changed files with 416 additions and 708 deletions

View File

@ -118,7 +118,7 @@ main_env.Append(LIBPATH=[
main_env.Append(CPPFLAGS=[
"-g",
"-Wall",
"-std=c++11"
"-std=c++14"
])
# Linking flags & common libraries

View File

@ -17,7 +17,7 @@
#ifndef CACHE_POOL_H_
#define CACHE_POOL_H_
#include <pthread.h>
#include <mutex>
#include <map>
@ -27,30 +27,23 @@
template<typename T> class CachePool
{
public:
CachePool()
{
pthread_mutex_init(&resource_lock, 0);
}
CachePool() = default;
~CachePool()
{
pthread_mutex_lock(&resource_lock);
std::lock_guard<std::mutex> lock(resource_lock);
for (auto it = resources.begin(); it != resources.end() ; ++it)
{
delete it->second;
}
pthread_mutex_unlock(&resource_lock);
pthread_mutex_destroy(&resource_lock);
};
}
T * get_resource(int oid)
{
T * res;
pthread_mutex_lock(&resource_lock);
std::lock_guard<std::mutex> lock(resource_lock);
auto it = resources.find(oid);
@ -65,15 +58,13 @@ public:
res = it->second;
}
pthread_mutex_unlock(&resource_lock);
return res;
}
void delete_resource(int oid)
{
pthread_mutex_lock(&resource_lock);
std::lock_guard<std::mutex> lock(resource_lock);
auto it = resources.find(oid);
@ -83,13 +74,11 @@ public:
resources.erase(it);
}
pthread_mutex_unlock(&resource_lock);
}
private:
pthread_mutex_t resource_lock;
std::mutex resource_lock;
std::map<int, T *> resources;
};

View File

@ -17,7 +17,7 @@
#ifndef CALLBACKABLE_H_
#define CALLBACKABLE_H_
#include <pthread.h>
#include <mutex>
#include <sstream>
#include <set>
#include <vector>
@ -31,15 +31,14 @@ class Callbackable
{
public:
Callbackable():cb(0),arg(0),affected_rows(0)
Callbackable()
: cb(nullptr)
, arg(nullptr)
, affected_rows(0)
{
pthread_mutex_init(&mutex,0);
};
}
virtual ~Callbackable()
{
pthread_mutex_destroy(&mutex);
};
virtual ~Callbackable() = default;
/**
* Datatype for call back pointers
@ -54,11 +53,11 @@ public:
*/
void set_callback(Callback _cb, void * _arg = nullptr)
{
pthread_mutex_lock(&mutex);
_mutex.lock();
cb = _cb;
arg = _arg;
};
}
/**
* Test if the CallBack is set for the object.
@ -66,8 +65,8 @@ public:
*/
virtual bool isCallBackSet()
{
return (cb != 0);
};
return (cb != nullptr);
}
/**
* Call the callback funcion set. This method must be called only if
@ -79,17 +78,17 @@ public:
++affected_rows;
return (this->*cb)(arg, num, values, names);
};
}
/**
* Unset the callback function.
*/
void unset_callback()
{
cb = 0;
arg = 0;
cb = nullptr;
arg = nullptr;
pthread_mutex_unlock(&mutex);
_mutex.unlock();
}
/**
@ -128,7 +127,7 @@ private:
/**
* Mutex for locking the callback function.
*/
pthread_mutex_t mutex;
std::mutex _mutex;
};
/* -------------------------------------------------------------------------- */

View File

@ -20,17 +20,6 @@
#define EXECUTE_HOOK_MAX_ARG 50
#include <string>
#include <sstream>
#include <iostream>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
extern "C" void * execute_thread(void *arg);
class ExecuteHook
{
@ -38,13 +27,11 @@ public:
ExecuteHook(const std::string& _name, const std::string& _cmd,
const std::string& _arg, const std::string& rl);
virtual ~ExecuteHook() = default;
~ExecuteHook() = default;
void execute();
private:
friend void * execute_thread(void *arg);
/**
* Name of the Hook
*/

View File

@ -135,26 +135,21 @@ public:
std::ios_base::openmode mode = std::ios_base::app)
:FileLog(file_name,level,mode)
{
pthread_mutex_init(&log_mutex,0);
}
~FileLogTS()
{
pthread_mutex_destroy(&log_mutex);
}
~FileLogTS() = default;
void log(
const char * module,
const MessageType type,
const char * message)
{
pthread_mutex_lock(&log_mutex);
std::lock_guard <std::mutex> lock(log_mutex);
FileLog::log(module,type,message);
pthread_mutex_unlock(&log_mutex);
}
private:
pthread_mutex_t log_mutex;
std::mutex log_mutex;
};

View File

@ -276,7 +276,7 @@ protected:
};
private:
pthread_mutex_t mutex;
std::mutex _mutex;
/**
* The Database was started in solo mode (no server_id defined)

View File

@ -21,6 +21,7 @@
#include <sstream>
#include <stdexcept>
#include <queue>
#include <condition_variable>
#include <sys/time.h>
#include <sys/types.h>
@ -120,12 +121,12 @@ private:
/**
* Fine-grain mutex for DB access (pool of DB connections)
*/
pthread_mutex_t mutex;
std::mutex _mutex;
/**
* Conditional variable to wake-up waiting threads.
*/
pthread_cond_t cond;
std::condition_variable cond;
/**
* Gets a free DB connection from the pool.

View File

@ -23,6 +23,7 @@
#include <set>
#include <algorithm>
#include <random>
#include <mutex>
#include <openssl/crypto.h>
@ -91,19 +92,17 @@ namespace one_util
template<typename Integer, typename std::enable_if<std::is_integral<Integer>::value>::type* = nullptr>
Integer random(Integer min = 0, Integer max = std::numeric_limits<Integer>::max())
{
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static std::mutex _mutex;
static std::random_device rd;
static std::mt19937_64 rng(rd());
std::uniform_int_distribution<Integer> distribution(min, max);
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
Integer i = distribution(rng);
pthread_mutex_unlock(&mutex);
return i;
}
@ -116,19 +115,17 @@ namespace one_util
template<typename Floating, typename std::enable_if<std::is_floating_point<Floating>::value>::type* = nullptr>
Floating random(Floating min = 0, Floating max = std::numeric_limits<Floating>::max())
{
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static std::mutex _mutex;
static std::random_device rd;
static std::mt19937_64 rng(rd());
std::uniform_real_distribution<Floating> distribution(min, max);
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
Floating f = distribution(rng);
pthread_mutex_unlock(&mutex);
return f;
}

View File

@ -21,7 +21,6 @@
#include "ObjectXML.h"
#include "Template.h"
#include <pthread.h>
#include <string>
class PoolObjectAuth;
@ -182,16 +181,16 @@ public:
lock_req_id(-1),
lock_time(0),
ro(false),
mutex(0),
_mutex(nullptr),
table(_table)
{
};
virtual ~PoolObjectSQL()
{
if (!ro && mutex != 0)
if (!ro && _mutex != nullptr)
{
pthread_mutex_unlock(mutex);
_mutex->unlock();
}
delete obj_template;
@ -825,7 +824,7 @@ private:
* The mutex for the PoolObject. This implementation assumes that the mutex
* IS LOCKED when the class destructor is called.
*/
pthread_mutex_t * mutex;
std::mutex * _mutex;
/**
* Pointer to the SQL table for the PoolObjectSQL

View File

@ -70,7 +70,7 @@ public:
return nullptr;
}
pthread_mutex_t * object_lock = cache.lock_line(oid);
std::mutex * object_lock = cache.lock_line(oid);
std::unique_ptr<T> objectsql(static_cast<T *>(create()));
@ -78,7 +78,7 @@ public:
objectsql->ro = false;
objectsql->mutex = object_lock;
objectsql->_mutex = object_lock;
int rc = objectsql->select(db);
@ -455,7 +455,7 @@ protected:
private:
pthread_mutex_t mutex;
std::mutex _mutex;
/**
* Tablename for this pool
@ -474,21 +474,6 @@ private:
*/
virtual PoolObjectSQL * create() = 0;
/**
* Function to lock the pool
*/
void lock()
{
pthread_mutex_lock(&mutex);
};
/**
* Function to unlock the pool
*/
void unlock()
{
pthread_mutex_unlock(&mutex);
};
};
#endif /*POOL_SQL_H_*/

View File

@ -18,9 +18,7 @@
#define POOL_SQL_CACHE_H_
#include <map>
#include <string>
#include <queue>
#include <pthread.h>
#include <mutex>
#include "PoolObjectSQL.h"
@ -36,10 +34,7 @@ public:
PoolSQLCache();
virtual ~PoolSQLCache()
{
pthread_mutex_destroy(&mutex);
};
~PoolSQLCache() = default;
/**
* Allocates a new cache line to hold an active pool object. If the line
@ -50,7 +45,7 @@ public:
*
* @param oid of the object
*/
pthread_mutex_t * lock_line(int oid);
std::mutex * lock_line(int oid);
private:
/**
@ -61,33 +56,29 @@ private:
{
CacheLine():active(0)
{
pthread_mutex_init(&mutex, 0);
};
~CacheLine()
{
pthread_mutex_destroy(&mutex);
}
~CacheLine() = default;
void lock()
{
pthread_mutex_lock(&mutex);
};
_mutex.lock();
}
void unlock()
{
pthread_mutex_unlock(&mutex);
_mutex.unlock();
}
int trylock()
bool trylock()
{
return pthread_mutex_trylock(&mutex);
return _mutex.try_lock();
}
/**
* Concurrent access to object
*/
pthread_mutex_t mutex;
std::mutex _mutex;
/**
* Number of threads waiting on the line mutex
@ -113,17 +104,8 @@ private:
/**
* Controls concurrent access to the cache map.
*/
pthread_mutex_t mutex;
std::mutex _mutex;
void lock()
{
pthread_mutex_lock(&mutex);
};
void unlock()
{
pthread_mutex_unlock(&mutex);
}
};
#endif /*POOL_SQL_CACHE_H_*/

View File

@ -18,12 +18,11 @@
#define POSTGRESQL_DB_H_
#include <string>
#include <stdexcept>
#include <queue>
#include <mutex>
#include <condition_variable>
#include "NebulaLog.h"
#include "SqlDB.h"
#include "ObjectSQL.h"
#ifdef POSTGRESQL_DB
@ -121,12 +120,12 @@ private:
/**
* Fine-grain mutex for DB access (pool of DB connections)
*/
pthread_mutex_t mutex;
std::mutex _mutex;
/**
* Conditional variable to wake-up waiting threads.
*/
pthread_cond_t cond;
std::condition_variable cond;
/**
* Gets a free DB connection from the pool.

View File

@ -104,21 +104,15 @@ private:
/**
* This class represents a map of replication requests. It syncs access between
* RaftManager and DB writer threads. A DB writer allocates and set the
* RaftManager and DB writer threads. A DB writer allocates and set the
* request and then it waits on it for completion.
*/
class ReplicaRequestMap
{
public:
ReplicaRequestMap()
{
pthread_mutex_init(&mutex, 0);
};
ReplicaRequestMap() = default;
virtual ~ReplicaRequestMap()
{
pthread_mutex_destroy(&mutex);
}
virtual ~ReplicaRequestMap() = default;
/**
* Increments the number of replicas of this request. If it can be
@ -131,7 +125,7 @@ public:
{
int to_commit = -1;
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
@ -147,8 +141,6 @@ public:
}
}
pthread_mutex_unlock(&mutex);
return to_commit;
}
@ -159,11 +151,9 @@ public:
*/
void allocate(uint64_t rindex)
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
requests.insert(std::make_pair(rindex, (ReplicaRequest*) 0));
pthread_mutex_unlock(&mutex);
}
/**
@ -174,20 +164,9 @@ public:
*/
void set(uint64_t rindex, ReplicaRequest * rr)
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it == requests.end() )
{
requests.insert(std::make_pair(rindex, rr));
}
else if ( it->second == 0 )
{
it->second = rr;
}
pthread_mutex_unlock(&mutex);
requests[rindex] = rr;
}
/**
@ -196,16 +175,9 @@ public:
*/
void remove(uint64_t rindex)
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
if ( it != requests.end() )
{
requests.erase(it);
}
pthread_mutex_unlock(&mutex);
requests.erase(rindex);
}
/**
@ -213,11 +185,9 @@ public:
*/
void clear()
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it;
for ( it = requests.begin() ; it != requests.end() ; ++it )
for ( auto it = requests.begin() ; it != requests.end() ; ++it )
{
if ( it->second == 0 )
{
@ -232,8 +202,6 @@ public:
}
requests.clear();
pthread_mutex_unlock(&mutex);
}
/**
@ -241,21 +209,16 @@ public:
*/
bool is_replicable(uint64_t rindex)
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
std::map<uint64_t, ReplicaRequest *>::iterator it = requests.find(rindex);
auto it = requests.find(rindex);
bool rc = it == requests.end() ||
(it != requests.end() && it->second != 0);
pthread_mutex_unlock(&mutex);
return rc;
return (it == requests.end()) || (it->second != nullptr);
}
private:
pthread_mutex_t mutex;
std::mutex _mutex;
/**
* Clients waiting for a log replication

View File

@ -17,9 +17,8 @@
#ifndef REPLICA_THREAD_H_
#define REPLICA_THREAD_H_
#include <pthread.h>
extern "C" void * replication_thread(void *arg);
#include <mutex>
#include <condition_variable>
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
@ -31,16 +30,16 @@ extern "C" void * replication_thread(void *arg);
class ReplicaThread
{
public:
ReplicaThread(int _follower_id):follower_id(_follower_id), _finalize(false),
_pending_requests(false), retry_timeout(1e8)
ReplicaThread(int _follower_id)
: follower_id(_follower_id)
, _finalize(false)
, _pending_requests(false)
, retry_timeout(1e8)
{
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
};
}
virtual ~ReplicaThread(){};
virtual ~ReplicaThread() = default;
/**
* Notify this replica thread that are new records in the log to replicate
@ -52,13 +51,6 @@ public:
*/
void finalize();
/**
* @return the ID of the thread
*/
pthread_t thread_id() const
{
return _thread_id;
}
protected:
/**
* Specific logic for the replicate process
@ -78,20 +70,14 @@ private:
*/
void do_replication();
/**
* C linkage function to start the thread
* @param arg pointer to "this"
*/
friend void * replication_thread(void *arg);
friend class ReplicaManager;
// -------------------------------------------------------------------------
// pthread synchronization variables
// -------------------------------------------------------------------------
pthread_t _thread_id;
std::mutex _mutex;
pthread_mutex_t mutex;
pthread_cond_t cond;
std::condition_variable cond;
bool _finalize;

View File

@ -22,9 +22,11 @@
#include <xmlrpc-c/server_abyss.hpp>
#include <set>
#include <thread>
#include <atomic>
#include <mutex>
extern "C" void * rm_xml_server_loop(void *arg);
class ConnectionManager;
class RequestManager
{
@ -42,7 +44,7 @@ public:
const std::string& _listen_address,
int message_size);
~RequestManager() = default;
~RequestManager();
/**
* This functions starts the associated listener thread (XML server), and
@ -83,16 +85,28 @@ private:
}
};
//--------------------------------------------------------------------------
// Friends, thread functions require C-linkage
//--------------------------------------------------------------------------
friend void * rm_xml_server_loop(void *arg);
/**
* XML Server main thread loop. Waits for client connections and starts
* a new thread to handle the request.
*/
void xml_server_loop();
/**
* Thread id for the XML Server
*/
pthread_t rm_xml_server_thread;
std::thread xml_server_thread;
/**
* Manage the number of connections to the RM
*/
std::unique_ptr<ConnectionManager> cm;
/**
* Flag to end the main server loop
*/
std::mutex end_lock;
std::atomic<bool> end;
/**
* Port number where the connection will be open

View File

@ -18,7 +18,7 @@
#define REQUEST_MANAGER_CONNECTION_H_
#include "RequestManager.h"
#include <condition_variable>
/**
* The connection manager class synchronizes the connection and manager threads
@ -26,84 +26,84 @@
class ConnectionManager
{
public:
ConnectionManager(RequestManager *_rm, int mc):rm(_rm), connections(0),
max_connections(mc)
ConnectionManager(int mc)
: connections(0)
, max_connections(mc)
, end(false)
{
pthread_mutex_init(&mutex,0);
}
pthread_cond_init(&cond,0);
};
~ConnectionManager()
{
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
};
~ConnectionManager() = default;
/**
* Increments number of active connections
*/
int add()
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
int temp_connections = ++connections;
pthread_mutex_unlock(&mutex);
return temp_connections;
};
return ++connections;
}
/**
* Decrements number of active connections and signals management thread
*/
void del()
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
--connections;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
};
cond.notify_one();
}
/**
* Waits for active connections to be under the max_connection threshold
*/
void wait()
{
pthread_mutex_lock(&mutex);
std::unique_lock<std::mutex> lock(_mutex);
while ( connections >= max_connections )
if (end)
{
pthread_cond_wait(&cond, &mutex);
return;
}
pthread_mutex_unlock(&mutex);
};
cond.wait(lock, [&]{
return (connections < max_connections) || end; });
}
/**
* Interrupts and prevents wait() operations
*/
void terminate()
{
std::lock_guard<std::mutex> lock(_mutex);
end = true;
cond.notify_one();
}
/**
* Run an xmlrpc connection
* @param fd connected socket
*/
void run_connection(int fd)
void run_connection(RequestManager *rm, int fd)
{
xmlrpc_c::serverAbyss * as = rm->create_abyss();
as->runConn(fd);
delete as;
};
}
private:
/**
* Synchronization for connection threads and listener thread
*/
pthread_mutex_t mutex;
pthread_cond_t cond;
std::mutex _mutex;
std::condition_variable cond;
/**
* RequestManager to create an AbyssSever class to handle each request
@ -119,6 +119,11 @@ private:
* Max number of active connections
*/
int max_connections;
/**
* Terminate wait
*/
std::atomic<bool> end;
};
#endif

View File

@ -50,10 +50,8 @@ protected:
const std::string& params = "A:sis")
:Request(method_name,params,help)
{
pthread_mutex_init(&mutex, 0);
auth_op = AuthRequest::MANAGE;
};
}
~RequestManagerRename() = default;
@ -79,13 +77,9 @@ protected:
*/
bool test_and_set_rename(int oid)
{
std::pair<std::set<int>::iterator,bool> rc;
std::lock_guard<std::mutex> lock(_mutex);
pthread_mutex_lock(&mutex);
rc = rename_ids.insert(oid);
pthread_mutex_unlock(&mutex);
auto rc = rename_ids.insert(oid);
return rc.second == true;
}
@ -95,11 +89,9 @@ protected:
*/
void clear_rename(int oid)
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
rename_ids.erase(oid);
pthread_mutex_unlock(&mutex);
}
/**
@ -111,13 +103,13 @@ protected:
virtual int extra_updates(PoolObjectSQL * obj)
{
return 0;
};
}
private:
/**
* Mutex to control concurrent access to the ongoing rename operations
*/
pthread_mutex_t mutex;
std::mutex _mutex;
/**
* Set of IDs being renamed;

View File

@ -19,6 +19,8 @@
#include <string>
#include <vector>
#include <mutex>
#include <memory>
namespace ssl_util
{
@ -91,10 +93,11 @@ namespace ssl_util
extern "C" void sslmutex_lock_callback(int mode, int type, char *file,
int line);
int line);
extern "C" unsigned long sslmutex_id_callback();
// Needed for openssl version < 1.1.0
class SSLMutex
{
public:
@ -112,7 +115,7 @@ namespace ssl_util
static SSLMutex * ssl_mutex;
static std::vector<pthread_mutex_t *> vmutex;
static std::vector<std::unique_ptr<std::mutex>> vmutex;
};
} // namespace ssl_util

View File

@ -70,28 +70,13 @@ private:
/**
* Fine-grain mutex for DB access
*/
pthread_mutex_t mutex;
std::mutex _mutex;
/**
* Pointer to the database.
*/
sqlite3 * db;
/**
* Function to lock the DB
*/
void lock()
{
pthread_mutex_lock(&mutex);
};
/**
* Function to unlock the DB
*/
void unlock()
{
pthread_mutex_unlock(&mutex);
};
};
#else
//CLass stub

View File

@ -364,15 +364,15 @@ namespace ssl_util
extern "C" void sslmutex_lock_callback(int mode, int type, char *file,
int line)
{
pthread_mutex_t * pm = SSLMutex::ssl_mutex->vmutex[type];
const auto& pm = SSLMutex::ssl_mutex->vmutex[type];
if (mode & CRYPTO_LOCK)
{
pthread_mutex_lock(pm);
pm->lock();
}
else
{
pthread_mutex_unlock(pm);
pm->unlock();
}
}
@ -384,9 +384,9 @@ namespace ssl_util
return (unsigned long) pthread_self();
}
SSLMutex * SSLMutex::ssl_mutex;
SSLMutex * SSLMutex::ssl_mutex = nullptr;
std::vector<pthread_mutex_t *> SSLMutex::vmutex;
std::vector<std::unique_ptr<std::mutex>> SSLMutex::vmutex;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -412,13 +412,9 @@ namespace ssl_util
SSLMutex::SSLMutex()
{
pthread_mutex_t * pm;
for (int i=0; i<CRYPTO_num_locks(); i++)
{
pm = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(pm, NULL);
vmutex.push_back(pm);
vmutex.push_back(std::make_unique<std::mutex>());
}
CRYPTO_set_id_callback((unsigned long (*)()) sslmutex_id_callback);
@ -432,12 +428,6 @@ namespace ssl_util
SSLMutex::~SSLMutex()
{
for (int i=0; i<CRYPTO_num_locks(); i++)
{
pthread_mutex_destroy(vmutex[i]);
free(vmutex[i]);
}
CRYPTO_set_locking_callback(NULL);
}

View File

@ -15,133 +15,16 @@
/* -------------------------------------------------------------------------- */
#include "ExecuteHook.h"
#include "Nebula.h"
#include "NebulaLog.h"
#include <string>
#include <sstream>
#include <iostream>
#include <thread>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
void * execute_thread(void *arg)
{
int out[2];
int err[2];
char buffer[500];
int rcode;
std::string s_out;
std::string s_err;
ExecuteHook * eh = static_cast<ExecuteHook *>(arg);
if ( pipe(out) == -1 )
{
NebulaLog::log("HKM", Log::ERROR, "Executing Hook: " + eh->name);
return 0;
}
if ( pipe(err) == -1 )
{
NebulaLog::log("HKM", Log::ERROR, "Executing Hook: " + eh->name);
return 0;
}
pid_t pid = fork();
switch (pid)
{
case 0: //child
{
int dev_null = open("/dev/null", O_RDWR);
if ( dev_null == -1 )
{
exit(-1);
}
close(out[0]);
close(err[0]);
dup2(dev_null, 0);
dup2(out[1], 1);
dup2(err[1], 2);
close(out[1]);
close(err[1]);
close(dev_null);
execvp(eh->cmd.c_str(), (char * const *) eh->c_args);
exit(-1);
}
break;
case -1: //failure
NebulaLog::log("HKM", Log::ERROR, "Executing Hook: " + eh->name);
break;
default: //parent
{
int nbytes;
close(out[1]);
close(err[1]);
while ( (nbytes = read(out[0], buffer, 500)) > 0 )
{
s_out.append(buffer, nbytes);
}
while ( (nbytes = read(err[0], buffer, 500)) > 0 )
{
s_err.append(buffer, nbytes);
}
close(out[0]);
close(err[0]);
waitpid(pid, &rcode, 0);
rcode = WEXITSTATUS(rcode);
}
break;
}
std::ostringstream oss;
oss << "Hook: " << eh->name << ", ";
if ( rcode == 0 )
{
oss << "successfully executed.\n";
}
else
{
oss << "execution error (" << rcode << ").\n";
}
if (!s_out.empty())
{
oss << "Hook stdout: " << s_out << "\n";
}
if (!s_err.empty())
{
oss << "Hook stderr: " << s_err << "\n";
}
NebulaLog::log("HKM", Log::INFO, oss);
return 0;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
@ -178,14 +61,117 @@ ExecuteHook::ExecuteHook(const std::string& _name, const std::string& _cmd,
void ExecuteHook::execute()
{
pthread_attr_t pattr;
pthread_t thid;
std::thread thr([this]{
int out[2];
int err[2];
pthread_attr_init(&pattr);
pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
char buffer[500];
pthread_create(&thid, &pattr, execute_thread, (void *) this);
int rcode;
std::string s_out;
std::string s_err;
pthread_attr_destroy(&pattr);
if ( pipe(out) == -1 )
{
NebulaLog::log("HKM", Log::ERROR, "Executing Hook: " + name);
return 0;
}
if ( pipe(err) == -1 )
{
NebulaLog::log("HKM", Log::ERROR, "Executing Hook: " + name);
return 0;
}
pid_t pid = fork();
switch (pid)
{
case 0: //child
{
int dev_null = open("/dev/null", O_RDWR);
if ( dev_null == -1 )
{
exit(-1);
}
close(out[0]);
close(err[0]);
dup2(dev_null, 0);
dup2(out[1], 1);
dup2(err[1], 2);
close(out[1]);
close(err[1]);
close(dev_null);
execvp(cmd.c_str(), (char * const *) c_args);
exit(-1);
}
break;
case -1: //failure
NebulaLog::log("HKM", Log::ERROR, "Executing Hook: " + name);
break;
default: //parent
{
int nbytes;
close(out[1]);
close(err[1]);
while ( (nbytes = read(out[0], buffer, 500)) > 0 )
{
s_out.append(buffer, nbytes);
}
while ( (nbytes = read(err[0], buffer, 500)) > 0 )
{
s_err.append(buffer, nbytes);
}
close(out[0]);
close(err[0]);
waitpid(pid, &rcode, 0);
rcode = WEXITSTATUS(rcode);
}
break;
}
std::ostringstream oss;
oss << "Hook: " << name << ", ";
if ( rcode == 0 )
{
oss << "successfully executed.\n";
}
else
{
oss << "execution error (" << rcode << ").\n";
}
if (!s_out.empty())
{
oss << "Hook stdout: " << s_out << "\n";
}
if (!s_err.empty())
{
oss << "Hook stderr: " << s_err << "\n";
}
NebulaLog::log("HKM", Log::INFO, oss);
return 0;
});
thr.detach();
}

View File

@ -745,7 +745,6 @@ static const flex_int32_t yy_rule_can_match_eol[13] =
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "template_syntax.h"
#include "mem_collector.h"

View File

@ -18,7 +18,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "template_syntax.h"
#include "mem_collector.h"

View File

@ -704,7 +704,6 @@ static const flex_int32_t yy_rule_can_match_eol[13] =
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "vm_var_syntax.h"
#include "vm_file_var_syntax.h"

View File

@ -18,7 +18,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "vm_var_syntax.h"
#include "vm_file_var_syntax.h"

View File

@ -54,12 +54,10 @@ int PoolSQL::get_lastOID()
{
int _last_oid;
lock();
lock_guard<mutex> lock(_mutex);
_last_oid = _get_lastOID(db, table);
unlock();
return _last_oid;
}
@ -77,31 +75,25 @@ static int _set_lastOID(int _last_oid, SqlDB * db, const string& table)
void PoolSQL::set_lastOID(int _last_oid)
{
lock();
lock_guard<mutex> lock(_mutex);
_set_lastOID(_last_oid, db, table);
unlock();
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
PoolSQL::PoolSQL(SqlDB * _db, const char * _table):db(_db), table(_table)
PoolSQL::PoolSQL(SqlDB * _db, const char * _table)
: db(_db)
, table(_table)
{
pthread_mutex_init(&mutex,0);
};
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
PoolSQL::~PoolSQL()
{
vector<PoolObjectSQL *>::iterator it;
pthread_mutex_lock(&mutex);
pthread_mutex_destroy(&mutex);
}
/* -------------------------------------------------------------------------- */
@ -114,7 +106,7 @@ int PoolSQL::allocate(PoolObjectSQL *objsql, string& error_str)
int rc;
int lastOID;
lock();
lock_guard<mutex> lock(_mutex);
lastOID = _get_lastOID(db, table);
@ -127,8 +119,6 @@ int PoolSQL::allocate(PoolObjectSQL *objsql, string& error_str)
if ( _set_lastOID(lastOID, db, table) == -1 )
{
unlock();
return -1;
}
@ -157,8 +147,6 @@ int PoolSQL::allocate(PoolObjectSQL *objsql, string& error_str)
_set_lastOID(lastOID, db, table);
}
unlock();
return rc;
}

View File

@ -21,13 +21,12 @@ unsigned int PoolSQLCache::MAX_ELEMENTS = 10000;
PoolSQLCache::PoolSQLCache()
{
pthread_mutex_init(&mutex, 0);
};
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
pthread_mutex_t * PoolSQLCache::lock_line(int oid)
std::mutex * PoolSQLCache::lock_line(int oid)
{
static unsigned int num_locks = 0;
@ -35,28 +34,28 @@ pthread_mutex_t * PoolSQLCache::lock_line(int oid)
CacheLine * cl;
lock();
it = cache.find(oid);
if ( it == cache.end() )
{
cl = new CacheLine();
std::lock_guard<std::mutex> lock(_mutex);
cache.insert(std::make_pair(oid, cl));
it = cache.find(oid);
if ( it == cache.end() )
{
cl = new CacheLine();
cache.insert(std::make_pair(oid, cl));
}
else
{
cl = it->second;
}
cl->active++;
}
else
{
cl = it->second;
}
cl->active++;
unlock();
cl->lock();
lock();
std::lock_guard<std::mutex> lock(_mutex);
cl->active--;
@ -70,9 +69,7 @@ pthread_mutex_t * PoolSQLCache::lock_line(int oid)
}
}
unlock();
return &(cl->mutex);
return &(cl->_mutex);
}
/* -------------------------------------------------------------------------- */
@ -84,9 +81,9 @@ void PoolSQLCache::flush_cache_lines()
{
CacheLine * cl = it->second;
int rc = cl->trylock();
bool rc = cl->trylock();
if ( rc == EBUSY ) // cache line locked
if ( !rc ) // cache line locked
{
++it;
continue;
@ -104,5 +101,5 @@ void PoolSQLCache::flush_cache_lines()
it = cache.erase(it);
}
};
}

View File

@ -114,9 +114,6 @@ void ReplicaManager::delete_replica_thread(int follower_id)
void ReplicaManager::add_replica_thread(int follower_id)
{
pthread_attr_t pattr;
pthread_t thid;
Nebula& nd = Nebula::instance();
int this_id = nd.get_server_id();
@ -129,13 +126,14 @@ void ReplicaManager::add_replica_thread(int follower_id)
thread_pool.insert(std::make_pair(follower_id, rthread));
pthread_attr_init (&pattr);
pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
std::thread replica_thread([rthread] {
rthread->do_replication();
pthread_create(&thid, &pattr, replication_thread, (void *) rthread);
delete rthread;
});
pthread_attr_destroy(&pattr);
};
replica_thread.detach();
}
// -----------------------------------------------------------------------------

View File

@ -37,50 +37,6 @@ const time_t ReplicaThread::max_retry_timeout = 2.5e9;
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
extern "C" void * replication_thread(void *arg)
{
ReplicaThread * rt;
int oldstate;
if ( arg == 0 )
{
return 0;
}
rt = static_cast<ReplicaThread *>(arg);
rt->_thread_id = pthread_self();
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldstate);
rt->do_replication();
NebulaLog::log("RCM", Log::INFO, "Replication thread stopped");
delete rt;
return 0;
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
static void set_timeout(struct timespec& timeout, time_t nsec )
{
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_nsec += nsec;
while ( timeout.tv_nsec >= 1000000000 )
{
timeout.tv_sec += 1;
timeout.tv_nsec -= 1000000000;
}
}
void ReplicaThread::do_replication()
{
int rc;
@ -89,29 +45,26 @@ void ReplicaThread::do_replication()
while ( _finalize == false )
{
pthread_mutex_lock(&mutex);
while ( _pending_requests == false )
{
struct timespec timeout;
std::unique_lock<std::mutex> lock(_mutex);
set_timeout(timeout, retry_timeout);
if ( pthread_cond_timedwait(&cond, &mutex, &timeout) == ETIMEDOUT )
while ( _pending_requests == false )
{
_pending_requests = retry_request || _pending_requests;
if (cond.wait_for(lock, chrono::nanoseconds(retry_timeout))
== std::cv_status::timeout)
{
_pending_requests = retry_request || _pending_requests;
}
if ( _finalize )
{
return;
}
}
if ( _finalize )
{
return;
}
_pending_requests = false;
}
_pending_requests = false;
pthread_mutex_unlock(&mutex);
rc = replicate();
if ( rc == -1 )
@ -129,6 +82,8 @@ void ReplicaThread::do_replication()
retry_request = false;
}
}
NebulaLog::log("RCM", Log::INFO, "Replication thread stopped");
}
// -----------------------------------------------------------------------------
@ -136,15 +91,13 @@ void ReplicaThread::do_replication()
void ReplicaThread::finalize()
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
_finalize = true;
_pending_requests = false;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
cond.notify_one();
}
// -----------------------------------------------------------------------------
@ -152,13 +105,11 @@ void ReplicaThread::finalize()
void ReplicaThread::add_request()
{
pthread_mutex_lock(&mutex);
std::lock_guard<std::mutex> lock(_mutex);
_pending_requests = true;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
cond.notify_one();
}
// -----------------------------------------------------------------------------

View File

@ -80,6 +80,7 @@ RequestManager::RequestManager(
const string& call_log_format,
const string& _listen_address,
int message_size):
end(false),
port(_port),
socket_fd(-1),
max_conn(_max_conn),
@ -95,74 +96,31 @@ RequestManager::RequestManager(
xmlrpc_limit_set(XMLRPC_XML_SIZE_LIMIT_ID, message_size);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
/**
* Connection class is used to pass arguments to connection threads.
*/
struct Connection
RequestManager::~RequestManager()
{
Connection(int c, ConnectionManager * cm):conn_fd(c), conn_manager(cm){};
int conn_fd;
ConnectionManager * conn_manager;
};
/**
* Connection Thread runs a xml-rpc method for a connected client
*/
extern "C" void * rm_do_connection(void *arg)
{
Connection * rc = static_cast<Connection *>(arg);
rc->conn_manager->run_connection(rc->conn_fd);
rc->conn_manager->del();
close(rc->conn_fd);
delete rc;
pthread_exit(0);
return 0;
};
/**
* Connection Manager Thread waits for client connections and starts a new
* thread to handle the request.
*/
extern "C" void * rm_xml_server_loop(void *arg)
{
if ( arg == 0 )
if (xml_server_thread.joinable())
{
return 0;
xml_server_thread.join();
}
}
RequestManager * rm = static_cast<RequestManager *>(arg);
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
void RequestManager::xml_server_loop()
{
// -------------------------------------------------------------------------
// Set cancel state for the thread
// -------------------------------------------------------------------------
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,0);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,0);
listen(rm->socket_fd, rm->max_conn_backlog);
pthread_attr_t pattr;
pthread_t thread_id;
pthread_attr_init (&pattr);
pthread_attr_setdetachstate (&pattr, PTHREAD_CREATE_DETACHED);
listen(socket_fd, max_conn_backlog);
// -------------------------------------------------------------------------
// Main connection loop
// -------------------------------------------------------------------------
std::unique_ptr<ConnectionManager> cm{new ConnectionManager(rm, rm->max_conn)};
cm = make_unique<ConnectionManager>(max_conn);
while (true)
{
@ -170,12 +128,25 @@ extern "C" void * rm_xml_server_loop(void *arg)
cm->wait();
{
std::lock_guard<std::mutex> lock(end_lock);
if (end)
{
break;
}
}
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(struct sockaddr_storage);
int client_fd = accept(rm->socket_fd, (struct sockaddr*) &addr,
&addr_len);
int client_fd = accept(socket_fd, (struct sockaddr*) &addr, &addr_len);
if (client_fd == -1)
{
break;
}
int nc = cm->add();
@ -183,12 +154,20 @@ extern "C" void * rm_xml_server_loop(void *arg)
NebulaLog::log("ReM", Log::DDEBUG, oss);
Connection * rc = new Connection(client_fd, cm.get());
thread conn_thread([client_fd, this]{
cm->run_connection(this, client_fd);
pthread_create(&thread_id, &pattr, rm_do_connection, (void *) rc);
cm->del();
close(client_fd);
return;
});
conn_thread.detach();
}
return 0;
NebulaLog::log("ReM",Log::INFO,"XML-RPC server stopped.");
}
/* -------------------------------------------------------------------------- */
@ -311,14 +290,10 @@ int RequestManager::start()
register_xml_methods();
pthread_attr_t pattr;
pthread_attr_init(&pattr);
pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_JOINABLE);
oss << "Starting XML-RPC server, port " << port << " ...";
NebulaLog::log("ReM",Log::INFO,oss);
pthread_create(&rm_xml_server_thread,&pattr,rm_xml_server_loop,(void *)this);
xml_server_thread = thread(&RequestManager::xml_server_loop, this);
NebulaLog::log("ReM",Log::INFO,"Request Manager started");
@ -1217,11 +1192,19 @@ void RequestManager::register_xml_methods()
void RequestManager::finalize()
{
pthread_cancel(rm_xml_server_thread);
NebulaLog::log("ReM",Log::INFO,"Stopping XML-RPC server...");
pthread_join(rm_xml_server_thread,0);
{
std::lock_guard<std::mutex> lock(end_lock);
end = true;
}
NebulaLog::log("ReM",Log::INFO,"XML-RPC server stopped.");
if (cm)
{
cm->terminate();
}
shutdown(socket_fd, SHUT_RDWR);
if (socket_fd != -1)
{

View File

@ -21,6 +21,8 @@
#include "FedReplicaManager.h"
#include "RaftManager.h"
#include <unistd.h>
using namespace std;
/* -------------------------------------------------------------------------- */

View File

@ -112,8 +112,6 @@ LogDB::LogDB(SqlDB * _db, bool _solo, bool _cache, uint64_t _lret, uint64_t _lp)
{
uint64_t r, i;
pthread_mutex_init(&mutex, 0);
LogDBRecord lr;
if ( get_log_record(0, 0, lr) != 0 )
@ -149,7 +147,7 @@ int LogDB::setup_index(uint64_t& _last_applied, uint64_t& _last_index)
_last_applied = 0;
_last_index = UINT64_MAX;
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
cb.set_callback(&_last_index);
@ -189,8 +187,6 @@ int LogDB::setup_index(uint64_t& _last_applied, uint64_t& _last_index)
build_federated_index();
pthread_mutex_unlock(&mutex);
return rc;
}
@ -230,12 +226,10 @@ int LogDB::get_log_record(uint64_t index, uint64_t prev_index, LogDBRecord& lr)
void LogDB::get_last_record_index(uint64_t& _i, unsigned int& _t)
{
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
_i = last_index;
_t = last_term;
pthread_mutex_unlock(&mutex);
}
/* -------------------------------------------------------------------------- */
@ -403,7 +397,7 @@ int LogDB::apply_log_record(LogDBRecord * lr)
uint64_t LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
time_t timestamp, uint64_t fed_index)
{
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
uint64_t index = next_index;
@ -422,8 +416,6 @@ uint64_t LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
{
NebulaLog::log("DBM", Log::ERROR, "Cannot insert log record in DB");
pthread_mutex_unlock(&mutex);
return UINT64_MAX;
}
@ -444,8 +436,6 @@ uint64_t LogDB::insert_log_record(unsigned int term, std::ostringstream& sql,
fed_log.insert(_fed_index);
}
pthread_mutex_unlock(&mutex);
return index;
}
@ -456,11 +446,9 @@ int LogDB::insert_log_record(uint64_t index, unsigned int term,
std::ostringstream& sql, time_t timestamp, uint64_t fed_index,
bool replace)
{
int rc;
lock_guard<mutex> lock(_mutex);
pthread_mutex_lock(&mutex);
rc = insert(index, term, sql.str(), timestamp, fed_index, replace);
int rc = insert(index, term, sql.str(), timestamp, fed_index, replace);
if ( rc == 0 )
{
@ -479,8 +467,6 @@ int LogDB::insert_log_record(uint64_t index, unsigned int term,
}
}
pthread_mutex_unlock(&mutex);
return rc;
}
@ -504,11 +490,9 @@ int LogDB::_exec_wr(ostringstream& cmd, uint64_t federated)
{
insert_log_record(0, cmd, time(0), federated);
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
last_applied = last_index;
pthread_mutex_unlock(&mutex);
}
return rc;
@ -545,7 +529,7 @@ int LogDB::delete_log_records(uint64_t start_index)
std::ostringstream oss;
int rc;
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
oss << "DELETE FROM " << one_db::log_table
<< " WHERE log_index >= " << start_index;
@ -566,8 +550,6 @@ int LogDB::delete_log_records(uint64_t start_index)
}
}
pthread_mutex_unlock(&mutex);
return rc;
}
@ -576,7 +558,7 @@ int LogDB::delete_log_records(uint64_t start_index)
int LogDB::apply_log_records(uint64_t commit_index)
{
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
while (last_applied < commit_index )
{
@ -584,19 +566,15 @@ int LogDB::apply_log_records(uint64_t commit_index)
if ( get_log_record(last_applied + 1, last_applied, lr) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
if ( apply_log_record(&lr) != 0 )
{
pthread_mutex_unlock(&mutex);
return -1;
}
}
pthread_mutex_unlock(&mutex);
return 0;
}
@ -619,7 +597,7 @@ int LogDB::purge_log()
int frc = 0;
uint64_t fed_min, fed_max = UINT64_MAX;
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
/* ---------------- Record log state -------------------- */
@ -709,8 +687,6 @@ int LogDB::purge_log()
NebulaLog::log("DBM", Log::INFO, foss);
pthread_mutex_unlock(&mutex);
return rc;
}
@ -753,8 +729,6 @@ int LogDB::purge_log()
NebulaLog::log("DBM", Log::INFO, foss);
pthread_mutex_unlock(&mutex);
return rc;
}
@ -824,7 +798,7 @@ void LogDB::build_federated_index()
uint64_t LogDB::last_federated()
{
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
uint64_t findex = UINT64_MAX;
@ -837,8 +811,6 @@ uint64_t LogDB::last_federated()
findex = *rit;
}
pthread_mutex_unlock(&mutex);
return findex;
}
@ -848,7 +820,7 @@ uint64_t LogDB::previous_federated(uint64_t i)
{
set<uint64_t>::iterator it;
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
uint64_t findex = UINT64_MAX;
@ -859,8 +831,6 @@ uint64_t LogDB::previous_federated(uint64_t i)
findex = *(--it);
}
pthread_mutex_unlock(&mutex);
return findex;
}
@ -870,7 +840,7 @@ uint64_t LogDB::next_federated(uint64_t i)
{
set<uint64_t>::iterator it;
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
uint64_t findex = UINT64_MAX;
@ -881,8 +851,6 @@ uint64_t LogDB::next_federated(uint64_t i)
findex = *(++it);
}
pthread_mutex_unlock(&mutex);
return findex;
}

View File

@ -313,10 +313,6 @@ MySqlDB::MySqlDB(const string& s, int p, const string& u, const string& _p,
{SqlFeature::FTS, version >= min_fts_version},
{SqlFeature::COMPARE_BINARY, one_util::toupper(_compare_binary) == "YES"}
};
pthread_mutex_init(&mutex,0);
pthread_cond_init(&cond,0);
}
/* -------------------------------------------------------------------------- */
@ -336,10 +332,6 @@ MySqlDB::~MySqlDB()
// End use of the MySQL library
mysql_library_end();
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
/* -------------------------------------------------------------------------- */
@ -510,19 +502,14 @@ MYSQL * MySqlDB::get_db_connection()
{
MYSQL *db;
pthread_mutex_lock(&mutex);
unique_lock<mutex> lock(_mutex);
while ( db_connect.empty() == true )
{
pthread_cond_wait(&cond, &mutex);
}
cond.wait(lock, [&]{ return !db_connect.empty(); });
db = db_connect.front();
db_connect.pop();
pthread_mutex_unlock(&mutex);
return db;
}
@ -530,12 +517,10 @@ MYSQL * MySqlDB::get_db_connection()
void MySqlDB::free_db_connection(MYSQL * db)
{
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
db_connect.push(db);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
cond.notify_one();
}

View File

@ -14,8 +14,9 @@
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include "NebulaUtil.h"
#include "PostgreSqlDB.h"
#include "NebulaUtil.h"
#include "NebulaLog.h"
#include <libpq-fe.h>
@ -97,10 +98,6 @@ PostgreSqlDB::PostgreSqlDB(
{SqlFeature::FTS, false},
{SqlFeature::COMPARE_BINARY, false}
};
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
}
/* -------------------------------------------------------------------------- */
@ -117,9 +114,6 @@ PostgreSqlDB::~PostgreSqlDB()
}
PQfinish(db_escape_connect);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
/* -------------------------------------------------------------------------- */
@ -248,18 +242,13 @@ PGconn * PostgreSqlDB::get_db_connection()
{
PGconn* conn;
pthread_mutex_lock(&mutex);
unique_lock<mutex> lock(_mutex);
while (db_connect.empty() == true)
{
pthread_cond_wait(&cond, &mutex);
}
cond.wait(lock, [&]{ return !db_connect.empty(); });
conn = db_connect.front();
db_connect.pop();
pthread_mutex_unlock(&mutex);
return conn;
}
@ -267,13 +256,11 @@ PGconn * PostgreSqlDB::get_db_connection()
void PostgreSqlDB::free_db_connection(PGconn * db)
{
pthread_mutex_lock(&mutex);
lock_guard<mutex> lock(_mutex);
db_connect.push(db);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
cond.notify_one();
}
/* -------------------------------------------------------------------------- */

View File

@ -42,8 +42,6 @@ extern "C" int sqlite_callback (
SqliteDB::SqliteDB(const string& db_name, int timeout)
{
pthread_mutex_init(&mutex,0);
int rc = sqlite3_open(db_name.c_str(), &db);
if ( rc != SQLITE_OK )
@ -75,8 +73,6 @@ SqliteDB::SqliteDB(const string& db_name, int timeout)
SqliteDB::~SqliteDB()
{
pthread_mutex_destroy(&mutex);
sqlite3_close(db);
}
@ -110,22 +106,22 @@ int SqliteDB::exec_ext(std::ostringstream& cmd, Callbackable *obj, bool quiet)
arg = static_cast<void *>(obj);
}
lock();
rc = sqlite3_exec(db, c_str, callback, arg, &err_msg);
if (obj != 0 && obj->get_affected_rows() == 0)
{
int num_rows = sqlite3_changes(db);
lock_guard<mutex> lock(_mutex);
if (num_rows > 0)
rc = sqlite3_exec(db, c_str, callback, arg, &err_msg);
if (obj != 0 && obj->get_affected_rows() == 0)
{
obj->set_affected_rows(num_rows);
int num_rows = sqlite3_changes(db);
if (num_rows > 0)
{
obj->set_affected_rows(num_rows);
}
}
}
unlock();
switch(rc)
{
case SQLITE_BUSY: