swift: Gluster object storage plugin for Openstack Swift.

Change-Id: I5e07339064b1f3bb2aa5b04740ed870e114db4f9
BUG: 811430
Signed-off-by: Mohammed Junaid <junaid@redhat.com>
Reviewed-on: http://review.gluster.com/3118
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vijay@gluster.com>
This commit is contained in:
Mohammed Junaid 2012-04-11 08:55:46 +05:30 committed by Vijay Bellur
parent 3651c7425e
commit 530a44a7c9
21 changed files with 2593 additions and 0 deletions

22
swift/1.4.8/README Normal file
View File

@ -0,0 +1,22 @@
Gluster Unified File and Object Storage allows files and directories created
via gluster-native/nfs mount to be accessed as containers and objects. It is
a plugin for OpenStack Swift project.
Install
* Clone the swift repo from git://github.com/openstack/swift.git
* Apply the swift.diff present in glusterfs.git/swift/1.4.8 to the swift repo.
* Create a directory named "plugins" under swift.git/swift directory.
* Copy the contents of glusterfs.git/swift/1.4.8/plugins/ under swift.git/swift/
except the conf directory.
* Copy the contents of glusterfs.git/swift/1.4.5/plugins/conf under /etc/swift/.
* Run python setup.py install
Once this is done, you can access the GlusterFS volumes as Swift accounts.
Add the Volume names with the user-name and its corresponding password to the
/etc/swift/proxy-server.conf (follow the syntax used in the sample conf file).
Command to start the servers
swift-init main start
Command to stop the servers
swift-init main stop

View File

@ -0,0 +1,484 @@
# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from swift.plugins.utils import clean_metadata, dir_empty, rmdirs, mkdirs, \
validate_account, validate_container, check_valid_account, is_marker, \
get_container_details, get_account_details, create_container_metadata, \
create_account_metadata, DEFAULT_GID, DEFAULT_UID, get_account_details, \
validate_object, create_object_metadata, read_metadata, write_metadata
from swift.common.constraints import CONTAINER_LISTING_LIMIT, \
check_mount
from swift.plugins.utils import X_CONTENT_TYPE, X_CONTENT_LENGTH, X_TIMESTAMP,\
X_PUT_TIMESTAMP, X_TYPE, X_ETAG, X_OBJECTS_COUNT, X_BYTES_USED, \
X_CONTAINER_COUNT, CONTAINER
from swift import plugins
def strip_obj_storage_path(path, string='/mnt/gluster-object'):
"""
strip /mnt/gluster-object
"""
return path.replace(string, '').strip('/')
DATADIR = 'containers'
class DiskCommon(object):
def is_deleted(self):
return not os.path.exists(self.datadir)
def filter_prefix(self, objects, prefix):
"""
Accept sorted list.
"""
found = 0
filtered_objs = []
for object_name in objects:
if object_name.startswith(prefix):
filtered_objs.append(object_name)
found = 1
else:
if found:
break
return filtered_objs
def filter_delimiter(self, objects, delimiter, prefix):
"""
Accept sorted list.
Objects should start with prefix.
"""
filtered_objs=[]
for object_name in objects:
tmp_obj = object_name.replace(prefix, '', 1)
sufix = tmp_obj.split(delimiter, 1)
new_obj = prefix + sufix[0]
if new_obj and new_obj not in filtered_objs:
filtered_objs.append(new_obj)
return filtered_objs
def filter_marker(self, objects, marker):
"""
TODO: We can traverse in reverse order to optimize.
Accept sorted list.
"""
filtered_objs=[]
found = 0
if objects[-1] < marker:
return filtered_objs
for object_name in objects:
if object_name > marker:
filtered_objs.append(object_name)
return filtered_objs
def filter_end_marker(self, objects, end_marker):
"""
Accept sorted list.
"""
filtered_objs=[]
for object_name in objects:
if object_name < end_marker:
filtered_objs.append(object_name)
else:
break
return filtered_objs
def filter_limit(self, objects, limit):
filtered_objs=[]
for i in range(0, limit):
filtered_objs.append(objects[i])
return filtered_objs
def update_account(self, metadata):
acc_path = self.datadir
write_metadata(acc_path, metadata)
self.metadata = metadata
class DiskDir(DiskCommon):
"""
Manage object files on disk.
:param path: path to devices on the node
:param device: device name
:param partition: partition on the device the object lives in
:param account: account name for the object
:param container: container name for the object
:param obj: object name for the object
:param keep_data_fp: if True, don't close the fp, otherwise close it
:param disk_chunk_Size: size of chunks on file reads
"""
def __init__(self, path, device, partition, account, container, logger,
uid=DEFAULT_UID, gid=DEFAULT_GID, fs_object=None):
self.root = path
device = account
if container:
self.name = container
else:
self.name = None
if self.name:
self.datadir = os.path.join(path, account, self.name)
else:
self.datadir = os.path.join(path, device)
self.account = account
self.device_path = os.path.join(path, device)
if not check_mount(path, device):
check_valid_account(account, fs_object)
self.logger = logger
self.metadata = {}
self.uid = int(uid)
self.gid = int(gid)
# Create a dummy db_file in /etc/swift
self.db_file = '/etc/swift/db_file.db'
if not os.path.exists(self.db_file):
file(self.db_file, 'w+')
self.dir_exists = os.path.exists(self.datadir)
if self.dir_exists:
try:
self.metadata = read_metadata(self.datadir)
except EOFError:
create_container_metadata(self.datadir)
else:
return
if container:
if not self.metadata:
create_container_metadata(self.datadir)
self.metadata = read_metadata(self.datadir)
else:
if not validate_container(self.metadata):
create_container_metadata(self.datadir)
self.metadata = read_metadata(self.datadir)
else:
if not self.metadata:
create_account_metadata(self.datadir)
self.metadata = read_metadata(self.datadir)
else:
if not validate_account(self.metadata):
create_account_metadata(self.datadir)
self.metadata = read_metadata(self.datadir)
def empty(self):
return dir_empty(self.datadir)
def delete(self):
if self.empty():
#For delete account.
if os.path.ismount(self.datadir):
clean_metadata(self.datadir)
else:
rmdirs(self.datadir)
self.dir_exists = False
def put_metadata(self, metadata):
"""
Write metadata to directory/container.
"""
write_metadata(self.datadir, metadata)
self.metadata = metadata
def put(self, metadata):
"""
Create and write metatdata to directory/container.
:param metadata: Metadata to write.
"""
if not self.dir_exists:
mkdirs(self.datadir)
os.chown(self.datadir, self.uid, self.gid)
write_metadata(self.datadir, metadata)
self.metadata = metadata
self.dir_exists = True
def put_obj(self, content_length, timestamp):
self.metadata[X_OBJECTS_COUNT] = int(self.metadata[X_OBJECTS_COUNT]) + 1
self.metadata[X_PUT_TIMESTAMP] = timestamp
self.metadata[X_BYTES_USED] = int(self.metadata[X_BYTES_USED]) + int(content_length)
#TODO: define update_metadata instad of writing whole metadata again.
self.put_metadata(self.metadata)
def delete_obj(self, content_length):
self.metadata[X_OBJECTS_COUNT] = int(self.metadata[X_OBJECTS_COUNT]) - 1
self.metadata[X_BYTES_USED] = int(self.metadata[X_BYTES_USED]) - int(content_length)
self.put_metadata(self.metadata)
def put_container(self, timestamp, object_count, bytes_used):
"""
For account server.
"""
self.metadata[X_OBJECTS_COUNT] = 0
self.metadata[X_BYTES_USED] = 0
self.metadata[X_CONTAINER_COUNT] = int(self.metadata[X_CONTAINER_COUNT]) + 1
self.metadata[X_PUT_TIMESTAMP] = timestamp
self.put_metadata(self.metadata)
def delete_container(self, object_count, bytes_used):
"""
For account server.
"""
self.metadata[X_OBJECTS_COUNT] = 0
self.metadata[X_BYTES_USED] = 0
self.metadata[X_CONTAINER_COUNT] = int(self.metadata[X_CONTAINER_COUNT]) - 1
self.put_metadata(self.metadata)
def unlink(self):
"""
Remove directory/container if empty.
"""
if dir_empty(self.datadir):
rmdirs(self.datadir)
def list_objects_iter(self, limit, marker, end_marker,
prefix, delimiter, path):
"""
Returns tuple of name, created_at, size, content_type, etag.
"""
if path:
prefix = path = path.rstrip('/') + '/'
delimiter = '/'
if delimiter and not prefix:
prefix = ''
objects = []
object_count = 0
bytes_used = 0
container_list = []
objects, object_count, bytes_used = get_container_details(self.datadir)
if int(self.metadata[X_OBJECTS_COUNT]) != object_count or \
int(self.metadata[X_BYTES_USED]) != bytes_used:
self.metadata[X_OBJECTS_COUNT] = object_count
self.metadata[X_BYTES_USED] = bytes_used
self.update_container(self.metadata)
if objects:
objects.sort()
if objects and prefix:
objects = self.filter_prefix(objects, prefix)
if objects and delimiter:
objects = self.filter_delimiter(objects, delimiter, prefix)
if objects and marker:
objects = self.filter_marker(objects, marker)
if objects and end_marker:
objects = self.filter_end_marker(objects, end_marker)
if objects and limit:
if len(objects) > limit:
objects = self.filter_limit(objects, limit)
if objects:
for obj in objects:
list_item = []
list_item.append(obj)
metadata = read_metadata(self.datadir + '/' + obj)
if not metadata or not validate_object(metadata):
metadata = create_object_metadata(self.datadir + '/' + obj)
if metadata:
list_item.append(metadata[X_TIMESTAMP])
list_item.append(int(metadata[X_CONTENT_LENGTH]))
list_item.append(metadata[X_CONTENT_TYPE])
list_item.append(metadata[X_ETAG])
container_list.append(list_item)
return container_list
def update_container(self, metadata):
cont_path = self.datadir
write_metadata(cont_path, metadata)
self.metadata = metadata
def update_object_count(self):
objects = []
object_count = 0
bytes_used = 0
objects, object_count, bytes_used = get_container_details(self.datadir)
if int(self.metadata[X_OBJECTS_COUNT]) != object_count or \
int(self.metadata[X_BYTES_USED]) != bytes_used:
self.metadata[X_OBJECTS_COUNT] = object_count
self.metadata[X_BYTES_USED] = bytes_used
self.update_container(self.metadata)
def update_container_count(self):
containers = []
container_count = 0
containers, container_count = get_account_details(self.datadir)
if int(self.metadata[X_CONTAINER_COUNT]) != container_count:
self.metadata[X_CONTAINER_COUNT] = container_count
self.update_account(self.metadata)
def get_info(self, include_metadata=False):
"""
Get global data for the container.
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id,
x_container_sync_point1, and x_container_sync_point2.
If include_metadata is set, metadata is included as a key
pointing to a dict of tuples of the metadata
"""
# TODO: delete_timestamp, reported_put_timestamp
# reported_delete_timestamp, reported_object_count,
# reported_bytes_used, created_at
metadata = {}
if os.path.exists(self.datadir):
metadata = read_metadata(self.datadir)
data = {'account' : self.account, 'container' : self.name,
'object_count' : metadata.get(X_OBJECTS_COUNT, '0'),
'bytes_used' : metadata.get(X_BYTES_USED, '0'),
'hash': '', 'id' : '', 'created_at' : '1',
'put_timestamp' : metadata.get(X_PUT_TIMESTAMP, '0'),
'delete_timestamp' : '1',
'reported_put_timestamp' : '1', 'reported_delete_timestamp' : '1',
'reported_object_count' : '1', 'reported_bytes_used' : '1'}
if include_metadata:
data['metadata'] = metadata
return data
def put_object(self, name, timestamp, size, content_type,
etag, deleted=0):
# TODO: Implement the specifics of this func.
pass
def initialize(self, timestamp):
pass
def update_put_timestamp(self, timestamp):
"""
Create the container if it doesn't exist and update the timestamp
"""
if not os.path.exists(self.datadir):
self.put(self.metadata)
def delete_object(self, name, timestamp):
# TODO: Implement the delete object
pass
def delete_db(self, timestamp):
"""
Delete the container
"""
self.unlink()
def update_metadata(self, metadata):
self.metadata.update(metadata)
write_metadata(self.datadir, self.metadata)
class DiskAccount(DiskDir):
def __init__(self, root, account, fs_object = None):
self.root = root
self.account = account
self.datadir = os.path.join(self.root, self.account)
if not check_mount(root, account):
check_valid_account(account, fs_object)
self.metadata = read_metadata(self.datadir)
if not self.metadata or not validate_account(self.metadata):
self.metadata = create_account_metadata(self.datadir)
def list_containers_iter(self, limit, marker, end_marker,
prefix, delimiter):
"""
Return tuple of name, object_count, bytes_used, 0(is_subdir).
Used by account server.
"""
if delimiter and not prefix:
prefix = ''
containers = []
container_count = 0
account_list = []
containers, container_count = get_account_details(self.datadir)
if int(self.metadata[X_CONTAINER_COUNT]) != container_count:
self.metadata[X_CONTAINER_COUNT] = container_count
self.update_account(self.metadata)
if containers:
containers.sort()
if containers and prefix:
containers = self.filter_prefix(containers, prefix)
if containers and delimiter:
containers = self.filter_delimiter(containers, delimiter, prefix)
if containers and marker:
containers = self.filter_marker(containers, marker)
if containers and end_marker:
containers = self.filter_end_marker(containers, end_marker)
if containers and limit:
if len(containers) > limit:
containers = self.filter_limit(containers, limit)
if containers:
for cont in containers:
list_item = []
metadata = None
list_item.append(cont)
metadata = read_metadata(self.datadir + '/' + cont)
if not metadata or not validate_container(metadata):
metadata = create_container_metadata(self.datadir + '/' + cont)
if metadata:
list_item.append(metadata[X_OBJECTS_COUNT])
list_item.append(metadata[X_BYTES_USED])
list_item.append(0)
account_list.append(list_item)
return account_list
def get_info(self, include_metadata=False):
"""
Get global data for the account.
:returns: dict with keys: account, created_at, put_timestamp,
delete_timestamp, container_count, object_count,
bytes_used, hash, id
"""
metadata = {}
if (os.path.exists(self.datadir)):
metadata = read_metadata(self.datadir)
if not metadata:
metadata = create_account_metadata(self.datadir)
data = {'account' : self.account, 'created_at' : '1',
'put_timestamp' : '1', 'delete_timestamp' : '1',
'container_count' : metadata.get(X_CONTAINER_COUNT, 0),
'object_count' : metadata.get(X_OBJECTS_COUNT, 0),
'bytes_used' : metadata.get(X_BYTES_USED, 0),
'hash' : '', 'id' : ''}
if include_metadata:
data['metadata'] = metadata
return data

