Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some topic examples fixes #43

Merged
merged 13 commits into from
Sep 30, 2024
15 changes: 13 additions & 2 deletions ydb-cookbook/src/main/java/tech/ydb/examples/SimpleExample.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
package tech.ydb.examples;

import java.time.Duration;

import tech.ydb.auth.iam.CloudAuthHelper;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.scheme.SchemeClient;
import tech.ydb.scheme.description.DescribePathResult;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.SupportedCodecs;
import tech.ydb.topic.settings.CreateTopicSettings;


/**
* @author Sergey Polovko
* @author Nikolay Perfilov
*/
public abstract class SimpleExample {
protected static final String TOPIC_NAME = "test-topic";
protected static final String CONSUMER_NAME = "test-consumer";
protected static final String TOPIC_NAME = System.getenv("YDB_TOPIC_NAME");
protected static final String CONSUMER_NAME = System.getenv("YDB_CONSUMER_NAME");

protected void doMain(String[] args) {
if (args.length > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
@Override
public void onReaderClosed(ReaderClosedEvent event) {
logger.info("Reader is closed.");
if (!messageReceivedFuture.isDone()) {
messageReceivedFuture.complete(null);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.examples.SimpleExample;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.SyncReader;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.examples.topic;

import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -109,6 +110,9 @@ protected void run(GrpcTransport transport, String pathPrefix) {
}
logger.info("Received a signal to stop writing");

// Wait for all writes to receive a WriteAck before shutting down writer
writer.flush();

try {
writer.shutdown(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException exception) {
Expand Down Expand Up @@ -240,7 +244,7 @@ private class Handler extends AbstractReadEventHandler {
public void onMessages(DataReceivedEvent event) {
for (tech.ydb.topic.read.Message message : event.getMessages()) {
messagesReceived.incrementAndGet();
if (logger.isTraceEnabled()) {
if (logger.isDebugEnabled()) {
StringBuilder str = new StringBuilder("Message received");
str.append("\n");
str.append(" offset: ").append(message.getOffset()).append("\n")
Expand All @@ -251,13 +255,22 @@ public void onMessages(DataReceivedEvent event) {
.append(" writtenAt: ").append(message.getWrittenAt()).append("\n")
.append(" partitionSession: ").append(message.getPartitionSession().getId()).append("\n")
.append(" partitionId: ").append(message.getPartitionSession().getPartitionId())
.append("\n")
.append(" metadataItems: ")
.append("\n");
message.getMetadataItems().forEach(item -> str
.append(" key: \"")
.append(item.getKey())
.append("\", value: \"")
.append(new String(item.getValue(), StandardCharsets.UTF_8))
.append("\"")
.append("\n"));
if (!message.getWriteSessionMeta().isEmpty()) {
str.append(" writeSessionMeta:\n");
message.getWriteSessionMeta().forEach((key, value) ->
str.append(" ").append(key).append(": ").append(value).append("\n"));
}
logger.trace(str.toString());
logger.debug(str.toString());
} else {
logger.debug("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
}
Expand Down
38 changes: 31 additions & 7 deletions ydb-cookbook/src/main/java/tech/ydb/examples/topic/WriteAsync.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.examples.topic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -23,6 +24,8 @@
*/
public class WriteAsync extends SimpleExample {
private static final Logger logger = LoggerFactory.getLogger(WriteAsync.class);
private static final int MESSAGES_COUNT = 5;
private static final int WAIT_TIMEOUT_SECONDS = 60;

@Override
protected void run(GrpcTransport transport, String pathPrefix) {
Expand Down Expand Up @@ -52,14 +55,17 @@ protected void run(GrpcTransport transport, String pathPrefix) {
return null;
});

for (int i = 1; i <= 5; i++) {
// A latch to wait for all writes to receive a WriteAck before shutting down writer
CountDownLatch writesInProgress = new CountDownLatch(MESSAGES_COUNT);

for (int i = 1; i <= MESSAGES_COUNT; i++) {
final int index = i;
try {
String messageString = "message" + i;
// Blocks until the message is put into sending buffer
writer.send(Message.of(messageString.getBytes())).whenComplete((result, ex) -> {
if (ex != null) {
logger.error("Exception while sending message {}: ", index, ex);
logger.error("Exception while sending a message {}: ", index, ex);
} else {
logger.info("Message {} ack received", index);

Expand All @@ -76,20 +82,38 @@ protected void run(GrpcTransport transport, String pathPrefix) {
break;
}
}
writesInProgress.countDown();
});
} catch (QueueOverflowException exception) {
logger.error("Queue overflow exception while sending message{}: ", index, exception);
// Send queue is full. Need retry with backoff or skip
logger.error("Queue overflow exception while sending a message{}: ", index, exception);
// Send queue is full. Need to retry with backoff or skip
writesInProgress.countDown();
}

logger.info("Message {} is sent", index);
}

long timeoutSeconds = 10;
try {
writer.shutdown().get(timeoutSeconds, TimeUnit.SECONDS);
while (!writesInProgress.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
logger.error("Writes are not finished in {} seconds", WAIT_TIMEOUT_SECONDS);
}
} catch (InterruptedException exception) {
logger.error("Waiting for writes to finish was interrupted: ", exception);
}

try {
if (!writesInProgress.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
logger.error("Writes are not finished in {} seconds", WAIT_TIMEOUT_SECONDS);
}
} catch (InterruptedException exception) {
logger.error("Waiting for writes to finish was interrupted: ", exception);
}

try {
writer.shutdown().get(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (TimeoutException exception) {
logger.error("Timeout exception during writer termination ({} seconds): ", timeoutSeconds, exception);
logger.error("Timeout exception during writer termination ({} seconds): ", WAIT_TIMEOUT_SECONDS,
exception);
} catch (ExecutionException exception) {
logger.error("Execution exception during writer termination: ", exception);
} catch (InterruptedException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ protected void run(GrpcTransport transport, String pathPrefix) {
}
}

// Wait for all writes to receive a WriteAck before shutting down writer
writer.flush();
logger.info("Flush finished");

long shutdownTimeoutSeconds = 10;
try {
writer.shutdown(shutdownTimeoutSeconds, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import tech.ydb.table.impl.PooledTableClient;
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.DecompressionException;
Expand All @@ -42,7 +40,7 @@
public class TransactionReadAsync extends SimpleExample {
private static final Logger logger = LoggerFactory.getLogger(TransactionReadAsync.class);
private static final long MAX_MEMORY_USAGE_BYTES = 500 * 1024 * 1024; // 500 Mb
private static final int MESSAGES_COUNT = 5;
private static final int MESSAGES_COUNT = 1;

private final CompletableFuture<Void> messageReceivedFuture = new CompletableFuture<>();
private TableClient tableClient;
Expand Down Expand Up @@ -135,11 +133,13 @@ public void onMessages(DataReceivedEvent event) {
// creating session and transaction
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't get session from pool: {}", sessionResult);
logger.error("Couldn't get a session from the pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
TableTransaction transaction = session.beginTransaction(TxMode.SERIALIZABLE_RW)
.join()
.getValue();

// do something else in transaction
transaction.executeDataQuery("SELECT 1").join();
Expand Down Expand Up @@ -199,6 +199,7 @@ public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
@Override
public void onReaderClosed(ReaderClosedEvent event) {
logger.info("Reader is closed.");
messageReceivedFuture.complete(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.Transaction;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
Expand Down Expand Up @@ -49,37 +47,34 @@ protected void run(GrpcTransport transport, String pathPrefix) {
reader.init();

try {
// Reading 5 messages
for (int i = 0; i < 5; i++) {
// creating session and transaction
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't get session from pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);

// do something else in transaction
transaction.executeDataQuery("SELECT 1").join();
// analyzeQueryResultIfNeeded();
// creating session and transaction
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
if (!sessionResult.isSuccess()) {
logger.error("Couldn't a get session from the pool: {}", sessionResult);
return; // retry or shutdown
}
Session session = sessionResult.getValue();
TableTransaction transaction = session.createNewTransaction(TxMode.SERIALIZABLE_RW);

//Session session
Message message = reader.receive(ReceiveSettings.newBuilder()
.setTransaction(transaction)
.build());
byte[] messageData;
try {
messageData = message.getData();
} catch (DecompressionException e) {
logger.warn("Decompression exception while receiving a message: ", e);
messageData = e.getRawData();
}
logger.info("Message received: {}", new String(messageData, StandardCharsets.UTF_8));
// do something else in transaction
transaction.executeDataQuery("SELECT 1").join();
// analyzeQueryResultIfNeeded();

transaction.commit().join();
// analyze commit status
//Session session
Message message = reader.receive(ReceiveSettings.newBuilder()
.setTransaction(transaction)
.build());
byte[] messageData;
try {
messageData = message.getData();
} catch (DecompressionException e) {
logger.warn("Decompression exception while receiving a message: ", e);
messageData = e.getRawData();
}
logger.info("Message received: {}", new String(messageData, StandardCharsets.UTF_8));

transaction.commit().join();
// analyze commit status
} catch (InterruptedException exception) {
logger.error("Interrupted exception while waiting for message: ", exception);
}
Expand Down
Loading
Loading