Skip to content

Commit

Permalink
chore(17592): Remove unsafe cast in TaskScheduler (#17593)
Browse files Browse the repository at this point in the history
Signed-off-by: mxtartaglia <[email protected]>
  • Loading branch information
mxtartaglia-sl authored Jan 29, 2025
1 parent 2e592c1 commit 980c65a
Show file tree
Hide file tree
Showing 29 changed files with 720 additions and 757 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,12 @@ public ComponentWiring(
schedulerName = schedulerLabelAnnotation.value();
}

this.scheduler = model.schedulerBuilder(schedulerName)
this.scheduler = model.<OUTPUT_TYPE>schedulerBuilder(schedulerName)
.configure(schedulerConfiguration)
// FUTURE WORK: all components not currently in platform core should move there
.withHyperlink(platformCoreHyperlink(clazz))
.withDataCounter(dataCounter)
.build()
.cast();
.build();

if (!clazz.isInterface()) {
throw new IllegalArgumentException("Component class " + clazz.getName() + " is not an interface.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,6 @@ public boolean isInsertionBlocking() {
return insertionIsBlocking;
}

/**
* Cast this scheduler into whatever a variable is expecting. Sometimes the compiler gets confused with generics,
* and path of least resistance is to just cast to the proper data type.
*
* <p>
* Warning: this will appease the compiler, but it is possible to cast a scheduler into a data type that will cause
* runtime exceptions. Use with appropriate caution.
*
* @param <X> the type to cast to
* @return this, cast into whatever type is requested
*/
@NonNull
@SuppressWarnings("unchecked")
public final <X> TaskScheduler<X> cast() {
return (TaskScheduler<X>) this;
}

/**
* Get the number of unprocessed tasks. A task is considered to be unprocessed until the data has been passed to the
* handler method (i.e. the one given to {@link BindableInputWire#bind(Function)} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ public WireFilter(

Objects.requireNonNull(predicate);

final TaskScheduler<T> taskScheduler = model.schedulerBuilder(filterName)
final TaskScheduler<T> taskScheduler = model.<T>schedulerBuilder(filterName)
.withType(TaskSchedulerType.DIRECT_THREADSAFE)
.build()
.cast();
.build();

inputWire = taskScheduler.buildInputWire(filterInputName);
inputWire.bind(t -> {
Expand All @@ -79,10 +78,9 @@ public WireFilter(
public WireFilter(
@NonNull final WiringModel model, @NonNull final String filterName, @NonNull final String filterInputName) {

final TaskScheduler<T> taskScheduler = model.schedulerBuilder(filterName)
final TaskScheduler<T> taskScheduler = model.<T>schedulerBuilder(filterName)
.withType(TaskSchedulerType.DIRECT_THREADSAFE)
.build()
.cast();
.build();

inputWire = taskScheduler.buildInputWire(filterInputName);
outputWire = taskScheduler.getOutputWire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ public WireListSplitter(
@NonNull final WiringModel model,
@NonNull final String splitterName,
@NonNull final String splitterInputName) {
final TaskScheduler<T> taskScheduler = model.schedulerBuilder(splitterName)
final TaskScheduler<T> taskScheduler = model.<T>schedulerBuilder(splitterName)
.withType(TaskSchedulerType.DIRECT_THREADSAFE)
.build()
.cast();
.build();

inputWire = taskScheduler.buildInputWire(splitterInputName);
outputWire = (StandardOutputWire<T>) taskScheduler.getOutputWire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ public WireRouter(
@NonNull final String routerName,
@NonNull final String routerInputName,
@NonNull final Class<ROUTER_TYPE> clazz) {
final TaskScheduler<Void> scheduler = model.schedulerBuilder(routerName)
final TaskScheduler<Void> scheduler = model.<Void>schedulerBuilder(routerName)
.withType(DIRECT_THREADSAFE)
.build()
.cast();
.build();

outputWires = new ArrayList<>(clazz.getEnumConstants().length);
for (int index = 0; index < clazz.getEnumConstants().length; index++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ public WireTransformer(
@NonNull final Function<A, B> transformer) {
Objects.requireNonNull(transformer);

final TaskScheduler<B> taskScheduler = model.schedulerBuilder(transformerName)
final TaskScheduler<B> taskScheduler = model.<B>schedulerBuilder(transformerName)
.withType(TaskSchedulerType.DIRECT_THREADSAFE)
.build()
.cast();
.build();

inputWire = taskScheduler.buildInputWire(transformerInputName);
inputWire.bind(transformer);
Expand All @@ -77,10 +76,9 @@ public WireTransformer(
@NonNull final String transformerName,
@NonNull final String transformerInputName) {

final TaskScheduler<B> taskScheduler = model.schedulerBuilder(transformerName)
final TaskScheduler<B> taskScheduler = model.<B>schedulerBuilder(transformerName)
.withType(TaskSchedulerType.DIRECT_THREADSAFE)
.build()
.cast();
.build();

inputWire = taskScheduler.buildInputWire(transformerInputName);
outputWire = taskScheduler.getOutputWire();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,9 @@ public void solderTo(
@NonNull final String inputWireLabel,
@NonNull final Consumer<OUT> handler) {

final TaskScheduler<Void> directScheduler = model.schedulerBuilder(handlerName)
final TaskScheduler<Void> directScheduler = model.<Void>schedulerBuilder(handlerName)
.withType(TaskSchedulerType.DIRECT)
.build()
.cast();
.build();

final BindableInputWire<OUT, Void> directSchedulerInputWire = directScheduler.buildInputWire(inputWireLabel);
directSchedulerInputWire.bindConsumer(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,27 @@ static void basicBenchmark() throws InterruptedException {
// Ensures that we have no more than 10,000 events in the pipeline at any given time
final ObjectCounter backpressure = new BackpressureObjectCounter("backpressure", 10_000, Duration.ZERO);

final TaskScheduler<WiringBenchmarkEvent> verificationTaskScheduler = model.schedulerBuilder("verification")
final TaskScheduler<WiringBenchmarkEvent> verificationTaskScheduler = model.<WiringBenchmarkEvent>
schedulerBuilder("verification")
.withPool(executor)
.withType(TaskSchedulerType.CONCURRENT)
.withOnRamp(backpressure)
.withExternalBackPressure(true)
.build()
.cast();
.build();

final TaskScheduler<WiringBenchmarkEvent> orphanBufferTaskScheduler = model.schedulerBuilder("orphanBuffer")
final TaskScheduler<WiringBenchmarkEvent> orphanBufferTaskScheduler = model.<WiringBenchmarkEvent>
schedulerBuilder("orphanBuffer")
.withPool(executor)
.withType(TaskSchedulerType.SEQUENTIAL)
.withExternalBackPressure(true)
.build()
.cast();
.build();

final TaskScheduler<Void> eventPoolTaskScheduler = model.schedulerBuilder("eventPool")
final TaskScheduler<Void> eventPoolTaskScheduler = model.<Void>schedulerBuilder("eventPool")
.withPool(executor)
.withType(TaskSchedulerType.SEQUENTIAL)
.withOffRamp(backpressure)
.withExternalBackPressure(true)
.build()
.cast();
.build();

final BindableInputWire<WiringBenchmarkEvent, WiringBenchmarkEvent> eventsToOrphanBuffer =
orphanBufferTaskScheduler.buildInputWire("unordered events");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ private InputWire<Long> buildAutomaticComponent(@NonNull final SimpleComponent c
TestPlatformContextBuilder.create().build())
.build();

final TaskScheduler<Void> scheduler = model.schedulerBuilder("test")
final TaskScheduler<Void> scheduler = model.<Void>schedulerBuilder("test")
.withType(TaskSchedulerType.DIRECT)
.build()
.cast();
.build();

final ComponentWiring<SimpleComponent, Void> componentWiring =
new ComponentWiring<>(model, SimpleComponent.class, scheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void heartbeatByFrequencyTest() {
;

final TaskScheduler<Void> scheduler =
model.schedulerBuilder("test").build().cast();
model.<Void>schedulerBuilder("test").build();

final BindableInputWire<Instant, Void> heartbeatBindable = scheduler.buildInputWire("heartbeat");
model.buildHeartbeatWire(100).solderTo(heartbeatBindable);
Expand Down Expand Up @@ -73,7 +73,7 @@ void heartbeatByPeriodTest() {
;

final TaskScheduler<Void> scheduler =
model.schedulerBuilder("test").build().cast();
model.<Void>schedulerBuilder("test").build();

final BindableInputWire<Instant, Void> heartbeatBindable = scheduler.buildInputWire("heartbeat");
model.buildHeartbeatWire(Duration.ofMillis(10)).solderTo(heartbeatBindable);
Expand Down Expand Up @@ -103,10 +103,9 @@ void heartbeatsAtDifferentRates() {
.build();
;

final TaskScheduler<Void> scheduler = model.schedulerBuilder("test")
final TaskScheduler<Void> scheduler = model.<Void>schedulerBuilder("test")
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.build()
.cast();
.build();

final BindableInputWire<Instant, Void> heartbeatBindableA = scheduler.buildInputWire("heartbeatA");
final BindableInputWire<Instant, Void> heartbeatBindableB = scheduler.buildInputWire("heartbeatB");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,21 @@ private static WiringMesh generateWiringMesh(
final ReentrantLock lock = new ReentrantLock();

final TaskScheduler<Long> schedulerA = wiringModel
.schedulerBuilder("A")
.<Long>schedulerBuilder("A")
.withType(SEQUENTIAL)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inA = schedulerA.buildInputWire("inA");
inA.bind(buildHandler(random, 0.01, lock));
final OutputWire<Long> outA = schedulerA.getOutputWire();

final TaskScheduler<Long> schedulerB = wiringModel
.schedulerBuilder("B")
.<Long>schedulerBuilder("B")
.withType(CONCURRENT)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inB = schedulerB.buildInputWire("inB");
inB.bind(buildHandler(random, 0.01, lock));
final BindableInputWire<Instant, Long> schedulerBHeartbeat = schedulerB.buildInputWire("heartbeatB");
Expand All @@ -186,100 +184,91 @@ private static WiringMesh generateWiringMesh(
final OutputWire<Long> outB = schedulerB.getOutputWire();

final TaskScheduler<Long> schedulerC = wiringModel
.schedulerBuilder("C")
.<Long>schedulerBuilder("C")
.withType(SEQUENTIAL_THREAD)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inC = schedulerC.buildInputWire("inC");
inC.bind(buildHandler(random, 0.01, lock));
final OutputWire<Long> outC = schedulerC.getOutputWire();

final TaskScheduler<Long> schedulerD = wiringModel
.schedulerBuilder("D")
.<Long>schedulerBuilder("D")
.withType(SEQUENTIAL)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inD = schedulerD.buildInputWire("inD");
inD.bind(buildHandler(random, 0.6, lock)); // This must be >0.5 else risk infinite loop
final OutputWire<Long> outD = schedulerD.getOutputWire();

final TaskScheduler<Long> schedulerE = wiringModel
.schedulerBuilder("E")
.<Long>schedulerBuilder("E")
.withType(SEQUENTIAL)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inE = schedulerE.buildInputWire("inE");
inE.bind(buildHandler(random, 0.01, lock));
final OutputWire<Long> outE = schedulerE.getOutputWire();

final TaskScheduler<Long> schedulerF = wiringModel
.schedulerBuilder("F")
.<Long>schedulerBuilder("F")
.withType(SEQUENTIAL)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inF = schedulerF.buildInputWire("inF");
inF.bind(buildHandler(random, 0.01, lock));
final OutputWire<Long> outF = schedulerF.getOutputWire();

final TaskScheduler<Long> schedulerG = wiringModel
.schedulerBuilder("G")
.<Long>schedulerBuilder("G")
.withType(SEQUENTIAL)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inG = schedulerG.buildInputWire("inG");
inG.bind(buildHandler(random, 0.01, lock));
final OutputWire<Long> outG = schedulerG.getOutputWire();

final TaskScheduler<Long> schedulerH = wiringModel
.schedulerBuilder("H")
.<Long>schedulerBuilder("H")
.withType(DIRECT)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inH = schedulerH.buildInputWire("inH");
inH.bind(buildHandler(random, 0.01, lock));
final OutputWire<Long> outH = schedulerH.getOutputWire();

final TaskScheduler<Long> schedulerI = wiringModel
.schedulerBuilder("I")
.<Long>schedulerBuilder("I")
.withType(DIRECT_THREADSAFE)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inI = schedulerI.buildInputWire("inI");
inI.bind(buildHandler(random, 0.01, lock));
final OutputWire<Long> outI = schedulerI.getOutputWire();

final TaskScheduler<Long> schedulerJ = wiringModel
.schedulerBuilder("J")
.<Long>schedulerBuilder("J")
.withType(SEQUENTIAL)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inJ = schedulerJ.buildInputWire("inJ");
inJ.bind(buildHandler(random, 0.01, lock));
final OutputWire<Long> outJ = schedulerJ.getOutputWire();

final TaskScheduler<Long> schedulerK = wiringModel
.schedulerBuilder("K")
.<Long>schedulerBuilder("K")
.withType(NO_OP)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.withFlushingEnabled(true)
.build()
.cast();
.build();
final BindableInputWire<Long, Long> inK = schedulerK.buildInputWire("inK");
inK.bind(buildHandler(random, 0.01, lock));

Expand Down Expand Up @@ -462,26 +451,22 @@ void circularDataFlowTest() {
final AtomicInteger countC = new AtomicInteger();
final AtomicInteger countD = new AtomicInteger();

final TaskScheduler<Integer> taskSchedulerToA = model.schedulerBuilder("wireToA")
final TaskScheduler<Integer> taskSchedulerToA = model.<Integer>schedulerBuilder("wireToA")
.withType(SEQUENTIAL)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.build()
.cast();
final TaskScheduler<Integer> taskSchedulerToB = model.schedulerBuilder("wireToB")
.build();
final TaskScheduler<Integer> taskSchedulerToB = model.<Integer>schedulerBuilder("wireToB")
.withType(SEQUENTIAL_THREAD)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.build()
.cast();
final TaskScheduler<Integer> taskSchedulerToC = model.schedulerBuilder("wireToC")
.build();
final TaskScheduler<Integer> taskSchedulerToC = model.<Integer>schedulerBuilder("wireToC")
.withType(CONCURRENT)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.build()
.cast();
final TaskScheduler<Integer> taskSchedulerToD = model.schedulerBuilder("wireToD")
.build();
final TaskScheduler<Integer> taskSchedulerToD = model.<Integer>schedulerBuilder("wireToD")
.withType(DIRECT)
.withUnhandledTaskCapacity(UNLIMITED_CAPACITY)
.build()
.cast();
.build();

final BindableInputWire<Integer, Integer> channelToA = taskSchedulerToA.buildInputWire("channelToA");
final BindableInputWire<Integer, Integer> channelToB = taskSchedulerToB.buildInputWire("channelToB");
Expand Down
Loading

0 comments on commit 980c65a

Please sign in to comment.