diff --git a/pom.xml b/pom.xml
index ee04442..35fceeb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
org.sherzberg.graylog.plugins
graylog-plugin-s3
- 2.4.1-SNAPSHOT
+ 2.5.0
jar
${project.artifactId}
diff --git a/src/main/java/org/sherzberg/graylog/aws/inputs/s3/S3Subscriber.java b/src/main/java/org/sherzberg/graylog/aws/inputs/s3/S3Subscriber.java
index b1c15f6..df59f8e 100644
--- a/src/main/java/org/sherzberg/graylog/aws/inputs/s3/S3Subscriber.java
+++ b/src/main/java/org/sherzberg/graylog/aws/inputs/s3/S3Subscriber.java
@@ -113,6 +113,7 @@ public void run() {
String message;
while ((message = reader.readLine()) != null) {
+ LOG.debug("S3 Event message : {}", message);
S3Record s3Record = new S3Record();
s3Record.s3Bucket = n.getS3Bucket();
s3Record.s3ObjectKey = n.getS3ObjectKey();
diff --git a/src/main/java/org/sherzberg/graylog/aws/inputs/s3/notifications/S3SNSNotification.java b/src/main/java/org/sherzberg/graylog/aws/inputs/s3/notifications/S3SNSNotification.java
index b17116d..fc57000 100644
--- a/src/main/java/org/sherzberg/graylog/aws/inputs/s3/notifications/S3SNSNotification.java
+++ b/src/main/java/org/sherzberg/graylog/aws/inputs/s3/notifications/S3SNSNotification.java
@@ -23,4 +23,12 @@ public String getS3ObjectKey() {
return s3ObjectKey;
}
+ @Override
+ public String toString() {
+ return "S3SNSNotification{" +
+ "receiptHandle='" + receiptHandle + '\'' +
+ ", s3Bucket='" + s3Bucket + '\'' +
+ ", s3ObjectKey='" + s3ObjectKey + '\'' +
+ '}';
+ }
}
diff --git a/src/main/java/org/sherzberg/graylog/aws/inputs/s3/notifications/S3SNSNotificationParser.java b/src/main/java/org/sherzberg/graylog/aws/inputs/s3/notifications/S3SNSNotificationParser.java
index 64ec409..f819b0c 100644
--- a/src/main/java/org/sherzberg/graylog/aws/inputs/s3/notifications/S3SNSNotificationParser.java
+++ b/src/main/java/org/sherzberg/graylog/aws/inputs/s3/notifications/S3SNSNotificationParser.java
@@ -22,24 +22,33 @@ public S3SNSNotificationParser() {
public List parse(Message message) {
List notifications = Lists.newArrayList();
+ String messageBody = message.getBody();
try {
- SQSMessage envelope = om.readValue(message.getBody(), SQSMessage.class);
+ SQSMessage envelope = om.readValue(messageBody, SQSMessage.class);
if (envelope.message == null) {
return Collections.emptyList();
}
S3EventNotification s3EventNotification = S3EventNotification.parseJson(envelope.message);
+ List records = s3EventNotification.getRecords();
- notifications.addAll(s3EventNotification.getRecords().stream().map(record -> new S3SNSNotification(
+ if (records == null) {
+ if (!envelope.message.contains("s3:TestEvent")){
+ LOG.warn("Consumed SNS notification does not have any record: {}", envelope.message);
+ }
+ return Collections.emptyList();
+ }
+
+ notifications.addAll(records.stream().map(record -> new S3SNSNotification(
message.getReceiptHandle(),
record.getS3().getBucket().getName(),
record.getS3().getObject().getUrlDecodedKey()
)).collect(Collectors.toList()));
} catch (Exception e) {
- LOG.error("Could not parse SNS notification: " + message.getBody(), e);
- throw new RuntimeException("Could not parse SNS notification: " + message.getBody(), e);
+ LOG.error("Could not parse SNS notification: " + messageBody, e);
+ throw new RuntimeException("Could not parse SNS notification: " + messageBody, e);
}
return notifications;