View File

@ -0,0 +1,296 @@
# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from eventlet import tpool
from swift.common.utils import normalize_timestamp, renamer
from swift.plugins.utils import mkdirs, rmdirs, validate_object, \
check_valid_account, create_object_metadata, do_open, \
do_close, do_unlink, do_chown, do_stat, do_listdir, read_metadata,\
write_metadata
from swift.common.constraints import check_mount
from swift.plugins.utils import X_CONTENT_TYPE, X_CONTENT_LENGTH, X_TIMESTAMP,\
X_PUT_TIMESTAMP, X_TYPE, X_ETAG, X_OBJECTS_COUNT, X_BYTES_USED, \
X_OBJECT_TYPE, FILE, DIR, MARKER_DIR, OBJECT, \
DIR_TYPE, FILE_TYPE, DEFAULT_UID, DEFAULT_GID
import logging
from swift.obj.server import DiskFile
DATADIR = 'objects'
ASYNCDIR = 'async_pending'
KEEP_CACHE_SIZE = (5 * 1024 * 1024)
# keep these lower-case
DISALLOWED_HEADERS = set('content-length content-type deleted etag'.split())
class Gluster_DiskFile(DiskFile):
"""
Manage object files on disk.
:param path: path to devices on the node/mount path for UFO.
:param device: device name/account_name for UFO.
:param partition: partition on the device the object lives in
:param account: account name for the object
:param container: container name for the object
:param obj: object name for the object
:param keep_data_fp: if True, don't close the fp, otherwise close it
:param disk_chunk_Size: size of chunks on file reads
"""
def __init__(self, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536,
uid=DEFAULT_UID, gid=DEFAULT_GID, fs_object = None):
self.disk_chunk_size = disk_chunk_size
device = account
#Don't support obj_name ending/begining with '/', like /a, a/, /a/b/ etc
obj = obj.strip('/')
if '/' in obj:
self.obj_path, self.obj = obj.rsplit('/', 1)
else:
self.obj_path = ''
self.obj = obj
if self.obj_path:
self.name = '/'.join((container, self.obj_path))
else:
self.name = container
#Absolute path for obj directory.
self.datadir = os.path.join(path, device, self.name)
self.device_path = os.path.join(path, device)
if not check_mount(path, device):
check_valid_account(account, fs_object)
self.container_path = os.path.join(path, device, container)
self.tmpdir = os.path.join(path, device, 'tmp')
self.logger = logger
self.metadata = {}
self.meta_file = None
self.data_file = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False
self.is_dir = False
self.is_valid = True
self.uid = int(uid)
self.gid = int(gid)
if not os.path.exists(self.datadir + '/' + self.obj):
return
self.data_file = os.path.join(self.datadir, self.obj)
self.metadata = read_metadata(self.datadir + '/' + self.obj)
if not self.metadata:
create_object_metadata(self.datadir + '/' + self.obj)
self.metadata = read_metadata(self.datadir + '/' + self.obj)
if not validate_object(self.metadata):
create_object_metadata(self.datadir + '/' + self.obj)
self.metadata = read_metadata(self.datadir + '/' +
self.obj)
self.filter_metadata()
if os.path.isdir(self.datadir + '/' + self.obj):
self.is_dir = True
else:
self.fp = do_open(self.data_file, 'rb')
if not keep_data_fp:
self.close(verify_file=False)
def close(self, verify_file=True):
"""
Close the file. Will handle quarantining file if necessary.
:param verify_file: Defaults to True. If false, will not check
file to see if it needs quarantining.
"""
#Marker directory
if self.is_dir:
return
if self.fp:
do_close(self.fp)
self.fp = None
def is_deleted(self):
"""
Check if the file is deleted.
:returns: True if the file doesn't exist or has been flagged as
deleted.
"""
return not self.data_file
def create_dir_object(self, dir_path):
#TODO: if object already exists???
if os.path.exists(dir_path) and not os.path.isdir(dir_path):
self.logger.error("Deleting file %s", dir_path)
do_unlink(dir_path)
#If dir aleady exist just override metadata.
mkdirs(dir_path)
do_chown(dir_path, self.uid, self.gid)
create_object_metadata(dir_path)
return True
def put_metadata(self, metadata):
obj_path = self.datadir + '/' + self.obj
write_metadata(obj_path, metadata)
self.metadata = metadata
def put(self, fd, tmppath, metadata, extension=''):
"""
Finalize writing the file on disk, and renames it from the temp file to
the real location. This should be called after the data has been
written to the temp file.
:params fd: file descriptor of the temp file
:param tmppath: path to the temporary file being used
:param metadata: dictionary of metadata to be written
:param extention: extension to be used when making the file
"""
#Marker dir.
if extension == '.ts':
return True
if extension == '.meta':
self.put_metadata(metadata)
return True
else:
extension = ''
if metadata[X_OBJECT_TYPE] == MARKER_DIR:
self.create_dir_object(os.path.join(self.datadir, self.obj))
self.put_metadata(metadata)
self.data_file = self.datadir + '/' + self.obj
return True
#Check if directory already exists.
if self.is_dir:
self.logger.error('Directory already exists %s/%s' % \
(self.datadir , self.obj))
return False
#metadata['name'] = self.name
timestamp = normalize_timestamp(metadata[X_TIMESTAMP])
write_metadata(tmppath, metadata)
if X_CONTENT_LENGTH in metadata:
self.drop_cache(fd, 0, int(metadata[X_CONTENT_LENGTH]))
tpool.execute(os.fsync, fd)
if self.obj_path:
dir_objs = self.obj_path.split('/')
tmp_path = ''
if len(dir_objs):
for dir_name in dir_objs:
if tmp_path:
tmp_path = tmp_path + '/' + dir_name
else:
tmp_path = dir_name
if not self.create_dir_object(os.path.join(self.container_path,
tmp_path)):
self.logger.error("Failed in subdir %s",\
os.path.join(self.container_path,tmp_path))
return False
renamer(tmppath, os.path.join(self.datadir,
self.obj + extension))
do_chown(os.path.join(self.datadir, self.obj + extension), \
self.uid, self.gid)
self.metadata = metadata
#self.logger.error("Meta %s", self.metadata)
self.data_file = self.datadir + '/' + self.obj + extension
return True
def unlinkold(self, timestamp):
"""
Remove any older versions of the object file. Any file that has an
older timestamp than timestamp will be deleted.
:param timestamp: timestamp to compare with each file
"""
if self.metadata and self.metadata['X-Timestamp'] != timestamp:
self.unlink()
def unlink(self):
"""
Remove the file.
"""
#Marker dir.
if self.is_dir:
rmdirs(os.path.join(self.datadir, self.obj))
if not os.path.isdir(os.path.join(self.datadir, self.obj)):
self.metadata = {}
self.data_file = None
else:
logging.error('Unable to delete dir %s' % os.path.join(self.datadir, self.obj))
return
for fname in do_listdir(self.datadir):
if fname == self.obj:
try:
do_unlink(os.path.join(self.datadir, fname))
except OSError, err:
if err.errno != errno.ENOENT:
raise
#Remove entire path for object.
#remove_dir_path(self.obj_path, self.container_path)
self.metadata = {}
self.data_file = None
def get_data_file_size(self):
"""
Returns the os.path.getsize for the file. Raises an exception if this
file does not match the Content-Length stored in the metadata. Or if
self.data_file does not exist.
:returns: file size as an int
:raises DiskFileError: on file size mismatch.
:raises DiskFileNotExist: on file not existing (including deleted)
"""
#Marker directory.
if self.is_dir:
return 0
try:
file_size = 0
if self.data_file:
file_size = os.path.getsize(self.data_file)
if X_CONTENT_LENGTH in self.metadata:
metadata_size = int(self.metadata[X_CONTENT_LENGTH])
if file_size != metadata_size:
self.metadata[X_CONTENT_LENGTH] = file_size
self.update_object(self.metadata)
return file_size
except OSError, err:
if err.errno != errno.ENOENT:
raise
raise DiskFileNotExist('Data File does not exist.')
def update_object(self, metadata):
obj_path = self.datadir + '/' + self.obj
write_metadata(obj_path, metadata)
self.metadata = metadata
def filter_metadata(self):
if X_TYPE in self.metadata:
self.metadata.pop(X_TYPE)
if X_OBJECT_TYPE in self.metadata:
self.metadata.pop(X_OBJECT_TYPE)

