Skip to content

Commit

Permalink
Merge pull request #215 from ydb-platform/release_v2.1.10
Browse files Browse the repository at this point in the history
Release v2.1.10
  • Loading branch information
pnv1 authored Jan 11, 2024
2 parents b58f7ee + 0d37eb7 commit 8b02238
Show file tree
Hide file tree
Showing 27 changed files with 202 additions and 53 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 2.1.10 ##

* Topics: Added message metadata support
* Topics: Added support for reading without a consumer
* Topics: Fixed a bug where onReaderClosed was not called on async reader shutdown
* Topics: Small logging improvements
* Added batch limit support for readTable

## 2.1.9 ##

* Topics: Fixed a bug where first commit was not getting commitResponse if a user had sent a custom StartPartitionSessionResponse
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Firstly you can import YDB Java BOM to specify correct versions of SDK modules.
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>tech.ydb</groupId>
<version>2.1.9</version>
<version>2.1.10</version>
<artifactId>ydb-sdk-bom</artifactId>
<name>Java SDK Bill of Materials</name>
<description>Java SDK Bill of Materials (BOM)</description>
Expand All @@ -15,7 +15,7 @@

<properties>
<ydb-auth-api.version>1.0.0</ydb-auth-api.version>
<ydb-proto-api.version>1.4.1</ydb-proto-api.version>
<ydb-proto-api.version>1.5.2</ydb-proto-api.version>
<yc-auth.version>2.1.0</yc-auth.version>
</properties>

Expand Down
2 changes: 1 addition & 1 deletion coordination/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
</parent>

<artifactId>ydb-sdk-coordination</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
</parent>

