diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/Builder.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/Builder.java index cc4ee5a..a259c3a 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/Builder.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/Builder.java @@ -2,12 +2,15 @@ import com.fluxtion.compiler.Fluxtion; import com.fluxtion.compiler.builder.dataflow.DataFlow; -import com.fluxtion.compiler.builder.dataflow.FlowBuilder; +import com.fluxtion.runtime.audit.EventLogControlEvent; + +import java.io.IOException; public class Builder { - public static void main(String[] args) { - var dataIngestor =Fluxtion.interpret(c ->{ + public static void main(String[] args) throws IOException { + + var dataIngest = Fluxtion.interpret(c -> { var csvRecordValidator = new CsvRecordValidator(); var x_Former = new RecordTransformer(); //outputs @@ -20,32 +23,42 @@ public static void main(String[] args) { var recordFlow = DataFlow.subscribe(String.class); recordFlow .map(csvRecordValidator::marshall) + .peek(r -> { + if (!csvRecordValidator.isValidRecord()) { + System.out.println("PUSH bad csv record"); + invalidLog.badCsvInput(csvRecordValidator); + stats.badCsvInput(csvRecordValidator); + } + }) + .map(CsvRecordValidator::getHouseInputRecord) .map(x_Former::transform) + .peek(r -> { + if (!x_Former.isValidRecord()) { + System.out.println("PUSH bad x-former record"); + invalidLog.badHousingRecord(x_Former); + stats.badHousingRecord(x_Former); + } + }) + .map(RecordTransformer::getRecord) .push(stats::validHousingRecord) .push(csvWriter::validHousingRecord) .push(binaryWriter::validHousingRecord); - //invalid flow - csv - recordFlow - .filter(csvRecordValidator::isInValidRecord) - .peek(i -> { - invalidLog.badCsvInput(csvRecordValidator); - stats.badCsvInput(csvRecordValidator); - }); + DataFlow.subscribeToNode(csvRecordValidator); - //invalid flow - x-former - recordFlow - .filter(x_Former::isInValidRecord) - .peek(i -> { - invalidLog.badCsvInput(csvRecordValidator); - stats.badCsvInput(csvRecordValidator); - }); + c.addEventAudit(EventLogControlEvent.LogLevel.INFO); }); - // - dataIngestor.init(); - dataIngestor.onEvent(""); - dataIngestor.onEvent(""); - dataIngestor.onEvent("MyString"); + //Send some data + dataIngest.init(); + dataIngest.setAuditLogLevel(EventLogControlEvent.LogLevel.DEBUG); + + dataIngest.onEvent(""); + System.out.println(); + + dataIngest.onEvent("good"); + System.out.println(); + + dataIngest.onEvent("BAD"); } } diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/CsvRecordValidator.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/CsvRecordValidator.java index a1fba05..2863076 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/CsvRecordValidator.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/CsvRecordValidator.java @@ -4,16 +4,22 @@ public class CsvRecordValidator { -// @Getter - private boolean inValidRecord = false; + @Getter + private boolean validRecord = false; + @Getter + private HouseInputRecord houseInputRecord; - public HouseInputRecord marshall(String inputData) { - inValidRecord = inputData == null || inputData.isBlank(); - System.out.println("CsvRecordValidator::marshall inputData: " + inputData + ", inValidRecord: " + inValidRecord); - return inValidRecord ? null: new HouseInputRecord(); + public CsvRecordValidator marshall(String inputData) { + validRecord = inputData != null && !inputData.isBlank(); + System.out.println("CsvRecordValidator::marshall inputData: " + inputData + ", validRecord: " + validRecord); + houseInputRecord = validRecord ? new HouseInputRecord() : null; + if(validRecord) { + houseInputRecord.setHouseId(inputData); + } + return this; } - public boolean isInValidRecord() { - return inValidRecord; - } +// public boolean isInValidRecord() { +// return inValidRecord; +// } } diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/HouseInputRecord.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/HouseInputRecord.java index 0821219..87f7ba6 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/HouseInputRecord.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/HouseInputRecord.java @@ -1,4 +1,8 @@ package com.fluxtion.example.cookbook.dataingestion; +import lombok.Data; + +@Data public class HouseInputRecord { + private String houseId; } diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/InvalidLog.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/InvalidLog.java index fd6e789..a7b30a7 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/InvalidLog.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/InvalidLog.java @@ -10,6 +10,6 @@ public void badCsvInput(CsvRecordValidator message){ } public void badHousingRecord(RecordTransformer message){ - System.out.println("InvalidLog::badCsvRecord: "); + System.out.println("InvalidLog::badHousingRecord: "); } } diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/ProcessingStats.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/ProcessingStats.java index 4570925..c64cd29 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/ProcessingStats.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/ProcessingStats.java @@ -8,7 +8,7 @@ public void badCsvInput(CsvRecordValidator message){ } public void badHousingRecord(RecordTransformer message){ - System.out.println("ProcessingStats::badCsvRecord: "); + System.out.println("ProcessingStats::badHousingRecord: "); } public void validHousingRecord(HouseInputRecord message){ diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/RecordTransformer.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/RecordTransformer.java index e7c88a9..f1c2036 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/RecordTransformer.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/RecordTransformer.java @@ -6,11 +6,15 @@ public class RecordTransformer { @Getter - private boolean inValidRecord = false; + private boolean validRecord = false; + @Getter + private HouseInputRecord record; - public HouseInputRecord transform(HouseInputRecord record) { + public RecordTransformer transform(HouseInputRecord record) { System.out.println("RecordTransformer::transform: " + record); - return record; + validRecord = !record.getHouseId().equalsIgnoreCase("BAD"); + this.record = validRecord ? record : null; + return this; } }