Skip to content

Commit

Permalink
Reuse storage clients and destroy them when no longer needed (fixes s…
Browse files Browse the repository at this point in the history
  • Loading branch information
GabrielLomba committed Dec 15, 2023
1 parent 49d0e7b commit f1b38a9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 33 deletions.
6 changes: 3 additions & 3 deletions lib/file_transfer_agent/azure_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function azure_util(azure, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);

const containerClient = client.getContainerClient(azureLocation.containerName);
Expand Down Expand Up @@ -189,7 +189,7 @@ function azure_util(azure, filestream) {
}

const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['dstFileName'];

Expand Down Expand Up @@ -233,7 +233,7 @@ function azure_util(azure, filestream) {
*/
this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['srcFileName'];

Expand Down
34 changes: 25 additions & 9 deletions lib/file_transfer_agent/file_transfer_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ function file_transfer_agent(context) {
meta['dstFileName'] = meta['srcFileName'];

var storageClient = getStorageClient(meta['stageLocationType']);
await storageClient.uploadOneFileStream(meta);
try {
await storageClient.uploadOneFileStream(meta);
} finally {
if (client.destroy) {
client.destroy();
}
}
} else {
parseCommand();
initFileMetadata();
Expand Down Expand Up @@ -312,6 +318,9 @@ function file_transfer_agent(context) {
if (largeFileMetas.length > 0) {
await uploadFilesinSequential(largeFileMetas);
}
if (client.destroy) {
client.destroy();
}
}

/**
Expand Down Expand Up @@ -426,6 +435,9 @@ function file_transfer_agent(context) {
if (largeFileMetas.length > 0) {
await downloadFilesinSequential(largeFileMetas);
}
if (client.destroy) {
client.destroy();
}
}

/**
Expand Down Expand Up @@ -503,14 +515,18 @@ function file_transfer_agent(context) {
var client = SnowflakeRemoteStorageUtil.createClient(stageInfo, false);
var s3location = SnowflakeS3Util.extractBucketNameAndPath(stageInfo['location']);

await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;
}
});
try {
await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;
}
});
} finally {
client.destroy();
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions lib/file_transfer_agent/s3_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ function s3_util(s3, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const s3location = this.extractBucketNameAndPath(stageInfo['location']);

const params = {
Expand Down Expand Up @@ -194,8 +194,7 @@ function s3_util(s3, filestream) {
s3Metadata[AMZ_MATDESC] = encryptionMetadata.matDesc;
}

const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand All @@ -221,6 +220,8 @@ function s3_util(s3, filestream) {
}
}
return;
} finally {
client.destroy();
}

meta['dstFileSize'] = meta['uploadSize'];
Expand All @@ -235,8 +236,7 @@ function s3_util(s3, filestream) {
* @param {Object} encryptionMetadata
*/
this.nativeDownloadFile = async function (meta, fullDstPath) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down Expand Up @@ -271,6 +271,8 @@ function s3_util(s3, filestream) {
}
}
return;
} finally {
client.destroy();
}
meta['resultStatus'] = resultStatus.DOWNLOADED;
};
Expand Down
28 changes: 20 additions & 8 deletions test/unit/file_transfer_agent/azure_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ describe('Azure client', function () {
let Azure = null;
let client = null;
let filestream = null;
let meta = null;
const dataFile = mockDataFile;
const meta = {
stageInfo: {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
},
SHA256_DIGEST: mockDigest,
};
const encryptionMetadata = {
key: mockKey,
iv: mockIv,
Expand Down Expand Up @@ -108,6 +101,18 @@ describe('Azure client', function () {
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
});
beforeEach(function () {
const stageInfo = {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
};
meta = {
stageInfo,
SHA256_DIGEST: mockDigest,
client: Azure.createClient(stageInfo),
};
});

it('extract bucket name and path', async function () {
verifyNameAndPath('sfc-eng-regression/test_sub_dir/', 'sfc-eng-regression', 'test_sub_dir/');
Expand All @@ -132,6 +137,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -147,6 +153,7 @@ describe('Azure client', function () {

client = require('client');
const Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE);
Expand All @@ -162,6 +169,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -177,6 +185,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.ERROR);
Expand All @@ -193,6 +202,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.UPLOADED);
Expand All @@ -213,6 +223,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -233,6 +244,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY);
Expand Down
39 changes: 31 additions & 8 deletions test/unit/file_transfer_agent/s3_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ describe('S3 client', function () {
let AWS;
let s3;
let filesystem;
let meta;
const dataFile = mockDataFile;
const meta = {
stageInfo: {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
},
SHA256_DIGEST: mockDigest,
};
const encryptionMetadata = {
key: mockKey,
iv: mockIv,
Expand Down Expand Up @@ -59,6 +52,7 @@ describe('S3 client', function () {

return new putObject;
};
this.destroy = function () {};
}

return new S3;
Expand All @@ -74,6 +68,21 @@ describe('S3 client', function () {

AWS = new SnowflakeS3Util(s3, filesystem);
});
beforeEach(function () {
const stageInfo = {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
};
meta = {
stageInfo,
SHA256_DIGEST: mockDigest,
client: AWS.createClient(stageInfo),
};
});
this.afterEach(function () {
meta.client.destroy();
});

it('extract bucket name and path', async function () {
var result = AWS.extractBucketNameAndPath('sfc-eng-regression/test_sub_dir/');
Expand Down Expand Up @@ -117,13 +126,15 @@ describe('S3 client', function () {

return new getObject;
};
this.destroy = function () {};
}

return new S3;
}
});
s3 = require('s3');
const AWS = new SnowflakeS3Util(s3);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -144,13 +155,15 @@ describe('S3 client', function () {

return new getObject;
};
this.destroy = function () {};
}

return new S3;
}
});
s3 = require('s3');
const AWS = new SnowflakeS3Util(s3);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE);
Expand All @@ -171,13 +184,15 @@ describe('S3 client', function () {

return new getObject;
};
this.destroy = function () {};
}

return new S3;
}
});
s3 = require('s3');
const AWS = new SnowflakeS3Util(s3);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -198,13 +213,15 @@ describe('S3 client', function () {

return new getObject;
};
this.destroy = function () {};
}

return new S3;
}
});
s3 = require('s3');
const AWS = new SnowflakeS3Util(s3);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.ERROR);
Expand All @@ -230,6 +247,7 @@ describe('S3 client', function () {

return new putObject;
};
this.destroy = function () {};
}

return new S3;
Expand All @@ -243,6 +261,7 @@ describe('S3 client', function () {
s3 = require('s3');
filesystem = require('filesystem');
const AWS = new SnowflakeS3Util(s3, filesystem);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -263,6 +282,7 @@ describe('S3 client', function () {

return new putObject;
};
this.destroy = function () {};
}

return new S3;
Expand All @@ -276,6 +296,7 @@ describe('S3 client', function () {
s3 = require('s3');
filesystem = require('filesystem');
const AWS = new SnowflakeS3Util(s3, filesystem);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY);
Expand All @@ -296,6 +317,7 @@ describe('S3 client', function () {

return new putObject;
};
this.destroy = function () {};
}

return new S3;
Expand All @@ -309,6 +331,7 @@ describe('S3 client', function () {
s3 = require('s3');
filesystem = require('filesystem');
const AWS = new SnowflakeS3Util(s3, filesystem);
meta['client'] = AWS.createClient(meta['stageInfo']);

await AWS.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY);
Expand Down

0 comments on commit f1b38a9

Please sign in to comment.