Commit aa07acfa authored by Antoine de Torcy's avatar Antoine de Torcy
Browse files

Merge pull request #1832 from earlephilhower/master

Multipart, parallel copy of objects
parents 450785e2 38102d20
......@@ -89,6 +89,9 @@ const std::string s3_mpu_threads = "S3_MPU_THREADS";
const std::string s3_enable_md5 = "S3_ENABLE_MD5";
const std::string s3_server_encrypt = "S3_SERVER_ENCRYPT";
// For s3PutCopyFile to identify the real source type
typedef enum { S3_PUTFILE, S3_COPYOBJECT } s3_putcopy;
size_t g_retry_count = 10;
size_t g_retry_wait = 1;
......@@ -832,7 +835,7 @@ extern "C" {
S3_get_object (&bucketContext, key.c_str(), NULL, 0, _fileSize, 0, &getObjectHandler, &data);
unsigned long long usEnd = usNow();
double bw = (_fileSize / (1024.0*1024.0)) / ( (usEnd - usStart) / 1000000.0 );
rodsLog( LOG_NOTICE, "GETBW=%lf", bw);
rodsLog( LOG_DEBUG, "GETBW=%lf", bw);
if (data.status != S3StatusOK) s3_sleep( g_retry_wait, 0 );
} while ( (data.status != S3StatusOK) && S3_status_is_retryable(data.status) && (++retry_cnt < g_retry_count) );
if (data.status != S3StatusOK) {
......@@ -901,7 +904,7 @@ extern "C" {
}
unsigned long long usEnd = usNow();
double bw = (_fileSize / (1024.0*1024.0)) / ( (usEnd - usStart) / 1000000.0 );
rodsLog( LOG_STATUS, "MultirangeBW=%lf", bw);
rodsLog( LOG_DEBUG, "MultirangeBW=%lf", bw);
if (!g_mrdResult.ok()) {
// Someone aborted after we started, delete the partial object on S3
......@@ -1139,7 +1142,22 @@ extern "C" {
putProps->useServerSideEncryption = true;
unsigned long long usStart = usNow();
bucketContext.hostName = s3GetHostname(); // Safe to do, this is a local copy of the data structure
S3_upload_part(&bucketContext, g_mpuKey, putProps, &putObjectHandler, seq, g_mpuUploadId, partData.put_object_data.contentLength, 0, &partData);
if (partData.mode == S3_COPYOBJECT) {
unsigned long long startOffset = partData.put_object_data.offset;
unsigned long long count = partData.put_object_data.contentLength;
S3ResponseHandler copyResponseHandler = {mpuInitRespPropCB /*Do nothing*/, mpuPartRespCompCB};
int64_t lastModified;
// The default copy callback tries to set this for us, need to allocate here
partData.manager->etags[seq-1] = (char *)malloc(512); // TBD - magic #! Is there a max etag defined?
S3_copy_object_range(partData.pSrcCtx, partData.srcKey, bucketContext.bucketName, g_mpuKey,
seq, g_mpuUploadId,
startOffset, count,
putProps,
&lastModified, 512 /*TBD - magic # */, partData.manager->etags[seq-1], 0,
&copyResponseHandler, &partData);
} else {
S3_upload_part(&bucketContext, g_mpuKey, putProps, &putObjectHandler, seq, g_mpuUploadId, partData.put_object_data.contentLength, 0, &partData);
}
unsigned long long usEnd = usNow();
double bw = (g_mpuData[seq-1].put_object_data.contentLength / (1024.0 * 1024.0)) / ( (usEnd - usStart) / 1000000.0 );
// Clear up the S3PutProperties, if it exists
......@@ -1164,7 +1182,8 @@ extern "C" {
}
}
irods::error s3PutFile(
irods::error s3PutCopyFile(
const s3_putcopy _mode,
const std::string& _filename,
const std::string& _s3ObjName,
rodsLong_t _fileSize,
......@@ -1177,6 +1196,8 @@ extern "C" {
int cache_fd = -1;
std::string bucket;
std::string key;
std::string srcBucket;
std::string srcKey;
int err_status = 0;
long chunksize = s3GetMPUChunksize( _prop_map );
size_t retry_cnt = 0;
......@@ -1191,8 +1212,18 @@ extern "C" {
ret = s3Init( _prop_map );
if((result = ASSERT_PASS(ret, "Failed to initialize the S3 system.")).ok()) {
cache_fd = open(_filename.c_str(), O_RDONLY);
err_status = UNIX_FILE_OPEN_ERR - errno;
if (_mode == S3_PUTFILE) {
cache_fd = open(_filename.c_str(), O_RDONLY);
err_status = UNIX_FILE_OPEN_ERR - errno;
} else if (_mode == S3_COPYOBJECT && _fileSize > s3GetMPUChunksize( _prop_map )) {
// Multipart copy, don't open anything
cache_fd = 0;
err_status = 0;
} else {
// Singlepart copy is NOT implemented here!
cache_fd = -1;
err_status = UNIX_FILE_OPEN_ERR;
}
if((result = ASSERT_ERROR(cache_fd != -1, err_status, "Failed to open the cache file: \"%s\".",
_filename.c_str())).ok()) {
......@@ -1207,6 +1238,7 @@ extern "C" {
bucketContext.accessKeyId = _key_id.c_str();
bucketContext.secretAccessKey = _access_key.c_str();
S3PutProperties *putProps = NULL;
putProps = (S3PutProperties*)calloc( sizeof(S3PutProperties), 1 );
if ( putProps && enable_md5 )
......@@ -1231,7 +1263,7 @@ extern "C" {
S3_put_object (&bucketContext, key.c_str(), _fileSize, putProps, 0, &putObjectHandler, &data);
unsigned long long usEnd = usNow();
double bw = (_fileSize / (1024.0*1024.0)) / ( (usEnd - usStart) / 1000000.0 );
rodsLog( LOG_NOTICE, "BW=%lf", bw);
rodsLog( LOG_DEBUG, "BW=%lf", bw);
if (data.status != S3StatusOK) s3_sleep( g_retry_wait, 0 );
} while ( (data.status != S3StatusOK) && S3_status_is_retryable(data.status) && (++retry_cnt < g_retry_count) );
if (data.status != S3StatusOK) {
......@@ -1249,7 +1281,7 @@ extern "C" {
free( putProps );
}
} else {
// Multi-part upload
// Multi-part upload or copy
upload_manager_t manager;
memset(&manager, 0, sizeof(manager));
......@@ -1339,6 +1371,23 @@ extern "C" {
return result; // Abort early
}
// Following used by S3_COPYOBJECT only
S3BucketContext srcBucketContext;
if (_mode == S3_COPYOBJECT) {
ret = parseS3Path(_filename, srcBucket, srcKey);
if(!(result = ASSERT_PASS(ret, "Failed parsing the S3 bucket and key from the physical path: \"%s\".",
_filename.c_str())).ok()) {
return result; // Abort early
}
bzero (&srcBucketContext, sizeof (srcBucketContext));
srcBucketContext.bucketName = srcBucket.c_str();
srcBucketContext.protocol = s3GetProto(_prop_map);
srcBucketContext.stsDate = s3GetSTSDate(_prop_map);
srcBucketContext.uriStyle = S3UriStylePath;
srcBucketContext.accessKeyId = _key_id.c_str();
srcBucketContext.secretAccessKey = _access_key.c_str();
}
g_mpuNext = 0;
g_mpuLast = totalSeq;
g_mpuUploadId = manager.upload_id;
......@@ -1347,6 +1396,11 @@ extern "C" {
memset(&partData, 0, sizeof(partData));
partData.manager = &manager;
partData.seq = seq;
partData.mode = _mode;
if (_mode == S3_COPYOBJECT) {
partData.pSrcCtx = &srcBucketContext;
partData.srcKey = srcKey.c_str();
}
partData.put_object_data = data;
partContentLength = (data.contentLength > chunksize)?chunksize:data.contentLength;
partData.put_object_data.contentLength = partContentLength;
......@@ -1378,7 +1432,7 @@ extern "C" {
unsigned long long usEnd = usNow();
double bw = (_fileSize / (1024.0*1024.0)) / ( (usEnd - usStart) / 1000000.0 );
rodsLog( LOG_STATUS, "MultipartBW=%lf", bw);
rodsLog( LOG_DEBUG, "MultipartBW=%lf", bw);
manager.remaining = 0;
manager.offset = 0;
......@@ -1444,15 +1498,19 @@ extern "C" {
}
}
close(cache_fd);
if (_mode != S3_COPYOBJECT) close(cache_fd);
}
}
}
return result;
}
// Define interface so we can use object stat to get the size we're copying
irods::error s3FileStatPlugin( irods::resource_plugin_context& _ctx, struct stat* _statbuf );
/// @brief Function to copy the specified src file to the specified dest file
irods::error s3CopyFile(
irods::resource_plugin_context& _src_ctx,
const std::string& _src_file,
const std::string& _dest_file,
const std::string& _key_id,
......@@ -1467,53 +1525,63 @@ extern "C" {
std::string dest_bucket;
std::string dest_key;
// Parse the src file
ret = parseS3Path(_src_file, src_bucket, src_key);
if((result = ASSERT_PASS(ret, "Failed to parse the source file name: \"%s\".",
_src_file.c_str())).ok()) {
// Parse the dest file
ret = parseS3Path(_dest_file, dest_bucket, dest_key);
if((result = ASSERT_PASS(ret, "Failed to parse the destination file name: \"%s\".",
_dest_file.c_str())).ok()) {
callback_data_t data;
S3BucketContext bucketContext;
int64_t lastModified;
char eTag[256];
bzero (&bucketContext, sizeof (bucketContext));
bucketContext.bucketName = src_bucket.c_str();
bucketContext.protocol = _proto;
bucketContext.stsDate = _stsDate;
bucketContext.uriStyle = S3UriStylePath;
bucketContext.accessKeyId = _key_id.c_str();
bucketContext.secretAccessKey = _access_key.c_str();
S3ResponseHandler responseHandler = {
&responsePropertiesCallback,
&responseCompleteCallback
};
size_t retry_cnt = 0;
do {
bzero (&data, sizeof (data));
bucketContext.hostName = s3GetHostname();
data.pCtx = &bucketContext;
S3_copy_object(&bucketContext, src_key.c_str(), dest_bucket.c_str(), dest_key.c_str(), NULL, &lastModified, sizeof(eTag), eTag, 0,
&responseHandler, &data);
if (data.status != S3StatusOK) s3_sleep( g_retry_wait, 0 );
} while ( (data.status != S3StatusOK) && S3_status_is_retryable(data.status) && (++retry_cnt < g_retry_count) );
if (data.status != S3StatusOK) {
std::stringstream msg;
msg << __FUNCTION__;
msg << " - Error copying the S3 object: \"" << _src_file << "\" to S3 object: \"" << _dest_file << "\"";
if (data.status >= 0) {
msg << " - \"" << S3_get_status_name((S3Status)data.status) << "\"";
}
result = ERROR(S3_FILE_COPY_ERR, msg.str());
}
// Check the size, and if too large punt to the multipart copy/put routine
struct stat statbuf;
ret = s3FileStatPlugin( _src_ctx, &statbuf );
if (( result = ASSERT_PASS(ret, "Unable to get original object size for source file name: \"%s\".",
_src_file.c_str())).ok()) {
if ( statbuf.st_size > s3GetMPUChunksize(_src_ctx.prop_map()) ) {
// Early return for cleaner code...
return s3PutCopyFile( S3_COPYOBJECT, _src_file, _dest_file, statbuf.st_size, _key_id, _access_key, _src_ctx.prop_map() );
}
// Parse the src file
ret = parseS3Path(_src_file, src_bucket, src_key);
if((result = ASSERT_PASS(ret, "Failed to parse the source file name: \"%s\".",
_src_file.c_str())).ok()) {
// Parse the dest file
ret = parseS3Path(_dest_file, dest_bucket, dest_key);
if((result = ASSERT_PASS(ret, "Failed to parse the destination file name: \"%s\".",
_dest_file.c_str())).ok()) {
callback_data_t data;
S3BucketContext bucketContext;
int64_t lastModified;
char eTag[256];
bzero (&bucketContext, sizeof (bucketContext));
bucketContext.bucketName = src_bucket.c_str();
bucketContext.protocol = _proto;
bucketContext.stsDate = _stsDate;
bucketContext.uriStyle = S3UriStylePath;
bucketContext.accessKeyId = _key_id.c_str();
bucketContext.secretAccessKey = _access_key.c_str();
S3ResponseHandler responseHandler = {
&responsePropertiesCallback,
&responseCompleteCallback
};
size_t retry_cnt = 0;
do {
bzero (&data, sizeof (data));
bucketContext.hostName = s3GetHostname();
data.pCtx = &bucketContext;
S3_copy_object(&bucketContext, src_key.c_str(), dest_bucket.c_str(), dest_key.c_str(), NULL, &lastModified, sizeof(eTag), eTag, 0,
&responseHandler, &data);
if (data.status != S3StatusOK) s3_sleep( g_retry_wait, 0 );
} while ( (data.status != S3StatusOK) && S3_status_is_retryable(data.status) && (++retry_cnt < g_retry_count) );
if (data.status != S3StatusOK) {
std::stringstream msg;
msg << __FUNCTION__;
msg << " - Error copying the S3 object: \"" << _src_file << "\" to S3 object: \"" << _dest_file << "\"";
if (data.status >= 0) {
msg << " - \"" << S3_get_status_name((S3Status)data.status) << "\"";
}
result = ERROR(S3_FILE_COPY_ERR, msg.str());
}
}
}
}
return result;
......@@ -1940,8 +2008,8 @@ extern "C" {
if((result = ASSERT_PASS(ret, "Failed to get S3 credential properties.")).ok()) {
// copy the file to the new location
ret = s3CopyFile(object->physical_path(), _new_file_name, key_id, access_key,
s3GetProto(_ctx.prop_map()),s3GetSTSDate(_ctx.prop_map()));
ret = s3CopyFile(_ctx, object->physical_path(), _new_file_name, key_id, access_key,
s3GetProto(_ctx.prop_map()), s3GetSTSDate(_ctx.prop_map()));
if((result = ASSERT_PASS(ret, "Failed to copy file from: \"%s\" to \"%s\".",
object->physical_path().c_str(), _new_file_name)).ok()) {
// delete the old file
......@@ -2091,7 +2159,7 @@ extern "C" {
object->physical_path(s3_key_name.str());
}
ret = s3PutFile(_cache_file_name, object->physical_path(), statbuf.st_size, key_id, access_key, _ctx.prop_map());
ret = s3PutCopyFile(S3_PUTFILE, _cache_file_name, object->physical_path(), statbuf.st_size, key_id, access_key, _ctx.prop_map());
result = ASSERT_PASS(ret, "Failed to copy the cache file: \"%s\" to the S3 object: \"%s\".",
_cache_file_name, object->physical_path().c_str());
......
......@@ -54,6 +54,9 @@ typedef struct upload_manager
typedef struct multipart_data
{
int seq; /* Sequence number, i.e. which part */
int mode; /* PUT or COPY */
S3BucketContext *pSrcCtx; /* Source bucket context, ignored in a PUT */
const char *srcKey; /* Source key, ignored in a PUT */
callback_data put_object_data; /* File being uploaded */
upload_manager_t *manager; /* To update w/the MD5 returned */
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment