Skip to content

Commit fa10e21

Browse files
authored
KAFKA-14247: add handler impl to the prototype (apache#12775)
Minor revision for KAFKA-14247. Added how the handler is called and constructed to the prototype code path. Reviewers: John Roesler <[email protected]>, Kirk True <[email protected]>
1 parent c710ecd commit fa10e21

File tree

4 files changed

+519
-37
lines changed

4 files changed

+519
-37
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java

-21
Original file line numberDiff line numberDiff line change
@@ -64,27 +64,6 @@ public class DefaultBackgroundThread extends KafkaThread {
6464
private final AtomicReference<Optional<RuntimeException>> exception =
6565
new AtomicReference<>(Optional.empty());
6666

67-
public DefaultBackgroundThread(final ConsumerConfig config,
68-
final LogContext logContext,
69-
final BlockingQueue<ApplicationEvent> applicationEventQueue,
70-
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
71-
final SubscriptionState subscriptions,
72-
final ConsumerMetadata metadata,
73-
final ConsumerNetworkClient networkClient,
74-
final Metrics metrics) {
75-
this(
76-
Time.SYSTEM,
77-
config,
78-
logContext,
79-
applicationEventQueue,
80-
backgroundEventQueue,
81-
subscriptions,
82-
metadata,
83-
networkClient,
84-
metrics
85-
);
86-
}
87-
8867
public DefaultBackgroundThread(final Time time,
8968
final ConsumerConfig config,
9069
final LogContext logContext,

clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java

+26-8
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.List;
3636
import java.util.Optional;
3737
import java.util.concurrent.BlockingQueue;
38+
import java.util.concurrent.LinkedBlockingQueue;
3839

3940
/**
4041
* An {@code EventHandler} that uses a single background thread to consume {@code ApplicationEvent} and produce
@@ -46,6 +47,24 @@ public class DefaultEventHandler implements EventHandler {
4647
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
4748
private final DefaultBackgroundThread backgroundThread;
4849

50+
public DefaultEventHandler(final ConsumerConfig config,
51+
final LogContext logContext,
52+
final SubscriptionState subscriptionState,
53+
final ApiVersions apiVersions,
54+
final Metrics metrics,
55+
final ClusterResourceListeners clusterResourceListeners,
56+
final Sensor fetcherThrottleTimeSensor) {
57+
this(Time.SYSTEM,
58+
config,
59+
logContext,
60+
new LinkedBlockingQueue<>(),
61+
new LinkedBlockingQueue<>(),
62+
subscriptionState,
63+
apiVersions,
64+
metrics,
65+
clusterResourceListeners,
66+
fetcherThrottleTimeSensor);
67+
}
4968

5069
public DefaultEventHandler(final Time time,
5170
final ConsumerConfig config,
@@ -95,14 +114,13 @@ public DefaultEventHandler(final Time time,
95114
logContext
96115
);
97116
final ConsumerNetworkClient networkClient = new ConsumerNetworkClient(
98-
logContext,
99-
netClient,
100-
metadata,
101-
time,
102-
config.getInt(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
103-
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
104-
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)
105-
);
117+
logContext,
118+
netClient,
119+
metadata,
120+
time,
121+
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
122+
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
123+
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
106124
this.backgroundThread = new DefaultBackgroundThread(
107125
time,
108126
config,

0 commit comments

Comments
 (0)