1
0
mirror of https://github.com/OpenNebula/one.git synced 2025-01-03 01:17:41 +03:00

F #6653: Remove Postgresql dependencies

This commit is contained in:
Ruben S. Montero 2024-07-17 12:58:37 +02:00
parent cbec93f7b3
commit 033ac01567
No known key found for this signature in database
GPG Key ID: A0CEA6FA880A1D87
15 changed files with 23 additions and 912 deletions

View File

@ -136,7 +136,6 @@ vars = Variables('custom.py')
vars.Add('sqlite_dir', 'Path to sqlite directory', '')
vars.Add('sqlite', 'Build with SQLite support', 'yes')
vars.Add('mysql', 'Build with MySQL support', 'no')
vars.Add('postgresql', 'Build with PostgreSQL support', 'no')
vars.Add('parsers', 'Obsolete. Rebuild flex/bison files', 'no')
vars.Add('xmlrpc', 'Path to xmlrpc directory', '')
vars.Add('new_xmlrpc', 'Use xmlrpc-c version >=1.31', 'no')
@ -173,16 +172,6 @@ if mysql == 'yes':
else:
main_env.Append(mysql='no')
# PostgreSql
postgresql = ARGUMENTS.get('postgresql', 'no')
if postgresql == 'yes':
main_env.Append(postgresql='yes')
main_env.Append(CPPPATH=['/usr/include/postgresql'])
main_env.Append(CPPFLAGS=["-DPOSTGRESQL_DB"])
main_env.Append(LIBS=['libpq'])
else:
main_env.Append(postgresql='no')
# Flag to compile with xmlrpc-c versions prior to 1.31 (September 2012)
new_xmlrpc = ARGUMENTS.get('new_xmlrpc', 'no')
if new_xmlrpc == 'yes':

View File

@ -1,195 +0,0 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2023, OpenNebula Project, OpenNebula Systems */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#ifndef POSTGRESQL_DB_H_
#define POSTGRESQL_DB_H_
#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include "SqlDB.h"
#ifdef POSTGRESQL_DB
#include <libpq-fe.h>
/**
* PostgreSqlDB class. Provides a wrapper to the PostgreSQL database interface.
*/
class PostgreSqlDB : public SqlDB
{
public:
PostgreSqlDB(
const std::string& _server,
int _port,
const std::string& _user,
const std::string& _password,
const std::string& _database,
int _connections);
~PostgreSqlDB();
/**
* This function returns a legal SQL string that can be used in an SQL
* statement.
* @param str the string to be escaped
* @return a valid SQL string or NULL in case of failure
*/
char * escape_str(const std::string& str) const override;
/**
* Frees a previously scaped string
* @param str pointer to the str
*/
void free_str(char * str) const override;
/**
* @param sid the offset
* @param eid the rowcount
* @return string with compatible LIMIT clause syntax
* LIMIT row_count OFFSET offset
*
* +---+---+---+---+---+---+---+---+--
* | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |...
* +---+---+---+---+---+---+---+---+--
* | |
* /-------------------/
* LIMIT 5 OFFSET 3
*/
std::string limit_string(int sid, int eid) const override
{
std::ostringstream oss;
oss << "LIMIT " << eid << " OFFSET " << sid;
return oss.str();
}
std::string limit_string(int sid) const override
{
std::ostringstream oss;
oss << "LIMIT " << sid << " OFFSET 0";
return oss.str();
}
protected:
int exec_ext(std::ostringstream& cmd, Callbackable *obj, bool quiet) override;
private:
/**
* Number of concurrent DB connections.
*/
int max_connections;
/**
* Connection pool
*/
std::queue<PGconn *> db_connect;
/**
* DB connection to escape strings
*/
PGconn * db_escape_connect;
/**
* Connection parameters
*/
std::string server;
int port;
std::string user;
std::string password;
std::string database;
/**
* Fine-grain mutex for DB access (pool of DB connections)
*/
std::mutex _mutex;
/**
* Conditional variable to wake-up waiting threads.
*/
std::condition_variable cond;
/**
* Gets a free DB connection from the pool.
*/
PGconn * get_db_connection();
/**
* Returns the connection to the pool.
*/
void free_db_connection(PGconn * db);
/**
* Preprocesses the query to be compatible with PostgreSQL syntax
*
* Any change to this method should be reflected in BackEndPostgreSQL class
* in src/onedb/onedb_backend.rb
*
* This method alters to queries:
* - CREATE TABLE to adjust type names
* . MEDIUMTEXT -> TEXT
* . LONGTEXT -> TEXT
* . BIGINT UNSIGNED -> NUMERIC
*
* - REPLACE INTO into PostgreSQL INSERT INTO query with ON CONFLICT
* clause. For example:
* REPLACE INTO pool_control (tablename, last_oid) VALUES ('acl',0)
* changes to:
* INSERT INTO pool_control (tablename, last_oid) VALUES ('acl',0)
* ON CONFLICT (tablename) DO UPDATE SET last_oid = EXCLUDED.last_oid
***************************************************************************
* Any change to this method should be reflected in BackEndPostgreSQL class
* in src/onedb/onedb_backend.rb
***************************************************************************
*/
static std::string preprocess_query(std::ostringstream& cmd);
};
#else
// Class stub
class PostgreSqlDB : public SqlDB
{
public:
PostgreSqlDB(
const std::string& _server,
int _port,
const std::string& _user,
const std::string& _password,
const std::string& _database,
int _connections)
{
throw std::runtime_error("Aborting oned, PostgreSQL support not compiled!");
}
~PostgreSqlDB() {}
char * escape_str(const std::string& str) const override {return 0;};
void free_str(char * str) const override {};
protected:
int exec_ext(std::ostringstream& c, Callbackable *o, bool q) override
{
return -1;
};
};
#endif
#endif /*POSTGRESQL_DB_H*/