View File

@ -0,0 +1,104 @@
# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from ConfigParser import ConfigParser
from swift.common.utils import TRUE_VALUES
from hashlib import md5
class Glusterfs(object):
def __init__(self):
self.name = 'glusterfs'
self.fs_conf = ConfigParser()
self.fs_conf.read(os.path.join('/etc/swift', 'fs.conf'))
self.mount_path = self.fs_conf.get('DEFAULT', 'mount_path', '/mnt/gluster-object')
self.auth_account = self.fs_conf.get('DEFAULT', 'auth_account', 'auth')
self.mount_ip = self.fs_conf.get('DEFAULT', 'mount_ip', 'localhost')
self.remote_cluster = self.fs_conf.get('DEFAULT', 'remote_cluster', False) in TRUE_VALUES
def mount(self, account):
export = self.get_export_from_account_id(account)
mount_path = os.path.join(self.mount_path, account)
mnt_cmd = 'mount -t glusterfs %s:%s %s' % (self.mount_ip, export, \
mount_path)
if os.system(mnt_cmd) or \
not os.path.exists(os.path.join(mount_path)):
raise Exception('Mount failed %s: %s' % (self.name, mnt_cmd))
return False
return True
def unmount(self, mount_path):
umnt_cmd = 'umount %s 2>> /dev/null' % mount_path
if os.system(umnt_cmd):
logging.error('Unable to unmount %s %s' % (mount_path, self.name))
def get_export_list_local(self):
export_list = []
cmnd = 'gluster volume info'
if os.system(cmnd + ' >> /dev/null'):
raise Exception('Getting volume failed %s', self.name)
return export_list
fp = os.popen(cmnd)
while True:
item = fp.readline()
if not item:
break
item = item.strip('\n').strip(' ')
if item.lower().startswith('volume name:'):
export_list.append(item.split(':')[1].strip(' '))
return export_list
def get_export_list_remote(self):
export_list = []
cmnd = 'ssh %s gluster volume info' % self.mount_ip
print 'Remote'
if os.system(cmnd + ' >> /dev/null'):
raise Exception('Getting volume info failed %s, make sure to have \
passwordless ssh on %s', self.name, self.mount_ip)
return export_list
fp = os.popen(cmnd)
while True:
item = fp.readline()
if not item:
break
item = item.strip('\n').strip(' ')
if item.lower().startswith('volume name:'):
export_list.append(item.split(':')[1].strip(' '))
return export_list
def get_export_list(self):
if self.remote_cluster:
return self.get_export_list_remote()
else:
return self.get_export_list_local()
def get_export_from_account_id(self, account):
if not account:
print 'account is none, returning'
raise AttributeError
for export in self.get_export_list():
if account == 'AUTH_' + export:
return export
raise Exception('No export found %s %s' % (account, self.name))
return None

View File

@ -0,0 +1,16 @@
# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from Glusterfs import Glusterfs

View File

@ -0,0 +1,22 @@
[DEFAULT]
devices = /srv/1/node
mount_check = false
bind_port = 6012
user = root
log_facility = LOG_LOCAL2
[pipeline:main]
pipeline = gluster account-server
[app:account-server]
use = egg:swift#account
[filter:gluster]
use = egg:swift#gluster
[account-replicator]
vm_test_mode = yes
[account-auditor]
[account-reaper]

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,24 @@
[DEFAULT]
devices = /srv/1/node
mount_check = false
bind_port = 6011
user = root
log_facility = LOG_LOCAL2
[pipeline:main]
pipeline = gluster container-server
[app:container-server]
use = egg:swift#container
[filter:gluster]
use = egg:swift#gluster
[container-replicator]
vm_test_mode = yes
[container-updater]
[container-auditor]
[container-sync]

Binary file not shown.

Binary file not shown.

View File

View File

@ -0,0 +1,8 @@
[DEFAULT]
mount_path = /mnt/gluster-object
auth_account = auth
#ip of the fs server.
mount_ip = localhost
#fs server need not be local, remote server can also be used,
#set remote_cluster=yes for using remote server.
remote_cluster = no

View File

@ -0,0 +1,22 @@
[DEFAULT]
devices = /srv/1/node
mount_check = false
bind_port = 6010
user = root
log_facility = LOG_LOCAL2
[pipeline:main]
pipeline = gluster object-server
[app:object-server]
use = egg:swift#object
[filter:gluster]
use = egg:swift#gluster
[object-replicator]
vm_test_mode = yes
[object-updater]
[object-auditor]

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,25 @@
[DEFAULT]
bind_port = 8080
user = root
log_facility = LOG_LOCAL1
[pipeline:main]
pipeline = healthcheck cache tempauth proxy-server
[app:proxy-server]
use = egg:swift#proxy
allow_account_management = true
account_autocreate = true
[filter:tempauth]
use = egg:swift#tempauth
user_admin_admin = admin .admin .reseller_admin
user_test_tester = testing .admin
user_test2_tester2 = testing2 .admin
user_test_tester3 = testing3
[filter:healthcheck]
use = egg:swift#healthcheck
[filter:cache]
use = egg:swift#memcache

View File

