Skip to content

process s3 event messages from SQS #215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 116 additions & 80 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ function handler(event, context) {
/*
* process what happened if the iterative request to
* write to the open pending batch timed out
*
*
* TODO Can we force a rotation of the current batch
* at this point?
*/
Expand Down Expand Up @@ -1490,102 +1490,138 @@ function handler(event, context) {
});
}

/* end of runtime functions */

try {
logger.debug(JSON.stringify(event));
exports.processS3EventRecord = function (event) {
logger.info("processS3EventRecord " + JSON.stringify(event));

if (!event.Records) {
// filter out unsupported events
logger.error("Event type unsupported by Lambda Redshift Loader");
logger.info(JSON.stringify(event));
context.done(null, null);
logger.error("The provided s3 event was not wellformed or was generated as a test event, this will be ignored.");
} else if (event.Records.length > 1) {
context.done(error, "Unable to process multi-record events");
} else {
if (event.Records.length > 1) {
context.done(error, "Unable to process multi-record events");
var r = event.Records[0];

// ensure that we can process this event based on a variety
// of criteria
var noProcessReason;
if (r.eventSource !== "aws:s3") {
noProcessReason = "Invalid Event Source " + r.eventSource;
}
if (!(r.eventName === "ObjectCreated:Copy" || r.eventName === "ObjectCreated:Put" || r.eventName === 'ObjectCreated:CompleteMultipartUpload')) {
noProcessReason = "Invalid Event Name " + r.eventName;
}
if (r.s3.s3SchemaVersion !== "1.0") {
noProcessReason = "Unknown S3 Schema Version " + r.s3.s3SchemaVersion;
}

if (noProcessReason) {
logger.error(noProcessReason);
context.done(error, noProcessReason);
} else {
var r = event.Records[0];
// extract the s3 details from the event
var inputInfo = {
bucket: undefined,
key: undefined,
prefix: undefined,
inputFilename: undefined
};

// ensure that we can process this event based on a variety
// of criteria
var noProcessReason;
if (r.eventSource !== "aws:s3") {
noProcessReason = "Invalid Event Source " + r.eventSource;
}
if (!(r.eventName === "ObjectCreated:Copy" || r.eventName === "ObjectCreated:Put" || r.eventName === 'ObjectCreated:CompleteMultipartUpload')) {
noProcessReason = "Invalid Event Name " + r.eventName;
}
if (r.s3.s3SchemaVersion !== "1.0") {
noProcessReason = "Unknown S3 Schema Version " + r.s3.s3SchemaVersion;
}
inputInfo.bucket = r.s3.bucket.name;
inputInfo.key = decodeURIComponent(r.s3.object.key);

if (noProcessReason) {
logger.error(noProcessReason);
context.done(error, noProcessReason);
} else {
// extract the s3 details from the event
var inputInfo = {
bucket: undefined,
key: undefined,
prefix: undefined,
inputFilename: undefined
};
// remove the bucket name from the key, if we have
// received it - this happens on object copy
inputInfo.key = inputInfo.key.replace(inputInfo.bucket + "/", "");

inputInfo.bucket = r.s3.bucket.name;
inputInfo.key = decodeURIComponent(r.s3.object.key);
var keyComponents = inputInfo.key.split('/');
inputInfo.inputFilename = keyComponents[keyComponents.length - 1];

// remove the bucket name from the key, if we have
// received it - this happens on object copy
inputInfo.key = inputInfo.key.replace(inputInfo.bucket + "/", "");
// remove the filename from the prefix value
var searchKey = inputInfo.key.replace(inputInfo.inputFilename, '').replace(/\/$/, '');

var keyComponents = inputInfo.key.split('/');
inputInfo.inputFilename = keyComponents[keyComponents.length - 1];
// transform hive style dynamic prefixes into static
// match prefixes and set the prefix in inputInfo
inputInfo.prefix = inputInfo.bucket + '/' + searchKey.transformHiveStylePrefix();

// remove the filename from the prefix value
var searchKey = inputInfo.key.replace(inputInfo.inputFilename, '').replace(/\/$/, '');
// add the object size to inputInfo
inputInfo.size = r.s3.object.size;

// transform hive style dynamic prefixes into static
// match prefixes and set the prefix in inputInfo
inputInfo.prefix = inputInfo.bucket + '/' + searchKey.transformHiveStylePrefix();
resolveConfig(inputInfo.prefix, function (err, configData) {
/*
* we did get a configuration found by the resolveConfig
* method
*/
if (err) {
logger.error(JSON.stringify(err));
context.done(err, JSON.stringify(err));
} else {
// update the inputInfo prefix to match the
// resolved
// config entry
inputInfo.prefix = configData.Item.s3Prefix.S;

// add the object size to inputInfo
inputInfo.size = r.s3.object.size;
logger.debug(JSON.stringify(inputInfo));

resolveConfig(inputInfo.prefix, function (err, configData) {
/*
* we did get a configuration found by the resolveConfig
* method
*/
if (err) {
logger.error(JSON.stringify(err));
context.done(err, JSON.stringify(err));
} else {
// update the inputInfo prefix to match the
// resolved
// config entry
inputInfo.prefix = configData.Item.s3Prefix.S;
// call the foundConfig method with the data
// item
foundConfig(inputInfo, null, configData);
}
}, function (err) {
// finish with no exception - where this file sits
// in the S3 structure is not configured for redshift
// loads, or there was an access issue that prevented us
// querying DDB
logger.error("No Configuration Found for " + inputInfo.prefix);
if (err) {
logger.error(err);
}

logger.debug(JSON.stringify(inputInfo));
context.done(err, JSON.stringify(err));
});
}
}
}

// call the foundConfig method with the data
// item
foundConfig(inputInfo, null, configData);
}
}, function (err) {
// finish with no exception - where this file sits
// in the S3 structure is not configured for redshift
// loads, or there was an access issue that prevented us
// querying DDB
logger.error("No Configuration Found for " + inputInfo.prefix);
if (err) {
logger.error(err);
}
/* end of runtime functions */

context.done(err, JSON.stringify(err));
});
try {
logger.debug(JSON.stringify(event));


if(!event.Records || event.Records.length == 0) {
// filter out unsupported events
logger.error("Event type unsupported by Lambda Redshift Loader");
logger.info(JSON.stringify(event));
return;
}

//obtain the first record in order to establish the eventSource
var record = event.Records[0];

var noProcessReason;

if (record.eventSource == "aws:s3") { //process the s3 event
logger.info("Processing message from S3 event source");

exports.processS3EventRecord(event);
} else if(record.eventSource == "aws:sqs") { //process the sqs message
logger.info("Processing " + event.Records.length + " message(s) from SQS event source.");

//process the s3 event contained in the body of each sqs message.
event.Records.forEach(function(record) {
if(!record.body) {
noProcessReason = "Unable to process message body for event received via sqs event source, body was not present.";
logger.error(noProcessReason);
context.done(error, noProcessReason);
}

}
var messageBody = JSON.parse(record.body);

exports.processS3EventRecord(messageBody);
});
} else {
noProcessReason = "Invalid Event Source " + record.eventSource;
logger.error(noProcessReason);
context.done(error, noProcessReason);
}
} catch (e) {
logger.error("Unhandled Exception");
Expand All @@ -1595,4 +1631,4 @@ function handler(event, context) {
}
}

exports.handler = handler;
exports.handler = handler;