View File

@ -50,7 +50,6 @@
<xs:element name="BACKEND" minOccurs="0" maxOccurs="1" >
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="postgresql"/>
<xs:enumeration value="mysql"/>
<xs:enumeration value="sqlite"/>
</xs:restriction>

View File

@ -63,7 +63,6 @@ case "${TARGET}" in
ruby-dev make gcc libsqlite3-dev libcurl4-openssl-dev \
rake libxml2-dev libxslt1-dev patch g++ build-essential \
libssl-dev libaugeas-dev pkgconf \
postgresql-server-dev-all \
>/dev/null
# default-libmysqlclient-dev OR libmysqlclient-dev
@ -79,7 +78,7 @@ case "${TARGET}" in
yum -y install ruby-devel make gcc sqlite-devel mysql-devel \
openssl-devel curl-devel rubygem-rake libxml2-devel \
libxslt-devel patch expat-devel gcc-c++ rpm-build augeas-devel \
postgresql-devel rubygems \
rubygems \
>/dev/null
;;
*)

View File

@ -27,10 +27,8 @@ end
if !defined?(RUBY_VERSION) || RUBY_VERSION < '2.2.0'
RACK = 'rack --version "< 2.0.0"'
PG = 'pg --version "< 1.2.0"'
else
RACK = 'rack'
PG = 'pg'
end
TREETOP = 'treetop --version ">= 1.6.3"'
@ -57,7 +55,7 @@ GROUPS={
:ec2_hybrid => 'aws-sdk --version "~> 2.5"',
:oca => 'ox',
:market => 'aws-sdk',
:onedb => ['mysql2', PG],
:onedb => ['mysql2'],
:hooks => %w[zeromq ffi-rzmq],
:serversync => "augeas",
:gnuplot => "gnuplot"
@ -77,7 +75,6 @@ DISTRIBUTIONS={
:dependencies => {
SQLITE => ['gcc', 'libsqlite3-dev'],
'mysql2' => ['gcc', 'libssl-dev', ['default-libmysqlclient-dev', 'libmysqlclient-dev']],
PG => ['gcc', 'postgresql-server-dev-all'],
'curb' => ['gcc', 'libcurl4-openssl-dev'],
$nokogiri => %w{gcc rake libxml2-dev libxslt1-dev patch},
'xmlparser' => ['gcc', 'libexpat1-dev'],
@ -100,7 +97,6 @@ DISTRIBUTIONS={
:dependencies => {
SQLITE => ['gcc', 'sqlite-devel'],
'mysql2' => ['gcc', 'mysql-devel', 'openssl-devel'],
PG => ['gcc', 'postgresql-devel'],
'curb' => ['gcc', 'curl-devel'],
$nokogiri => %w{gcc rubygem-rake libxml2-devel libxslt-devel patch},
'xmlparser' => ['gcc', 'expat-devel'],

View File

@ -15,7 +15,7 @@
SOURCES="src"
INCLUDES="-I include -I src/monitor/include -I src/scheduler/include"
DEFINES="-DSQLITE_DB -DMYSQL_DB -DPOSTGRESQL_DB -DSYSTEMD"
DEFINES="-DSQLITE_DB -DMYSQL_DB -DSYSTEMD"
ENABLE="--enable=performance,information,warning,portability,style"
IGNORE="-i .xmlrpc_test/ -i src/sunstone/ -i src/svncterm_server/ -i src/fireedge -i src/parsers -i src/vmm/LibVirtDriverKVM.cc"
SUPRESS="--suppress-xml=share/smoke_tests/config/cppcheck-suppressions.xml"

View File

@ -135,7 +135,7 @@ class Replicator
def fetch_db_config(configs)
configs.store(:backend, configs[:raw]['/OPENNEBULA_CONFIGURATION/DB/BACKEND'])
if configs[:backend] == 'mysql' || configs[:backend] == 'postgresql'
if configs[:backend] == 'mysql'
configs.store(:server, configs[:raw]['/OPENNEBULA_CONFIGURATION/DB/SERVER'])
configs.store(:user, configs[:raw]['/OPENNEBULA_CONFIGURATION/DB/USER'])
configs.store(:password, configs[:raw]['/OPENNEBULA_CONFIGURATION/DB/PASSWD'])
@ -143,7 +143,7 @@ class Replicator
configs.store(:port, configs[:raw]['/OPENNEBULA_CONFIGURATION/DB/PORT'])
configs[:port] = '3306' if configs[:port] == '0'
else
STDERR.puts 'No mysql or postgresql backend configuration found'
STDERR.puts 'No mysql backend configuration found'
exit(-1)
end
end

View File

@ -22,7 +22,6 @@
#include "StreamManager.h"
#include "SqliteDB.h"
#include "MySqlDB.h"
#include "PostgreSqlDB.h"
#include <fcntl.h>
#include <unistd.h>
@ -106,7 +105,7 @@ void Monitor::start()
sqlDB = make_unique<SqliteDB>(get_var_location() + "one.db", timeout);
}
else
else if ( db_backend == "mysql" )
{
string server;
int port;
@ -127,20 +126,12 @@ void Monitor::start()
_db_m->vector_value("CONNECTIONS", connections, 15);
if ( db_backend == "postgresql" )
{
sqlDB = make_unique<PostgreSqlDB>(server, port, user, passwd, db_name,
connections);
}
else if ( db_backend == "mysql" )
{
sqlDB = make_unique<MySqlDB>(server, port, user, passwd, db_name,
encoding, connections, compare_binary);
}
else
{
throw runtime_error("Unknown DB backend " + db_backend);
}
sqlDB = make_unique<MySqlDB>(server, port, user, passwd, db_name,
encoding, connections, compare_binary);
}
else
{
throw runtime_error("DB BACKEND must be sqlite or mysql.");
}
// -------------------------------------------------------------------------

View File

@ -19,7 +19,6 @@
#include "VirtualMachine.h"
#include "SqliteDB.h"
#include "MySqlDB.h"
#include "PostgreSqlDB.h"
#include "Client.h"
#include "LogDB.h"
#include "SystemDB.h"
@ -424,14 +423,9 @@ void Nebula::start(bool bootstrap_only)
db_backend = new MySqlDB(server, port, user, passwd, db_name,
encoding, connections, compare_binary);
}
else if ( db_backend_type == "postgresql" )
{
db_backend = new PostgreSqlDB(server, port, user, passwd, db_name,
connections);
}
else
{
throw runtime_error("DB BACKEND must be one of sqlite, mysql or postgresql.");
throw runtime_error("DB BACKEND must be sqlite or mysql.");
}
// ---------------------------------------------------------------------

View File

@ -120,7 +120,7 @@ SQLITE={
}
###############################################################################
# MySQL and PostgreSQL options
# MySQL options
###############################################################################
TYPE={
:name => 'type',
@ -138,8 +138,7 @@ SERVER={
:short => '-S host',
:large => '--server host',
:format => String,
:description => 'MySQL or PostgreSQL server hostname or IP. ' \
'Defaults to localhost',
:description => 'MySQL server hostname or IP. Defaults to localhost',
:proc => lambda {|o, options|
options[:server] = o
}
@ -150,8 +149,7 @@ PORT={
:short => '-P port',
:large => '--port port',
:format => String,
:description => 'MySQL or PostgreSQL server port. Defaults to ' \
'3306 for MySQL and 5432 for PostgreSQL',
:description => 'MySQL server port. Defaults to 3306',
:proc => lambda {|o, options|
options[:port] = o
}
@ -162,7 +160,7 @@ USERNAME={
:short => '-u user',
:large => '--username user',
:format => String,
:description => 'MySQL or PostgreSQL username',
:description => 'MySQL username',
:proc => lambda {|o, options|
options[:user] = o
}
@ -173,8 +171,7 @@ PASSWORD={
:short => '-p pass',
:large => '--password pass',
:format => String,
:description => 'MySQL or PostgreSQL password. Leave unset ' \
'to be prompted for it',
:description => 'MySQL password. Leave unset to be prompted for it',
:proc => lambda {|o, options|
options[:passwd] = o
}
@ -185,7 +182,7 @@ DBNAME={
:short => '-d dbname',
:large => '--dbname dbname',
:format => String,
:description => 'MySQL or PostgreSQL DB name for OpenNebula',
:description => 'MySQL DB name for OpenNebula',
:proc => lambda {|o, options|
options[:db_name] = o
}
@ -195,7 +192,7 @@ ENCODING={
:name => 'encoding',
:large => '--encoding charset',
:format => String,
:description => 'MySQL or PostgreSQL encoding to use for the connection',
:description => 'MySQL encoding to use for the connection',
:proc => lambda {|o, options|
options[:encoding] = o
}

View File

@ -87,30 +87,8 @@ class OneDB
:db_name => ops[:db_name],
:encoding=> ops[:encoding]
)
elsif ops[:backend] == :postgresql
begin
require 'pg'
rescue
STDERR.puts "Ruby gem pg is needed for this operation:"
STDERR.puts " $ sudo gem install pg"
exit -1
end
passwd = ops[:passwd]
passwd = ENV['ONE_DB_PASSWORD'] unless passwd
passwd = get_password("PostgreSQL Password: ") unless passwd
ops[:port] = 5432 if ops[:port] == 0
@backend = BackEndPostgreSQL.new(
:server => ops[:server],
:port => ops[:port],
:user => ops[:user],
:passwd => passwd,
:db_name => ops[:db_name],
:encoding=> ops[:encoding]
)
else
raise "You need to specify the SQLite, MySQL or PostgreSQL connection options."
raise "DB BACKEND must be sqlite or mysql."
end
end

View File

@ -611,289 +611,3 @@ class BackEndSQLite < OneDBBacKEnd
end
end
end
class BackEndPostgreSQL < OneDBBacKEnd
def initialize(opts={})
@server = opts[:server]
@port = opts[:port]
@user = opts[:user]
@passwd = opts[:passwd]
@db_name = opts[:db_name]
@encoding= opts[:encoding]
# Check for errors:
error = false
(error = true; missing = "USER" ) if @user == nil
(error = true; missing = "DBNAME") if @db_name == nil
if error
raise "PostgreSQL option #{missing} is needed"
end
# Check for defaults:
@server = "localhost" if @server.nil?
@port = 5432 if @port.nil?
encoding if @encoding.nil?
# Clean leading and trailing quotes, if any
@server = @server [1..-2] if @server [0] == ?"
@port = @port [1..-2] if @port [0] == ?"
@user = @user [1..-2] if @user [0] == ?"
@passwd = @passwd [1..-2] if @passwd [0] == ?"
@db_name = @db_name[1..-2] if @db_name[0] == ?"
end
def bck_file(federated = false)
t = Time.now
bck_name = "#{VAR_LOCATION}/postgresql_#{@server}_#{@db_name}_"
bck_name << "federated_" if federated
bck_name << "#{t.year}-#{t.month}-#{t.day}_"
bck_name << "#{t.hour}:#{t.min}:#{t.sec}.sql"
bck_name
end
def backup(bck_file, federated = false)
cmd = "PGPASSWORD=\"#{@passwd}\" pg_dump -U #{@user} -h #{@server} -p #{@port} -b #{@db_name} -Fp -f #{bck_file} "
if federated
connect_db
@db.drop_table?(:logdb_tmp)
@db.run 'CREATE TABLE logdb_tmp (LIKE logdb INCLUDING ALL)'
@db[:logdb_tmp].insert(@db[:logdb].where { fed_index != -1 })
FEDERATED_TABLES.each do |table|
cmd << " -t " << table
end
cmd << " -t logdb_tmp"
rc = system(cmd)
if !rc
raise "Unknown error running '#{cmd}'"
end
@db.drop_table(:logdb_tmp)
File.write("#{bck_file}",File.open("#{bck_file}",&:read).gsub("logdb_tmp","logdb"))
else
rc = system(cmd)
if !rc
raise "Unknown error running '#{cmd}'"
end
end
File.write("#{bck_file}",File.open("#{bck_file}",&:read).gsub("COMMENT ON","-- COMMENT ON"))
puts "PostgreSQL dump stored in #{bck_file}"
puts "Use 'onedb restore' to restore the DB"
puts
end
def get_db_encoding
db_enc = ''
@db.fetch("SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname = \'#{@db_name}\'") do |row|
db_enc = row[:pg_encoding_to_char]
db_enc ||= row[:PG_ENCODING_TO_CHAR]
end
db_enc
end
def encoding
@encoding = ''
connect_db
@encoding = get_db_encoding
end
def get_table_enconding(table = nil)
encoding = get_db_encoding
table_to_nk(encoding)
end
def table_to_nk(encoding)
case encoding
when 'UTF8'
'UTF-8'
when 'ISO_8859_5'
'ISO-8859-5'
when 'ISO_8859_6'
'ISO-8859-6'
when 'ISO_8859_7'
'ISO-8859-7'
when 'ISO_8859_8'
'ISO-8859-8'
when 'LATIN1'
'ISO-8859-1'
when 'LATIN2'
'ISO-8859-2'
when 'LATIN3'
'ISO-8859-3'
when 'LATIN4'
'ISO-8859-4'
when 'LATIN5'
'ISO-8859-9'
when 'SJIS'
'SHIFT-JIS'
when 'EUC_JP'
'EUC-JP'
when 'SQL_ASCII'
'ASCII'
else
NOKOGIRI_ENCODING
end
end
def restore(bck_file, force=nil, federated=false)
if !federated && !force && db_exists?
raise "PostgreSQL database #{@db_name} at #{@server} exists," <<
" use -f to overwrite."
end
connect_db
if federated
FEDERATED_TABLES.each do |table|
@db.drop_table?(table)
end
@db.drop_table?(:logdb)
else
@db.tables.each do |table|
@db.drop_table(table)
end
end
rc = system("PGPASSWORD=\"#{@passwd}\" psql -U #{@user} -h #{@server} -p #{@port} -d #{@db_name} -f #{bck_file} --set ON_ERROR_STOP=on --quiet -o /dev/null")
if !rc
raise "Error while restoring PostgreSQL DB #{@db_name} at #{@server}."
end
puts "PostgreSQL DB #{@db_name} at #{@server} restored."
end
def idx?(idx)
query = "SELECT * FROM pg_indexes WHERE indexname = '#{idx[:name]}'"
!@db.fetch(query).first.nil?
end
def create_idx(version = nil)
schema = get_schema(:index_sqlite, version)
schema.each do |idx|
next if idx? idx
query = 'CREATE INDEX '
query << idx[:type] if idx[:type]
query << " #{idx[:name]} ON #{idx[:table]} #{idx[:columns]};"
@db.run query
end
end
def delete_idx(version = nil)
schema = get_schema(:index_sqlite, version)
return unless schema
schema.each do |idx|
next unless idx? idx
query = "DROP INDEX #{idx[:name]};"
@db.run query
end
end
private
def connect_db
passwd = CGI.escape(@passwd)
endpoint = "postgres://#{@user}:#{passwd}@#{@server}:#{@port}/#{@db_name}"
begin
options = {}
options[:encoding] = @encoding unless @encoding.empty?
@db = Sequel.connect(endpoint, options)
rescue Exception => e
raise "Error connecting to DB: " + e.message
end
redefine_db_methods
end
def redefine_db_methods
def @db.fetch(query)
preprocessed = BackEndPostgreSQL.preprocess(query)
super(preprocessed)
end
def @db.run(query)
preprocessed = BackEndPostgreSQL.preprocess(query)
super(preprocessed)
end
end
# Any change to this method should be reflected in PostgreSqlDB class
# in src/sql/PostgreSqlDB.cc
def self.preprocess(query)
pp_query = query.dup
if pp_query.upcase.start_with?('CREATE TABLE')
pp_query = replace_type(pp_query, 'MEDIUMTEXT', 'TEXT')
pp_query = replace_type(pp_query, 'LONGTEXT', 'TEXT')
pp_query = replace_type(pp_query, 'BIGINT UNSIGNED', 'NUMERIC(20)')
end
preprocess_query(pp_query)
end
def self.replace_type(query, type, replacement)
query = query.gsub(type.upcase, replacement.upcase)
query = query.gsub(type.downcase, replacement.downcase)
return query
end
# This method changes MySQL/SQLite REPLACE INTO into PostgreSQL
# INSERT INTO query with ON CONFLICT clause.
#
# For more information look into include/PostgreSQL.h
def self.preprocess_query(query)
return query unless query.upcase.start_with?('REPLACE')
query[0, 7] = 'INSERT'
table_start = query.index('INTO ', 7) + 5
names_start = query.index('(', table_start) + 1
names_end = query.index(')', names_start)
table = query[table_start, names_start - 2 - table_start ]
db_names = query[names_start, names_end - names_start]
splits = db_names.split(',')
query += " ON CONFLICT ON CONSTRAINT #{table}_pkey DO UPDATE SET"
sep = " "
splits.each do |split|
query += "#{sep}#{split.strip} = EXCLUDED.#{split.strip}"
sep = ", "
end
query
end
end

View File

@ -158,14 +158,8 @@ void PoolSQL::exist(const string& id_str, std::set<int>& id_list)
std::vector<int> existing_items;
one_util::split_unique(id_str, ',', id_list);
if (Nebula::instance().get_db_backend() == "postgresql")
{
search(existing_items, table.c_str(), "true order by 1 ASC");
}
else
{
search(existing_items, table.c_str(), "1 order by 1 ASC");
}
search(existing_items, table.c_str(), "1 order by 1 ASC");
for (auto iterator = id_list.begin(); iterator != id_list.end();)
{

View File

@ -1,342 +0,0 @@
/* -------------------------------------------------------------------------- */
/* Copyright 2002-2023, OpenNebula Project, OpenNebula Systems */
/* */
/* Licensed under the Apache License, Version 2.0 (the "License"); you may */
/* not use this file except in compliance with the License. You may obtain */
/* a copy of the License at */
/* */
/* http://www.apache.org/licenses/LICENSE-2.0 */
/* */
/* Unless required by applicable law or agreed to in writing, software */
/* distributed under the License is distributed on an "AS IS" BASIS, */
/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
/* See the License for the specific language governing permissions and */
/* limitations under the License. */
/* -------------------------------------------------------------------------- */
#include "PostgreSqlDB.h"
#include "NebulaUtil.h"
#include "NebulaLog.h"
#include <libpq-fe.h>
#include <iostream>
/*********
* Doc: https://www.postgresql.org/docs/current/libpq.html
********/
#define PG_DEFAULT_PORT 5432
using namespace std;
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
PostgreSqlDB::PostgreSqlDB(const string& _server,
int _port,
const string& _user,
const string& _password,
const string& _database,
int _connections)
: server(_server)
, port(_port)
, user(_user)
, password(_password)
, database(_database)
, max_connections(_connections)
{
PGconn* conn;
if ( port == 0 )
{
port = PG_DEFAULT_PORT;
}
// Set up connection parameters
string params = "host=" + _server
+ " port=" + to_string(port)
+ " user=" + user
+ " password=" + password
+ " dbname=" + database;
// Create connection pool
for (int i = 0; i < max_connections; i++)
{
conn = PQconnectdb(params.c_str());
if ( PQstatus(conn) == CONNECTION_BAD )
{
ostringstream oss;
oss << "Could not open connect to database server: "
<< PQerrorMessage(conn);
throw runtime_error(oss.str());
}
db_connect.push(conn);
}
db_escape_connect = PQconnectdb(params.c_str());
if ( PQserverVersion(db_escape_connect) < 90500 )
{
std::string error = "PostgreSQL version error: must be 9.5 or higher.";
NebulaLog::log("ONE", Log::ERROR, error);
throw runtime_error(error);
}
features =
{
{SqlFeature::MULTIPLE_VALUE, PQlibVersion() > 80200},
{SqlFeature::LIMIT, false},
{SqlFeature::JSON_QUERY, false},
{SqlFeature::COMPARE_BINARY, false}
};
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
PostgreSqlDB::~PostgreSqlDB()
{
while (!db_connect.empty())
{
PGconn* conn = db_connect.front();
db_connect.pop();
PQfinish(conn);
}
PQfinish(db_escape_connect);
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
char * PostgreSqlDB::escape_str(const string& str) const
{
char* buf = new char[str.size() * 2 + 1];
int err;
PQescapeStringConn(db_escape_connect, buf, str.c_str(), str.length(), &err);
if ( err != 0 )
{
delete[] buf;
return nullptr;
}
return buf;
}
/* -------------------------------------------------------------------------- */
void PostgreSqlDB::free_str(char * str) const
{
delete[] str;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
int PostgreSqlDB::exec_ext(std::ostringstream& cmd, Callbackable *obj, bool quiet)
{
string str = preprocess_query(cmd);
const char* c_str = str.c_str();
Log::MessageType error_type = quiet ? Log::DDEBUG : Log::ERROR;
PGconn* conn = get_db_connection();
PGresult* res = PQexec(conn, c_str);
if ( PQstatus(conn) == CONNECTION_BAD )
{
PQreset(conn);
if ( PQstatus(conn) == CONNECTION_BAD )
{
NebulaLog::error("ONE", "Lost connection to DB, unable to reconnect");
PQclear(res);
free_db_connection(conn);
return SqlDB::CONNECTION;
}
else
{
NebulaLog::info("ONE", "Succesfully reconnected to DB");
// Re-execute the query
PQclear(res);
res = PQexec(conn, c_str);
}
}
if ( PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK )
{
const char* err_msg = PQerrorMessage(conn);
ostringstream oss;
oss << "SQL command was: " << c_str;
oss << ", error " << err_msg;
NebulaLog::log("ONE", error_type, oss);
PQclear(res);
free_db_connection(conn);
return SqlDB::SQL;
}
if ( obj == 0 )
{
// Free the result and db connection
PQclear(res);
free_db_connection(conn);
return SqlDB::SUCCESS;
}
int ec = SqlDB::SUCCESS;
// Get number of fields and rows of the result
int n_fields = PQnfields(res);
int n_rows = PQntuples(res);
if ( obj->isCallBackSet() && PQresultStatus(res) == PGRES_TUPLES_OK )
{
char** names = new char*[n_fields];
char** values = new char*[n_fields];
// Get column names
for (int i = 0; i < n_fields; i++)
{
names[i] = PQfname(res, i);
}
// For each row
for (int row = 0; row < n_rows; row++)
{
// get values in that row
for (int field = 0; field < n_fields; field++)
{
values[field] = PQgetvalue(res, row, field);
}
// and do a callback on them
if ( obj->do_callback(n_fields, values, names) != 0 )
{
ec = SqlDB::SQL;
break;
}
}
delete[] names;
delete[] values;
}
if ( obj->get_affected_rows() == 0 && n_rows )
{
obj->set_affected_rows(n_rows);
}
// Free the result and db connection
PQclear(res);
free_db_connection(conn);
return ec;
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
PGconn * PostgreSqlDB::get_db_connection()
{
PGconn* conn;
unique_lock<mutex> lock(_mutex);
cond.wait(lock, [&] { return !db_connect.empty(); });
conn = db_connect.front();
db_connect.pop();
return conn;
}
/* -------------------------------------------------------------------------- */
void PostgreSqlDB::free_db_connection(PGconn * db)
{
lock_guard<mutex> lock(_mutex);
db_connect.push(db);
cond.notify_one();
}
/* -------------------------------------------------------------------------- */
/* -------------------------------------------------------------------------- */
static void replace_substring(std::string& cmd, const std::string& s1,
const std::string& s2)
{
size_t pos = cmd.find(s1);
while (pos != std::string::npos)
{
cmd.replace(pos, s1.length(), s2);
pos = cmd.find(s1, pos + s2.length());
}
}
std::string PostgreSqlDB::preprocess_query(std::ostringstream& cmd)
{
std::string query = cmd.str();
// Both CREATE TABLE and REPLACE should be at the start
// so we don't change user data
if (query.find("CREATE TABLE") == 0)
{
replace_substring(query, "MEDIUMTEXT", "TEXT");
replace_substring(query, "LONGTEXT", "TEXT");
replace_substring(query, "BIGINT UNSIGNED", "NUMERIC(20)");
}
else if (query.find("REPLACE") == 0)
{
query.replace(0, 7, "INSERT");
size_t table_start = query.find("INTO ", 7) + 5;
size_t names_start = query.find("(", table_start) + 1;
size_t names_end = query.find(")", names_start);
std::string db_names = query.substr(names_start, names_end - names_start);
std::string table = query.substr(table_start, names_start - 2 - table_start);
std::vector<std::string> splits;
one_util::split(db_names, ',', splits);
query += " ON CONFLICT ON CONSTRAINT " + table + "_pkey DO UPDATE SET ";
const char* sep = "";
for (auto &s: splits)
{
query += sep + s + " = EXCLUDED." + s;
sep = ", ";
}
}
return query;
}

View File

@ -29,9 +29,6 @@ if env['sqlite']=='yes':
if env['mysql']=='yes':
source_files.append('MySqlDB.cc')
if env['postgresql']=='yes':
source_files.append('PostgreSqlDB.cc')
# Build library
env.StaticLibrary(lib_name, source_files)