@ -0,0 +1,7 @@
[DEFAULT]
Enable_plugin = yes
[swift-hash]
# random unique string that can never change (DO NOT LOSE)
swift_hash_path_suffix = gluster

View File

@ -0,0 +1,97 @@
# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
from swift.common.constraints import check_utf8, check_metadata
from webob.exc import HTTPBadRequest, HTTPLengthRequired, \
HTTPRequestEntityTooLarge
#: Max file size allowed for objects
MAX_FILE_SIZE = 0xffffffffffffffff
#: Max length of the name of a key for metadata
MAX_META_NAME_LENGTH = 128
#: Max length of the value of a key for metadata
MAX_META_VALUE_LENGTH = 256
#: Max number of metadata items
MAX_META_COUNT = 90
#: Max overall size of metadata
MAX_META_OVERALL_SIZE = 4096
#: Max object name length
MAX_OBJECT_NAME_LENGTH = 255
#: Max object list length of a get request for a container
CONTAINER_LISTING_LIMIT = 10000
#: Max container list length of a get request for an account
ACCOUNT_LISTING_LIMIT = 10000
MAX_ACCOUNT_NAME_LENGTH = 255
MAX_CONTAINER_NAME_LENGTH = 255
def validate_obj_name(obj):
if len(obj) > MAX_OBJECT_NAME_LENGTH:
logging.error('Object name too long %s' % obj)
return False
if obj == '.' or obj == '..':
logging.error('Object name cannot be . or .. %s' % obj)
return False
return True
def check_object_creation(req, object_name):
"""
Check to ensure that everything is alright about an object to be created.
:param req: HTTP request object
:param object_name: name of object to be created
:raises HTTPRequestEntityTooLarge: the object is too large
:raises HTTPLengthRequered: missing content-length header and not
a chunked request
:raises HTTPBadRequest: missing or bad content-type header, or
bad metadata
"""
if req.content_length and req.content_length > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(body='Your request is too large.',
request=req, content_type='text/plain')
if req.content_length is None and \
req.headers.get('transfer-encoding') != 'chunked':
return HTTPLengthRequired(request=req)
if 'X-Copy-From' in req.headers and req.content_length:
return HTTPBadRequest(body='Copy requests require a zero byte body',
request=req, content_type='text/plain')
for obj in object_name.split('/'):
if not validate_obj_name(obj):
return HTTPBadRequest(body='Invalid object name %s' %
(obj), request=req,
content_type='text/plain')
if 'Content-Type' not in req.headers:
return HTTPBadRequest(request=req, content_type='text/plain',
body='No content type')
if not check_utf8(req.headers['Content-Type']):
return HTTPBadRequest(request=req, body='Invalid Content-Type',
content_type='text/plain')
if 'x-object-manifest' in req.headers:
value = req.headers['x-object-manifest']
container = prefix = None
try:
container, prefix = value.split('/', 1)
except ValueError:
pass
if not container or not prefix or '?' in value or '&' in value or \
prefix[0] == '/':
return HTTPBadRequest(request=req,
body='X-Object-Manifest must in the format container/prefix')
return check_metadata(req, 'object')

View File

