Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ ErrorOr<SharedPtr<Aws::S3::S3Client>> IBaseInstance::getClient(
Aws::Client::ClientConfiguration clientCfg;
clientCfg.connectTimeoutMs = 30'000;
clientCfg.requestTimeoutMs = 120'000;
// The client is shared by the scanner threads, so the connection pool
// must be large enough to avoid serializing them. Keep this in sync with
// the thread-count cap in store/core/scan.cpp (m_threadCount <= 64).
clientCfg.maxConnections = 64;

if (!config.region)
config.region = string::lowerCase(Aws::Region::US_EAST_1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,7 @@ class IBaseEndpoint : public IServiceEndpoint {
virtual Error scanObjects(Path &path,
const ScanAddObject &callback) noexcept override;
virtual Error processEntry(const S3Object &s3Object, Entry &object,
const ScanAddObject &addObject,
const SharedPtr<Aws::S3::S3Client> &client,
const Text &bucket) noexcept;
const ScanAddObject &addObject) noexcept;
virtual Error processBuckets(const SharedPtr<Aws::S3::S3Client> &client,
const ScanAddObject &callback) noexcept;

Expand All @@ -266,9 +264,17 @@ class IBaseEndpoint : public IServiceEndpoint {
Text m_type;

protected:
Text getContentType(const Text &key,
const SharedPtr<Aws::S3::S3Client> &client,
const Text &bucket) noexcept;
ErrorOr<SharedPtr<Aws::S3::S3Client>> getScanClient() noexcept;
void resetScanClient() noexcept;

private:
//-----------------------------------------------------------------
/// @details
/// Cached client shared by the scanner threads, reset on error
/// so the next scan re-connects
//-----------------------------------------------------------------
SharedPtr<Aws::S3::S3Client> m_scanClient;
mutable async::MutexLock m_scanClientLock;
};

//-------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,28 @@
namespace engine::store::filter::baseObjectStore {
//---------------------------------------------------------------------
/// @details
/// get Content-Type
/// @param[in] key
/// S3 object's key (name)
/// Get the cached scan client, creating it on first use. The
/// client is shared by all scanner threads; the AWS client is
/// thread safe.
/// @returns
/// Text
/// Error or shared_ptr to the client
//---------------------------------------------------------------------
Text IBaseEndpoint::getContentType(const Text &key,
const SharedPtr<Aws::S3::S3Client> &client,
const Text &bucket) noexcept {
// Define the request to read the file
const auto objectsReq =
Aws::S3::Model::HeadObjectRequest().WithBucket(bucket).WithKey(key);

// get the object from the bucket
auto objectsResp = client->HeadObject(objectsReq);
if (objectsResp.IsSuccess())
return objectsResp.GetResultWithOwnership().GetContentType();

return {};
ErrorOr<SharedPtr<Aws::S3::S3Client>> IBaseEndpoint::getScanClient() noexcept {
auto guard = m_scanClientLock.acquire();
if (!m_scanClient) {
auto client = IBaseInstance::getClient(m_storeConfig);
if (!client) return client.ccode();
m_scanClient = _mv(*client);
}
return m_scanClient;
}
//---------------------------------------------------------------------
/// @details
/// Drop the cached scan client so the next scan re-connects
//---------------------------------------------------------------------
void IBaseEndpoint::resetScanClient() noexcept {
auto guard = m_scanClientLock.acquire();
m_scanClient.reset();
}
//---------------------------------------------------------------------
/// @details
Expand All @@ -66,10 +69,7 @@ Text IBaseEndpoint::getContentType(const Text &key,
/// Error
//---------------------------------------------------------------------
Error IBaseEndpoint::processEntry(const S3Object &s3Object, Entry &object,
const ScanAddObject &addObject,
const SharedPtr<Aws::S3::S3Client> &client,
const Text &bucket) noexcept {
Error ccode;
const ScanAddObject &addObject) noexcept {
json::Value metadata;
Text key, param;

Expand Down Expand Up @@ -116,10 +116,6 @@ Error IBaseEndpoint::processEntry(const S3Object &s3Object, Entry &object,
}
}

key = "Content-Type";
param = getContentType(s3Object.GetKey(), client, bucket);
metadata[key] = param;

if (!metadata.empty()) object.metadata(_mv(metadata));

// Add the object
Expand All @@ -134,51 +130,54 @@ Error IBaseEndpoint::processEntry(const S3Object &s3Object, Entry &object,
//-----------------------------------------------------------------
Error IBaseEndpoint::scanObjects(Path &path,
const ScanAddObject &callback) noexcept {
static auto start = time::now();
const auto start = time::now();
Text delimiter = "/";
Text tempPath = path.gen() + delimiter;
Error ccode;

// Get a new aws client
auto client = IBaseInstance::getClient(m_storeConfig);
if (!client) return client.ccode();
// Get the cached aws client shared by the scanner threads
auto clientOr = getScanClient();
if (!clientOr) return clientOr.ccode();
auto client = _mv(*clientOr);

if (tempPath == delimiter) {
if (ccode = processBuckets(client.value(), callback)) return ccode;
if (ccode = processBuckets(client, callback)) return ccode;
return {};
}

Text bucket, prefix;
extractBucketAndKeyFromPath(tempPath, bucket, prefix);

static uint64_t count = 0;
Aws::String lastObjectInChunk;
Aws::Vector<Aws::S3::Model::CommonPrefix> prevPrefixes;
Aws::Vector<Aws::S3::Model::Object> prevObjects;
uint64_t count = 0;
Aws::String continuationToken;

_forever() {
// Define the request to list the segments. Default max keys to retrieve
// is 1000
// Define the request to list one page of objects. Default max keys to
// retrieve is 1000. FetchOwner is needed for the owner info below
auto listObjectsReq = Aws::S3::Model::ListObjectsV2Request()
.WithBucket(bucket)
.WithStartAfter(lastObjectInChunk)
.WithPrefix(prefix)
.WithDelimiter(delimiter);
.WithDelimiter(delimiter)
.WithFetchOwner(true);
if (!continuationToken.empty())
listObjectsReq.SetContinuationToken(continuationToken);

// Just list all segments in the bucket
auto listObjectsResp = client->ListObjectsV2(listObjectsReq);
if (!listObjectsResp.IsSuccess())
return errorFromS3Error(*client, _location,
if (!listObjectsResp.IsSuccess()) {
resetScanClient();
return errorFromS3Error(client, _location,
listObjectsResp.GetError(), bucket);
}

auto objects = listObjectsResp.GetResult().GetContents();
auto prefixes =
listObjectsResp.GetResultWithOwnership().GetCommonPrefixes();
auto result = listObjectsResp.GetResultWithOwnership();
const auto &objects = result.GetContents();
const auto &prefixes = result.GetCommonPrefixes();

LOGT("{} segment{} listed", objects.size(),
objects.size() != 1 ? "s" : "");

if (objects.empty() && prefixes.empty()) {
if (objects.empty() && prefixes.empty() && continuationToken.empty()) {
if (!prefix.empty() && prefix.starts_with(delimiter.c_str())) {
prefix.erase(0, 1);
continue;
Expand All @@ -187,50 +186,32 @@ Error IBaseEndpoint::scanObjects(Path &path,
break;
}

if (!prevPrefixes.empty() && prefixes.size() == prevPrefixes.size() &&
std::equal(prefixes.begin(), prefixes.end(), prevPrefixes.begin(),
[](const Aws::S3::Model::CommonPrefix &lhs,
const Aws::S3::Model::CommonPrefix &rhs) {
return lhs.GetPrefix() == rhs.GetPrefix();
}))
break;

for (const auto &prefix : prefixes) {
for (const auto &commonPrefix : prefixes) {
Entry folderObject;
Path prefixPath{prefix.GetPrefix()};
Path prefixPath{commonPrefix.GetPrefix()};
folderObject.name(prefixPath.back());
folderObject.isContainer(true);
if (ccode = callback(folderObject)) {
MONERR(error, ccode, "Scanning on", prefix.GetPrefix(),
MONERR(error, ccode, "Scanning on", commonPrefix.GetPrefix(),
"failed");
folderObject.completionCode(ccode);
}
lastObjectInChunk = prefix.GetPrefix();
}
prevPrefixes = _mv(prefixes);

if (!prevObjects.empty() && objects.size() == prevObjects.size() &&
std::equal(objects.begin(), objects.end(), prevObjects.begin(),
[](const Aws::S3::Model::Object &lhs,
const Aws::S3::Model::Object &rhs) {
return lhs.GetKey() == rhs.GetKey();
}))
break;

for (const auto &s3Object : objects) {
// if it's a folder -> don't process it
if (s3Object.GetKey().ends_with(delimiter.c_str())) continue;
lastObjectInChunk = s3Object.GetKey();
Entry fileObject;
if (ccode = processEntry(s3Object, fileObject, callback,
client.value(), bucket)) {
if (ccode = processEntry(s3Object, fileObject, callback)) {
MONERR(error, ccode, "Scanning on", s3Object.GetKey(),
"failed");
fileObject.completionCode(ccode);
}
++count;
}
prevObjects = _mv(objects);

if (!result.GetIsTruncated()) break;
continuationToken = result.GetNextContinuationToken();
}

LOGT("Scan elapsed {}, completed {} objects", time::now() - start, count);
Expand Down
Loading