Skip to content

Commit

Permalink
Reuse storage clients + destroy when no longer needed (fixes #734) (#735
Browse files Browse the repository at this point in the history
)

Co-authored-by: Dawid Heyman <[email protected]>
  • Loading branch information
GabrielLomba and sfc-gh-dheyman authored Dec 21, 2023
1 parent 65f8670 commit b092fb8
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 45 deletions.
6 changes: 3 additions & 3 deletions lib/file_transfer_agent/azure_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function azure_util(azure, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);

const containerClient = client.getContainerClient(azureLocation.containerName);
Expand Down Expand Up @@ -189,7 +189,7 @@ function azure_util(azure, filestream) {
}

const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['dstFileName'];

Expand Down Expand Up @@ -233,7 +233,7 @@ function azure_util(azure, filestream) {
*/
this.nativeDownloadFile = async function (meta, fullDstPath, maxConcurrency) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const azureLocation = this.extractContainerNameAndPath(stageInfo['location']);
const blobName = azureLocation.path + meta['srcFileName'];

Expand Down
58 changes: 37 additions & 21 deletions lib/file_transfer_agent/file_transfer_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ function file_transfer_agent(context) {
meta['dstFileName'] = meta['srcFileName'];

var storageClient = getStorageClient(meta['stageLocationType']);
await storageClient.uploadOneFileStream(meta);
try {
await storageClient.uploadOneFileStream(meta);
} finally {
storageClient.destroyClient(stageInfo, client);
}
} else {
parseCommand();
initFileMetadata();
Expand Down Expand Up @@ -305,12 +309,16 @@ function file_transfer_agent(context) {
meta['client'] = client;
}

if (smallFileMetas.length > 0) {
//await uploadFilesinParallel(smallFileMetas);
await uploadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await uploadFilesinSequential(largeFileMetas);
try {
if (smallFileMetas.length > 0) {
//await uploadFilesinParallel(smallFileMetas);
await uploadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await uploadFilesinSequential(largeFileMetas);
}
} finally {
storageClient.destroyClient(stageInfo, client);
}
}

Expand Down Expand Up @@ -419,12 +427,16 @@ function file_transfer_agent(context) {
meta['client'] = client;
}

if (smallFileMetas.length > 0) {
//await downloadFilesinParallel(smallFileMetas);
await downloadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await downloadFilesinSequential(largeFileMetas);
try {
if (smallFileMetas.length > 0) {
//await downloadFilesinParallel(smallFileMetas);
await downloadFilesinSequential(smallFileMetas);
}
if (largeFileMetas.length > 0) {
await downloadFilesinSequential(largeFileMetas);
}
} finally {
storageClient.destroyClient(stageInfo, client);
}
}

Expand Down Expand Up @@ -503,14 +515,18 @@ function file_transfer_agent(context) {
var client = SnowflakeRemoteStorageUtil.createClient(stageInfo, false);
var s3location = SnowflakeS3Util.extractBucketNameAndPath(stageInfo['location']);

await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;
}
});
try {
await client.getBucketAccelerateConfiguration({ Bucket: s3location.bucketName })
.then(function (data) {
useAccelerateEndpoint = data['Status'] === 'Enabled';
}).catch(function (err) {
if (err['code'] === 'AccessDenied') {
return;
}
});
} finally {
SnowflakeRemoteStorageUtil.destroyClient(stageInfo, client);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions lib/file_transfer_agent/local_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ function local_util() {
return null;
};

this.destroyClient = function (stageInfo, client) {
};

/**
* Write file to upload.
*
Expand Down
13 changes: 13 additions & 0 deletions lib/file_transfer_agent/remote_storage_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ function remote_storage_util() {
return utilClass.createClient(stageInfo, useAccelerateEndpoint);
};

/**
* Destroys a client based on the location type.
*
* @param {Object} stageInfo
* @param {Object} client
*/
this.destroyClient = function (stageInfo, client) {
var utilClass = this.getForStorageType(stageInfo['locationType']);
if (utilClass.destroyClient) {
utilClass.destroyClient(client);
}
};

/**
* Encrypt then upload one file stream.
*
Expand Down
17 changes: 12 additions & 5 deletions lib/file_transfer_agent/s3_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ function s3_util(s3, filestream) {
return new AWS.S3(config);
};

/**
* Destroys an AWS S3 client.
*
* @param {AWS.S3} client
*/
this.destroyClient = function (client) {
client.destroy();
};

/**
* Extract the bucket name and path from the metadata's stage location.
*
Expand Down Expand Up @@ -114,7 +123,7 @@ function s3_util(s3, filestream) {
*/
this.getFileHeader = async function (meta, filename) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];
const s3location = this.extractBucketNameAndPath(stageInfo['location']);

const params = {
Expand Down Expand Up @@ -194,8 +203,7 @@ function s3_util(s3, filestream) {
s3Metadata[AMZ_MATDESC] = encryptionMetadata.matDesc;
}

const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down Expand Up @@ -235,8 +243,7 @@ function s3_util(s3, filestream) {
* @param {Object} encryptionMetadata
*/
this.nativeDownloadFile = async function (meta, fullDstPath) {
const stageInfo = meta['stageInfo'];
const client = this.createClient(stageInfo);
const client = meta['client'];

const s3location = this.extractBucketNameAndPath(meta['stageInfo']['location']);

Expand Down
28 changes: 20 additions & 8 deletions test/unit/file_transfer_agent/azure_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ describe('Azure client', function () {
let Azure = null;
let client = null;
let filestream = null;
let meta = null;
const dataFile = mockDataFile;
const meta = {
stageInfo: {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
},
SHA256_DIGEST: mockDigest,
};
const encryptionMetadata = {
key: mockKey,
iv: mockIv,
Expand Down Expand Up @@ -108,6 +101,18 @@ describe('Azure client', function () {
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
});
beforeEach(function () {
const stageInfo = {
location: mockLocation,
path: mockTable + '/' + mockPath + '/',
creds: {}
};
meta = {
stageInfo,
SHA256_DIGEST: mockDigest,
client: Azure.createClient(stageInfo),
};
});

it('extract bucket name and path', async function () {
verifyNameAndPath('sfc-eng-regression/test_sub_dir/', 'sfc-eng-regression', 'test_sub_dir/');
Expand All @@ -132,6 +137,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -147,6 +153,7 @@ describe('Azure client', function () {

client = require('client');
const Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.NOT_FOUND_FILE);
Expand All @@ -162,6 +169,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -177,6 +185,7 @@ describe('Azure client', function () {

client = require('client');
Azure = new SnowflakeAzureUtil(client);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.getFileHeader(meta, dataFile);
assert.strictEqual(meta['resultStatus'], resultStatus.ERROR);
Expand All @@ -193,6 +202,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.UPLOADED);
Expand All @@ -213,6 +223,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.RENEW_TOKEN);
Expand All @@ -233,6 +244,7 @@ describe('Azure client', function () {
client = require('client');
filestream = require('filestream');
Azure = new SnowflakeAzureUtil(client, filestream);
meta['client'] = Azure.createClient(meta['stageInfo']);

await Azure.uploadFile(dataFile, meta, encryptionMetadata);
assert.strictEqual(meta['resultStatus'], resultStatus.NEED_RETRY);
Expand Down
Loading

0 comments on commit b092fb8

Please sign in to comment.