@ -0,0 +1,692 @@
# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import errno
from hashlib import md5
from swift.common.utils import normalize_timestamp
from xattr import setxattr, removexattr, getxattr, removexattr
import cPickle as pickle
X_CONTENT_TYPE = 'Content-Type'
X_CONTENT_LENGTH = 'Content-Length'
X_TIMESTAMP = 'X-Timestamp'
X_PUT_TIMESTAMP = 'X-PUT-Timestamp'
X_TYPE = 'X-Type'
X_ETAG = 'ETag'
X_OBJECTS_COUNT = 'X-Object-Count'
X_BYTES_USED = 'X-Bytes-Used'
X_CONTAINER_COUNT = 'X-Container-Count'
X_OBJECT_TYPE = 'X-Object-Type'
DIR_TYPE = 'application/directory'
ACCOUNT = 'Account'
MOUNT_PATH = '/mnt/gluster-object'
METADATA_KEY = 'user.swift.metadata'
CONTAINER = 'container'
DIR = 'dir'
MARKER_DIR = 'marker_dir'
FILE = 'file'
DIR_TYPE = 'application/directory'
FILE_TYPE = 'application/octet-stream'
OBJECT = 'Object'
OBJECT_TYPE = 'application/octet-stream'
DEFAULT_UID = -1
DEFAULT_GID = -1
PICKLE_PROTOCOL = 2
CHUNK_SIZE = 65536
def mkdirs(path):
"""
Ensures the path is a directory or makes it if not. Errors if the path
exists but is a file or on permissions failure.
:param path: path to create
"""
if not os.path.isdir(path):
try:
do_makedirs(path)
except OSError, err:
#TODO: check, isdir will fail if mounted and volume stopped.
#if err.errno != errno.EEXIST or not os.path.isdir(path)
if err.errno != errno.EEXIST:
raise
def rmdirs(path):
if os.path.isdir(path) and dir_empty(path):
do_rmdir(path)
else:
logging.error("rmdirs failed dir may not be empty or not valid dir")
return False
def strip_obj_storage_path(path, string='/mnt/gluster-object'):
"""
strip /mnt/gluster-object
"""
return path.replace(string, '').strip('/')
def do_mkdir(path):
try:
os.mkdir(path)
except Exception, err:
logging.exception("Mkdir failed on %s err: %s", path, str(err))
if err.errno != errno.EEXIST:
raise
return True
def do_makedirs(path):
try:
os.makedirs(path)
except Exception, err:
logging.exception("Makedirs failed on %s err: %s", path, str(err))
if err.errno != errno.EEXIST:
raise
return True
def do_listdir(path):
try:
buf = os.listdir(path)
except Exception, err:
logging.exception("Listdir failed on %s err: %s", path, str(err))
raise
return buf
def do_chown(path, uid, gid):
try:
os.chown(path, uid, gid)
except Exception, err:
logging.exception("Chown failed on %s err: %s", path, str(err))
raise
return True
def do_stat(path):
try:
#Check for fd.
if isinstance(path, int):
buf = os.fstat(path)
else:
buf = os.stat(path)
except Exception, err:
logging.exception("Stat failed on %s err: %s", path, str(err))
raise
return buf
def do_open(path, mode):
try:
fd = open(path, mode)
except Exception, err:
logging.exception("Open failed on %s err: %s", path, str(err))
raise
return fd
def do_close(fd):
#fd could be file or int type.
try:
if isinstance(fd, int):
os.close(fd)
else:
fd.close()
except Exception, err:
logging.exception("Close failed on %s err: %s", fd, str(err))
raise
return True
def do_unlink(path, log = True):
try:
os.unlink(path)
except Exception, err:
if log:
logging.exception("Unlink failed on %s err: %s", path, str(err))
if err.errno != errno.ENOENT:
raise
return True
def do_rmdir(path):
try:
os.rmdir(path)
except Exception, err:
logging.exception("Rmdir failed on %s err: %s", path, str(err))
if err.errno != errno.ENOENT:
raise
return True
def do_rename(old_path, new_path):
try:
os.rename(old_path, new_path)
except Exception, err:
logging.exception("Rename failed on %s to %s err: %s", old_path, new_path, \
str(err))
raise
return True
def do_setxattr(path, key, value):
fd = None
if not os.path.isdir(path):
fd = do_open(path, 'rb')
else:
fd = path
if fd or os.path.isdir(path):
try:
setxattr(fd, key, value)
except Exception, err:
logging.exception("setxattr failed on %s key %s err: %s", path, key, str(err))
raise
finally:
if fd and not os.path.isdir(path):
do_close(fd)
else:
logging.error("Open failed path %s", path)
return False
return True
def do_getxattr(path, key, log = True):
fd = None
if not os.path.isdir(path):
fd = do_open(path, 'rb')
else:
fd = path
if fd or os.path.isdir(path):
try:
value = getxattr(fd, key)
except Exception, err:
if log:
logging.exception("getxattr failed on %s key %s err: %s", path, key, str(err))
raise
finally:
if fd and not os.path.isdir(path):
do_close(fd)
else:
logging.error("Open failed path %s", path)
return False
return value
def do_removexattr(path, key):
fd = None
if not os.path.isdir(path):
fd = do_open(path, 'rb')
else:
fd = path
if fd or os.path.isdir(path):
try:
removexattr(fd, key)
except Exception, err:
logging.exception("removexattr failed on %s key %s err: %s", path, key, str(err))
raise
finally:
if fd and not os.path.isdir(path):
do_close(fd)
else:
logging.error("Open failed path %s", path)
return False
return True
def read_metadata(path):
"""
Helper function to read the pickled metadata from a File/Directory .
:param path: File/Directory to read metadata from.
:returns: dictionary of metadata
"""
metadata = ''
key = 0
while True:
try:
metadata += do_getxattr(path, '%s%s' % (METADATA_KEY, (key or '')),
log = False)
except Exception:
break
key += 1
if metadata:
return pickle.loads(metadata)
else:
return {}
def write_metadata(path, metadata):
"""
Helper function to write pickled metadata for a File/Directory.
:param path: File/Directory path to write the metadata
:param metadata: metadata to write
"""
metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
key = 0
while metastr:
do_setxattr(path, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
metastr = metastr[254:]
key += 1
def clean_metadata(path):
key = 0
while True:
value = do_getxattr(path, '%s%s' % (METADATA_KEY, (key or '')))
do_removexattr(path, '%s%s' % (METADATA_KEY, (key or '')))
key += 1
def dir_empty(path):
"""
Return true if directory/container is empty.
:param path: Directory path.
:returns: True/False.
"""
if os.path.isdir(path):
try:
files = do_listdir(path)
except Exception, err:
logging.exception("listdir failed on %s err: %s", path, str(err))
raise
if not files:
return True
else:
return False
else:
if not os.path.exists(path):
return True
def get_device_from_account(account):
if account.startswith(RESELLER_PREFIX):
device = account.replace(RESELLER_PREFIX, '', 1)
return device
def check_user_xattr(path):
if not os.path.exists(path):
return False
do_setxattr(path, 'user.test.key1', 'value1')
try:
removexattr(path, 'user.test.key1')
except Exception, err:
logging.exception("removexattr failed on %s err: %s", path, str(err))
#Remove xattr may fail in case of concurrent remove.
return True
def _check_valid_account(account, fs_object):
mount_path = getattr(fs_object, 'mount_path', MOUNT_PATH)
if not check_account_exists(fs_object.get_export_from_account_id(account), \
fs_object):
logging.error('Account not present %s', account)
return False
if not os.path.ismount(os.path.join(mount_path, account)):
if not os.path.isdir(os.path.join(mount_path, account)):
mkdirs(os.path.join(mount_path, account))
fs_object.unmount(os.path.join(mount_path, account))
if fs_object:
if not fs_object.mount(account):
return False
if not check_user_xattr(os.path.join(mount_path, account)):
logging.error('Error: No support for user.xattr on backend %s' % account)
return False
chmod_cmd = ['chmod 777 %s' % (mount_path), \
'chmod 777 %s/%s' % (mount_path, account)]
for cmd in chmod_cmd:
if os.system(cmd):
logging.error('Chmod failed: %s' % (cmd))
return False
return True
def check_valid_account(account, fs_object):
return _check_valid_account(account, fs_object)
def validate_container(metadata):
if not metadata:
logging.error('No metadata')
return False
if X_TYPE not in metadata.keys() or \
X_TIMESTAMP not in metadata.keys() or \
X_PUT_TIMESTAMP not in metadata.keys() or \
X_OBJECTS_COUNT not in metadata.keys() or \
X_BYTES_USED not in metadata.keys():
#logging.error('Container error %s' % metadata)
return False
if metadata[X_TYPE] == CONTAINER:
return True
logging.error('Container error %s' % metadata)
return False
def validate_account(metadata):
if not metadata:
logging.error('No metadata')
return False
if X_TYPE not in metadata.keys() or \
X_TIMESTAMP not in metadata.keys() or \
X_PUT_TIMESTAMP not in metadata.keys() or \
X_OBJECTS_COUNT not in metadata.keys() or \
X_BYTES_USED not in metadata.keys() or \
X_CONTAINER_COUNT not in metadata.keys():
#logging.error('Account error %s' % metadata)
return False
if metadata[X_TYPE] == ACCOUNT:
return True
logging.error('Account error %s' % metadata)
return False
def validate_object(metadata):
if not metadata:
logging.error('No metadata')
return False
if X_TIMESTAMP not in metadata.keys() or \
X_CONTENT_TYPE not in metadata.keys() or \
X_ETAG not in metadata.keys() or \
X_CONTENT_LENGTH not in metadata.keys() or \
X_TYPE not in metadata.keys() or \
X_OBJECT_TYPE not in metadata.keys():
#logging.error('Object error %s' % metadata)
return False
if metadata[X_TYPE] == OBJECT:
return True
logging.error('Object error %s' % metadata)
return False
def is_marker(metadata):
if not metadata:
logging.error('No metadata')
return False
if X_OBJECT_TYPE not in metadata.keys():
logging.error('X_OBJECT_TYPE missing %s' % metadata)
return False
if metadata[X_OBJECT_TYPE] == MARKER_DIR:
return True
else:
return False
def _update_list(path, const_path, src_list, reg_file=True, object_count=0,
bytes_used=0, obj_list=[]):
obj_path = strip_obj_storage_path(path, const_path)
for i in src_list:
if obj_path:
obj_list.append(os.path.join(obj_path, i))
else:
obj_list.append(i)
object_count += 1
if reg_file:
bytes_used += os.path.getsize(path + '/' + i)
return object_count, bytes_used
def update_list(path, const_path, dirs=[], files=[], object_count=0,
bytes_used=0, obj_list=[]):
object_count, bytes_used = _update_list (path, const_path, files, True,
object_count, bytes_used,
obj_list)
object_count, bytes_used = _update_list (path, const_path, dirs, False,
object_count, bytes_used,
obj_list)
return object_count, bytes_used
def get_container_details_from_fs(cont_path, const_path,
memcache=None):
"""
get container details by traversing the filesystem
"""
bytes_used = 0
object_count = 0
obj_list=[]
dir_list = []
if os.path.isdir(cont_path):
for (path, dirs, files) in os.walk(cont_path):
object_count, bytes_used = update_list(path, const_path, dirs, files,
object_count, bytes_used,
obj_list)
dir_list.append(path + ':' + str(do_stat(path).st_mtime))
if memcache:
memcache.set(strip_obj_storage_path(cont_path), obj_list)
memcache.set(strip_obj_storage_path(cont_path) + '-dir_list',
','.join(dir_list))
memcache.set(strip_obj_storage_path(cont_path) + '-cont_meta',
[object_count, bytes_used])
return obj_list, object_count, bytes_used
def get_container_details_from_memcache(cont_path, const_path,
memcache):
"""
get container details stored in memcache
"""
bytes_used = 0
object_count = 0
obj_list=[]
dir_contents = memcache.get(strip_obj_storage_path(cont_path) + '-dir_list')
if not dir_contents:
return get_container_details_from_fs(cont_path, const_path,
memcache=memcache)
for i in dir_contents.split(','):
path, mtime = i.split(':')
if mtime != str(do_stat(path).st_mtime):
return get_container_details_from_fs(cont_path, const_path,
memcache=memcache)
obj_list = memcache.get(strip_obj_storage_path(cont_path))
object_count, bytes_used = memcache.get(strip_obj_storage_path(cont_path) + '-cont_meta')
return obj_list, object_count, bytes_used
def get_container_details(cont_path, memcache=None):
"""
Return object_list, object_count and bytes_used.
"""
if memcache:
object_list, object_count, bytes_used = get_container_details_from_memcache(cont_path, cont_path,
memcache=memcache)
else:
object_list, object_count, bytes_used = get_container_details_from_fs(cont_path, cont_path)
return object_list, object_count, bytes_used
def get_account_details_from_fs(acc_path, memcache=None):
container_list = []
container_count = 0
if os.path.isdir(acc_path):
for name in do_listdir(acc_path):
if not os.path.isdir(acc_path + '/' + name) or \
name.lower() == 'tmp':
continue
container_count += 1
container_list.append(name)
if memcache:
memcache.set(strip_obj_storage_path(acc_path) + '_container_list', container_list)
memcache.set(strip_obj_storage_path(acc_path)+'_mtime', str(do_stat(acc_path).st_mtime))
memcache.set(strip_obj_storage_path(acc_path)+'_container_count', container_count)
return container_list, container_count
def get_account_details_from_memcache(acc_path, memcache=None):
if memcache:
mtime = memcache.get(strip_obj_storage_path(acc_path)+'_mtime')
if not mtime or mtime != str(do_stat(acc_path).st_mtime):
return get_account_details_from_fs(acc_path, memcache)
container_list = memcache.get(strip_obj_storage_path(acc_path) + '_container_list')
container_count = memcache.get(strip_obj_storage_path(acc_path)+'_container_count')
return container_list, container_count
def get_account_details(acc_path, memcache=None):
"""
Return container_list and container_count.
"""
if memcache:
return get_account_details_from_memcache(acc_path, memcache)
else:
return get_account_details_from_fs(acc_path, memcache)
def get_etag(path):
etag = None
if os.path.exists(path):
etag = md5()
if not os.path.isdir(path):
fp = open(path, 'rb')
if fp:
while True:
chunk = fp.read(CHUNK_SIZE)
if chunk:
etag.update(chunk)
else:
break
fp.close()
etag = etag.hexdigest()
return etag
def get_object_metadata(obj_path):
"""
Return metadata of object.
"""
metadata = {}
if os.path.exists(obj_path):
if not os.path.isdir(obj_path):
metadata = {
X_TIMESTAMP: normalize_timestamp(os.path.getctime(obj_path)),
X_CONTENT_TYPE: FILE_TYPE,
X_ETAG: get_etag(obj_path),
X_CONTENT_LENGTH: os.path.getsize(obj_path),
X_TYPE: OBJECT,
X_OBJECT_TYPE: FILE,
}
else:
metadata = {
X_TIMESTAMP: normalize_timestamp(os.path.getctime(obj_path)),
X_CONTENT_TYPE: DIR_TYPE,
X_ETAG: get_etag(obj_path),
X_CONTENT_LENGTH: 0,
X_TYPE: OBJECT,
X_OBJECT_TYPE: DIR,
}
return metadata
def get_container_metadata(cont_path, memcache=None):
objects = []
object_count = 0
bytes_used = 0
objects, object_count, bytes_used = get_container_details(cont_path,
memcache=memcache)
metadata = {X_TYPE: CONTAINER,
X_TIMESTAMP: normalize_timestamp(os.path.getctime(cont_path)),
X_PUT_TIMESTAMP: normalize_timestamp(os.path.getmtime(cont_path)),
X_OBJECTS_COUNT: object_count,
X_BYTES_USED: bytes_used}
return metadata
def get_account_metadata(acc_path, memcache=None):
containers = []
container_count = 0
containers, container_count = get_account_details(acc_path, memcache)
metadata = {X_TYPE: ACCOUNT,
X_TIMESTAMP: normalize_timestamp(os.path.getctime(acc_path)),
X_PUT_TIMESTAMP: normalize_timestamp(os.path.getmtime(acc_path)),
X_OBJECTS_COUNT: 0,
X_BYTES_USED: 0,
X_CONTAINER_COUNT: container_count}
return metadata
def restore_object(obj_path, metadata):
meta = read_metadata(obj_path)
if meta:
meta.update(metadata)
write_metadata(obj_path, meta)
else:
write_metadata(obj_path, metadata)
def restore_container(cont_path, metadata):
meta = read_metadata(cont_path)
if meta:
meta.update(metadata)
write_metadata(cont_path, meta)
else:
write_metadata(cont_path, metadata)
def restore_account(acc_path, metadata):
meta = read_metadata(acc_path)
if meta:
meta.update(metadata)
write_metadata(acc_path, meta)
else:
write_metadata(acc_path, metadata)
def create_object_metadata(obj_path):
meta = get_object_metadata(obj_path)
restore_object(obj_path, meta)
return meta
def create_container_metadata(cont_path, memcache=None):
meta = get_container_metadata(cont_path, memcache)
restore_container(cont_path, meta)
return meta
def create_account_metadata(acc_path, memcache=None):
meta = get_account_metadata(acc_path, memcache)
restore_account(acc_path, meta)
return meta
def check_account_exists(account, fs_object):
if account not in get_account_list(fs_object):
logging.error('Account not exists %s' % account)
return False
else:
return True
def get_account_list(fs_object):
account_list = []
if fs_object:
account_list = fs_object.get_export_list()
return account_list
def get_account_id(account):
return RESELLER_PREFIX + md5(account + HASH_PATH_SUFFIX).hexdigest()

774
swift/1.4.8/swift.diff Normal file
View File

@ -0,0 +1,774 @@
diff --git a/setup.py b/setup.py
index d195d34..b5b5ca2 100644
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,6 @@
#!/usr/bin/python
# Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -94,6 +95,7 @@ setup(
'tempurl=swift.common.middleware.tempurl:filter_factory',
'formpost=swift.common.middleware.formpost:filter_factory',
'name_check=swift.common.middleware.name_check:filter_factory',
+ 'gluster=swift.common.middleware.gluster:filter_factory',
],
},
)
diff --git a/swift/account/server.py b/swift/account/server.py
index 800b3c0..99f5de3 100644
--- a/swift/account/server.py
+++ b/swift/account/server.py
@@ -1,4 +1,5 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -31,7 +32,7 @@ import simplejson