<artifactId>ydb-sdk-core</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/tech/ydb/core/StatusCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum StatusCode {
UNDETERMINED(SERVER_STATUSES_FIRST + 170),
UNSUPPORTED(SERVER_STATUSES_FIRST + 180),
SESSION_BUSY(SERVER_STATUSES_FIRST + 190),
EXTERNAL_ERROR(SERVER_STATUSES_FIRST + 200),

// Client statuses
/** Cannot connect or unrecoverable network error. (map from gRPC UNAVAILABLE) */
Expand Down Expand Up @@ -129,6 +130,7 @@ public static StatusCode fromProto(StatusIds.StatusCode code) {
case UNDETERMINED: return UNDETERMINED;
case UNSUPPORTED: return UNSUPPORTED;
case SESSION_BUSY: return SESSION_BUSY;
case EXTERNAL_ERROR: return EXTERNAL_ERROR;
default:
return UNUSED_STATUS;
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/version.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.1.9
version=2.1.10
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>

<name>Java SDK for YDB</name>
<description>Java SDK for YDB</description>
Expand Down
2 changes: 1 addition & 1 deletion scheme/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
</parent>

<artifactId>ydb-sdk-scheme</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
</parent>

<artifactId>ydb-sdk-table</artifactId>
Expand Down
4 changes: 3 additions & 1 deletion table/src/main/java/tech/ydb/table/impl/BaseSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,9 @@ public GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTabl
.setSessionId(id)
.setPath(tablePath)
.setOrdered(settings.isOrdered())
.setRowLimit(settings.getRowLimit());
.setRowLimit(settings.getRowLimit())
.setBatchLimitBytes(settings.batchLimitBytes())
.setBatchLimitRows(settings.batchLimitRows());

Value<?> fromKey = settings.getFromKey();
if (fromKey != null) {
Expand Down
24 changes: 24 additions & 0 deletions table/src/main/java/tech/ydb/table/settings/ReadTableSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class ReadTableSettings extends BaseRequestSettings {
private final boolean toInclusive;
private final int rowLimit;
private final ImmutableList<String> columns;
private final int batchLimitBytes;
private final int batchLimitRows;

private ReadTableSettings(Builder builder) {
super(builder);
Expand All @@ -39,6 +41,8 @@ private ReadTableSettings(Builder builder) {
this.toInclusive = builder.toInclusive;
this.rowLimit = builder.rowLimit;
this.columns = ImmutableList.copyOf(builder.columns);
this.batchLimitBytes = builder.batchLimitBytes;
this.batchLimitRows = builder.batchLimitRows;
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -75,6 +79,14 @@ public ImmutableList<String> getColumns() {
return columns;
}

public int batchLimitBytes() {
return batchLimitBytes;
}

public int batchLimitRows() {
return batchLimitRows;
}

/**
* BUILDER
*/
Expand All @@ -86,6 +98,8 @@ public static final class Builder extends BaseBuilder<Builder> {
private boolean toInclusive = false;
private int rowLimit = 0;
private List<String> columns = Collections.emptyList();
private int batchLimitBytes = 0;
private int batchLimitRows = 0;

public Builder orderedRead(boolean ordered) {
this.ordered = ordered;
Expand Down Expand Up @@ -160,6 +174,16 @@ public Builder column(String column) {
return this;
}

public Builder batchLimitBytes(int batchLimitBytes) {
this.batchLimitBytes = batchLimitBytes;
return this;
}

public Builder batchLimitRows(int batchLimitRows) {
this.batchLimitRows = batchLimitRows;
return this;
}

@Override
public ReadTableSettings build() {
return new ReadTableSettings(this);
Expand Down
2 changes: 1 addition & 1 deletion tests/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tests/junit4-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion tests/junit5-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion topic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-parent</artifactId>
<version>2.1.9</version>
<version>2.1.10</version>
</parent>

<artifactId>ydb-sdk-topic</artifactId>
Expand Down
24 changes: 24 additions & 0 deletions topic/src/main/java/tech/ydb/topic/description/MetadataItem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package tech.ydb.topic.description;

import javax.annotation.Nonnull;

/**
* @author Nikolay Perfilov
*/
public class MetadataItem {
private final String key;
private final byte[] value;

public MetadataItem(@Nonnull String key, byte[] value) {
this.key = key;
this.value = value;
}

public String getKey() {
return key;
}

public byte[] getValue() {
return value;
}
}
8 changes: 8 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/Message.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package tech.ydb.topic.read;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import io.grpc.ExperimentalApi;

import tech.ydb.topic.description.MetadataItem;

/**
* @author Nikolay Perfilov
*/
Expand Down Expand Up @@ -52,6 +55,11 @@ public interface Message {
*/
Instant getWrittenAt();

/**
* @return message metadata items
*/
List<MetadataItem> getMetadataItems();

/**
* @return Partition session of this message
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,20 @@ protected void handleClosePartitionSession(tech.ydb.topic.read.PartitionSession
});
}

@Override
protected void handleCloseReader() {
protected void handleReaderClosed() {
handlerExecutor.execute(() -> {
eventHandler.onReaderClosed(new ReaderClosedEvent());
});
}

@Override
protected CompletableFuture<Void> shutdownImpl() {
return super.shutdownImpl().whenComplete((res, th) -> {
if (defaultHandlerExecutorService != null) {
logger.debug("Shutting down default handler executor");
defaultHandlerExecutorService.shutdown();
}
});
protected void onShutdown(String reason) {
super.onShutdown(reason);
handleReaderClosed();
if (defaultHandlerExecutorService != null) {
logger.debug("Shutting down default handler executor");
defaultHandlerExecutorService.shutdown();
}
}

@Override
Expand Down
15 changes: 15 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/MessageImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.read.DecompressionException;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.OffsetsRange;
Expand All @@ -22,6 +24,7 @@ public class MessageImpl implements Message {
private final String messageGroupId;
private final BatchMeta batchMeta;
private final PartitionSessionImpl partitionSession;
private List<MetadataItem> metadataItems;
private final OffsetsRange offsetsToCommit;
private final CommitterImpl committer;
private boolean isDecompressed = false;
Expand All @@ -36,6 +39,7 @@ private MessageImpl(Builder builder) {
this.messageGroupId = builder.messageGroupId;
this.batchMeta = builder.batchMeta;
this.partitionSession = builder.partitionSession;
this.metadataItems = builder.metadataItems;
this.offsetsToCommit = new OffsetsRangeImpl(commitOffsetFrom, offset + 1);
this.committer = new CommitterImpl(partitionSession, 1, offsetsToCommit);
}
Expand Down Expand Up @@ -105,6 +109,11 @@ public PartitionSessionImpl getPartitionSessionImpl() {
return partitionSession;
}

@Override
public List<MetadataItem> getMetadataItems() {
return metadataItems;
}

public void setDecompressed(boolean decompressed) {
isDecompressed = decompressed;
}
Expand All @@ -130,6 +139,7 @@ public static class Builder {
private String messageGroupId;
private BatchMeta batchMeta;
private PartitionSessionImpl partitionSession;
private List<MetadataItem> metadataItems;

public Builder setData(byte[] data) {
this.data = data;
Expand Down Expand Up @@ -171,6 +181,11 @@ public Builder setPartitionSession(PartitionSessionImpl partitionSession) {
return this;
}

public Builder setMetadataItems(List<MetadataItem> metadataItems) {
this.metadataItems = metadataItems;
return this;
}

public MessageImpl build() {
return new MessageImpl(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import tech.ydb.core.utils.ProtobufUtils;
import tech.ydb.proto.topic.YdbTopic;
import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.MetadataItem;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.OffsetsRange;
import tech.ydb.topic.read.PartitionSession;
Expand Down Expand Up @@ -140,6 +142,11 @@ public CompletableFuture<Void> addBatches(List<YdbTopic.StreamReadMessage.ReadRe
.setCommitOffsetFrom(commitOffsetFrom)
.setCreatedAt(ProtobufUtils.protoToInstant(messageData.getCreatedAt()))
.setMessageGroupId(messageData.getMessageGroupId())
.setMetadataItems(messageData.getMetadataItemsList()
.stream()
.map(metadataItem -> new MetadataItem(metadataItem.getKey(),
metadataItem.getValue().toByteArray()))
.collect(Collectors.toList()))
.build()
);
});
Expand Down Expand Up @@ -245,10 +252,12 @@ public void handleCommitResponse(long committedOffset) {
return;
}
Map<Long, CompletableFuture<Void>> futuresToComplete = commitFutures.headMap(committedOffset, true);
logger.info("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}" +
". Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures",
path, id, partitionId, committedOffset, lastCommittedOffset, committedOffset - lastCommittedOffset,
futuresToComplete.size());
if (logger.isDebugEnabled()) {
logger.debug("[{}] Commit response received for partition session {} (partition {}). Committed offset: {}" +
". Previous committed offset: {} (diff is {} message(s)). Completing {} commit futures",
path, id, partitionId, committedOffset, lastCommittedOffset, committedOffset - lastCommittedOffset,
futuresToComplete.size());
}
lastCommittedOffset = committedOffset;
futuresToComplete.values().forEach(future -> future.complete(null));
futuresToComplete.clear();
Expand Down
Loading

0 comments on commit 8b02238

Please sign in to comment.