Skip to content

Commit 36ec17a

Browse files
committed
Fix accordingly to code review
1 parent f3d06d9 commit 36ec17a

File tree

6 files changed

+145
-195
lines changed

6 files changed

+145
-195
lines changed

spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.web.servlet.config.annotation;
1818

19+
import java.time.Duration;
1920
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.List;
@@ -32,6 +33,7 @@
3233
* Helps with configuring options for asynchronous request processing.
3334
*
3435
* @author Rossen Stoyanchev
36+
* @author Réda Housni Alaoui
3537
* @since 3.2
3638
*/
3739
public class AsyncSupportConfigurer {
@@ -44,6 +46,8 @@ public class AsyncSupportConfigurer {
4446

4547
private final List<DeferredResultProcessingInterceptor> deferredResultInterceptors = new ArrayList<>();
4648

49+
private @Nullable Duration sseHeartbeatPeriod;
50+
4751

4852
/**
4953
* The provided task executor is used for the following:
@@ -99,6 +103,14 @@ public AsyncSupportConfigurer registerDeferredResultInterceptors(
99103
return this;
100104
}
101105

106+
/**
107+
* Configure the SSE heartbeat period.
108+
* @param sseHeartbeatPeriod The SSE heartbeat period
109+
*/
110+
public AsyncSupportConfigurer setSseHeartbeatPeriod(Duration sseHeartbeatPeriod) {
111+
this.sseHeartbeatPeriod = sseHeartbeatPeriod;
112+
return this;
113+
}
102114

103115
protected @Nullable AsyncTaskExecutor getTaskExecutor() {
104116
return this.taskExecutor;
@@ -116,4 +128,8 @@ protected List<DeferredResultProcessingInterceptor> getDeferredResultInterceptor
116128
return this.deferredResultInterceptors;
117129
}
118130

131+
protected @Nullable Duration getSseHeartbeatPeriod() {
132+
return this.sseHeartbeatPeriod;
133+
}
134+
119135
}

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/DefaultSseEmitterHeartbeatExecutor.java

Lines changed: 0 additions & 125 deletions
This file was deleted.

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/RequestMappingHandlerAdapter.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package org.springframework.web.servlet.mvc.method.annotation;
1818

1919
import java.lang.reflect.Method;
20+
import java.time.Duration;
2021
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.LinkedHashMap;
2324
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Optional;
2527
import java.util.Set;
2628
import java.util.concurrent.Callable;
2729
import java.util.concurrent.ConcurrentHashMap;
@@ -54,6 +56,8 @@
5456
import org.springframework.http.converter.HttpMessageConverter;
5557
import org.springframework.http.converter.StringHttpMessageConverter;
5658
import org.springframework.http.converter.support.AllEncompassingFormHttpMessageConverter;
59+
import org.springframework.scheduling.TaskScheduler;
60+
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
5761
import org.springframework.ui.ModelMap;
5862
import org.springframework.util.Assert;
5963
import org.springframework.util.ClassUtils;
@@ -202,8 +206,9 @@ public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter
202206

203207
private final Map<ControllerAdviceBean, Set<Method>> modelAttributeAdviceCache = new LinkedHashMap<>();
204208

205-
@Nullable
206-
private SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor;
209+
private TaskScheduler taskScheduler = new SimpleAsyncTaskScheduler();
210+
211+
private @Nullable Duration sseHeartbeatPeriod;
207212

208213
/**
209214
* Provide resolvers for custom argument types. Custom resolvers are ordered
@@ -530,10 +535,17 @@ public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDisc
530535
}
531536

532537
/**
533-
* Set the {@link SseEmitterHeartbeatExecutor} that will be used to periodically prob the SSE connection health
538+
* Set the {@link TaskScheduler}
539+
*/
540+
public void setTaskScheduler(TaskScheduler taskScheduler) {
541+
this.taskScheduler = taskScheduler;
542+
}
543+
544+
/**
545+
* Sets the heartbeat period that will be used to periodically prob the SSE connection health
534546
*/
535-
public void setSseEmitterHeartbeatExecutor(@Nullable SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor) {
536-
this.sseEmitterHeartbeatExecutor = sseEmitterHeartbeatExecutor;
547+
public void setSseHeartbeatPeriod(@Nullable Duration sseHeartbeatPeriod) {
548+
this.sseHeartbeatPeriod = sseHeartbeatPeriod;
537549
}
538550

539551
/**
@@ -743,9 +755,12 @@ private List<HandlerMethodReturnValueHandler> getDefaultReturnValueHandlers() {
743755
handlers.add(new ModelAndViewMethodReturnValueHandler());
744756
handlers.add(new ModelMethodProcessor());
745757
handlers.add(new ViewMethodReturnValueHandler());
758+
759+
SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor = Optional.ofNullable(sseHeartbeatPeriod)
760+
.map(period -> new SseEmitterHeartbeatExecutor(taskScheduler, period)).orElse(null);
746761
handlers.add(new ResponseBodyEmitterReturnValueHandler(getMessageConverters(),
747762
this.reactiveAdapterRegistry, this.taskExecutor, this.contentNegotiationManager,
748-
initViewResolvers(), initLocaleResolver(), this.sseEmitterHeartbeatExecutor));
763+
initViewResolvers(), initLocaleResolver(), sseEmitterHeartbeatExecutor));
749764
handlers.add(new StreamingResponseBodyReturnValueHandler());
750765
handlers.add(new HttpEntityMethodProcessor(getMessageConverters(),
751766
this.contentNegotiationManager, this.requestResponseBodyAdvice, this.errorResponseInterceptors));

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
import java.io.IOException;
2020
import java.nio.charset.StandardCharsets;
21+
import java.time.Duration;
2122
import java.util.Collections;
2223
import java.util.LinkedHashSet;
24+
import java.util.Optional;
2325
import java.util.Set;
2426
import java.util.concurrent.locks.Lock;
2527
import java.util.concurrent.locks.ReentrantLock;
2628

2729
import org.jspecify.annotations.Nullable;
2830

31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
2933
import org.springframework.http.HttpHeaders;
3034
import org.springframework.http.MediaType;
3135
import org.springframework.http.server.ServerHttpResponse;
@@ -41,17 +45,22 @@
4145
* @author Juergen Hoeller
4246
* @author Sam Brannen
4347
* @author Brian Clozel
48+
* @author Réda Housni Alaoui
4449
* @since 4.2
4550
*/
4651
public class SseEmitter extends ResponseBodyEmitter {
4752

53+
private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitter.class);
54+
4855
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
4956

5057
/**
5158
* Guards access to write operations on the response.
5259
*/
5360
private final Lock writeLock = new ReentrantLock();
5461

62+
private volatile @Nullable Long lastEmissionNanoTime;
63+
5564
/**
5665
* Create a new SseEmitter instance.
5766
*/
@@ -134,12 +143,31 @@ public void send(SseEventBuilder builder) throws IOException {
134143
this.writeLock.lock();
135144
try {
136145
super.send(dataToSend);
146+
this.lastEmissionNanoTime = System.nanoTime();
137147
}
138148
finally {
139149
this.writeLock.unlock();
140150
}
141151
}
142152

153+
void notifyOfHeartbeatTick(Duration heartbeatPeriod) {
154+
boolean skip = Optional.ofNullable(lastEmissionNanoTime)
155+
.map(lastEmissionNanoTime -> System.nanoTime() - lastEmissionNanoTime)
156+
.map(nanoTimeElapsedSinceLastEmission -> nanoTimeElapsedSinceLastEmission < heartbeatPeriod.toNanos())
157+
.orElse(false);
158+
if (skip) {
159+
return;
160+
}
161+
LOGGER.trace("Sending heartbeat to {}", this);
162+
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name("ping").data("ping", MediaType.TEXT_PLAIN);
163+
try {
164+
send(eventBuilder);
165+
} catch (IOException | RuntimeException e) {
166+
// According to SseEmitter's Javadoc, the container itself will call SseEmitter#completeWithError
167+
LOGGER.debug(e.getMessage());
168+
}
169+
}
170+
143171
@Override
144172
public String toString() {
145173
return "SseEmitter@" + ObjectUtils.getIdentityHexString(this);

spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/SseEmitterHeartbeatExecutor.java

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,74 @@
1616

1717
package org.springframework.web.servlet.mvc.method.annotation;
1818

19+
20+
import java.time.Duration;
21+
import java.util.Set;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ScheduledFuture;
24+
25+
import org.jspecify.annotations.Nullable;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import org.springframework.scheduling.TaskScheduler;
29+
1930
/**
2031
* @author Réda Housni Alaoui
2132
*/
22-
public interface SseEmitterHeartbeatExecutor {
33+
class SseEmitterHeartbeatExecutor {
34+
35+
private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitterHeartbeatExecutor.class);
36+
37+
private final TaskScheduler taskScheduler;
38+
private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet();
39+
40+
private final Object lifecycleMonitor = new Object();
41+
42+
private final Duration period;
43+
44+
@Nullable
45+
private volatile ScheduledFuture<?> taskFuture;
46+
47+
public SseEmitterHeartbeatExecutor(TaskScheduler taskScheduler, Duration period) {
48+
this.taskScheduler = taskScheduler;
49+
this.period = period;
50+
}
51+
52+
public void register(SseEmitter emitter) {
53+
startIfNeeded();
54+
55+
Runnable closeCallback = () -> emitters.remove(emitter);
56+
emitter.onCompletion(closeCallback);
57+
emitter.onError(t -> closeCallback.run());
58+
emitter.onTimeout(closeCallback);
59+
60+
emitters.add(emitter);
61+
}
62+
63+
boolean isRegistered(SseEmitter emitter) {
64+
return emitters.contains(emitter);
65+
}
66+
67+
private void startIfNeeded() {
68+
if (taskFuture != null) {
69+
return;
70+
}
71+
synchronized (lifecycleMonitor) {
72+
if (taskFuture != null) {
73+
return;
74+
}
75+
taskFuture = taskScheduler.scheduleAtFixedRate(this::notifyEmitters, period);
76+
}
77+
}
78+
79+
private void notifyEmitters() {
80+
LOGGER.atDebug().log(() -> "Notifying %s emitter(s)".formatted(emitters.size()));
2381

24-
void register(SseEmitter emitter);
82+
for (SseEmitter emitter : emitters) {
83+
if (Thread.currentThread().isInterrupted()) {
84+
return;
85+
}
86+
emitter.notifyOfHeartbeatTick(period);
87+
}
88+
}
2589
}

0 commit comments

Comments
 (0)