from swift.common.db import AccountBroker
from swift.common.utils import get_logger, get_param, hash_path, \
- normalize_timestamp, split_path, storage_directory
+ normalize_timestamp, split_path, storage_directory, plugin_enabled
from swift.common.constraints import ACCOUNT_LISTING_LIMIT, \
check_mount, check_float, check_utf8
from swift.common.db_replicator import ReplicatorRpc
@@ -39,6 +40,8 @@ from swift.common.db_replicator import ReplicatorRpc

DATADIR = 'accounts'

+if plugin_enabled():
+ from swift.plugins.DiskDir import DiskAccount

class AccountController(object):
"""WSGI controller for the account server."""
@@ -52,8 +55,12 @@ class AccountController(object):
self.mount_check, logger=self.logger)
self.auto_create_account_prefix = \
conf.get('auto_create_account_prefix') or '.'
+ self.fs_object = None

def _get_account_broker(self, drive, part, account):
+ if self.fs_object:
+ return DiskAccount(self.root, account, self.fs_object);
+
hsh = hash_path(account)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')
@@ -121,9 +128,15 @@ class AccountController(object):
if broker.is_deleted():
return HTTPConflict(request=req)
metadata = {}
- metadata.update((key, (value, timestamp))
- for key, value in req.headers.iteritems()
- if key.lower().startswith('x-account-meta-'))
+ if not self.fs_object:
+ metadata.update((key, (value, timestamp))
+ for key, value in req.headers.iteritems()
+ if key.lower().startswith('x-account-meta-'))
+ else:
+ metadata.update((key, value)
+ for key, value in req.headers.iteritems()
+ if key.lower().startswith('x-account-meta-'))
+
if metadata:
broker.update_metadata(metadata)
if created:
@@ -153,6 +166,9 @@ class AccountController(object):
broker.stale_reads_ok = True
if broker.is_deleted():
return HTTPNotFound(request=req)
+ if self.fs_object:
+ broker.list_containers_iter(None, None,None,
+ None, None)
info = broker.get_info()
headers = {
'X-Account-Container-Count': info['container_count'],
@@ -164,9 +180,16 @@ class AccountController(object):
container_ts = broker.get_container_timestamp(container)
if container_ts is not None:
headers['X-Container-Timestamp'] = container_ts
- headers.update((key, value)
- for key, (value, timestamp) in broker.metadata.iteritems()
- if value != '')
+ if not self.fs_object:
+ headers.update((key, value)
+ for key, (value, timestamp) in broker.metadata.iteritems()
+ if value != '')
+ else:
+ headers.update((key, value)
+ for key, value in broker.metadata.iteritems()
+ if value != '')
+
+
return HTTPNoContent(request=req, headers=headers)

def GET(self, req):
@@ -190,9 +213,15 @@ class AccountController(object):
'X-Account-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp']}
- resp_headers.update((key, value)
- for key, (value, timestamp) in broker.metadata.iteritems()
- if value != '')
+ if not self.fs_object:
+ resp_headers.update((key, value)
+ for key, (value, timestamp) in broker.metadata.iteritems()
+ if value != '')
+ else:
+ resp_headers.update((key, value)
+ for key, value in broker.metadata.iteritems()
+ if value != '')
+
try:
prefix = get_param(req, 'prefix')
delimiter = get_param(req, 'delimiter')
@@ -224,6 +253,7 @@ class AccountController(object):
content_type='text/plain', request=req)
account_list = broker.list_containers_iter(limit, marker, end_marker,
prefix, delimiter)
+
if out_content_type == 'application/json':
json_pattern = ['"name":%s', '"count":%s', '"bytes":%s']
json_pattern = '{' + ','.join(json_pattern) + '}'
@@ -298,15 +328,29 @@ class AccountController(object):
return HTTPNotFound(request=req)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
metadata = {}
- metadata.update((key, (value, timestamp))
- for key, value in req.headers.iteritems()
- if key.lower().startswith('x-account-meta-'))
+ if not self.fs_object:
+ metadata.update((key, (value, timestamp))
+ for key, value in req.headers.iteritems()
+ if key.lower().startswith('x-account-meta-'))
+ else:
+ metadata.update((key, value)
+ for key, value in req.headers.iteritems()
+ if key.lower().startswith('x-account-meta-'))
if metadata:
broker.update_metadata(metadata)
return HTTPNoContent(request=req)

+ def plugin(self, env):
+ if env.get('Gluster_enabled', False):
+ self.fs_object = env.get('fs_object')
+ self.root = env.get('root')
+ self.mount_check = False
+ else:
+ self.fs_object = None
+
def __call__(self, env, start_response):
start_time = time.time()
+ self.plugin(env)
req = Request(env)
self.logger.txn_id = req.headers.get('x-trans-id', None)
if not check_utf8(req.path_info):
diff --git a/swift/common/middleware/gluster.py b/swift/common/middleware/gluster.py
new file mode 100644
index 0000000..341285d
--- /dev/null
+++ b/swift/common/middleware/gluster.py
@@ -0,0 +1,55 @@
+# Copyright (c) 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from swift.common.utils import get_logger, plugin_enabled
+from swift import plugins
+from ConfigParser import ConfigParser
+
+class Gluster_plugin(object):
+ """
+ Update the environment with keys that reflect Gluster_plugin enabled
+ """
+
+ def __init__(self, app, conf):
+ self.app = app
+ self.conf = conf
+ self.fs_name = 'Glusterfs'
+ self.logger = get_logger(conf, log_route='gluster')
+
+ def __call__(self, env, start_response):
+ if not plugin_enabled():
+ return self.app(env, start_response)
+ env['Gluster_enabled'] =True
+ fs_object = getattr(plugins, self.fs_name, False)
+ if not fs_object:
+ raise Exception('%s plugin not found', self.fs_name)
+
+ env['fs_object'] = fs_object()
+ fs_conf = ConfigParser()
+ if fs_conf.read('/etc/swift/fs.conf'):
+ try:
+ env['root'] = fs_conf.get ('DEFAULT', 'mount_path')
+ except NoSectionError, NoOptionError:
+ self.logger.exception(_('ERROR mount_path not present'))
+ return self.app(env, start_response)
+
+def filter_factory(global_conf, **local_conf):
+ """Returns a WSGI filter app for use with paste.deploy."""
+ conf = global_conf.copy()
+ conf.update(local_conf)
+
+ def gluster_filter(app):
+ return Gluster_plugin(app, conf)
+ return gluster_filter
diff --git a/swift/common/utils.py b/swift/common/utils.py
index 47edce8..afc356c 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -1,4 +1,5 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -1138,3 +1139,12 @@ def streq_const_time(s1, s2):
for (a, b) in zip(s1, s2):
result |= ord(a) ^ ord(b)
return result == 0
+
+def plugin_enabled():
+ swift_conf = ConfigParser()
+ swift_conf.read(os.path.join('/etc/swift', 'swift.conf'))
+ try:
+ return swift_conf.get('DEFAULT', 'Enable_plugin', 'no') in TRUE_VALUES
+ except NoOptionError, NoSectionError:
+ return False
+
diff --git a/swift/container/server.py b/swift/container/server.py
index 8a18cfd..741c81a 100644
--- a/swift/container/server.py
+++ b/swift/container/server.py
@@ -1,4 +1,5 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -31,7 +32,8 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \

