2007-06-10 17:02:09 +00:00
/*
Unix SMB / CIFS implementation .
Database interface wrapper around ctdbd
2009-12-11 10:35:50 +01:00
Copyright ( C ) Volker Lendecke 2007 - 2009
Copyright ( C ) Michael Adam 2009
2008-08-24 12:46:26 +02:00
2007-06-10 17:02:09 +00:00
This program is free software ; you can redistribute it and / or modify
it under the terms of the GNU General Public License as published by
2007-07-09 19:25:36 +00:00
the Free Software Foundation ; either version 3 of the License , or
2007-06-10 17:02:09 +00:00
( at your option ) any later version .
2008-08-24 12:46:26 +02:00
2007-06-10 17:02:09 +00:00
This program is distributed in the hope that it will be useful ,
but WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU General Public License for more details .
2008-08-24 12:46:26 +02:00
2007-06-10 17:02:09 +00:00
You should have received a copy of the GNU General Public License
2007-07-10 00:52:41 +00:00
along with this program . If not , see < http : //www.gnu.org/licenses/>.
2007-06-10 17:02:09 +00:00
*/
# include "includes.h"
2011-02-25 23:20:06 +01:00
# include "system/filesys.h"
2012-03-10 21:33:11 +01:00
# include "lib/tdb_wrap/tdb_wrap.h"
2011-05-05 11:25:29 +02:00
# include "util_tdb.h"
2012-05-11 21:36:48 +02:00
# include "dbwrap/dbwrap.h"
2011-10-25 16:32:12 +02:00
# include "dbwrap/dbwrap_ctdb.h"
2011-10-12 11:48:55 +02:00
# include "dbwrap/dbwrap_rbt.h"
2011-10-13 16:50:57 +02:00
# include "lib/param/param.h"
2011-10-12 11:48:55 +02:00
2007-06-10 17:02:09 +00:00
# ifdef CLUSTER_SUPPORT
2011-08-30 17:02:54 +02:00
/*
* It is not possible to include ctdb . h and tdb_compat . h ( included via
* some other include above ) without warnings . This fixes those
* warnings .
*/
# ifdef typesafe_cb
# undef typesafe_cb
# endif
# ifdef typesafe_cb_preargs
# undef typesafe_cb_preargs
# endif
# ifdef typesafe_cb_postargs
# undef typesafe_cb_postargs
# endif
2007-06-10 17:02:09 +00:00
# include "ctdb.h"
# include "ctdb_private.h"
2008-01-16 12:09:48 +03:00
# include "ctdbd_conn.h"
2011-07-07 17:42:08 +02:00
# include "dbwrap/dbwrap.h"
2011-08-24 14:53:42 +02:00
# include "dbwrap/dbwrap_private.h"
2011-08-16 15:51:58 +02:00
# include "dbwrap/dbwrap_ctdb.h"
2009-12-03 17:29:54 +01:00
# include "g_lock.h"
2011-03-24 15:31:06 +01:00
# include "messages.h"
2007-06-10 17:02:09 +00:00
2008-08-07 16:20:05 +10:00
struct db_ctdb_transaction_handle {
struct db_ctdb_ctx * ctx ;
2009-10-22 14:37:51 +02:00
/*
2011-09-23 11:58:35 +02:00
* we store the writes done under a transaction :
2009-10-22 14:37:51 +02:00
*/
2008-08-07 16:20:05 +10:00
struct ctdb_marshall_buffer * m_write ;
2008-08-08 16:44:24 +10:00
uint32_t nesting ;
bool nested_cancel ;
2009-12-03 17:29:54 +01:00
char * lock_name ;
2008-08-07 16:20:05 +10:00
} ;
2007-06-10 17:02:09 +00:00
struct db_ctdb_ctx {
2008-08-07 19:14:16 +10:00
struct db_context * db ;
2007-06-10 17:02:09 +00:00
struct tdb_wrap * wtdb ;
2012-05-11 21:53:13 +02:00
uint32_t db_id ;
2008-08-07 16:20:05 +10:00
struct db_ctdb_transaction_handle * transaction ;
2009-12-03 17:29:54 +01:00
struct g_lock_ctx * lock_ctx ;
2007-06-10 17:02:09 +00:00
} ;
struct db_ctdb_rec {
struct db_ctdb_ctx * ctdb_ctx ;
struct ctdb_ltdb_header header ;
2010-03-05 16:46:36 +01:00
struct timeval lock_time ;
2007-06-10 17:02:09 +00:00
} ;
2008-08-06 22:22:23 +02:00
static NTSTATUS tdb_error_to_ntstatus ( struct tdb_context * tdb )
{
enum TDB_ERROR tret = tdb_error ( tdb ) ;
2011-11-29 15:57:10 +01:00
return map_nt_error_from_tdb ( tret ) ;
2008-08-06 22:22:23 +02:00
}
2008-08-07 16:20:05 +10:00
2009-11-03 00:55:41 +01:00
/**
* fetch a record from the tdb , separating out the header
* information and returning the body of the record .
*/
static NTSTATUS db_ctdb_ltdb_fetch ( struct db_ctdb_ctx * db ,
TDB_DATA key ,
struct ctdb_ltdb_header * header ,
TALLOC_CTX * mem_ctx ,
TDB_DATA * data )
{
TDB_DATA rec ;
NTSTATUS status ;
2011-06-20 18:40:31 +09:30
rec = tdb_fetch_compat ( db - > wtdb - > tdb , key ) ;
2009-11-03 00:55:41 +01:00
if ( rec . dsize < sizeof ( struct ctdb_ltdb_header ) ) {
status = NT_STATUS_NOT_FOUND ;
if ( data ) {
ZERO_STRUCTP ( data ) ;
}
if ( header ) {
header - > dmaster = ( uint32_t ) - 1 ;
header - > rsn = 0 ;
}
goto done ;
}
if ( header ) {
* header = * ( struct ctdb_ltdb_header * ) rec . dptr ;
}
if ( data ) {
data - > dsize = rec . dsize - sizeof ( struct ctdb_ltdb_header ) ;
if ( data - > dsize = = 0 ) {
data - > dptr = NULL ;
} else {
data - > dptr = ( unsigned char * ) talloc_memdup ( mem_ctx ,
rec . dptr
+ sizeof ( struct ctdb_ltdb_header ) ,
data - > dsize ) ;
if ( data - > dptr = = NULL ) {
status = NT_STATUS_NO_MEMORY ;
goto done ;
}
}
}
status = NT_STATUS_OK ;
done :
SAFE_FREE ( rec . dptr ) ;
return status ;
}
2009-10-22 16:27:45 +02:00
/*
* Store a record together with the ctdb record header
* in the local copy of the database .
*/
static NTSTATUS db_ctdb_ltdb_store ( struct db_ctdb_ctx * db ,
TDB_DATA key ,
struct ctdb_ltdb_header * header ,
TDB_DATA data )
{
TALLOC_CTX * tmp_ctx = talloc_stackframe ( ) ;
TDB_DATA rec ;
int ret ;
rec . dsize = data . dsize + sizeof ( struct ctdb_ltdb_header ) ;
rec . dptr = ( uint8_t * ) talloc_size ( tmp_ctx , rec . dsize ) ;
if ( rec . dptr = = NULL ) {
talloc_free ( tmp_ctx ) ;
return NT_STATUS_NO_MEMORY ;
}
memcpy ( rec . dptr , header , sizeof ( struct ctdb_ltdb_header ) ) ;
memcpy ( sizeof ( struct ctdb_ltdb_header ) + ( uint8_t * ) rec . dptr , data . dptr , data . dsize ) ;
ret = tdb_store ( db - > wtdb - > tdb , key , rec , TDB_REPLACE ) ;
talloc_free ( tmp_ctx ) ;
return ( ret = = 0 ) ? NT_STATUS_OK
: tdb_error_to_ntstatus ( db - > wtdb - > tdb ) ;
}
2008-08-07 16:20:05 +10:00
/*
form a ctdb_rec_data record from a key / data pair
2008-08-24 12:46:26 +02:00
2008-08-07 16:20:05 +10:00
note that header may be NULL . If not NULL then it is included in the data portion
of the record
*/
static struct ctdb_rec_data * db_ctdb_marshall_record ( TALLOC_CTX * mem_ctx , uint32_t reqid ,
TDB_DATA key ,
struct ctdb_ltdb_header * header ,
TDB_DATA data )
{
size_t length ;
struct ctdb_rec_data * d ;
length = offsetof ( struct ctdb_rec_data , data ) + key . dsize +
data . dsize + ( header ? sizeof ( * header ) : 0 ) ;
d = ( struct ctdb_rec_data * ) talloc_size ( mem_ctx , length ) ;
if ( d = = NULL ) {
return NULL ;
}
d - > length = length ;
d - > reqid = reqid ;
d - > keylen = key . dsize ;
memcpy ( & d - > data [ 0 ] , key . dptr , key . dsize ) ;
if ( header ) {
d - > datalen = data . dsize + sizeof ( * header ) ;
memcpy ( & d - > data [ key . dsize ] , header , sizeof ( * header ) ) ;
memcpy ( & d - > data [ key . dsize + sizeof ( * header ) ] , data . dptr , data . dsize ) ;
} else {
d - > datalen = data . dsize ;
memcpy ( & d - > data [ key . dsize ] , data . dptr , data . dsize ) ;
}
return d ;
}
/* helper function for marshalling multiple records */
static struct ctdb_marshall_buffer * db_ctdb_marshall_add ( TALLOC_CTX * mem_ctx ,
struct ctdb_marshall_buffer * m ,
uint64_t db_id ,
uint32_t reqid ,
TDB_DATA key ,
struct ctdb_ltdb_header * header ,
TDB_DATA data )
{
struct ctdb_rec_data * r ;
size_t m_size , r_size ;
2009-03-04 22:05:17 +01:00
struct ctdb_marshall_buffer * m2 = NULL ;
2008-08-07 16:20:05 +10:00
2009-03-04 22:05:17 +01:00
r = db_ctdb_marshall_record ( talloc_tos ( ) , reqid , key , header , data ) ;
2008-08-07 16:20:05 +10:00
if ( r = = NULL ) {
talloc_free ( m ) ;
return NULL ;
}
if ( m = = NULL ) {
2008-08-24 12:43:36 +02:00
m = ( struct ctdb_marshall_buffer * ) talloc_zero_size (
mem_ctx , offsetof ( struct ctdb_marshall_buffer , data ) ) ;
2008-08-07 16:20:05 +10:00
if ( m = = NULL ) {
2009-03-04 22:05:17 +01:00
goto done ;
2008-08-07 16:20:05 +10:00
}
m - > db_id = db_id ;
}
m_size = talloc_get_size ( m ) ;
r_size = talloc_get_size ( r ) ;
2008-08-24 12:43:36 +02:00
m2 = ( struct ctdb_marshall_buffer * ) talloc_realloc_size (
mem_ctx , m , m_size + r_size ) ;
2008-08-07 16:20:05 +10:00
if ( m2 = = NULL ) {
talloc_free ( m ) ;
2009-03-04 22:05:17 +01:00
goto done ;
2008-08-07 16:20:05 +10:00
}
memcpy ( m_size + ( uint8_t * ) m2 , r , r_size ) ;
m2 - > count + + ;
2009-03-04 22:05:17 +01:00
done :
talloc_free ( r ) ;
2008-08-07 16:20:05 +10:00
return m2 ;
}
/* we've finished marshalling, return a data blob with the marshalled records */
static TDB_DATA db_ctdb_marshall_finish ( struct ctdb_marshall_buffer * m )
{
TDB_DATA data ;
data . dptr = ( uint8_t * ) m ;
data . dsize = talloc_get_size ( m ) ;
return data ;
}
/*
loop over a marshalling buffer
2008-08-24 12:46:26 +02:00
2008-08-07 16:20:05 +10:00
- pass r = = NULL to start
- loop the number of times indicated by m - > count
*/
static struct ctdb_rec_data * db_ctdb_marshall_loop_next ( struct ctdb_marshall_buffer * m , struct ctdb_rec_data * r ,
uint32_t * reqid ,
struct ctdb_ltdb_header * header ,
TDB_DATA * key , TDB_DATA * data )
{
if ( r = = NULL ) {
r = ( struct ctdb_rec_data * ) & m - > data [ 0 ] ;
} else {
r = ( struct ctdb_rec_data * ) ( r - > length + ( uint8_t * ) r ) ;
}
if ( reqid ! = NULL ) {
* reqid = r - > reqid ;
}
2008-08-24 12:46:26 +02:00
2008-08-07 16:20:05 +10:00
if ( key ! = NULL ) {
key - > dptr = & r - > data [ 0 ] ;
key - > dsize = r - > keylen ;
}
if ( data ! = NULL ) {
data - > dptr = & r - > data [ r - > keylen ] ;
data - > dsize = r - > datalen ;
if ( header ! = NULL ) {
data - > dptr + = sizeof ( * header ) ;
data - > dsize - = sizeof ( * header ) ;
}
}
if ( header ! = NULL ) {
if ( r - > datalen < sizeof ( * header ) ) {
return NULL ;
}
* header = * ( struct ctdb_ltdb_header * ) & r - > data [ r - > keylen ] ;
}
return r ;
}
2009-05-25 21:59:40 +02:00
/**
* CTDB transaction destructor
*/
2008-08-07 16:20:05 +10:00
static int db_ctdb_transaction_destructor ( struct db_ctdb_transaction_handle * h )
{
2009-10-28 01:50:15 +01:00
NTSTATUS status ;
2008-08-07 16:20:05 +10:00
2009-12-03 17:29:54 +01:00
status = g_lock_unlock ( h - > ctx - > lock_ctx , h - > lock_name ) ;
2009-10-28 01:54:04 +01:00
if ( ! NT_STATUS_IS_OK ( status ) ) {
2011-09-05 17:07:37 +02:00
DEBUG ( 0 , ( " g_lock_unlock failed for %s: %s \n " , h - > lock_name ,
nt_errstr ( status ) ) ) ;
2009-10-28 01:54:04 +01:00
return - 1 ;
}
2008-08-07 16:20:05 +10:00
return 0 ;
}
2009-05-25 21:59:40 +02:00
/**
* CTDB dbwrap API : transaction_start function
* starts a transaction on a persistent database
*/
2008-08-07 16:20:05 +10:00
static int db_ctdb_transaction_start ( struct db_context * db )
{
struct db_ctdb_transaction_handle * h ;
2009-12-03 17:29:54 +01:00
NTSTATUS status ;
2008-08-07 16:20:05 +10:00
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
if ( ! db - > persistent ) {
DEBUG ( 0 , ( " transactions not supported on non-persistent database 0x%08x \n " ,
ctx - > db_id ) ) ;
return - 1 ;
}
if ( ctx - > transaction ) {
2008-08-08 16:44:24 +10:00
ctx - > transaction - > nesting + + ;
2011-08-14 23:47:47 +02:00
DEBUG ( 5 , ( __location__ " transaction start on db 0x%08x: nesting %d -> %d \n " ,
ctx - > db_id , ctx - > transaction - > nesting - 1 , ctx - > transaction - > nesting ) ) ;
2008-08-08 16:44:24 +10:00
return 0 ;
2008-08-07 16:20:05 +10:00
}
h = talloc_zero ( db , struct db_ctdb_transaction_handle ) ;
if ( h = = NULL ) {
DEBUG ( 0 , ( __location__ " oom for transaction handle \n " ) ) ;
return - 1 ;
}
h - > ctx = ctx ;
2009-12-03 17:29:54 +01:00
h - > lock_name = talloc_asprintf ( h , " transaction_db_0x%08x " ,
( unsigned int ) ctx - > db_id ) ;
if ( h - > lock_name = = NULL ) {
DEBUG ( 0 , ( " talloc_asprintf failed \n " ) ) ;
TALLOC_FREE ( h ) ;
return - 1 ;
}
/*
* Wait a day , i . e . forever . . .
*/
status = g_lock_lock ( ctx - > lock_ctx , h - > lock_name , G_LOCK_WRITE ,
timeval_set ( 86400 , 0 ) ) ;
if ( ! NT_STATUS_IS_OK ( status ) ) {
DEBUG ( 0 , ( " g_lock_lock failed: %s \n " , nt_errstr ( status ) ) ) ;
TALLOC_FREE ( h ) ;
2008-08-07 16:20:05 +10:00
return - 1 ;
}
talloc_set_destructor ( h , db_ctdb_transaction_destructor ) ;
ctx - > transaction = h ;
2011-08-14 23:47:47 +02:00
DEBUG ( 5 , ( __location__ " transaction started on db 0x%08x \n " , ctx - > db_id ) ) ;
2008-08-07 16:56:47 +10:00
2008-08-07 16:20:05 +10:00
return 0 ;
}
2009-12-03 17:29:54 +01:00
static bool pull_newest_from_marshall_buffer ( struct ctdb_marshall_buffer * buf ,
TDB_DATA key ,
struct ctdb_ltdb_header * pheader ,
TALLOC_CTX * mem_ctx ,
TDB_DATA * pdata )
{
struct ctdb_rec_data * rec = NULL ;
struct ctdb_ltdb_header h ;
2010-01-05 16:17:27 +01:00
bool found = false ;
2009-12-03 17:29:54 +01:00
TDB_DATA data ;
int i ;
if ( buf = = NULL ) {
return false ;
}
2009-12-12 00:38:14 +01:00
ZERO_STRUCT ( h ) ;
ZERO_STRUCT ( data ) ;
2009-12-03 17:29:54 +01:00
/*
* Walk the list of records written during this
* transaction . If we want to read one we have already
* written , return the last written sample . Thus we do not do
* a " break; " for the first hit , this record might have been
* overwritten later .
*/
for ( i = 0 ; i < buf - > count ; i + + ) {
TDB_DATA tkey , tdata ;
uint32_t reqid ;
2010-01-06 00:37:21 +01:00
struct ctdb_ltdb_header hdr ;
2009-12-03 17:29:54 +01:00
2010-01-06 00:37:21 +01:00
ZERO_STRUCT ( hdr ) ;
rec = db_ctdb_marshall_loop_next ( buf , rec , & reqid , & hdr , & tkey ,
2009-12-03 17:29:54 +01:00
& tdata ) ;
if ( rec = = NULL ) {
return false ;
}
if ( tdb_data_equal ( key , tkey ) ) {
found = true ;
data = tdata ;
2010-01-06 00:37:21 +01:00
h = hdr ;
2009-12-03 17:29:54 +01:00
}
}
if ( ! found ) {
return false ;
}
if ( pdata ! = NULL ) {
data . dptr = ( uint8_t * ) talloc_memdup ( mem_ctx , data . dptr ,
data . dsize ) ;
if ( ( data . dsize ! = 0 ) & & ( data . dptr = = NULL ) ) {
return false ;
}
* pdata = data ;
}
if ( pheader ! = NULL ) {
* pheader = h ;
}
2008-08-07 16:20:05 +10:00
2009-12-03 17:29:54 +01:00
return true ;
}
2008-08-07 16:20:05 +10:00
/*
fetch a record inside a transaction
*/
2011-11-11 00:49:11 +01:00
static NTSTATUS db_ctdb_transaction_fetch ( struct db_ctdb_ctx * db ,
TALLOC_CTX * mem_ctx ,
TDB_DATA key , TDB_DATA * data )
2008-08-07 16:20:05 +10:00
{
struct db_ctdb_transaction_handle * h = db - > transaction ;
2009-10-28 01:28:38 +01:00
NTSTATUS status ;
2009-12-03 17:29:54 +01:00
bool found ;
found = pull_newest_from_marshall_buffer ( h - > m_write , key , NULL ,
mem_ctx , data ) ;
if ( found ) {
2011-11-11 00:49:11 +01:00
return NT_STATUS_OK ;
2009-12-03 17:29:54 +01:00
}
2008-08-07 16:20:05 +10:00
2009-10-28 01:28:38 +01:00
status = db_ctdb_ltdb_fetch ( h - > ctx , key , NULL , mem_ctx , data ) ;
2008-08-07 16:20:05 +10:00
2009-10-28 01:28:38 +01:00
if ( NT_STATUS_EQUAL ( status , NT_STATUS_NOT_FOUND ) ) {
* data = tdb_null ;
2008-08-07 16:20:05 +10:00
}
2011-11-11 00:49:11 +01:00
return status ;
2008-08-07 16:20:05 +10:00
}
2011-03-25 00:29:42 +01:00
/**
* Fetch a record from a persistent database
* without record locking and without an active transaction .
*
* This just fetches from the local database copy .
* Since the databases are kept in syc cluster - wide ,
* there is no point in doing a ctdb call to fetch the
* record from the lmaster . It does even harm since migration
* of records bump their RSN and hence render the persistent
* database inconsistent .
*/
2011-11-11 00:49:11 +01:00
static NTSTATUS db_ctdb_fetch_persistent ( struct db_ctdb_ctx * db ,
TALLOC_CTX * mem_ctx ,
TDB_DATA key , TDB_DATA * data )
2011-03-25 00:29:42 +01:00
{
NTSTATUS status ;
status = db_ctdb_ltdb_fetch ( db , key , NULL , mem_ctx , data ) ;
if ( NT_STATUS_EQUAL ( status , NT_STATUS_NOT_FOUND ) ) {
* data = tdb_null ;
}
2011-11-11 00:49:11 +01:00
return status ;
2011-03-25 00:29:42 +01:00
}
2008-08-07 16:20:05 +10:00
static NTSTATUS db_ctdb_store_transaction ( struct db_record * rec , TDB_DATA data , int flag ) ;
static NTSTATUS db_ctdb_delete_transaction ( struct db_record * rec ) ;
static struct db_record * db_ctdb_fetch_locked_transaction ( struct db_ctdb_ctx * ctx ,
TALLOC_CTX * mem_ctx ,
TDB_DATA key )
{
struct db_record * result ;
TDB_DATA ctdb_data ;
if ( ! ( result = talloc ( mem_ctx , struct db_record ) ) ) {
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
return NULL ;
}
result - > private_data = ctx - > transaction ;
result - > key . dsize = key . dsize ;
2012-05-11 21:53:13 +02:00
result - > key . dptr = ( uint8_t * ) talloc_memdup ( result , key . dptr ,
key . dsize ) ;
2008-08-07 16:20:05 +10:00
if ( result - > key . dptr = = NULL ) {
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
result - > store = db_ctdb_store_transaction ;
result - > delete_rec = db_ctdb_delete_transaction ;
2009-12-03 17:29:54 +01:00
if ( pull_newest_from_marshall_buffer ( ctx - > transaction - > m_write , key ,
NULL , result , & result - > value ) ) {
return result ;
}
2011-06-20 18:40:31 +09:30
ctdb_data = tdb_fetch_compat ( ctx - > wtdb - > tdb , key ) ;
2008-08-07 16:20:05 +10:00
if ( ctdb_data . dptr = = NULL ) {
/* create the record */
result - > value = tdb_null ;
return result ;
}
result - > value . dsize = ctdb_data . dsize - sizeof ( struct ctdb_ltdb_header ) ;
result - > value . dptr = NULL ;
if ( ( result - > value . dsize ! = 0 )
2012-05-11 21:53:13 +02:00
& & ! ( result - > value . dptr = ( uint8_t * ) talloc_memdup (
2008-08-07 16:20:05 +10:00
result , ctdb_data . dptr + sizeof ( struct ctdb_ltdb_header ) ,
result - > value . dsize ) ) ) {
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
TALLOC_FREE ( result ) ;
}
SAFE_FREE ( ctdb_data . dptr ) ;
return result ;
}
2008-09-15 14:27:50 +10:00
static int db_ctdb_record_destructor ( struct db_record * * recp )
2008-08-07 19:14:16 +10:00
{
2008-09-15 14:27:50 +10:00
struct db_record * rec = talloc_get_type_abort ( * recp , struct db_record ) ;
2008-08-07 19:14:16 +10:00
struct db_ctdb_transaction_handle * h = talloc_get_type_abort (
rec - > private_data , struct db_ctdb_transaction_handle ) ;
2008-08-07 21:33:00 +10:00
int ret = h - > ctx - > db - > transaction_commit ( h - > ctx - > db ) ;
if ( ret ! = 0 ) {
DEBUG ( 0 , ( __location__ " transaction_commit failed \n " ) ) ;
}
2008-08-07 19:14:16 +10:00
return 0 ;
}
/*
auto - create a transaction for persistent databases
*/
static struct db_record * db_ctdb_fetch_locked_persistent ( struct db_ctdb_ctx * ctx ,
TALLOC_CTX * mem_ctx ,
TDB_DATA key )
{
int res ;
2008-09-15 14:27:50 +10:00
struct db_record * rec , * * recp ;
2008-08-07 19:14:16 +10:00
res = db_ctdb_transaction_start ( ctx - > db ) ;
if ( res = = - 1 ) {
return NULL ;
}
rec = db_ctdb_fetch_locked_transaction ( ctx , mem_ctx , key ) ;
if ( rec = = NULL ) {
ctx - > db - > transaction_cancel ( ctx - > db ) ;
return NULL ;
}
/* destroy this transaction when we release the lock */
2008-09-15 14:27:50 +10:00
recp = talloc ( rec , struct db_record * ) ;
if ( recp = = NULL ) {
ctx - > db - > transaction_cancel ( ctx - > db ) ;
2008-09-15 14:51:35 +10:00
talloc_free ( rec ) ;
2008-09-15 14:27:50 +10:00
return NULL ;
}
* recp = rec ;
talloc_set_destructor ( recp , db_ctdb_record_destructor ) ;
2008-08-07 19:14:16 +10:00
return rec ;
}
2008-08-07 16:20:05 +10:00
/*
stores a record inside a transaction
*/
2009-12-11 12:30:57 +01:00
static NTSTATUS db_ctdb_transaction_store ( struct db_ctdb_transaction_handle * h ,
TDB_DATA key , TDB_DATA data )
2008-08-07 16:20:05 +10:00
{
TALLOC_CTX * tmp_ctx = talloc_new ( h ) ;
TDB_DATA rec ;
struct ctdb_ltdb_header header ;
2009-12-03 17:29:54 +01:00
ZERO_STRUCT ( header ) ;
2008-08-07 16:20:05 +10:00
/* we need the header so we can update the RSN */
2009-12-03 17:29:54 +01:00
if ( ! pull_newest_from_marshall_buffer ( h - > m_write , key , & header ,
NULL , NULL ) ) {
2011-06-20 18:40:31 +09:30
rec = tdb_fetch_compat ( h - > ctx - > wtdb - > tdb , key ) ;
2009-12-03 17:29:54 +01:00
if ( rec . dptr ! = NULL ) {
memcpy ( & header , rec . dptr ,
sizeof ( struct ctdb_ltdb_header ) ) ;
rec . dsize - = sizeof ( struct ctdb_ltdb_header ) ;
/*
* a special case , we are writing the same
* data that is there now
*/
if ( data . dsize = = rec . dsize & &
memcmp ( data . dptr ,
rec . dptr + sizeof ( struct ctdb_ltdb_header ) ,
data . dsize ) = = 0 ) {
SAFE_FREE ( rec . dptr ) ;
talloc_free ( tmp_ctx ) ;
2009-12-11 12:30:57 +01:00
return NT_STATUS_OK ;
2009-12-03 17:29:54 +01:00
}
2008-08-08 09:58:15 +10:00
}
2008-08-07 16:20:05 +10:00
SAFE_FREE ( rec . dptr ) ;
}
2009-09-11 13:23:34 +02:00
header . dmaster = get_my_vnn ( ) ;
2008-08-07 16:20:05 +10:00
header . rsn + + ;
2008-08-08 09:58:15 +10:00
h - > m_write = db_ctdb_marshall_add ( h , h - > m_write , h - > ctx - > db_id , 0 , key , & header , data ) ;
if ( h - > m_write = = NULL ) {
DEBUG ( 0 , ( __location__ " Failed to add to marshalling record \n " ) ) ;
talloc_free ( tmp_ctx ) ;
2009-12-11 12:30:57 +01:00
return NT_STATUS_NO_MEMORY ;
2008-08-07 16:20:05 +10:00
}
2008-08-24 12:46:26 +02:00
2008-08-07 16:20:05 +10:00
talloc_free ( tmp_ctx ) ;
2009-12-11 12:30:57 +01:00
return NT_STATUS_OK ;
2008-08-07 16:20:05 +10:00
}
/*
a record store inside a transaction
*/
static NTSTATUS db_ctdb_store_transaction ( struct db_record * rec , TDB_DATA data , int flag )
{
struct db_ctdb_transaction_handle * h = talloc_get_type_abort (
rec - > private_data , struct db_ctdb_transaction_handle ) ;
2009-12-11 12:30:57 +01:00
NTSTATUS status ;
2008-08-07 16:20:05 +10:00
2009-12-11 12:30:57 +01:00
status = db_ctdb_transaction_store ( h , rec - > key , data ) ;
return status ;
2008-08-07 16:20:05 +10:00
}
/*
a record delete inside a transaction
*/
static NTSTATUS db_ctdb_delete_transaction ( struct db_record * rec )
{
struct db_ctdb_transaction_handle * h = talloc_get_type_abort (
rec - > private_data , struct db_ctdb_transaction_handle ) ;
2009-12-11 12:30:57 +01:00
NTSTATUS status ;
2008-08-07 16:20:05 +10:00
2009-12-11 12:30:57 +01:00
status = db_ctdb_transaction_store ( h , rec - > key , tdb_null ) ;
return status ;
2008-08-07 16:20:05 +10:00
}
2009-12-11 14:07:28 +01:00
/**
* Fetch the db sequence number of a persistent db directly from the db .
*/
static NTSTATUS db_ctdb_fetch_db_seqnum_from_db ( struct db_ctdb_ctx * db ,
uint64_t * seqnum )
{
NTSTATUS status ;
const char * keyname = CTDB_DB_SEQNUM_KEY ;
TDB_DATA key ;
TDB_DATA data ;
struct ctdb_ltdb_header header ;
TALLOC_CTX * mem_ctx = talloc_stackframe ( ) ;
if ( seqnum = = NULL ) {
return NT_STATUS_INVALID_PARAMETER ;
}
2010-01-18 17:26:04 +01:00
key = string_term_tdb_data ( keyname ) ;
2009-12-11 14:07:28 +01:00
status = db_ctdb_ltdb_fetch ( db , key , & header , mem_ctx , & data ) ;
2009-12-12 00:30:37 +01:00
if ( ! NT_STATUS_IS_OK ( status ) & &
! NT_STATUS_EQUAL ( status , NT_STATUS_NOT_FOUND ) )
{
2009-12-11 14:07:28 +01:00
goto done ;
}
2009-12-12 00:30:37 +01:00
status = NT_STATUS_OK ;
2009-12-11 14:07:28 +01:00
if ( data . dsize ! = sizeof ( uint64_t ) ) {
* seqnum = 0 ;
goto done ;
}
* seqnum = * ( uint64_t * ) data . dptr ;
done :
TALLOC_FREE ( mem_ctx ) ;
return status ;
}
/**
* Store the database sequence number inside a transaction .
*/
static NTSTATUS db_ctdb_store_db_seqnum ( struct db_ctdb_transaction_handle * h ,
uint64_t seqnum )
{
NTSTATUS status ;
const char * keyname = CTDB_DB_SEQNUM_KEY ;
TDB_DATA key ;
TDB_DATA data ;
2010-01-18 17:26:04 +01:00
key = string_term_tdb_data ( keyname ) ;
2009-12-11 14:07:28 +01:00
data . dptr = ( uint8_t * ) & seqnum ;
data . dsize = sizeof ( uint64_t ) ;
status = db_ctdb_transaction_store ( h , key , data ) ;
return status ;
}
2008-08-07 16:20:05 +10:00
/*
commit a transaction
*/
static int db_ctdb_transaction_commit ( struct db_context * db )
{
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
NTSTATUS rets ;
int status ;
struct db_ctdb_transaction_handle * h = ctx - > transaction ;
2009-12-11 14:07:28 +01:00
uint64_t old_seqnum , new_seqnum ;
int ret ;
2008-08-07 16:20:05 +10:00
if ( h = = NULL ) {
DEBUG ( 0 , ( __location__ " transaction commit with no open transaction on db 0x%08x \n " , ctx - > db_id ) ) ;
return - 1 ;
}
2008-08-08 16:44:24 +10:00
if ( h - > nested_cancel ) {
db - > transaction_cancel ( db ) ;
DEBUG ( 5 , ( __location__ " Failed transaction commit after nested cancel \n " ) ) ;
return - 1 ;
}
if ( h - > nesting ! = 0 ) {
h - > nesting - - ;
2011-08-14 23:47:47 +02:00
DEBUG ( 5 , ( __location__ " transaction commit on db 0x%08x: nesting %d -> %d \n " ,
ctx - > db_id , ctx - > transaction - > nesting + 1 , ctx - > transaction - > nesting ) ) ;
2008-08-08 16:44:24 +10:00
return 0 ;
}
2010-01-13 23:53:54 +01:00
if ( h - > m_write = = NULL ) {
/*
* No changes were made , so don ' t change the seqnum ,
* don ' t push to other node , just exit with success .
*/
ret = 0 ;
goto done ;
}
2011-08-14 23:47:47 +02:00
DEBUG ( 5 , ( __location__ " transaction commit on db 0x%08x \n " , ctx - > db_id ) ) ;
2008-08-07 16:20:05 +10:00
2009-12-11 14:07:28 +01:00
/*
* As the last db action before committing , bump the database sequence
* number . Note that this undoes all changes to the seqnum records
* performed under the transaction . This record is not meant to be
* modified by user interaction . It is for internal use only . . .
*/
rets = db_ctdb_fetch_db_seqnum_from_db ( ctx , & old_seqnum ) ;
if ( ! NT_STATUS_IS_OK ( rets ) ) {
DEBUG ( 1 , ( __location__ " failed to fetch the db sequence number "
" in transaction commit on db 0x%08x \n " , ctx - > db_id ) ) ;
ret = - 1 ;
goto done ;
}
new_seqnum = old_seqnum + 1 ;
rets = db_ctdb_store_db_seqnum ( h , new_seqnum ) ;
if ( ! NT_STATUS_IS_OK ( rets ) ) {
DEBUG ( 1 , ( __location__ " failed to store the db sequence number "
" in transaction commit on db 0x%08x \n " , ctx - > db_id ) ) ;
ret = - 1 ;
goto done ;
}
2008-08-07 16:20:05 +10:00
again :
/* tell ctdbd to commit to the other nodes */
2010-08-31 16:52:56 +02:00
rets = ctdbd_control_local ( messaging_ctdbd_connection ( ) ,
2009-12-03 17:29:54 +01:00
CTDB_CONTROL_TRANS3_COMMIT ,
2008-08-08 13:12:16 +10:00
h - > ctx - > db_id , 0 ,
2009-12-03 17:29:54 +01:00
db_ctdb_marshall_finish ( h - > m_write ) ,
NULL , NULL , & status ) ;
2008-08-07 16:20:05 +10:00
if ( ! NT_STATUS_IS_OK ( rets ) | | status ! = 0 ) {
2009-12-03 17:29:54 +01:00
/*
2009-12-11 14:07:28 +01:00
* The TRANS3_COMMIT control should only possibly fail when a
* recovery has been running concurrently . In any case , the db
* will be the same on all nodes , either the new copy or the
* old copy . This can be detected by comparing the old and new
* local sequence numbers .
*/
rets = db_ctdb_fetch_db_seqnum_from_db ( ctx , & new_seqnum ) ;
if ( ! NT_STATUS_IS_OK ( rets ) ) {
DEBUG ( 1 , ( __location__ " failed to refetch db sequence "
" number after failed TRANS3_COMMIT \n " ) ) ;
ret = - 1 ;
goto done ;
}
if ( new_seqnum = = old_seqnum ) {
/* Recovery prevented all our changes: retry. */
goto again ;
} else if ( new_seqnum ! = ( old_seqnum + 1 ) ) {
DEBUG ( 0 , ( __location__ " ERROR: new_seqnum[%lu] != "
" old_seqnum[%lu] + (0 or 1) after failed "
" TRANS3_COMMIT - this should not happen! \n " ,
( unsigned long ) new_seqnum ,
( unsigned long ) old_seqnum ) ) ;
ret = - 1 ;
goto done ;
}
/*
* Recovery propagated our changes to all nodes , completing
* our commit for us - succeed .
2009-12-03 17:29:54 +01:00
*/
2008-08-07 16:20:05 +10:00
}
2009-12-11 14:07:28 +01:00
ret = 0 ;
2009-12-03 17:29:54 +01:00
done :
2008-08-07 16:20:05 +10:00
h - > ctx - > transaction = NULL ;
talloc_free ( h ) ;
2010-01-13 23:51:34 +01:00
return ret ;
2008-08-07 16:20:05 +10:00
}
/*
cancel a transaction
*/
static int db_ctdb_transaction_cancel ( struct db_context * db )
{
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
struct db_ctdb_transaction_handle * h = ctx - > transaction ;
if ( h = = NULL ) {
DEBUG ( 0 , ( __location__ " transaction cancel with no open transaction on db 0x%08x \n " , ctx - > db_id ) ) ;
return - 1 ;
}
2008-08-08 16:44:24 +10:00
if ( h - > nesting ! = 0 ) {
h - > nesting - - ;
h - > nested_cancel = true ;
2011-08-14 23:47:47 +02:00
DEBUG ( 5 , ( __location__ " transaction cancel on db 0x%08x: nesting %d -> %d \n " ,
ctx - > db_id , ctx - > transaction - > nesting + 1 , ctx - > transaction - > nesting ) ) ;
2008-08-08 16:44:24 +10:00
return 0 ;
}
2008-08-07 16:56:47 +10:00
DEBUG ( 5 , ( __location__ " Cancel transaction on db 0x%08x \n " , ctx - > db_id ) ) ;
2008-08-07 16:20:05 +10:00
ctx - > transaction = NULL ;
talloc_free ( h ) ;
return 0 ;
}
2007-06-10 17:02:09 +00:00
static NTSTATUS db_ctdb_store ( struct db_record * rec , TDB_DATA data , int flag )
{
struct db_ctdb_rec * crec = talloc_get_type_abort (
rec - > private_data , struct db_ctdb_rec ) ;
2009-10-22 16:27:45 +02:00
return db_ctdb_ltdb_store ( crec - > ctdb_ctx , rec - > key , & ( crec - > header ) , data ) ;
2007-06-10 17:02:09 +00:00
}
2008-01-16 12:09:48 +03:00
2011-07-12 17:32:55 +02:00
# ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
2010-12-22 14:16:07 +01:00
static NTSTATUS db_ctdb_send_schedule_for_deletion ( struct db_record * rec )
{
NTSTATUS status ;
struct ctdb_control_schedule_for_deletion * dd ;
TDB_DATA indata ;
int cstatus ;
struct db_ctdb_rec * crec = talloc_get_type_abort (
rec - > private_data , struct db_ctdb_rec ) ;
indata . dsize = offsetof ( struct ctdb_control_schedule_for_deletion , key ) + rec - > key . dsize ;
indata . dptr = talloc_zero_array ( crec , uint8_t , indata . dsize ) ;
if ( indata . dptr = = NULL ) {
DEBUG ( 0 , ( __location__ " talloc failed! \n " ) ) ;
return NT_STATUS_NO_MEMORY ;
}
dd = ( struct ctdb_control_schedule_for_deletion * ) ( void * ) indata . dptr ;
dd - > db_id = crec - > ctdb_ctx - > db_id ;
dd - > hdr = crec - > header ;
dd - > keylen = rec - > key . dsize ;
memcpy ( dd - > key , rec - > key . dptr , rec - > key . dsize ) ;
status = ctdbd_control_local ( messaging_ctdbd_connection ( ) ,
CTDB_CONTROL_SCHEDULE_FOR_DELETION ,
crec - > ctdb_ctx - > db_id ,
CTDB_CTRL_FLAG_NOREPLY , /* flags */
indata ,
NULL , /* outdata */
NULL , /* errmsg */
& cstatus ) ;
talloc_free ( indata . dptr ) ;
if ( ! NT_STATUS_IS_OK ( status ) | | cstatus ! = 0 ) {
DEBUG ( 1 , ( __location__ " Error sending local control "
" SCHEDULE_FOR_DELETION: %s, cstatus = %d \n " ,
nt_errstr ( status ) , cstatus ) ) ;
if ( NT_STATUS_IS_OK ( status ) ) {
status = NT_STATUS_UNSUCCESSFUL ;
}
}
return status ;
}
# endif
2007-06-10 17:02:09 +00:00
static NTSTATUS db_ctdb_delete ( struct db_record * rec )
{
TDB_DATA data ;
2010-12-22 14:16:07 +01:00
NTSTATUS status ;
2007-06-10 17:02:09 +00:00
/*
* We have to store the header with empty data . TODO : Fix the
* tdb - level cleanup
*/
2008-08-05 18:46:02 +02:00
ZERO_STRUCT ( data ) ;
2007-06-10 17:02:09 +00:00
2010-12-22 14:16:07 +01:00
status = db_ctdb_store ( rec , data , 0 ) ;
if ( ! NT_STATUS_IS_OK ( status ) ) {
return status ;
}
2007-06-10 17:02:09 +00:00
2011-07-12 17:32:55 +02:00
# ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
2010-12-22 14:16:07 +01:00
status = db_ctdb_send_schedule_for_deletion ( rec ) ;
# endif
return status ;
2007-06-10 17:02:09 +00:00
}
static int db_ctdb_record_destr ( struct db_record * data )
{
struct db_ctdb_rec * crec = talloc_get_type_abort (
data - > private_data , struct db_ctdb_rec ) ;
2010-03-05 16:46:36 +01:00
int threshold ;
2007-06-10 17:02:09 +00:00
2007-08-29 11:46:44 +00:00
DEBUG ( 10 , ( DEBUGLEVEL > 10
2007-11-07 19:06:30 +01:00
? " Unlocking db %u key %s \n "
: " Unlocking db %u key %.20s \n " ,
2007-08-29 11:46:44 +00:00
( int ) crec - > ctdb_ctx - > db_id ,
2008-10-18 16:16:57 +02:00
hex_encode_talloc ( data , ( unsigned char * ) data - > key . dptr ,
2007-06-10 17:02:09 +00:00
data - > key . dsize ) ) ) ;
2011-06-20 18:40:31 +09:30
tdb_chainunlock ( crec - > ctdb_ctx - > wtdb - > tdb , data - > key ) ;
2007-06-10 17:02:09 +00:00
2010-03-05 16:46:36 +01:00
threshold = lp_ctdb_locktime_warn_threshold ( ) ;
if ( threshold ! = 0 ) {
double timediff = timeval_elapsed ( & crec - > lock_time ) ;
if ( ( timediff * 1000 ) > threshold ) {
2012-08-30 13:16:24 -07:00
const char * key ;
key = hex_encode_talloc ( data ,
( unsigned char * ) data - > key . dptr ,
data - > key . dsize ) ;
DEBUG ( 0 , ( " Held tdb lock on db %s, key %s %f seconds \n " ,
tdb_name ( crec - > ctdb_ctx - > wtdb - > tdb ) , key ,
timediff ) ) ;
2010-03-05 16:46:36 +01:00
}
}
2007-06-10 17:02:09 +00:00
return 0 ;
}
2012-06-29 10:51:37 +02:00
/**
* Check whether we have a valid local copy of the given record ,
* either for reading or for writing .
*/
2012-06-29 10:47:56 +02:00
static bool db_ctdb_can_use_local_copy ( TDB_DATA ctdb_data , bool read_only )
2012-02-03 10:53:27 +11:00
{
struct ctdb_ltdb_header * hdr ;
if ( ctdb_data . dptr = = NULL )
return false ;
if ( ctdb_data . dsize < sizeof ( struct ctdb_ltdb_header ) )
return false ;
hdr = ( struct ctdb_ltdb_header * ) ctdb_data . dptr ;
2012-06-29 10:39:07 +02:00
# ifdef HAVE_CTDB_WANT_READONLY_DECL
2012-02-03 10:53:27 +11:00
if ( hdr - > dmaster ! = get_my_vnn ( ) ) {
/* If we're not dmaster, it must be r/o copy. */
return read_only & & ( hdr - > flags & CTDB_REC_RO_HAVE_READONLY ) ;
}
2012-06-29 10:55:32 +02:00
/*
* If we want write access , no one may have r / o copies .
*/
2012-02-03 10:53:27 +11:00
return read_only | | ! ( hdr - > flags & CTDB_REC_RO_HAVE_DELEGATIONS ) ;
2012-02-21 17:30:53 +11:00
# else
2012-06-29 10:39:07 +02:00
return ( hdr - > dmaster = = get_my_vnn ( ) ) ;
2012-02-21 17:30:53 +11:00
# endif
2012-02-03 10:53:27 +11:00
}
2008-08-05 11:32:20 +02:00
static struct db_record * fetch_locked_internal ( struct db_ctdb_ctx * ctx ,
TALLOC_CTX * mem_ctx ,
2012-03-27 14:31:04 +02:00
TDB_DATA key ,
bool tryonly )
2007-06-10 17:02:09 +00:00
{
struct db_record * result ;
struct db_ctdb_rec * crec ;
NTSTATUS status ;
TDB_DATA ctdb_data ;
2008-01-16 12:09:48 +03:00
int migrate_attempts = 0 ;
2012-03-27 14:31:04 +02:00
int lockret ;
2007-06-10 17:02:09 +00:00
if ( ! ( result = talloc ( mem_ctx , struct db_record ) ) ) {
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
return NULL ;
}
2011-06-07 11:44:43 +10:00
if ( ! ( crec = talloc_zero ( result , struct db_ctdb_rec ) ) ) {
2007-06-10 17:02:09 +00:00
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
2012-07-31 09:30:29 +02:00
result - > db = ctx - > db ;
2007-06-10 17:02:09 +00:00
result - > private_data = ( void * ) crec ;
crec - > ctdb_ctx = ctx ;
result - > key . dsize = key . dsize ;
2012-05-11 21:53:13 +02:00
result - > key . dptr = ( uint8_t * ) talloc_memdup ( result , key . dptr ,
key . dsize ) ;
2007-06-10 17:02:09 +00:00
if ( result - > key . dptr = = NULL ) {
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
/*
* Do a blocking lock on the record
*/
again :
2007-08-29 11:46:44 +00:00
if ( DEBUGLEVEL > = 10 ) {
2008-10-18 16:16:57 +02:00
char * keystr = hex_encode_talloc ( result , key . dptr , key . dsize ) ;
2007-08-29 11:46:44 +00:00
DEBUG ( 10 , ( DEBUGLEVEL > 10
? " Locking db %u key %s \n "
2007-11-07 19:06:30 +01:00
: " Locking db %u key %.20s \n " ,
2007-08-29 11:46:44 +00:00
( int ) crec - > ctdb_ctx - > db_id , keystr ) ) ;
TALLOC_FREE ( keystr ) ;
}
2008-08-24 12:46:26 +02:00
2012-03-27 14:31:04 +02:00
lockret = tryonly
? tdb_chainlock_nonblock ( ctx - > wtdb - > tdb , key )
: tdb_chainlock ( ctx - > wtdb - > tdb , key ) ;
if ( lockret ! = 0 ) {
2007-06-10 17:02:09 +00:00
DEBUG ( 3 , ( " tdb_chainlock failed \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
2008-08-07 19:14:16 +10:00
result - > store = db_ctdb_store ;
result - > delete_rec = db_ctdb_delete ;
2007-06-10 17:02:09 +00:00
talloc_set_destructor ( result , db_ctdb_record_destr ) ;
2011-06-20 18:40:31 +09:30
ctdb_data = tdb_fetch_compat ( ctx - > wtdb - > tdb , key ) ;
2007-06-10 17:02:09 +00:00
/*
* See if we have a valid record and we are the dmaster . If so , we can
* take the shortcut and just return it .
*/
2012-06-29 10:47:56 +02:00
if ( ! db_ctdb_can_use_local_copy ( ctdb_data , false ) ) {
2007-06-10 17:02:09 +00:00
SAFE_FREE ( ctdb_data . dptr ) ;
tdb_chainunlock ( ctx - > wtdb - > tdb , key ) ;
talloc_set_destructor ( result , NULL ) ;
2012-03-27 14:31:04 +02:00
if ( tryonly & & ( migrate_attempts ! = 0 ) ) {
DEBUG ( 5 , ( " record migrated away again \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
2008-01-16 12:09:48 +03:00
migrate_attempts + = 1 ;
2012-02-03 10:53:27 +11:00
DEBUG ( 10 , ( " ctdb_data.dptr = %p, dmaster = %u (%u) %u \n " ,
2007-06-10 17:02:09 +00:00
ctdb_data . dptr , ctdb_data . dptr ?
( ( struct ctdb_ltdb_header * ) ctdb_data . dptr ) - > dmaster : - 1 ,
2012-02-03 10:53:27 +11:00
get_my_vnn ( ) ,
2012-03-30 15:15:29 +02:00
ctdb_data . dptr ?
( ( struct ctdb_ltdb_header * ) ctdb_data . dptr ) - > flags : 0 ) ) ;
2007-06-10 17:02:09 +00:00
2010-08-31 16:52:56 +02:00
status = ctdbd_migrate ( messaging_ctdbd_connection ( ) , ctx - > db_id ,
key ) ;
2007-06-10 17:02:09 +00:00
if ( ! NT_STATUS_IS_OK ( status ) ) {
DEBUG ( 5 , ( " ctdb_migrate failed: %s \n " ,
nt_errstr ( status ) ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
/* now its migrated, try again */
goto again ;
}
2008-01-16 12:09:48 +03:00
if ( migrate_attempts > 10 ) {
2012-04-03 13:20:39 +02:00
DEBUG ( 0 , ( " db_ctdb_fetch_locked for %s key %s needed %d "
" attempts \n " , tdb_name ( ctx - > wtdb - > tdb ) ,
hex_encode_talloc ( talloc_tos ( ) ,
( unsigned char * ) key . dptr ,
key . dsize ) ,
2008-01-16 12:09:48 +03:00
migrate_attempts ) ) ;
}
2010-03-05 16:46:36 +01:00
GetTimeOfDay ( & crec - > lock_time ) ;
2007-06-10 17:02:09 +00:00
memcpy ( & crec - > header , ctdb_data . dptr , sizeof ( crec - > header ) ) ;
result - > value . dsize = ctdb_data . dsize - sizeof ( crec - > header ) ;
result - > value . dptr = NULL ;
if ( ( result - > value . dsize ! = 0 )
2012-05-11 21:53:13 +02:00
& & ! ( result - > value . dptr = ( uint8_t * ) talloc_memdup (
2007-06-10 17:02:09 +00:00
result , ctdb_data . dptr + sizeof ( crec - > header ) ,
result - > value . dsize ) ) ) {
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
TALLOC_FREE ( result ) ;
}
SAFE_FREE ( ctdb_data . dptr ) ;
return result ;
}
2008-08-05 11:32:20 +02:00
static struct db_record * db_ctdb_fetch_locked ( struct db_context * db ,
TALLOC_CTX * mem_ctx ,
TDB_DATA key )
{
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
2008-08-07 19:14:16 +10:00
if ( ctx - > transaction ! = NULL ) {
return db_ctdb_fetch_locked_transaction ( ctx , mem_ctx , key ) ;
}
if ( db - > persistent ) {
return db_ctdb_fetch_locked_persistent ( ctx , mem_ctx , key ) ;
}
2012-03-27 14:31:04 +02:00
return fetch_locked_internal ( ctx , mem_ctx , key , false ) ;
}
static struct db_record * db_ctdb_try_fetch_locked ( struct db_context * db ,
TALLOC_CTX * mem_ctx ,
TDB_DATA key )
{
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
if ( ctx - > transaction ! = NULL ) {
return db_ctdb_fetch_locked_transaction ( ctx , mem_ctx , key ) ;
}
if ( db - > persistent ) {
return db_ctdb_fetch_locked_persistent ( ctx , mem_ctx , key ) ;
}
return fetch_locked_internal ( ctx , mem_ctx , key , true ) ;
2008-08-05 11:32:20 +02:00
}
2007-06-10 17:02:09 +00:00
/*
fetch ( unlocked , no migration ) operation on ctdb
*/
2011-11-11 00:49:11 +01:00
static NTSTATUS db_ctdb_fetch ( struct db_context * db , TALLOC_CTX * mem_ctx ,
TDB_DATA key , TDB_DATA * data )
2007-06-10 17:02:09 +00:00
{
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
NTSTATUS status ;
TDB_DATA ctdb_data ;
2008-08-07 16:20:05 +10:00
if ( ctx - > transaction ) {
return db_ctdb_transaction_fetch ( ctx , mem_ctx , key , data ) ;
}
2011-03-25 00:29:42 +01:00
if ( db - > persistent ) {
return db_ctdb_fetch_persistent ( ctx , mem_ctx , key , data ) ;
}
2007-06-10 17:02:09 +00:00
/* try a direct fetch */
2011-06-20 18:40:31 +09:30
ctdb_data = tdb_fetch_compat ( ctx - > wtdb - > tdb , key ) ;
2007-06-10 17:02:09 +00:00
/*
* See if we have a valid record and we are the dmaster . If so , we can
* take the shortcut and just return it .
2008-01-16 12:09:48 +03:00
* we bypass the dmaster check for persistent databases
2007-06-10 17:02:09 +00:00
*/
2012-06-29 10:47:56 +02:00
if ( db_ctdb_can_use_local_copy ( ctdb_data , true ) ) {
2012-06-29 10:52:33 +02:00
/*
* We have a valid local copy - avoid the ctdb protocol op
*/
2007-06-10 17:02:09 +00:00
data - > dsize = ctdb_data . dsize - sizeof ( struct ctdb_ltdb_header ) ;
2012-05-11 21:53:13 +02:00
data - > dptr = ( uint8_t * ) talloc_memdup (
2007-06-10 17:02:09 +00:00
mem_ctx , ctdb_data . dptr + sizeof ( struct ctdb_ltdb_header ) ,
data - > dsize ) ;
SAFE_FREE ( ctdb_data . dptr ) ;
if ( data - > dptr = = NULL ) {
2011-11-11 00:49:11 +01:00
return NT_STATUS_NO_MEMORY ;
2007-06-10 17:02:09 +00:00
}
2011-11-11 00:49:11 +01:00
return NT_STATUS_OK ;
2007-06-10 17:02:09 +00:00
}
SAFE_FREE ( ctdb_data . dptr ) ;
2012-02-03 11:00:54 +11:00
/*
* We weren ' t able to get it locally - ask ctdb to fetch it for us .
* If we already had * something * , it ' s probably worth making a local
* read - only copy .
*/
2010-08-31 16:52:56 +02:00
status = ctdbd_fetch ( messaging_ctdbd_connection ( ) , ctx - > db_id , key ,
2012-02-03 11:00:54 +11:00
mem_ctx , data ,
ctdb_data . dsize > = sizeof ( struct ctdb_ltdb_header ) ) ;
2007-06-10 17:02:09 +00:00
if ( ! NT_STATUS_IS_OK ( status ) ) {
DEBUG ( 5 , ( " ctdbd_fetch failed: %s \n " , nt_errstr ( status ) ) ) ;
}
2011-11-11 00:49:11 +01:00
return status ;
2007-06-10 17:02:09 +00:00
}
2011-12-08 15:56:35 +01:00
static NTSTATUS db_ctdb_parse_record ( struct db_context * db , TDB_DATA key ,
void ( * parser ) ( TDB_DATA key ,
TDB_DATA data ,
void * private_data ) ,
void * private_data )
{
NTSTATUS status ;
TDB_DATA data ;
status = db_ctdb_fetch ( db , talloc_tos ( ) , key , & data ) ;
if ( ! NT_STATUS_IS_OK ( status ) ) {
return status ;
}
parser ( key , data , private_data ) ;
TALLOC_FREE ( data . dptr ) ;
return NT_STATUS_OK ;
}
2007-06-10 17:02:09 +00:00
struct traverse_state {
struct db_context * db ;
int ( * fn ) ( struct db_record * rec , void * private_data ) ;
void * private_data ;
2012-06-12 10:10:36 +02:00
int count ;
2007-06-10 17:02:09 +00:00
} ;
static void traverse_callback ( TDB_DATA key , TDB_DATA data , void * private_data )
{
struct traverse_state * state = ( struct traverse_state * ) private_data ;
struct db_record * rec ;
TALLOC_CTX * tmp_ctx = talloc_new ( state - > db ) ;
/* we have to give them a locked record to prevent races */
rec = db_ctdb_fetch_locked ( state - > db , tmp_ctx , key ) ;
if ( rec & & rec - > value . dsize > 0 ) {
state - > fn ( rec , state - > private_data ) ;
}
talloc_free ( tmp_ctx ) ;
}
2008-01-16 12:09:48 +03:00
static int traverse_persistent_callback ( TDB_CONTEXT * tdb , TDB_DATA kbuf , TDB_DATA dbuf ,
void * private_data )
{
struct traverse_state * state = ( struct traverse_state * ) private_data ;
struct db_record * rec ;
TALLOC_CTX * tmp_ctx = talloc_new ( state - > db ) ;
int ret = 0 ;
2011-09-20 04:33:31 +02:00
/*
* Skip the __db_sequence_number__ key :
* This is used for persistent transactions internally .
*/
if ( kbuf . dsize = = strlen ( CTDB_DB_SEQNUM_KEY ) + 1 & &
2011-10-12 12:03:42 +02:00
strcmp ( ( const char * ) kbuf . dptr , CTDB_DB_SEQNUM_KEY ) = = 0 )
2011-09-20 04:33:31 +02:00
{
goto done ;
}
2008-01-16 12:09:48 +03:00
/* we have to give them a locked record to prevent races */
rec = db_ctdb_fetch_locked ( state - > db , tmp_ctx , kbuf ) ;
if ( rec & & rec - > value . dsize > 0 ) {
ret = state - > fn ( rec , state - > private_data ) ;
}
2011-09-20 04:33:31 +02:00
done :
2008-01-16 12:09:48 +03:00
talloc_free ( tmp_ctx ) ;
return ret ;
}
2011-09-22 13:58:24 +02:00
/* wrapper to use traverse_persistent_callback with dbwrap */
static int traverse_persistent_callback_dbwrap ( struct db_record * rec , void * data )
{
return traverse_persistent_callback ( NULL , rec - > key , rec - > value , data ) ;
}
2007-06-10 17:02:09 +00:00
static int db_ctdb_traverse ( struct db_context * db ,
int ( * fn ) ( struct db_record * rec ,
void * private_data ) ,
void * private_data )
{
2012-06-12 10:10:36 +02:00
NTSTATUS status ;
2007-06-10 17:02:09 +00:00
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
struct traverse_state state ;
state . db = db ;
state . fn = fn ;
state . private_data = private_data ;
2012-06-12 10:10:36 +02:00
state . count = 0 ;
2007-06-10 17:02:09 +00:00
2008-01-16 12:09:48 +03:00
if ( db - > persistent ) {
2011-09-22 13:58:24 +02:00
struct tdb_context * ltdb = ctx - > wtdb - > tdb ;
int ret ;
2008-01-16 12:09:48 +03:00
/* for persistent databases we don't need to do a ctdb traverse,
we can do a faster local traverse */
2011-09-22 13:58:24 +02:00
ret = tdb_traverse ( ltdb , traverse_persistent_callback , & state ) ;
if ( ret < 0 ) {
return ret ;
}
if ( ctx - > transaction & & ctx - > transaction - > m_write ) {
2011-10-14 16:11:06 +02:00
/*
* we now have to handle keys not yet
* present at transaction start
*/
2011-09-22 13:58:24 +02:00
struct db_context * newkeys = db_open_rbt ( talloc_tos ( ) ) ;
struct ctdb_marshall_buffer * mbuf = ctx - > transaction - > m_write ;
struct ctdb_rec_data * rec = NULL ;
int i ;
2011-10-14 16:11:06 +02:00
int count = 0 ;
if ( newkeys = = NULL ) {
return - 1 ;
}
2011-09-22 13:58:24 +02:00
for ( i = 0 ; i < mbuf - > count ; i + + ) {
TDB_DATA key ;
rec = db_ctdb_marshall_loop_next ( mbuf , rec ,
NULL , NULL ,
& key , NULL ) ;
SMB_ASSERT ( rec ! = NULL ) ;
if ( ! tdb_exists ( ltdb , key ) ) {
dbwrap_store ( newkeys , key , tdb_null , 0 ) ;
}
}
status = dbwrap_traverse ( newkeys ,
traverse_persistent_callback_dbwrap ,
2011-10-12 11:48:55 +02:00
& state ,
2011-10-14 16:11:06 +02:00
& count ) ;
2011-09-22 13:58:24 +02:00
talloc_free ( newkeys ) ;
2011-10-14 16:11:06 +02:00
if ( ! NT_STATUS_IS_OK ( status ) ) {
return - 1 ;
}
ret + = count ;
2011-09-22 13:58:24 +02:00
}
return ret ;
2008-01-16 12:09:48 +03:00
}
2012-06-12 10:10:36 +02:00
status = ctdbd_traverse ( ctx - > db_id , traverse_callback , & state ) ;
if ( ! NT_STATUS_IS_OK ( status ) ) {
return - 1 ;
}
return state . count ;
2007-06-10 17:02:09 +00:00
}
static NTSTATUS db_ctdb_store_deny ( struct db_record * rec , TDB_DATA data , int flag )
{
return NT_STATUS_MEDIA_WRITE_PROTECTED ;
}
static NTSTATUS db_ctdb_delete_deny ( struct db_record * rec )
{
return NT_STATUS_MEDIA_WRITE_PROTECTED ;
}
static void traverse_read_callback ( TDB_DATA key , TDB_DATA data , void * private_data )
{
struct traverse_state * state = ( struct traverse_state * ) private_data ;
struct db_record rec ;
rec . key = key ;
rec . value = data ;
rec . store = db_ctdb_store_deny ;
rec . delete_rec = db_ctdb_delete_deny ;
rec . private_data = state - > db ;
state - > fn ( & rec , state - > private_data ) ;
2012-06-12 10:10:36 +02:00
state - > count + + ;
2007-06-10 17:02:09 +00:00
}
2008-01-16 12:09:48 +03:00
static int traverse_persistent_callback_read ( TDB_CONTEXT * tdb , TDB_DATA kbuf , TDB_DATA dbuf ,
void * private_data )
{
struct traverse_state * state = ( struct traverse_state * ) private_data ;
struct db_record rec ;
2011-09-20 04:33:31 +02:00
/*
* Skip the __db_sequence_number__ key :
* This is used for persistent transactions internally .
*/
if ( kbuf . dsize = = strlen ( CTDB_DB_SEQNUM_KEY ) + 1 & &
2011-10-12 12:04:50 +02:00
strcmp ( ( const char * ) kbuf . dptr , CTDB_DB_SEQNUM_KEY ) = = 0 )
2011-09-20 04:33:31 +02:00
{
return 0 ;
}
2008-01-16 12:09:48 +03:00
rec . key = kbuf ;
rec . value = dbuf ;
rec . store = db_ctdb_store_deny ;
rec . delete_rec = db_ctdb_delete_deny ;
rec . private_data = state - > db ;
if ( rec . value . dsize < = sizeof ( struct ctdb_ltdb_header ) ) {
/* a deleted record */
return 0 ;
}
rec . value . dsize - = sizeof ( struct ctdb_ltdb_header ) ;
rec . value . dptr + = sizeof ( struct ctdb_ltdb_header ) ;
2012-06-12 10:10:36 +02:00
state - > count + + ;
2008-01-16 12:09:48 +03:00
return state - > fn ( & rec , state - > private_data ) ;
}
2007-06-10 17:02:09 +00:00
static int db_ctdb_traverse_read ( struct db_context * db ,
int ( * fn ) ( struct db_record * rec ,
void * private_data ) ,
void * private_data )
{
2012-06-12 10:10:36 +02:00
NTSTATUS status ;
2007-06-10 17:02:09 +00:00
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
struct traverse_state state ;
state . db = db ;
state . fn = fn ;
state . private_data = private_data ;
2012-06-12 10:10:36 +02:00
state . count = 0 ;
2007-06-10 17:02:09 +00:00
2008-01-16 12:09:48 +03:00
if ( db - > persistent ) {
/* for persistent databases we don't need to do a ctdb traverse,
we can do a faster local traverse */
return tdb_traverse_read ( ctx - > wtdb - > tdb , traverse_persistent_callback_read , & state ) ;
}
2012-06-12 10:10:36 +02:00
status = ctdbd_traverse ( ctx - > db_id , traverse_read_callback , & state ) ;
if ( ! NT_STATUS_IS_OK ( status ) ) {
return - 1 ;
}
return state . count ;
2007-06-10 17:02:09 +00:00
}
static int db_ctdb_get_seqnum ( struct db_context * db )
{
struct db_ctdb_ctx * ctx = talloc_get_type_abort ( db - > private_data ,
struct db_ctdb_ctx ) ;
return tdb_get_seqnum ( ctx - > wtdb - > tdb ) ;
}
2012-02-15 14:57:01 +01:00
static void db_ctdb_id ( struct db_context * db , const uint8_t * * id ,
size_t * idlen )
{
struct db_ctdb_ctx * ctx = talloc_get_type_abort (
db - > private_data , struct db_ctdb_ctx ) ;
* id = ( uint8_t * ) & ctx - > db_id ;
* idlen = sizeof ( ctx - > db_id ) ;
}
2007-06-10 17:02:09 +00:00
struct db_context * db_open_ctdb ( TALLOC_CTX * mem_ctx ,
const char * name ,
int hash_size , int tdb_flags ,
2012-01-16 12:50:44 +01:00
int open_flags , mode_t mode ,
enum dbwrap_lock_order lock_order )
2007-06-10 17:02:09 +00:00
{
struct db_context * result ;
struct db_ctdb_ctx * db_ctdb ;
char * db_path ;
2009-12-03 17:29:54 +01:00
struct ctdbd_connection * conn ;
2011-10-13 16:50:57 +02:00
struct loadparm_context * lp_ctx ;
2012-01-16 13:42:52 +01:00
struct ctdb_db_priority prio ;
NTSTATUS status ;
int cstatus ;
2007-06-10 17:02:09 +00:00
if ( ! lp_clustering ( ) ) {
DEBUG ( 10 , ( " Clustering disabled -- no ctdb \n " ) ) ;
return NULL ;
}
2011-06-07 11:44:43 +10:00
if ( ! ( result = talloc_zero ( mem_ctx , struct db_context ) ) ) {
2007-06-10 17:02:09 +00:00
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
2011-06-07 11:38:41 +10:00
if ( ! ( db_ctdb = talloc ( result , struct db_ctdb_ctx ) ) ) {
2007-06-10 17:02:09 +00:00
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
2008-08-07 16:56:47 +10:00
db_ctdb - > transaction = NULL ;
2008-08-07 19:14:16 +10:00
db_ctdb - > db = result ;
2008-08-07 16:56:47 +10:00
2010-08-31 16:52:56 +02:00
conn = messaging_ctdbd_connection ( ) ;
2010-08-06 12:32:30 +02:00
if ( conn = = NULL ) {
DEBUG ( 1 , ( " Could not connect to ctdb \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
2009-12-03 17:29:54 +01:00
if ( ! NT_STATUS_IS_OK ( ctdbd_db_attach ( conn , name , & db_ctdb - > db_id , tdb_flags ) ) ) {
2008-01-16 12:09:48 +03:00
DEBUG ( 0 , ( " ctdbd_db_attach failed for %s \n " , name ) ) ;
2007-06-10 17:02:09 +00:00
TALLOC_FREE ( result ) ;
return NULL ;
}
2009-12-03 17:29:54 +01:00
db_path = ctdbd_dbpath ( conn , db_ctdb , db_ctdb - > db_id ) ;
2008-01-16 12:09:48 +03:00
result - > persistent = ( ( tdb_flags & TDB_CLEAR_IF_FIRST ) = = 0 ) ;
2012-03-15 11:10:35 +01:00
result - > lock_order = lock_order ;
2007-06-10 17:02:09 +00:00
/* only pass through specific flags */
tdb_flags & = TDB_SEQNUM ;
2008-08-06 16:35:43 +10:00
/* honor permissions if user has specified O_CREAT */
if ( open_flags & O_CREAT ) {
chmod ( db_path , mode ) ;
}
2012-01-16 13:42:52 +01:00
prio . db_id = db_ctdb - > db_id ;
prio . priority = lock_order ;
status = ctdbd_control_local (
conn , CTDB_CONTROL_SET_DB_PRIORITY , 0 , 0 ,
make_tdb_data ( ( uint8_t * ) & prio , sizeof ( prio ) ) ,
NULL , NULL , & cstatus ) ;
if ( ! NT_STATUS_IS_OK ( status ) | | ( cstatus ! = 0 ) ) {
DEBUG ( 1 , ( " CTDB_CONTROL_SET_DB_PRIORITY failed: %s, %d \n " ,
nt_errstr ( status ) , cstatus ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
2012-06-27 23:24:39 +10:00
lp_ctx = loadparm_init_s3 ( db_path , loadparm_s3_helpers ( ) ) ;
2011-10-13 16:50:57 +02:00
db_ctdb - > wtdb = tdb_wrap_open ( db_ctdb , db_path , hash_size , tdb_flags ,
O_RDWR , 0 , lp_ctx ) ;
talloc_unlink ( db_path , lp_ctx ) ;
2007-06-10 17:02:09 +00:00
if ( db_ctdb - > wtdb = = NULL ) {
DEBUG ( 0 , ( " Could not open tdb %s: %s \n " , db_path , strerror ( errno ) ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
talloc_free ( db_path ) ;
2009-12-03 17:29:54 +01:00
if ( result - > persistent ) {
db_ctdb - > lock_ctx = g_lock_ctx_init ( db_ctdb ,
ctdb_conn_msg_ctx ( conn ) ) ;
if ( db_ctdb - > lock_ctx = = NULL ) {
DEBUG ( 0 , ( " g_lock_ctx_init failed \n " ) ) ;
TALLOC_FREE ( result ) ;
return NULL ;
}
}
2007-06-10 17:02:09 +00:00
result - > private_data = ( void * ) db_ctdb ;
result - > fetch_locked = db_ctdb_fetch_locked ;
2012-03-27 14:31:04 +02:00
result - > try_fetch_locked = db_ctdb_try_fetch_locked ;
2011-12-08 15:56:35 +01:00
result - > parse_record = db_ctdb_parse_record ;
2007-06-10 17:02:09 +00:00
result - > traverse = db_ctdb_traverse ;
result - > traverse_read = db_ctdb_traverse_read ;
result - > get_seqnum = db_ctdb_get_seqnum ;
2008-08-07 16:20:05 +10:00
result - > transaction_start = db_ctdb_transaction_start ;
result - > transaction_commit = db_ctdb_transaction_commit ;
result - > transaction_cancel = db_ctdb_transaction_cancel ;
2012-02-15 14:57:01 +01:00
result - > id = db_ctdb_id ;
2012-02-15 15:08:29 +01:00
result - > stored_callback = NULL ;
2007-06-10 17:02:09 +00:00
DEBUG ( 3 , ( " db_open_ctdb: opened database '%s' with dbid 0x%x \n " ,
name , db_ctdb - > db_id ) ) ;
return result ;
}
2011-10-25 16:32:12 +02:00
# else /* CLUSTER_SUPPORT */
struct db_context * db_open_ctdb ( TALLOC_CTX * mem_ctx ,
const char * name ,
int hash_size , int tdb_flags ,
2012-01-16 12:50:44 +01:00
int open_flags , mode_t mode ,
enum dbwrap_lock_order lock_order )
2011-10-25 16:32:12 +02:00
{
DEBUG ( 3 , ( " db_open_ctdb: no cluster support! \n " ) ) ;
return NULL ;
}
2007-06-10 17:02:09 +00:00
# endif