Skip to content

Commit

Permalink
NPE in PgDecoder if a notice is raised while no query is being execut…
Browse files Browse the repository at this point in the history
…ed (#1460)

See #1442

This may happen if the connection is used by a PgSubscriber.

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont committed Sep 2, 2024
1 parent 8bde735 commit c4881b4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package io.vertx.pgclient.impl.codec;

import io.netty.buffer.ByteBuf;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.pgclient.PgException;
import io.vertx.sqlclient.impl.command.CommandResponse;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.netty.buffer.ByteBuf;
import io.vertx.sqlclient.impl.command.CommandResponse;

import java.util.Arrays;

Expand Down Expand Up @@ -68,10 +68,6 @@ void handleNoData() {
logger.warn(getClass().getSimpleName() + " should handle message NoData");
}

void handleNoticeResponse(NoticeResponse noticeResponse) {
decoder.fireNoticeResponse(noticeResponse);
}

void handleErrorResponse(ErrorResponse errorResponse) {
logger.warn(getClass().getSimpleName() + " should handle message " + errorResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package io.vertx.pgclient.impl.codec;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.vertx.sqlclient.impl.Notification;
import io.vertx.pgclient.impl.util.Util;
import io.netty.buffer.ByteBuf;
import io.netty.util.ByteProcessor;
import io.vertx.pgclient.impl.util.Util;
import io.vertx.sqlclient.impl.Notification;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.sqlclient.impl.command.CommandResponse;

Expand Down Expand Up @@ -273,7 +273,7 @@ private void decodeError(ChannelHandlerContext ctx, ByteBuf in) {
private void decodeNotice(ByteBuf in) {
NoticeResponse response = new NoticeResponse();
decodeErrorOrNotice(response, in);
codec.peek().handleNoticeResponse(response);
fireNoticeResponse(response);
}

private void decodeErrorOrNotice(Response response, ByteBuf in) {
Expand Down
34 changes: 34 additions & 0 deletions vertx-pg-client/src/test/java/io/vertx/pgclient/PubSubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.vertx.pgclient;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.pgclient.impl.pubsub.PgSubscriberImpl;
Expand All @@ -28,6 +29,7 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -307,4 +309,36 @@ public void testClose(TestContext ctx, String channelName) {
endLatch.awaitSuccess(10000);
closeLatch.awaitSuccess(10000);
}

@Test
public void testNoticedRaised(TestContext ctx) {
Async async = ctx.async();
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
CompletableFuture<Void> connected = new CompletableFuture<>();
proxy.proxyHandler(conn -> {
connected.thenAccept(v -> {
Buffer noticeMsg = Buffer.buffer();
noticeMsg.appendByte((byte) 'N'); // Notice
noticeMsg.appendInt(0);
noticeMsg.appendByte((byte) 0);
noticeMsg.setInt(1, noticeMsg.length() - 1);
conn.clientSocket().write(noticeMsg);
});
conn.connect();
});
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v1 -> {
PgConnectOptions connectOptions = new PgConnectOptions(options).setPort(8080).setHost("localhost");
PgConnection.connect(vertx, connectOptions).onComplete(ctx.asyncAssertSuccess(conn -> {
conn
.noticeHandler(notice -> {
async.complete();
})
.query("LISTEN \"toto\"")
.execute()
.onComplete(ctx.asyncAssertSuccess(result1 -> {
connected.complete(null);
}));
}));
}));
}
}

0 comments on commit c4881b4

Please sign in to comment.