from swift.common.db import ContainerBroker
from swift.common.utils import get_logger, get_param, hash_path, \
- normalize_timestamp, storage_directory, split_path, validate_sync_to
+ normalize_timestamp, storage_directory, split_path, validate_sync_to, \
+ plugin_enabled
from swift.common.constraints import CONTAINER_LISTING_LIMIT, \
check_mount, check_float, check_utf8
from swift.common.bufferedhttp import http_connect
@@ -40,6 +42,9 @@ from swift.common.db_replicator import ReplicatorRpc

DATADIR = 'containers'

+if plugin_enabled():
+ from swift.plugins.DiskDir import DiskDir
+

class ContainerController(object):
"""WSGI Controller for the container server."""
@@ -62,6 +67,7 @@ class ContainerController(object):
ContainerBroker, self.mount_check, logger=self.logger)
self.auto_create_account_prefix = \
conf.get('auto_create_account_prefix') or '.'
+ self.fs_object = None

def _get_container_broker(self, drive, part, account, container):
"""
@@ -73,6 +79,11 @@ class ContainerController(object):
:param container: container name
:returns: ContainerBroker object
"""
+ if self.fs_object:
+ return DiskDir(self.root, drive, part, account,
+ container, self.logger,
+ fs_object = self.fs_object)
+
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')
@@ -211,10 +222,18 @@ class ContainerController(object):
if broker.is_deleted():
return HTTPConflict(request=req)
metadata = {}
- metadata.update((key, (value, timestamp))
- for key, value in req.headers.iteritems()
- if key.lower() in self.save_headers or
- key.lower().startswith('x-container-meta-'))
+ #Note: check the structure of req.headers
+ if not self.fs_object:
+ metadata.update((key, (value, timestamp))
+ for key, value in req.headers.iteritems()
+ if key.lower() in self.save_headers or
+ key.lower().startswith('x-container-meta-'))
+ else:
+ metadata.update((key, value)
+ for key, value in req.headers.iteritems()
+ if key.lower() in self.save_headers or
+ key.lower().startswith('x-container-meta-'))
+
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
@@ -222,6 +241,7 @@ class ContainerController(object):
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata)
+
resp = self.account_update(req, account, container, broker)
if resp:
return resp
@@ -252,10 +272,17 @@ class ContainerController(object):
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp'],
}
- headers.update((key, value)
- for key, (value, timestamp) in broker.metadata.iteritems()
- if value != '' and (key.lower() in self.save_headers or
- key.lower().startswith('x-container-meta-')))
+ if not self.fs_object:
+ headers.update((key, value)
+ for key, (value, timestamp) in broker.metadata.iteritems()
+ if value != '' and (key.lower() in self.save_headers or
+ key.lower().startswith('x-container-meta-')))
+ else:
+ headers.update((key, value)
+ for key, value in broker.metadata.iteritems()
+ if value != '' and (key.lower() in self.save_headers or
+ key.lower().startswith('x-container-meta-')))
+
return HTTPNoContent(request=req, headers=headers)

def GET(self, req):
@@ -268,6 +295,7 @@ class ContainerController(object):
request=req)
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
+
broker = self._get_container_broker(drive, part, account, container)
broker.pending_timeout = 0.1
broker.stale_reads_ok = True
@@ -280,10 +308,17 @@ class ContainerController(object):
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp'],
}
- resp_headers.update((key, value)
- for key, (value, timestamp) in broker.metadata.iteritems()
- if value != '' and (key.lower() in self.save_headers or
- key.lower().startswith('x-container-meta-')))
+ if not self.fs_object:
+ resp_headers.update((key, value)
+ for key, (value, timestamp) in broker.metadata.iteritems()
+ if value != '' and (key.lower() in self.save_headers or
+ key.lower().startswith('x-container-meta-')))
+ else:
+ resp_headers.update((key, value)
+ for key, value in broker.metadata.iteritems()
+ if value != '' and (key.lower() in self.save_headers or
+ key.lower().startswith('x-container-meta-')))
+
try:
path = get_param(req, 'path')
prefix = get_param(req, 'prefix')
@@ -414,10 +449,17 @@ class ContainerController(object):
return HTTPNotFound(request=req)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
metadata = {}
- metadata.update((key, (value, timestamp))
- for key, value in req.headers.iteritems()
- if key.lower() in self.save_headers or
- key.lower().startswith('x-container-meta-'))
+ if not self.fs_object:
+ metadata.update((key, (value, timestamp))
+ for key, value in req.headers.iteritems()
+ if key.lower() in self.save_headers or
+ key.lower().startswith('x-container-meta-'))
+ else:
+ metadata.update((key, value)
+ for key, value in req.headers.iteritems()
+ if key.lower() in self.save_headers or
+ key.lower().startswith('x-container-meta-'))
+
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
@@ -427,8 +469,19 @@ class ContainerController(object):
broker.update_metadata(metadata)
return HTTPNoContent(request=req)

+ def plugin(self, env):
+ if env.get('Gluster_enabled', False):
+ self.fs_object = env.get('fs_object')
+ if not self.fs_object:
+ raise NoneTypeError
+ self.root = env.get('root')
+ self.mount_check = False
+ else:
+ self.fs_object = None
+
def __call__(self, env, start_response):
start_time = time.time()
+ self.plugin(env)
req = Request(env)
self.logger.txn_id = req.headers.get('x-trans-id', None)
if not check_utf8(req.path_info):
diff --git a/swift/obj/server.py b/swift/obj/server.py
index 9cca16b..37730ff 100644
--- a/swift/obj/server.py
+++ b/swift/obj/server.py
@@ -1,4 +1,5 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -26,6 +27,7 @@ from hashlib import md5
from tempfile import mkstemp
from urllib import unquote
from contextlib import contextmanager
+from ConfigParser import ConfigParser

from webob import Request, Response, UTC
from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
@@ -37,16 +39,23 @@ from eventlet import sleep, Timeout, tpool

from swift.common.utils import mkdirs, normalize_timestamp, \
storage_directory, hash_path, renamer, fallocate, \
- split_path, drop_buffer_cache, get_logger, write_pickle
+ split_path, drop_buffer_cache, get_logger, write_pickle, \
+ plugin_enabled
from swift.common.bufferedhttp import http_connect
-from swift.common.constraints import check_object_creation, check_mount, \
- check_float, check_utf8
+if plugin_enabled():
+ from swift.plugins.constraints import check_object_creation
+ from swift.plugins.utils import X_TYPE, X_OBJECT_TYPE, FILE, DIR, MARKER_DIR, \
+ OBJECT, DIR_TYPE, FILE_TYPE
+else:
+ from swift.common.constraints import check_object_creation
+
+from swift.common.constraints import check_mount, check_float, check_utf8
+
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
DiskFileNotExist
from swift.obj.replicator import tpooled_get_hashes, invalidate_hash, \
quarantine_renamer

-
DATADIR = 'objects'
ASYNCDIR = 'async_pending'
PICKLE_PROTOCOL = 2
@@ -339,6 +348,9 @@ class DiskFile(object):
raise
raise DiskFileNotExist('Data File does not exist.')

+if plugin_enabled():
+ from swift.plugins.DiskFile import Gluster_DiskFile
+

class ObjectController(object):
"""Implements the WSGI application for the Swift Object Server."""
@@ -377,6 +389,17 @@ class ObjectController(object):
'expiring_objects'
self.expiring_objects_container_divisor = \
int(conf.get('expiring_objects_container_divisor') or 86400)
+ self.fs_object = None
+
+ def get_DiskFile_obj(self, path, device, partition, account, container, obj,
+ logger, keep_data_fp=False, disk_chunk_size=65536):
+ if self.fs_object:
+ return Gluster_DiskFile(path, device, partition, account, container,
+ obj, logger, keep_data_fp,
+ disk_chunk_size, fs_object = self.fs_object);
+ else:
+ return DiskFile(path, device, partition, account, container,
+ obj, logger, keep_data_fp, disk_chunk_size)

def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice):
@@ -493,7 +516,7 @@ class ObjectController(object):
content_type='text/plain')
if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device)
- file = DiskFile(self.devices, device, partition, account, container,
+ file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
obj, self.logger, disk_chunk_size=self.disk_chunk_size)

