Skip to content

Commit

Permalink
Add imported service direct callback to a node.
Browse files Browse the repository at this point in the history
  • Loading branch information
greg-higgins committed Jul 26, 2024
1 parent fc1ee1b commit 9d5192a
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.fluxtion.example.cookbook.nodecallback;

import com.fluxtion.compiler.Fluxtion;
import com.fluxtion.runtime.callback.CallBackNode;

import java.util.List;

Expand Down Expand Up @@ -32,7 +31,7 @@
public class Main {

public static void main(String[] args) throws NoSuchFieldException {
var voteProcessor = Fluxtion.compileAot(c -> c.addNode(new ElectionTracker(List.of(
var voteProcessor = Fluxtion.compile(c -> c.addNode(new ElectionTracker(List.of(
new CandidateVoteHandler("Red_party"),
new CandidateVoteHandler("Blue_party"),
new CandidateVoteHandler("Green_party")
Expand Down
10 changes: 10 additions & 0 deletions reference-examples/runtime-execution/.mvn/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.model=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.processing=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-opens=jdk.compiler/com.sun.tools.javac.code=ALL-UNNAMED
--add-opens=jdk.compiler/com.sun.tools.javac.comp=ALL-UNNAMED
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

/**
* Imports a TimeService into the event processor, any node method annotated with {@code @ServiceRegistered} will receive
* the service if the types match.
*
* the service if the types match. The imported service can then be used within the node like any normal java class. In
* this case we are register an external TimeService that will be used inside a node in the graph.
* <p>
* running the example:
*
* <pre>
Expand All @@ -23,7 +24,7 @@
public class ImportService {

public static void main(String[] args) {
EventProcessor processor = Fluxtion.interpret(new StopWatchNode());
EventProcessor<?> processor = Fluxtion.interpret(new StopWatchNode());
processor.init();

//register a time service, specifying the service interface TimeService
Expand All @@ -44,12 +45,12 @@ public static void main(String[] args) {
}

@FunctionalInterface
public interface TimeService{
public interface TimeService {
long getTime();
}

//Stopwatch that uses the TimeService supplied at runtime
public static class StopWatchNode{
public static class StopWatchNode {

private TimeService timeService;
private long startTime;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package com.fluxtion.example.reference.serviceimport;

import com.fluxtion.compiler.Fluxtion;
import com.fluxtion.runtime.EventProcessor;
import com.fluxtion.runtime.annotations.OnTrigger;
import com.fluxtion.runtime.annotations.runtime.ServiceRegistered;
import com.fluxtion.runtime.callback.CallBackNode;

import java.util.concurrent.atomic.AtomicReference;

/**
* Registers a callback node listener method with an externally imported service. The callback will be to the node directly
* and not the parent event processor. This one to one callback to the node instance bypasses the event processor dispatch.
*
* <ul>
* <li>Create a MarketDataSubscriberNode that implements and exports the listener call back interface, MarketDataSubscriber and extends CallBackNode</li>
* <li>Annotate a node method with {@code @ServiceRegistered} to access the external MarketDataPublisher service</li>
* <li>Register a MarketDataPublisher service with the event processor</li>
* <li>Register the node instance as a market data callback with the MarketDataPublisher</li>
* </ul>
*
* <p>
* The MarketDataSubscriberNode implements listener interface, MarketDataSubscriber, and receives the market
* data update callbacks directly and not via the parent event processor. The node decides whether to trigger dependent
* nodes by calling triggerGraphCycle from the super class {@link CallBackNode}, in this case MidLoggerNode
*
*
* <pre>
*
* running the example:
*
* MarketDataSubscriberNode - triggerGraphCycle mid changed AAA 23.24
* MidLoggerNode - NEW mid: 23.24
*
* MarketDataSubscriberNode - ignore marketUpdate: AAA 23.24
* MarketDataSubscriberNode - triggerGraphCycle mid changed AAA 19.99
* MidLoggerNode - NEW mid: 19.99
*
* MarketDataSubscriberNode - ignore marketUpdate: AAA 19.99
* MarketDataSubscriberNode - ignore marketUpdate: AAA 19.99
* MarketDataSubscriberNode - ignore marketUpdate: AAA 19.99
* MarketDataSubscriberNode - triggerGraphCycle mid changed AAA 15.67
* MidLoggerNode - NEW mid: 15.67
* MarketDataSubscriberNode - triggerGraphCycle mid changed AAA 23.24
* MidLoggerNode - NEW mid: 23.24
*
* MarketDataSubscriberNode - ignore marketUpdate: AAA 23.24
* MarketDataSubscriberNode - triggerGraphCycle mid changed AAA 19.99
* MidLoggerNode - NEW mid: 19.99
*
* MarketDataSubscriberNode - ignore marketUpdate: AAA 19.99
* MarketDataSubscriberNode - ignore marketUpdate: AAA 19.99
* MarketDataSubscriberNode - ignore marketUpdate: AAA 19.99
* MarketDataSubscriberNode - triggerGraphCycle mid changed AAA 15.67
* MidLoggerNode - NEW mid: 15.67
* marketUpdate: AAA 15.67
* </pre>
*/
public class ImportedServiceCallbackToNode {

public static void main(String[] args) {
EventProcessor<?> processor = Fluxtion.interpret(c -> c.addNode(new MidLoggerNode(new MarketDataSubscriberNode())));
processor.init();

//create a simple market data publisher service and register with the processor
AtomicReference<MarketDataSubscriber> subscriber = new AtomicReference<>();
processor.registerService(
(symbol, callback) -> subscriber.set(callback),
MarketDataPublisher.class
);

//publish some data
subscriber.get().marketUpdate("AAA", 23.24);
subscriber.get().marketUpdate("AAA", 23.24);
subscriber.get().marketUpdate("AAA", 19.99);
subscriber.get().marketUpdate("AAA", 19.99);
subscriber.get().marketUpdate("AAA", 19.99);
subscriber.get().marketUpdate("AAA", 19.99);
subscriber.get().marketUpdate("AAA", 15.67);
}

public interface MarketDataPublisher {
void subscribe(String symbol, MarketDataSubscriber callback);
}

public interface MarketDataSubscriber {
boolean marketUpdate(String symbol, double mid);
}

//The node in the graph that is a MarketDataSubscriber.
//DOES NOT EXPORT SERVICE MarketDataSubscriber.
//Receives updates from MarketDataPublisher directly into this instance bypassing the event processor dispatch
//Extend the CallBackNode and call triggerGraphCycle to trigger the event processor with this node as the root dirty node
public static class MarketDataSubscriberNode
extends CallBackNode
implements
MarketDataSubscriber {

private double mid;

//Annotated callback method called at runtime when a matching service is registered
//with the parent event processor
@ServiceRegistered
public void marketDataPublisher(MarketDataPublisher marketDataPublisher) {
//Subscribes the parent event processor with this instance
marketDataPublisher.subscribe("AAA", this);
}

//MarketDataPublisher service directly invokes callback method, this node can call triggerGraphCycle
//to start a graph cycle and trigger any dependent nodes such as the MidLoggerNode
@Override
public boolean marketUpdate(String symbol, double mid) {
double oldMid = this.mid;
this.mid = mid;
if (oldMid == this.mid) {
System.out.println("MarketDataSubscriberNode - ignore marketUpdate: " + symbol + " " + mid);
} else {
System.out.println("MarketDataSubscriberNode - triggerGraphCycle mid changed " + symbol + " " + mid);
triggerGraphCycle();
}
return true;//ignored value as this method is not called via the parent eventprocessor
}
}

public record MidLoggerNode(MarketDataSubscriberNode marketDataSubscriber) {
@OnTrigger
public boolean midUpdated() {
System.out.println("MidLoggerNode - NEW mid: " + marketDataSubscriber.mid + "\n");
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@
import java.util.concurrent.atomic.AtomicReference;

/**
* Registers a callback listener method with an externally imported service.
* Registers a callback listener method with an externally imported service. The callback is implemented by the parent
* event processor and will route calls to any node that implements and exports the interface MarketDataSubscriber using
* the {@code @ExportService} annotation. This is effectively a one to many broadcast dispatch within the event processor.
*
* <ul>
* <li>Register a MarketDataPublisher service with the event processor that registers MarketDataSubscriber</li>
* <li>Create a MarketDataSubscriberNode that implements and exports the listener call back interface, MarketDataSubscriber</li>
* <li>Annotate a node method with {@code @ServiceRegistered} to access the external MarketDataPublisher service</li>
* <li>Register a MarketDataPublisher service with the event processor</li>
* <li>Register the subscription callback with the MarketDataPublisher using the EventProcessor.exportedService</li>
* </ul>
*
* The event processor implements the exported listener interface, MarketDataSubscriber, and receives the market
* data update callbacks. The processor dispatches and market updates to the MarketDataSubscriberNode or any other node
* data update callbacks. The processor dispatches any market updates to the MarketDataSubscriberNode or any other node
* that exports the MarketDataSubscriber interface.
*
*
Expand All @@ -33,10 +35,10 @@
* marketUpdate: AAA 15.67
* </pre>
*/
public class ServiceCallback {
public class ImportedServiceCallbackToProcessor {

public static void main(String[] args) {
EventProcessor processor = Fluxtion.interpret(new MarketDataSubscriberNode());
EventProcessor<?> processor = Fluxtion.interpret(new MarketDataSubscriberNode());
processor.init();

//create a simple market data publisher service and register with the processor
Expand All @@ -60,6 +62,7 @@ public interface MarketDataSubscriber {
boolean marketUpdate(String symbol, double mid);
}

//The node in the graph that is MarketDataSubscriber, receiving updates from MarketDataPublisher
public static class MarketDataSubscriberNode
implements
@ExportService MarketDataSubscriber, //callback interface exported by processor
Expand All @@ -81,6 +84,7 @@ public void marketDataPublisher(MarketDataPublisher marketDataPublisher) {
marketDataPublisher.subscribe("AAA", eventProcessorContext.getExportedService());
}

//This is the callback method invoked by the external MarketDataPublisher service
@Override
public boolean marketUpdate(String symbol, double mid) {
System.out.println("marketUpdate: " + symbol + " " + mid);
Expand Down

0 comments on commit 9d5192a

Please sign in to comment.