Skip to content

Commit

Permalink
data ingest example work
Browse files Browse the repository at this point in the history
  • Loading branch information
greg-higgins committed Apr 14, 2024
1 parent ce42bf7 commit 0257f60
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import com.fluxtion.compiler.Fluxtion;
import com.fluxtion.compiler.builder.dataflow.DataFlow;
import com.fluxtion.compiler.builder.dataflow.FlowBuilder;
import com.fluxtion.runtime.audit.EventLogControlEvent;

import java.io.IOException;

public class Builder {

public static void main(String[] args) {
var dataIngestor =Fluxtion.interpret(c ->{
public static void main(String[] args) throws IOException {

var dataIngest = Fluxtion.interpret(c -> {
var csvRecordValidator = new CsvRecordValidator();
var x_Former = new RecordTransformer();
//outputs
Expand All @@ -20,32 +23,42 @@ public static void main(String[] args) {
var recordFlow = DataFlow.subscribe(String.class);
recordFlow
.map(csvRecordValidator::marshall)
.peek(r -> {
if (!csvRecordValidator.isValidRecord()) {
System.out.println("PUSH bad csv record");
invalidLog.badCsvInput(csvRecordValidator);
stats.badCsvInput(csvRecordValidator);
}
})
.map(CsvRecordValidator::getHouseInputRecord)
.map(x_Former::transform)
.peek(r -> {
if (!x_Former.isValidRecord()) {
System.out.println("PUSH bad x-former record");
invalidLog.badHousingRecord(x_Former);
stats.badHousingRecord(x_Former);
}
})
.map(RecordTransformer::getRecord)
.push(stats::validHousingRecord)
.push(csvWriter::validHousingRecord)
.push(binaryWriter::validHousingRecord);

//invalid flow - csv
recordFlow
.filter(csvRecordValidator::isInValidRecord)
.peek(i -> {
invalidLog.badCsvInput(csvRecordValidator);
stats.badCsvInput(csvRecordValidator);
});
DataFlow.subscribeToNode(csvRecordValidator);

//invalid flow - x-former
recordFlow
.filter(x_Former::isInValidRecord)
.peek(i -> {
invalidLog.badCsvInput(csvRecordValidator);
stats.badCsvInput(csvRecordValidator);
});
c.addEventAudit(EventLogControlEvent.LogLevel.INFO);
});

//
dataIngestor.init();
dataIngestor.onEvent("");
dataIngestor.onEvent("");
dataIngestor.onEvent("MyString");
//Send some data
dataIngest.init();
dataIngest.setAuditLogLevel(EventLogControlEvent.LogLevel.DEBUG);

dataIngest.onEvent("");
System.out.println();

dataIngest.onEvent("good");
System.out.println();

dataIngest.onEvent("BAD");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@

public class CsvRecordValidator {

// @Getter
private boolean inValidRecord = false;
@Getter
private boolean validRecord = false;
@Getter
private HouseInputRecord houseInputRecord;

public HouseInputRecord marshall(String inputData) {
inValidRecord = inputData == null || inputData.isBlank();
System.out.println("CsvRecordValidator::marshall inputData: " + inputData + ", inValidRecord: " + inValidRecord);
return inValidRecord ? null: new HouseInputRecord();
public CsvRecordValidator marshall(String inputData) {
validRecord = inputData != null && !inputData.isBlank();
System.out.println("CsvRecordValidator::marshall inputData: " + inputData + ", validRecord: " + validRecord);
houseInputRecord = validRecord ? new HouseInputRecord() : null;
if(validRecord) {
houseInputRecord.setHouseId(inputData);
}
return this;
}

public boolean isInValidRecord() {
return inValidRecord;
}
// public boolean isInValidRecord() {
// return inValidRecord;
// }
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package com.fluxtion.example.cookbook.dataingestion;

import lombok.Data;

@Data
public class HouseInputRecord {
private String houseId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ public void badCsvInput(CsvRecordValidator message){
}

public void badHousingRecord(RecordTransformer message){
System.out.println("InvalidLog::badCsvRecord: ");
System.out.println("InvalidLog::badHousingRecord: ");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public void badCsvInput(CsvRecordValidator message){
}

public void badHousingRecord(RecordTransformer message){
System.out.println("ProcessingStats::badCsvRecord: ");
System.out.println("ProcessingStats::badHousingRecord: ");
}

public void validHousingRecord(HouseInputRecord message){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ public class RecordTransformer {


@Getter
private boolean inValidRecord = false;
private boolean validRecord = false;
@Getter
private HouseInputRecord record;

public HouseInputRecord transform(HouseInputRecord record) {
public RecordTransformer transform(HouseInputRecord record) {
System.out.println("RecordTransformer::transform: " + record);
return record;
validRecord = !record.getHouseId().equalsIgnoreCase("BAD");
this.record = validRecord ? record : null;
return this;
}

}

0 comments on commit 0257f60

Please sign in to comment.