Skip to content

Commit

Permalink
bumps fluxtion to version 9.2.23 data ingestion example
Browse files Browse the repository at this point in the history
  • Loading branch information
greg-higgins committed Apr 19, 2024
1 parent ac4bd3d commit 2e2b288
Show file tree
Hide file tree
Showing 18 changed files with 3,383 additions and 156 deletions.
2,931 changes: 2,931 additions & 0 deletions cookbook/data/dataingest/input/AmesHousing.csv

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,23 +1,96 @@
package com.fluxtion.example.cookbook.dataingestion;

import com.fluxtion.example.cookbook.dataingestion.api.DataIngestComponent;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig;
import com.fluxtion.example.cookbook.dataingestion.api.HouseRecord;
import com.fluxtion.example.cookbook.dataingestion.pipeline.DataIngestionPipeline;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Stream;

/**
* Executes a {@link DataIngestionPipeline} with data from kaggle's AmesHousing.csv data file. The pipeline behaviour
* <ul>
* <li>Subscribes to String events</li>
* <li>Tries to marshal the String from csv into a {@link HouseRecord} </li>
* <li>Transforms the {@link HouseRecord} by applying a user transform function</li>
* <li>Validates the transformed {@link HouseRecord} is valid with a user supplied {@link java.util.function.Predicate}</li>
* <li>Writes the valid {@link HouseRecord} to a user supplied {@link java.io.Writer} as CSV</li>
* <li>Writes the valid {@link HouseRecord} to a user supplied {@link java.io.OutputStream} in a binary format</li>
* <li>Processing stats are updated with each valid transformed {@link HouseRecord}</li>
* </ul>
*
* Any processing errors are recorded as:
* <ul>
* <li>An entry in the invalid log that writes to a user supplied {@link java.io.Writer}</li>
* <li>Processing stats are updated with each csv error</li>
* <li>Processing stats are updated with each {@link HouseRecord} validation failure</li>
* </ul>
*
* Dynamic configuration is supplied in an instance of {@link DataIngestConfig} for:
* <ul>
* <li>{@link HouseRecord} validation {@link java.util.function.Predicate}</li>
* <li>{@link HouseRecord} validation transformer as {@link java.util.function.UnaryOperator}</li>
* <li>Post process Csv output - {@link java.io.Writer}</li>
* <li>Post process binary output - {@link java.io.OutputStream}</li>
* <li>Statistics output - {@link java.io.Writer}</li>
* <li>Invalid log output - {@link java.io.Writer}</li>
* </ul>
*
*
*/
public class Main {

public static void main(String[] args) throws IOException {
//set up pipeline
var dataIngest = new DataIngestionPipeline();
//lifecycle call to init pipeline, user components that implement DataIngestLifecycle receive init callback
dataIngest.init();

try (Stream<String> reader = Files.lines(Path.of("data/ml/linear_regression/AmesHousing.csv"))) {
//get the exported DataIngestComponent service, used to set configuration as an api call
DataIngestComponent dataIngestComponent = dataIngest.getExportedService();

//set up a config for pipeline - can be changed dynamically during the run
Path dataPath = Path.of("data/dataingest/");
Path dataOutPath = Path.of("data/dataingest/output/");
Files.createDirectories(dataOutPath);
DataIngestConfig dataIngestConfig = DataIngestConfig.builder()
.houseRecordValidator(houseRecord -> houseRecord.MS_Zoning().equalsIgnoreCase("FV"))
.houseTransformer(Main::tansformInputHouseRecord)
.csvWriter(Files.newBufferedWriter(dataOutPath.resolve("postProcessHouse.csv")))
.binaryWriter(new BufferedOutputStream(Files.newOutputStream(dataOutPath.resolve("postProcessHouse.binary"))))
.statsWriter(Files.newBufferedWriter(dataOutPath.resolve("processStats.rpt")))
.invalidLogWriter(Files.newBufferedWriter(dataOutPath.resolve("processingErrors.log")))
.build();

//update the config for the pipeline
dataIngestComponent.configUpdate(dataIngestConfig);

//send some data as individual events
try (Stream<String> reader = Files.lines(dataPath.resolve("input/AmesHousing.csv"))) {
reader.forEach(dataIngest::onEvent);
}

//lifecycle call to close pipeline, user components that implement DataIngestLifecycle are receive tearDown callback
dataIngest.tearDown();
}

//User supplied function to transform a HouseRecord for post process output
public static HouseRecord tansformInputHouseRecord(HouseRecord houseRecord) {
int lotFrontage = houseRecord.Lot_Frontage();
houseRecord.Lot_Frontage_Squared(lotFrontage * lotFrontage);

switch (houseRecord.MS_Zoning()) {
case "A" -> houseRecord.ms_zone_category(1);
case "FV" -> houseRecord.ms_zone_category(2);
case "RL" -> houseRecord.ms_zone_category(3);
case "RM" -> houseRecord.ms_zone_category(4);
default -> houseRecord.ms_zone_category(-1);
}
return houseRecord;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,43 @@
/**
* Builds the data ingestion processing graph, invoked by the Fluxtion maven plugin to generate the pipeline AOT as
* part of the build.
* <br>
* <br>
* This example using the Fluxtion {@link DataFlow} api to manage event subscription and notification
* to user supplied functions.
* <br>
* <br>
* The actual processing logic is encapsulated in user classes and functions. The goal is to have no Fluxtion api calls
* in the business logic only pure vanilla java. The advantages of this approach:
* <ul>
* <li>Business logic components are re-usable and testable</li>
* <li>Business code is not tied to a library api</li>
* <li>There is a clear separation between event notification and business logic</li>
* <li>The aot generated source file {@link com.fluxtion.example.cookbook.dataingestion.pipeline.DataIngestionPipeline} simplifies debugging</li>
* </ul>
*
*/
public class DataIngestionPipelineBuilder implements FluxtionGraphBuilder {
public class PipelineBuilder implements FluxtionGraphBuilder {

@Override
public void buildGraph(EventProcessorConfig eventProcessorConfig) {

//flow: Csv String -> HouseInputRecord
var csv2HouseRecordFlow = DataFlow
.subscribe(String.class)
.map(new CsvToHouseRecord()::marshall);
.map(new CsvToHouseRecordSerializer()::marshall);

//flow: HouseInputRecord -> x_formed(HouseInputRecord) -> validated(HouseInputRecord)
var validTransformedFlow = csv2HouseRecordFlow
.map(CsvToHouseRecord::getHouseRecord)
.map(CsvToHouseRecordSerializer::getHouseRecord)
.map(new HouseRecordTransformer()::transform)
.map(new HouseRecordValidator()::validate);

//outputs
var csvWriter = new HouseRecordCsvWriter();
var binaryWriter = new HouseRecordBinaryWriter();
var csvWriter = new PostProcessCsvWriter();
var binaryWriter = new PostProcessBinaryWriter();
var stats = new ProcessingStats();
var invalidLog = new InvalidLog();
var invalidLog = new InvalidLogWriter();

//write validated output push to [stats, csv, binary]
validTransformedFlow
Expand All @@ -39,7 +54,7 @@ public void buildGraph(EventProcessorConfig eventProcessorConfig) {

//invalid csv parsing output push to [invalid log, stats]
csv2HouseRecordFlow
.filter(CsvToHouseRecord::isBadCsvMessage)
.filter(CsvToHouseRecordSerializer::isBadCsvMessage)
.push(invalidLog::badCsvRecord, stats::badCsvRecord);

//invalid transform output push to [invalid log, stats]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
package com.fluxtion.example.cookbook.dataingestion.api;

public record DataIngestConfig(String name) {
import lombok.Builder;
import lombok.Data;

import java.io.OutputStream;
import java.io.Writer;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;

@Data
@Builder
public final class DataIngestConfig {
private final Predicate<HouseRecord> houseRecordValidator;
private final UnaryOperator<HouseRecord> houseTransformer;
private final OutputStream binaryWriter;
private final Writer csvWriter;
private final Writer statsWriter;
private final Writer invalidLogWriter;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.fluxtion.example.cookbook.dataingestion.api;

import java.util.function.Consumer;

public interface DataIngestStats extends DataIngestComponent {

void publishStats();

void currentStats(Consumer<String> consumer);

void clearStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
@Accessors(fluent = true)
public class HouseRecord {

//input only use @ColumnMapping(outputField = false)
@ColumnMapping(outputField = false)
private int Order;
@ColumnMapping(outputField = false, columnName = "Lot Frontage", defaultValue = "-1")
Expand All @@ -22,7 +23,7 @@ public class HouseRecord {
@ColumnMapping(columnName = "MS SubClass")
private int MS_SubClass;

//derived
//derived output
@ColumnMapping(optionalField = true)
private int Lot_Frontage_Squared;
@ColumnMapping(optionalField = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


@Getter
public class CsvToHouseRecord implements DataIngestLifecycle {
public class CsvToHouseRecordSerializer implements DataIngestLifecycle, ValidationLogger {

private SingleRowMarshaller<HouseRecord> houseDataCsvMarshaller;
private boolean validRecord = false;
Expand All @@ -21,23 +21,11 @@ public class CsvToHouseRecord implements DataIngestLifecycle {
@Override
public void init() {
houseDataCsvMarshaller = RowMarshaller.load(HouseRecord.class)
.setValidationLogger(new ValidationLogger() {
@Override
public void logFatal(CsvProcessingException e) {
validRecord = false;
processingException = e;
}

@Override
public void logWarning(CsvProcessingException e) {
validRecord = false;
processingException = e;
}
})
.setValidationLogger(this)
.parser();
}

public CsvToHouseRecord marshall(String inputData) {
public CsvToHouseRecordSerializer marshall(String inputData) {
validRecord = true;
this.inputString = inputData + "\n";
houseRecord = houseDataCsvMarshaller.parse(inputString);
Expand All @@ -47,4 +35,16 @@ public CsvToHouseRecord marshall(String inputData) {
public boolean isBadCsvMessage() {
return !validRecord;
}

@Override
public void logFatal(CsvProcessingException e) {
validRecord = false;
processingException = e;
}

@Override
public void logWarning(CsvProcessingException e) {
validRecord = false;
processingException = e;
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@
import com.fluxtion.runtime.annotations.ExportService;
import lombok.Getter;

import java.util.function.UnaryOperator;

@Getter
public class HouseRecordTransformer implements @ExportService(propagate = false) DataIngestComponent {

private HouseRecord record;
private UnaryOperator<HouseRecord> transformer = UnaryOperator.identity();

public HouseRecord transform(HouseRecord record) {
this.record = record;
public HouseRecord transform(HouseRecord houseRecord) {
this.record = transformer.apply(houseRecord);
return this.record;
}

@Override
public boolean configUpdate(DataIngestConfig config) {
transformer = config.getHouseTransformer() == null ? UnaryOperator.identity() : config.getHouseTransformer();
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
package com.fluxtion.example.cookbook.dataingestion.function;

import com.fluxtion.example.cookbook.dataingestion.api.DataIngestComponent;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestLifecycle;
import com.fluxtion.example.cookbook.dataingestion.api.HouseRecord;
import com.fluxtion.runtime.annotations.ExportService;
import lombok.Getter;

import java.util.function.Predicate;

@Getter
public class HouseRecordValidator {
public class HouseRecordValidator
implements
DataIngestLifecycle,
@ExportService(propagate = false) DataIngestComponent {

private boolean inValidRecord = false;
private HouseRecord validHouseRecord;
private HouseRecord inValidHouseRecord;
private Predicate<HouseRecord> validator;

@Override
public void init() {
validator = h -> true;
}

@Override
public boolean configUpdate(DataIngestConfig config) {
validator = config.getHouseRecordValidator() == null ? h -> true : config.getHouseRecordValidator();
return false;
}

public HouseRecordValidator validate(HouseRecord record){
public HouseRecordValidator validate(HouseRecord record) {
inValidRecord = record.MS_Zoning().equalsIgnoreCase("FV");
this.validHouseRecord = inValidRecord ? null : record;
this.inValidHouseRecord = inValidRecord ? record : null;
Expand Down
Loading

0 comments on commit 2e2b288

Please sign in to comment.