Skip to content

Commit

Permalink
bumps fluxtion to version 9.2.23 data ingestion example
Browse files Browse the repository at this point in the history
  • Loading branch information
greg-higgins committed Apr 18, 2024
1 parent ce84ed8 commit f52776c
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ public class DataIngestionPipelineBuilder implements FluxtionGraphBuilder {
@Override
public void buildGraph(EventProcessorConfig eventProcessorConfig) {

//flows Csv String -> HouseInputRecord -> x_formed(HouseInputRecord) -> houseRecordValidator(validate)
var csvFlow = DataFlow.subscribe(String.class).map(new CsvToHouseRecord()::marshall);
//flows HouseInputRecord -> x_formed(HouseInputRecord) -> houseRecordValidator(validate)
var validXformedFlow = csvFlow.map(CsvToHouseRecord::getHouseRecord)
//flow: Csv String -> HouseInputRecord
var csv2HouseRecordFlow = DataFlow
.subscribe(String.class)
.map(new CsvToHouseRecord()::marshall);

//flow: HouseInputRecord -> x_formed(HouseInputRecord) -> validated(HouseInputRecord)
var validTransformedFlow = csv2HouseRecordFlow
.map(CsvToHouseRecord::getHouseRecord)
.map(new HouseRecordTransformer()::transform)
.map(new HouseRecordValidator()::validate);

Expand All @@ -28,19 +32,20 @@ public void buildGraph(EventProcessorConfig eventProcessorConfig) {
var stats = new ProcessingStats();
var invalidLog = new InvalidLog();

//write validated output to [stats, csv, binary]
validXformedFlow.map(HouseRecordValidator::getValidHouseRecord)
//write validated output push to [stats, csv, binary]
validTransformedFlow
.map(HouseRecordValidator::getValidHouseRecord)
.push(stats::validHouseRecord, csvWriter::validHouseRecord, binaryWriter::validHouseRecord);

//invalid csv parsing output to [invalid log, stats]
var invalidCsv = csvFlow.filter(CsvToHouseRecord::isInValidRecord);
invalidCsv.push(invalidLog::badCsvRecord);
invalidCsv.push(stats::badCsvRecord);
//invalid csv parsing output push to [invalid log, stats]
csv2HouseRecordFlow
.filter(CsvToHouseRecord::isBadCsvMessage)
.push(invalidLog::badCsvRecord, stats::badCsvRecord);

//invalid transform output to [invalid log, stats]
var invalidHouseRecord = validXformedFlow.filter(HouseRecordValidator::isInValidRecord);
invalidHouseRecord.push(invalidLog::invalidHouseRecord);
invalidHouseRecord.push(stats::invalidHouseRecord);
//invalid transform output push to [invalid log, stats]
validTransformedFlow
.filter(HouseRecordValidator::isInValidRecord)
.push(invalidLog::invalidHouseRecord, stats::invalidHouseRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ public static void main(String[] args) throws IOException {
reader.forEach(dataIngest::onEvent);
}

System.out.println("finished");

dataIngest.tearDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public CsvToHouseRecord marshall(String inputData) {
return this;
}

public boolean isInValidRecord() {
public boolean isBadCsvMessage() {
return !validRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.22
* api version : 9.2.22
* eventProcessorGenerator version : 9.2.23
* api version : 9.2.23
* </pre>
*
* Event classes supported:
Expand Down Expand Up @@ -101,7 +101,7 @@ public class DataIngestionPipeline
private final MapRef2RefFlowFunction mapRef2RefFlowFunction_1 =
new MapRef2RefFlowFunction<>(handlerString, csvToHouseRecord_0::marshall);
private final FilterFlowFunction filterFlowFunction_11 =
new FilterFlowFunction<>(mapRef2RefFlowFunction_1, CsvToHouseRecord::isInValidRecord);
new FilterFlowFunction<>(mapRef2RefFlowFunction_1, CsvToHouseRecord::isBadCsvMessage);
private final MapRef2RefFlowFunction mapRef2RefFlowFunction_2 =
new MapRef2RefFlowFunction<>(mapRef2RefFlowFunction_1, CsvToHouseRecord::getHouseRecord);
private final MapRef2RefFlowFunction mapRef2RefFlowFunction_4 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.22
* api version : 9.2.22
* eventProcessorGenerator version : 9.2.23
* api version : 9.2.23
* </pre>
*
* Event classes supported:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.22
* api version : 9.2.22
* eventProcessorGenerator version : 9.2.23
* api version : 9.2.23
* </pre>
*
* Event classes supported:
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ along with this program. If not, see

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<fluxtion.version>9.2.22</fluxtion.version>
<fluxtion.version>9.2.23</fluxtion.version>
<fluxtion.mavenplugin.version>3.0.14</fluxtion.mavenplugin.version>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.22
* api version : 9.2.22
* eventProcessorGenerator version : 9.2.23
* api version : 9.2.23
* </pre>
*
* Event classes supported:
Expand Down

0 comments on commit f52776c

Please sign in to comment.