if 'X-Delete-At' in file.metadata and \
@@ -548,7 +571,7 @@ class ObjectController(object):
if new_delete_at and new_delete_at < time.time():
return HTTPBadRequest(body='X-Delete-At in past', request=request,
content_type='text/plain')
- file = DiskFile(self.devices, device, partition, account, container,
+ file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
orig_timestamp = file.metadata.get('X-Timestamp')
upload_expiration = time.time() + self.max_upload_time
@@ -580,12 +603,29 @@ class ObjectController(object):
if 'etag' in request.headers and \
request.headers['etag'].lower() != etag:
return HTTPUnprocessableEntity(request=request)
- metadata = {
- 'X-Timestamp': request.headers['x-timestamp'],
- 'Content-Type': request.headers['content-type'],
- 'ETag': etag,
- 'Content-Length': str(os.fstat(fd).st_size),
- }
+ content_type = request.headers['content-type']
+ if self.fs_object and not content_type:
+ content_type = FILE_TYPE
+ if not self.fs_object:
+ metadata = {
+ 'X-Timestamp': request.headers['x-timestamp'],
+ 'Content-Type': request.headers['content-type'],
+ 'ETag': etag,
+ 'Content-Length': str(os.fstat(fd).st_size),
+ }
+ else:
+ metadata = {
+ 'X-Timestamp': request.headers['x-timestamp'],
+ 'Content-Type': request.headers['content-type'],
+ 'ETag': etag,
+ 'Content-Length': str(os.fstat(fd).st_size),
+ X_TYPE: OBJECT,
+ X_OBJECT_TYPE: FILE,
+ }
+
+ if self.fs_object and \
+ request.headers['content-type'].lower() == DIR_TYPE:
+ metadata.update({X_OBJECT_TYPE: MARKER_DIR})
metadata.update(val for val in request.headers.iteritems()
if val[0].lower().startswith('x-object-meta-') and
len(val[0]) > 14)
@@ -626,9 +666,9 @@ class ObjectController(object):
content_type='text/plain')
if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device)
- file = DiskFile(self.devices, device, partition, account, container,
- obj, self.logger, keep_data_fp=True,
- disk_chunk_size=self.disk_chunk_size)
+ file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
+ obj, self.logger, keep_data_fp=True,
+ disk_chunk_size=self.disk_chunk_size)
if file.is_deleted() or ('X-Delete-At' in file.metadata and
int(file.metadata['X-Delete-At']) <= time.time()):
if request.headers.get('if-match') == '*':
@@ -702,7 +742,7 @@ class ObjectController(object):
return resp
if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device)
- file = DiskFile(self.devices, device, partition, account, container,
+ file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
if file.is_deleted() or ('X-Delete-At' in file.metadata and
int(file.metadata['X-Delete-At']) <= time.time()):
@@ -744,7 +784,7 @@ class ObjectController(object):
if self.mount_check and not check_mount(self.devices, device):
return Response(status='507 %s is not mounted' % device)
response_class = HTTPNoContent
- file = DiskFile(self.devices, device, partition, account, container,
+ file = self.get_DiskFile_obj(self.devices, device, partition, account, container,
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
if 'x-if-delete-at' in request.headers and \
int(request.headers['x-if-delete-at']) != \
@@ -797,9 +837,18 @@ class ObjectController(object):
raise hashes
return Response(body=pickle.dumps(hashes))

+ def plugin(self, env):
+ if env.get('Gluster_enabled', False):
+ self.fs_object = env.get('fs_object')
+ self.devices = env.get('root')
+ self.mount_check = False
+ else:
+ self.fs_object = None
+
def __call__(self, env, start_response):
"""WSGI Application entry point for the Swift Object Server."""
start_time = time.time()
+ self.plugin(env)
req = Request(env)
self.logger.txn_id = req.headers.get('x-trans-id', None)
if not check_utf8(req.path_info):
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index 17613b8..af1ef25 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -1,4 +1,5 @@
# Copyright (c) 2010-2012 OpenStack, LLC.
+# Copyright (c) 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -53,8 +54,20 @@ from webob import Request, Response

from swift.common.ring import Ring
from swift.common.utils import cache_from_env, ContextPool, get_logger, \
- get_remote_client, normalize_timestamp, split_path, TRUE_VALUES
+ get_remote_client, normalize_timestamp, split_path, TRUE_VALUES, \
+ plugin_enabled
from swift.common.bufferedhttp import http_connect
+
+if plugin_enabled():
+ from swift.plugins.constraints import check_object_creation, \
+ MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
+else:
+ from swift.common.constraints import check_object_creation, \
+ MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
+
+from swift.common.constraints import check_metadata, check_utf8, \
+ CONTAINER_LISTING_LIMIT
+
from swift.common.constraints import check_metadata, check_object_creation, \
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
diff --git a/test/__init__.py b/test/__init__.py
index ef2ce31..363a051 100644
--- a/test/__init__.py
+++ b/test/__init__.py
@@ -6,8 +6,16 @@ import sys
import os
from ConfigParser import MissingSectionHeaderError
from StringIO import StringIO
-
from swift.common.utils import readconf
+from swift.common.utils import plugin_enabled
+if plugin_enabled():
+ from swift.plugins.constraints import MAX_OBJECT_NAME_LENGTH, \
+ MAX_CONTAINER_NAME_LENGTH, MAX_ACCOUNT_NAME_LENGTH, \
+ MAX_FILE_SIZE
+else:
+ from swift.common.constraints import MAX_OBJECT_NAME_LENGTH, \
+ MAX_CONTAINER_NAME_LENGTH, MAX_ACCOUNT_NAME_LENGTH, \
+ MAX_FILE_SIZE

setattr(__builtin__, '_', lambda x: x)

diff --git a/test/functional/tests.py b/test/functional/tests.py
index b25b4fd..8d12f58 100644
--- a/test/functional/tests.py
+++ b/test/functional/tests.py
@@ -31,6 +31,16 @@ import urllib
from test import get_config
from swift import Account, AuthenticationFailed, Connection, Container, \
File, ResponseError
+from test import plugin_enabled
+if plugin_enabled():
+ from test import MAX_OBJECT_NAME_LENGTH, \
+ MAX_CONTAINER_NAME_LENGTH, MAX_ACCOUNT_NAME_LENGTH, \
+ MAX_FILE_SIZE
+else:
+ from test import MAX_OBJECT_NAME_LENGTH, \
+ MAX_CONTAINER_NAME_LENGTH, MAX_ACCOUNT_NAME_LENGTH, \
+ MAX_FILE_SIZE
+

config = get_config()

@@ -361,7 +371,7 @@ class TestContainer(Base):
set_up = False

def testContainerNameLimit(self):
- limit = 256
+ limit = MAX_CONTAINER_NAME_LENGTH

for l in (limit-100, limit-10, limit-1, limit,
limit+1, limit+10, limit+100):
@@ -949,7 +959,7 @@ class TestFile(Base):
self.assert_status(404)

def testNameLimit(self):
- limit = 1024
+ limit = MAX_OBJECT_NAME_LENGTH

for l in (1, 10, limit/2, limit-1, limit, limit+1, limit*2):
file = self.env.container.file('a'*l)
@@ -1093,7 +1103,7 @@ class TestFile(Base):
self.assert_(file.read(hdrs={'Range': r}) == data[0:1000])

def testFileSizeLimit(self):
- limit = 5*2**30 + 2
+ limit = MAX_FILE_SIZE
tsecs = 3

for i in (limit-100, limit-10, limit-1, limit, limit+1, limit+10,
diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py
index 075700e..5b6f32d 100644
--- a/test/unit/obj/test_server.py
+++ b/test/unit/obj/test_server.py
@@ -1355,7 +1355,7 @@ class TestObjectController(unittest.TestCase):

def test_max_object_name_length(self):
timestamp = normalize_timestamp(time())
- req = Request.blank('/sda1/p/a/c/' + ('1' * 1024),
+ req = Request.blank('/sda1/p/a/c/' + ('1' * MAX_OBJECT_NAME_LENGTH),
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Length': '4',
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index 364370e..c17fe59 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -21,7 +21,6 @@ import os
import sys
import unittest
from nose import SkipTest
-from ConfigParser import ConfigParser
from contextlib import contextmanager
from cStringIO import StringIO
from gzip import GzipFile
@@ -44,8 +43,18 @@ from swift.account import server as account_server
from swift.container import server as container_server
from swift.obj import server as object_server
from swift.common import ring
-from swift.common.constraints import MAX_META_NAME_LENGTH, \
- MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, MAX_FILE_SIZE
+from swift.common.utils import plugin_enabled
+if plugin_enabled():
+ from swift.plugins.constraints import MAX_META_NAME_LENGTH, \
+ MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
+ MAX_FILE_SIZE, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
+ MAX_OBJECT_NAME_LENGTH
+else:
+ from swift.plugins.constraints import MAX_META_NAME_LENGTH, \
+ MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
+ MAX_FILE_SIZE, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
+ MAX_OBJECT_NAME_LENGTH
+
from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
from swift.common.wsgi import monkey_patch_mimetools

@@ -3207,7 +3216,8 @@ class TestContainerController(unittest.TestCase):
def test_PUT_max_container_name_length(self):
with save_globals():
controller = proxy_server.ContainerController(self.app, 'account',
- '1' * 256)
+ '1' *
+ MAX_CONTAINER_NAME_LENGTH,)
self.assert_status_map(controller.PUT,
(200, 200, 200, 201, 201, 201), 201,
missing_container=True)
@@ -3813,7 +3823,8 @@ class TestAccountController(unittest.TestCase):
def test_PUT_max_account_name_length(self):
with save_globals():
self.app.allow_account_management = True
- controller = proxy_server.AccountController(self.app, '1' * 256)
+ controller = proxy_server.AccountController(self.app, '1' *
+ MAX_ACCOUNT_NAME_LENGTH)
self.assert_status_map(controller.PUT, (201, 201, 201), 201)
controller = proxy_server.AccountController(self.app, '2' * 257)
self.assert_status_map(controller.PUT, (201, 201, 201), 400)