diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/DataIngestionPipelineBuilder.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/DataIngestionPipelineBuilder.java index 62588f8..981931c 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/DataIngestionPipelineBuilder.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/DataIngestionPipelineBuilder.java @@ -15,10 +15,14 @@ public class DataIngestionPipelineBuilder implements FluxtionGraphBuilder { @Override public void buildGraph(EventProcessorConfig eventProcessorConfig) { - //flows Csv String -> HouseInputRecord -> x_formed(HouseInputRecord) -> houseRecordValidator(validate) - var csvFlow = DataFlow.subscribe(String.class).map(new CsvToHouseRecord()::marshall); - //flows HouseInputRecord -> x_formed(HouseInputRecord) -> houseRecordValidator(validate) - var validXformedFlow = csvFlow.map(CsvToHouseRecord::getHouseRecord) + //flow: Csv String -> HouseInputRecord + var csv2HouseRecordFlow = DataFlow + .subscribe(String.class) + .map(new CsvToHouseRecord()::marshall); + + //flow: HouseInputRecord -> x_formed(HouseInputRecord) -> validated(HouseInputRecord) + var validTransformedFlow = csv2HouseRecordFlow + .map(CsvToHouseRecord::getHouseRecord) .map(new HouseRecordTransformer()::transform) .map(new HouseRecordValidator()::validate); @@ -28,19 +32,20 @@ public void buildGraph(EventProcessorConfig eventProcessorConfig) { var stats = new ProcessingStats(); var invalidLog = new InvalidLog(); - //write validated output to [stats, csv, binary] - validXformedFlow.map(HouseRecordValidator::getValidHouseRecord) + //write validated output push to [stats, csv, binary] + validTransformedFlow + .map(HouseRecordValidator::getValidHouseRecord) .push(stats::validHouseRecord, csvWriter::validHouseRecord, binaryWriter::validHouseRecord); - //invalid csv parsing output to [invalid log, stats] - var invalidCsv = csvFlow.filter(CsvToHouseRecord::isInValidRecord); - invalidCsv.push(invalidLog::badCsvRecord); - invalidCsv.push(stats::badCsvRecord); + //invalid csv parsing output push to [invalid log, stats] + csv2HouseRecordFlow + .filter(CsvToHouseRecord::isBadCsvMessage) + .push(invalidLog::badCsvRecord, stats::badCsvRecord); - //invalid transform output to [invalid log, stats] - var invalidHouseRecord = validXformedFlow.filter(HouseRecordValidator::isInValidRecord); - invalidHouseRecord.push(invalidLog::invalidHouseRecord); - invalidHouseRecord.push(stats::invalidHouseRecord); + //invalid transform output push to [invalid log, stats] + validTransformedFlow + .filter(HouseRecordValidator::isInValidRecord) + .push(invalidLog::invalidHouseRecord, stats::invalidHouseRecord); } @Override diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/Main.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/Main.java index 8441e44..976eb4d 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/Main.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/Main.java @@ -17,8 +17,6 @@ public static void main(String[] args) throws IOException { reader.forEach(dataIngest::onEvent); } - System.out.println("finished"); - dataIngest.tearDown(); } diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/function/CsvToHouseRecord.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/function/CsvToHouseRecord.java index 36c31ce..d5fc6d5 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/function/CsvToHouseRecord.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/function/CsvToHouseRecord.java @@ -44,7 +44,7 @@ public CsvToHouseRecord marshall(String inputData) { return this; } - public boolean isInValidRecord() { + public boolean isBadCsvMessage() { return !validRecord; } } diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/generated/DataIngestionPipeline.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/generated/DataIngestionPipeline.java index 1f3e94a..6d622e1 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/generated/DataIngestionPipeline.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/dataingestion/generated/DataIngestionPipeline.java @@ -58,8 +58,8 @@ * *
  * generation time                 : Not available
- * eventProcessorGenerator version : 9.2.22
- * api version                     : 9.2.22
+ * eventProcessorGenerator version : 9.2.23
+ * api version                     : 9.2.23
  * 
* * Event classes supported: @@ -101,7 +101,7 @@ public class DataIngestionPipeline private final MapRef2RefFlowFunction mapRef2RefFlowFunction_1 = new MapRef2RefFlowFunction<>(handlerString, csvToHouseRecord_0::marshall); private final FilterFlowFunction filterFlowFunction_11 = - new FilterFlowFunction<>(mapRef2RefFlowFunction_1, CsvToHouseRecord::isInValidRecord); + new FilterFlowFunction<>(mapRef2RefFlowFunction_1, CsvToHouseRecord::isBadCsvMessage); private final MapRef2RefFlowFunction mapRef2RefFlowFunction_2 = new MapRef2RefFlowFunction<>(mapRef2RefFlowFunction_1, CsvToHouseRecord::getHouseRecord); private final MapRef2RefFlowFunction mapRef2RefFlowFunction_4 = diff --git a/cookbook/src/main/java/com/fluxtion/example/cookbook/ml/linearregression/generated/OpportunityMlProcessor.java b/cookbook/src/main/java/com/fluxtion/example/cookbook/ml/linearregression/generated/OpportunityMlProcessor.java index 78f6030..1fcef09 100644 --- a/cookbook/src/main/java/com/fluxtion/example/cookbook/ml/linearregression/generated/OpportunityMlProcessor.java +++ b/cookbook/src/main/java/com/fluxtion/example/cookbook/ml/linearregression/generated/OpportunityMlProcessor.java @@ -61,8 +61,8 @@ * *
  * generation time                 : Not available
- * eventProcessorGenerator version : 9.2.22
- * api version                     : 9.2.22
+ * eventProcessorGenerator version : 9.2.23
+ * api version                     : 9.2.23
  * 
* * Event classes supported: diff --git a/imperative-helloworld/src/main/java/com/fluxtion/example/imperative/helloworld/generated/BreachNotifierProcessor.java b/imperative-helloworld/src/main/java/com/fluxtion/example/imperative/helloworld/generated/BreachNotifierProcessor.java index fddb557..f418844 100644 --- a/imperative-helloworld/src/main/java/com/fluxtion/example/imperative/helloworld/generated/BreachNotifierProcessor.java +++ b/imperative-helloworld/src/main/java/com/fluxtion/example/imperative/helloworld/generated/BreachNotifierProcessor.java @@ -51,8 +51,8 @@ * *
  * generation time                 : Not available
- * eventProcessorGenerator version : 9.2.22
- * api version                     : 9.2.22
+ * eventProcessorGenerator version : 9.2.23
+ * api version                     : 9.2.23
  * 
* * Event classes supported: diff --git a/pom.xml b/pom.xml index 821f2d9..c45f7a4 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ along with this program. If not, see UTF-8 - 9.2.22 + 9.2.23 3.0.14 21 21 diff --git a/reference-examples/racing-aot/src/main/java/com/fluxtion/example/reference/racing/generated/RaceCalculatorProcessor.java b/reference-examples/racing-aot/src/main/java/com/fluxtion/example/reference/racing/generated/RaceCalculatorProcessor.java index 168163d..21a7cf2 100644 --- a/reference-examples/racing-aot/src/main/java/com/fluxtion/example/reference/racing/generated/RaceCalculatorProcessor.java +++ b/reference-examples/racing-aot/src/main/java/com/fluxtion/example/reference/racing/generated/RaceCalculatorProcessor.java @@ -50,8 +50,8 @@ * *
  * generation time                 : Not available
- * eventProcessorGenerator version : 9.2.22
- * api version                     : 9.2.22
+ * eventProcessorGenerator version : 9.2.23
+ * api version                     : 9.2.23
  * 
* * Event classes supported: