Skip to content

Commit

Permalink
data ingest example - more work. Bump to Fluxtion 9.2.21. Move @Nopro…
Browse files Browse the repository at this point in the history
…pagte Function and use @ExportService(propagate = false), possible bug in Fluxtion.

Added .jmvm config to cookbook build so google code formatter works when running maven in this project
  • Loading branch information
greg committed Apr 17, 2024
1 parent 5f82f0e commit 698e832
Show file tree
Hide file tree
Showing 18 changed files with 117 additions and 30 deletions.
10 changes: 10 additions & 0 deletions cookbook/.mvn/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.model=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.processing=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-opens=jdk.compiler/com.sun.tools.javac.code=ALL-UNNAMED
--add-opens=jdk.compiler/com.sun.tools.javac.comp=ALL-UNNAMED

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.fluxtion.example.cookbook.dataingestion.api;

public record DataIngestConfig(String name) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.fluxtion.example.cookbook.dataingestion.api;

public interface DataIngestConfigListener {

boolean configUpdate(DataIngestConfig config);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.fluxtion.example.cookbook.dataingestion;
package com.fluxtion.example.cookbook.dataingestion.api;

import com.fluxtion.extension.csvcompiler.annotations.ColumnMapping;
import com.fluxtion.extension.csvcompiler.annotations.CsvMarshaller;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
package com.fluxtion.example.cookbook.dataingestion.builder;

import com.fluxtion.compiler.EventProcessorConfig;
import com.fluxtion.compiler.Fluxtion;
import com.fluxtion.compiler.FluxtionCompilerConfig;
import com.fluxtion.compiler.FluxtionGraphBuilder;
import com.fluxtion.compiler.builder.dataflow.DataFlow;
import com.fluxtion.compiler.builder.dataflow.FlowBuilder;
import com.fluxtion.example.cookbook.dataingestion.api.HouseData;
import com.fluxtion.example.cookbook.dataingestion.node.*;

//@Disabled
public class DataIngestionBuilder implements FluxtionGraphBuilder {

public static void main(String[] args) {
Fluxtion.interpret(c -> {
FlowBuilder<CsvHouseDataValidator> csvFlow = DataFlow.subscribe(String.class).map(new CsvHouseDataValidator()::marshall);
FlowBuilder<HouseData> validXformedFlow = csvFlow.map(CsvHouseDataValidator::getHouseData)
.map(new HouseDataRecordTransformer()::transform);
});
}

@Override
public void buildGraph(EventProcessorConfig eventProcessorConfig) {
//flows Csv String -> HouseInputRecord -> x_formed(HouseInputRecord)
var csvFlow = DataFlow.subscribe(String.class).map(new CsvHouseDataValidator()::marshall);
var validXformedFlow = csvFlow.map(CsvHouseDataValidator::getHouseData)
.map(new HouseDataRecordTransformer()::transform)
.map(new HouseDataRecordValidator()::validate);
.map(new HouseDataRecordValidator()::validate)
;

//outputs
var csvWriter = new HouseDataRecordCsvWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fluxtion.runtime.lifecycle.Lifecycle;
import com.fluxtion.runtime.EventProcessor;
import com.fluxtion.runtime.callback.InternalEventProcessor;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfigListener;
import com.fluxtion.example.cookbook.dataingestion.node.CsvHouseDataValidator;
import com.fluxtion.example.cookbook.dataingestion.node.HouseDataRecordBinaryWriter;
import com.fluxtion.example.cookbook.dataingestion.node.HouseDataRecordCsvWriter;
Expand Down Expand Up @@ -57,13 +58,14 @@
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.18
* api version : 9.2.18
* eventProcessorGenerator version : 9.2.21
* api version : 9.2.21
* </pre>
*
* Event classes supported:
*
* <ul>
* <li>com.fluxtion.compiler.generation.model.ExportFunctionMarker
* <li>com.fluxtion.runtime.time.ClockStrategy.ClockStrategyEvent
* <li>java.lang.String
* </ul>
Expand All @@ -76,7 +78,8 @@ public class DataIngestion
StaticEventProcessor,
InternalEventProcessor,
BatchHandler,
Lifecycle {
Lifecycle,
DataIngestConfigListener {

// Node declarations
private final CallbackDispatcherImpl callbackDispatcher = new CallbackDispatcherImpl();
Expand Down Expand Up @@ -380,6 +383,22 @@ public void handleEvent(String typedEvent) {
}
// EVENT DISPATCH - END

// EXPORTED SERVICE FUNCTIONS - START
@Override
public boolean configUpdate(
com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig arg0) {
beforeServiceCall(
"public boolean com.fluxtion.example.cookbook.dataingestion.node.HouseDataRecordTransformer.configUpdate(com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig)");
ExportFunctionAuditEvent typedEvent = functionAudit;
houseDataRecordTransformer_3.configUpdate(arg0);
houseDataRecordCsvWriter_37.configUpdate(arg0);
houseDataRecordBinaryWriter_36.configUpdate(arg0);
invalidLog_56.configUpdate(arg0);
afterServiceCall();
return true;
}
// EXPORTED SERVICE FUNCTIONS - END

public void bufferEvent(Object event) {
buffering = true;
if (event instanceof com.fluxtion.runtime.time.ClockStrategy.ClockStrategyEvent) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.fluxtion.example.cookbook.dataingestion.node;

import com.fluxtion.example.cookbook.dataingestion.HouseData;
import com.fluxtion.example.cookbook.dataingestion.api.HouseData;
import lombok.Getter;

@Getter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.fluxtion.example.cookbook.dataingestion.node;

import com.fluxtion.example.cookbook.dataingestion.HouseData;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfigListener;
import com.fluxtion.example.cookbook.dataingestion.api.HouseData;
import com.fluxtion.runtime.annotations.ExportService;
import com.fluxtion.runtime.annotations.NoPropagateFunction;

public class HouseDataRecordBinaryWriter {
public class HouseDataRecordBinaryWriter implements @ExportService DataIngestConfigListener {

public void validHouseDataRecord(HouseData message){
System.out.println("RecordBinaryWriter::validHousingRecord - " + message);
}

@Override
@NoPropagateFunction
public boolean configUpdate(DataIngestConfig config) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.fluxtion.example.cookbook.dataingestion.node;

import com.fluxtion.example.cookbook.dataingestion.HouseData;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfigListener;
import com.fluxtion.example.cookbook.dataingestion.api.HouseData;
import com.fluxtion.runtime.annotations.ExportService;
import com.fluxtion.runtime.annotations.NoPropagateFunction;

public class HouseDataRecordCsvWriter {
public class HouseDataRecordCsvWriter implements @ExportService DataIngestConfigListener {

public void validHouseDataRecord(HouseData message){
System.out.println("RecordCsvWriter::validHousingRecord - " + message);
}

@Override
@NoPropagateFunction
public boolean configUpdate(DataIngestConfig config) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package com.fluxtion.example.cookbook.dataingestion.node;

import com.fluxtion.example.cookbook.dataingestion.HouseData;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfigListener;
import com.fluxtion.example.cookbook.dataingestion.api.HouseData;
import com.fluxtion.runtime.annotations.ExportService;
import com.fluxtion.runtime.annotations.NoPropagateFunction;
import lombok.Getter;

@Getter
public class HouseDataRecordTransformer {
public class HouseDataRecordTransformer implements @ExportService(propagate = false) DataIngestConfigListener {

private HouseData record;

Expand All @@ -13,4 +17,10 @@ public HouseData transform(HouseData record) {
this.record = record;
return this.record;
}

@Override
// @NoPropagateFunction
public boolean configUpdate(DataIngestConfig config) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.fluxtion.example.cookbook.dataingestion.node;

import com.fluxtion.example.cookbook.dataingestion.HouseData;
import com.fluxtion.example.cookbook.dataingestion.api.HouseData;
import lombok.Getter;

@Getter
public class HouseDataRecordValidator {

@Getter
private boolean inValidRecord = false;
@Getter
private HouseData record;

public HouseDataRecordValidator validate(HouseData record){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.fluxtion.example.cookbook.dataingestion.node;

import lombok.RequiredArgsConstructor;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfigListener;
import com.fluxtion.runtime.annotations.ExportService;
import com.fluxtion.runtime.annotations.NoPropagateFunction;

@RequiredArgsConstructor
public class InvalidLog {
public class InvalidLog implements @ExportService DataIngestConfigListener {

public void badCsvRecord(CsvHouseDataValidator message){
System.out.println("InvalidLog::badCsvInput - " + message.getHouseData());
Expand All @@ -12,4 +14,10 @@ public void badCsvRecord(CsvHouseDataValidator message){
public void badHouseDataRecord(HouseDataRecordValidator message){
System.out.println("InvalidLog::badHousingRecord - " + message.getRecord());
}

@Override
@NoPropagateFunction
public boolean configUpdate(DataIngestConfig config) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.fluxtion.example.cookbook.dataingestion.node;

import com.fluxtion.example.cookbook.dataingestion.HouseData;
import com.fluxtion.example.cookbook.dataingestion.api.DataIngestConfig;
import com.fluxtion.example.cookbook.dataingestion.api.HouseData;

public class ProcessingStats {

Expand All @@ -15,4 +16,5 @@ public void badHouseDataRecord(HouseDataRecordValidator message){
public void validHousingRecord(HouseData message){
System.out.println("ProcessingStats::validHousingRecord - " + message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.18
* api version : 9.2.18
* eventProcessorGenerator version : 9.2.21
* api version : 9.2.21
* </pre>
*
* Event classes supported:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.18
* api version : 9.2.18
* eventProcessorGenerator version : 9.2.21
* api version : 9.2.21
* </pre>
*
* Event classes supported:
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ along with this program. If not, see

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<fluxtion.version>9.2.18</fluxtion.version>
<fluxtion.version>9.2.21</fluxtion.version>
<fluxtion.mavenplugin.version>3.0.14</fluxtion.mavenplugin.version>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
*
* <pre>
* generation time : Not available
* eventProcessorGenerator version : 9.2.18
* api version : 9.2.18
* eventProcessorGenerator version : 9.2.21
* api version : 9.2.21
* </pre>
*
* Event classes supported:
Expand Down

0 comments on commit 698e832

Please sign in to comment.