Skip to content

Commit

Permalink
Fix #1409 Socket Mode: Slow message consumption when listeners do not…
Browse files Browse the repository at this point in the history
… immediately return ack() (#1411)
  • Loading branch information
seratch authored Jan 8, 2025
1 parent 9e815b0 commit f35db18
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.slack.api.bolt.App;
import com.slack.api.bolt.request.Request;
import com.slack.api.bolt.response.Response;
import com.slack.api.bolt.jakarta_socket_mode.request.SocketModeRequest;
import com.slack.api.bolt.jakarta_socket_mode.request.SocketModeRequestParser;
import com.slack.api.bolt.request.Request;
import com.slack.api.bolt.response.Response;
import com.slack.api.jakarta_socket_mode.JakartaSocketModeClientFactory;
import com.slack.api.socket_mode.SocketModeClient;
import com.slack.api.socket_mode.response.AckResponse;
import com.slack.api.util.json.GsonFactory;
import com.slack.api.util.thread.DaemonThreadExecutorServiceFactory;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -27,6 +29,7 @@ public class SocketModeApp {
private final App app;
private final Supplier<SocketModeClient> clientFactory;
private SocketModeClient client;
private final ExecutorService executorService;

private static final Function<ErrorContext, Response> DEFAULT_ERROR_HANDLER = (context) -> {
Exception e = context.getException();
Expand Down Expand Up @@ -69,35 +72,22 @@ private static void sendSocketModeResponse(
private static Supplier<SocketModeClient> buildSocketModeClientFactory(
App app,
String appToken,
Function<ErrorContext, Response> errorHandler
Function<ErrorContext, Response> errorHandler,
ExecutorService executorService
) {
return () -> {
try {
final SocketModeClient client = JakartaSocketModeClientFactory.create(app.slack(), appToken);
final SocketModeRequestParser requestParser = new SocketModeRequestParser(app.config());
final Gson gson = GsonFactory.createSnakeCase(app.slack().getConfig());
client.addWebSocketMessageListener(message -> {
long startMillis = System.currentTimeMillis();
SocketModeRequest req = requestParser.parse(message);
if (req != null) {
try {
Response boltResponse = app.run(req.getBoltRequest());
if (boltResponse.getStatusCode() != 200) {
log.warn("Unsuccessful Bolt app execution (status: {}, body: {})",
boltResponse.getStatusCode(), boltResponse.getBody());
return;
}
sendSocketModeResponse(client, gson, req, boltResponse);
} catch (Exception e) {
ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build();
Response errorResponse = errorHandler.apply(context);
if (errorResponse != null) {
sendSocketModeResponse(client, gson, req, errorResponse);
}
} finally {
long spentMillis = System.currentTimeMillis() - startMillis;
log.debug("Response time: {} milliseconds", spentMillis);
}
if (executorService != null) {
// asynchronous
executorService.execute(() -> runBoltApp(
message, app, client, requestParser, errorHandler, gson));
} else {
// synchronous
runBoltApp(message, app, client, requestParser, errorHandler, gson);
}
});
return client;
Expand All @@ -108,34 +98,101 @@ private static Supplier<SocketModeClient> buildSocketModeClientFactory(
};
}

private static void runBoltApp(
String message,
App app,
SocketModeClient client,
SocketModeRequestParser requestParser,
Function<ErrorContext, Response> errorHandler,
Gson gson
) {
long startMillis = System.currentTimeMillis();
SocketModeRequest req = requestParser.parse(message);
if (req != null) {
try {
Response boltResponse = app.run(req.getBoltRequest());
if (boltResponse.getStatusCode() != 200) {
log.warn("Unsuccessful Bolt app execution (status: {}, body: {})",
boltResponse.getStatusCode(), boltResponse.getBody());
return;
}
sendSocketModeResponse(client, gson, req, boltResponse);
} catch (Exception e) {
ErrorContext context = ErrorContext.builder().request(req.getBoltRequest()).exception(e).build();
Response errorResponse = errorHandler.apply(context);
if (errorResponse != null) {
sendSocketModeResponse(client, gson, req, errorResponse);
}
} finally {
long spentMillis = System.currentTimeMillis() - startMillis;
log.debug("Response time: {} milliseconds", spentMillis);
}
}
}

private static ExecutorService buildExecutorService(int concurrency) {
return DaemonThreadExecutorServiceFactory.createDaemonThreadPoolExecutor(
"slack-bolt-socket-mode", concurrency);
}

// -------------------------------------------

public SocketModeApp(App app) throws IOException {
this(System.getenv("SLACK_APP_TOKEN"), app);
}

public SocketModeApp(App app, int concurrency) throws IOException {
this(System.getenv("SLACK_APP_TOKEN"), app, concurrency);
}

public SocketModeApp(String appToken, App app) throws IOException {
this(appToken, DEFAULT_ERROR_HANDLER, app);
}

public SocketModeApp(String appToken, App app, int concurrency) throws IOException {
this(appToken, DEFAULT_ERROR_HANDLER, app, buildExecutorService(concurrency));
}

public SocketModeApp(
String appToken,
Function<ErrorContext, Response> errorHandler,
App app
) throws IOException {
this(buildSocketModeClientFactory(app, appToken, errorHandler), app);
this(buildSocketModeClientFactory(app, appToken, errorHandler, null), app);
}

public SocketModeApp(
String appToken,
App app,
Function<ErrorContext, Response> errorHandler
) throws IOException {
this(buildSocketModeClientFactory(app, appToken, errorHandler), app);
this(buildSocketModeClientFactory(app, appToken, errorHandler, null), app);
}

public SocketModeApp(Supplier<SocketModeClient> clientFactory, App app) {
this(clientFactory, app, null);
}


// intentionally private to avoid exposing the ExecutorService initialization
private SocketModeApp(
String appToken,
Function<ErrorContext, Response> errorHandler,
App app,
ExecutorService executorService
) throws IOException {
this(buildSocketModeClientFactory(app, appToken, errorHandler, executorService), app, executorService);
}

// intentionally private to avoid exposing the ExecutorService initialization
private SocketModeApp(
Supplier<SocketModeClient> clientFactory,
App app,
ExecutorService executorService
) {
this.clientFactory = clientFactory;
this.app = app;
this.executorService = executorService;
}

/**
Expand All @@ -152,10 +209,9 @@ public SocketModeApp(SocketModeClient socketModeClient, App app) {
this.client = socketModeClient;
this.clientFactory = () -> socketModeClient;
this.app = app;
this.executorService = null;
}

// -------------------------------------------

public void start() throws Exception {
run(true);
}
Expand Down Expand Up @@ -192,6 +248,16 @@ public void stop() throws Exception {
public void close() throws Exception {
this.stop();
this.client = null;
if (executorService != null) {
for (Runnable runnable : executorService.shutdownNow()) {
try {
runnable.run();
} catch (Exception e) {
log.warn("Failed to run the remaining Runnable in SocketModeApp (error: {}, message: {})",
e.getClass().getCanonicalName(), e.getMessage());
}
}
}
}

// -------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package samples;

import com.slack.api.bolt.App;
import com.slack.api.bolt.AppConfig;
import com.slack.api.bolt.jakarta_socket_mode.SocketModeApp;
import com.slack.api.model.event.MessageChangedEvent;
import com.slack.api.model.event.MessageDeletedEvent;
import com.slack.api.model.event.MessageEvent;
import config.Constants;

public class ConcurrencyTestApp {

public static void main(String[] args) throws Exception {
App app = new App(AppConfig.builder()
.singleTeamBotToken(System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_BOT_TOKEN))
.build());

app.event(MessageEvent.class, (req, ctx) -> {
// Without concurrency option, this time-consuming task slows the whole message processing mechanism down
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ctx.asyncClient().reactionsAdd(r -> r
.channel(req.getEvent().getChannel())
.name("eyes")
.timestamp(req.getEvent().getTs())
);
return ctx.ack();
});
app.event(MessageChangedEvent.class, (req, ctx) -> ctx.ack());
app.event(MessageDeletedEvent.class, (req, ctx) -> ctx.ack());

String appToken = System.getenv(Constants.SLACK_SDK_TEST_SOCKET_MODE_APP_TOKEN);
// SocketModeApp socketModeApp = new SocketModeApp(appToken, app);
SocketModeApp socketModeApp = new SocketModeApp(appToken, app, 10);
socketModeApp.start();
}
}
Loading

0 comments on commit f35db18

Please sign in to comment.