Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix no batch returned when there is batch corresponding to conditon with limit #262

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Overview (5.4.4)
# Overview (5.4.5)

Cradle API is used to work with Cradle - the datalake where th2 stores its data.

Expand Down Expand Up @@ -209,6 +209,10 @@ Test events have mandatory parameters that are verified when storing an event. T

## Release notes

### 5.4.5
* Fixed the problem - grouped messages queries with limit on number of batches to return could return no batches when there is a batch that falls under condition.
* Updated th2 gradle plugin: `0.1.5`

### 5.4.4
* Fixed the problem - page cache doesn't work correct when storage has removed page(s)

Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id "com.exactpro.th2.gradle.base" version "0.1.3"
id "com.exactpro.th2.gradle.publish" version "0.1.3"
id "com.exactpro.th2.gradle.base" version "0.1.5"
id "com.exactpro.th2.gradle.publish" version "0.1.5"
}

allprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,25 @@

public class FilteredGroupedMessageBatchIterator extends MappedIterator<StoredGroupedMessageBatch, StoredGroupedMessageBatch>
{
private final FilterForGreater<Instant> filterFrom;
private final FilterForLess<Instant> filterTo;


public FilteredGroupedMessageBatchIterator(Iterator<StoredGroupedMessageBatch> sourceIterator, GroupedMessageFilter filter,
int limit, AtomicInteger returned)
{
super(sourceIterator, limit, returned);
filterFrom = filter.getFrom();
filterTo = filter.getTo();
super(createTargetIterator(sourceIterator, filter), limit, returned);
}

@Override
Iterator<StoredGroupedMessageBatch> createTargetIterator(Iterator<StoredGroupedMessageBatch> sourceIterator)

private static Iterator<StoredGroupedMessageBatch> createTargetIterator(Iterator<StoredGroupedMessageBatch> sourceIterator, GroupedMessageFilter filter)
{
Predicate<StoredGroupedMessageBatch> filterPredicate = createFilterPredicate();
Predicate<StoredGroupedMessageBatch> filterPredicate = createFilterPredicate(filter);
return Streams.stream(sourceIterator)
.filter(filterPredicate)
.iterator();
}

private Predicate<StoredGroupedMessageBatch> createFilterPredicate()
private static Predicate<StoredGroupedMessageBatch> createFilterPredicate(GroupedMessageFilter filter)
{
return storedMessageBatch ->
(filterFrom == null || filterFrom.check(storedMessageBatch.getLastTimestamp()))
&& (filterTo == null || filterTo.check(storedMessageBatch.getFirstTimestamp()));
(filter.getFrom() == null || filter.getFrom().check(storedMessageBatch.getLastTimestamp()))
&& (filter.getTo() == null || filter.getTo().check(storedMessageBatch.getFirstTimestamp()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,18 @@

public class FilteredMessageIterator extends MappedIterator<StoredMessageBatch, StoredMessage>
{
private final FilterForAny<Long> sequence;
private final FilterForGreater<Instant> timestampFrom;
private final FilterForLess<Instant> timestampTo;
private final Order order;

public FilteredMessageIterator(Iterator<StoredMessageBatch> batchIterator, MessageFilter filter, int limit,
AtomicInteger returned)
{
super(batchIterator, limit, returned);
sequence = filter == null ? null : filter.getSequence();
timestampFrom = filter == null ? null : filter.getTimestampFrom();
timestampTo = filter == null ? null : filter.getTimestampTo();
order = filter == null ? Order.DIRECT : filter.getOrder();
super(createTargetIterator(batchIterator, filter), limit, returned);
}

@Override
Iterator<StoredMessage> createTargetIterator(Iterator<StoredMessageBatch> sourceIterator)
private static Iterator<StoredMessage> createTargetIterator(Iterator<StoredMessageBatch> sourceIterator, MessageFilter filter)
{
Predicate<StoredMessage> filterPredicate = createFilterPredicate();
Predicate<StoredMessage> filterPredicate = createFilterPredicate(filter);
return Streams.stream(sourceIterator)
.flatMap(b -> {
if (order.equals(Order.REVERSE)) {
if (filter.getOrder().equals(Order.REVERSE)) {
var elements = new ArrayList<>(b.getMessages());
Collections.reverse(elements);

Expand All @@ -68,8 +58,11 @@ Iterator<StoredMessage> createTargetIterator(Iterator<StoredMessageBatch> source
.iterator();
}

private Predicate<StoredMessage> createFilterPredicate()
private static Predicate<StoredMessage> createFilterPredicate(MessageFilter filter)
{
FilterForAny<Long> sequence = filter == null ? null : filter.getSequence();
FilterForGreater<Instant> timestampFrom = filter == null ? null : filter.getTimestampFrom();
FilterForLess<Instant> timestampTo = filter == null ? null : filter.getTimestampTo();
return storedMessage ->
(sequence == null || sequence.check(storedMessage.getId().getSequence()))
&& (timestampFrom == null || timestampFrom.check(storedMessage.getTimestamp()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,38 @@ public CompletableFuture<Iterator<StoredGroupedMessageBatch>> nextIterator() {
return CompletableFuture.completedFuture(null);
}

CassandraGroupedMessageFilter cassandraFilter = createFilter(nextPage, max(limit - returned.get(), 0));
CassandraGroupedMessageFilter cassandraFilter;
if(this.order == Order.DIRECT) {
// There is no way to make queries with limit work 100% of the cases through cassandra query filtering with only first_message_time and first_message_date being clustering columns.
// It is required to have last_message_time, last_message_date to make DB filtering possible
// We have to rely on programmatic filtering for now

// Example 1:
// startTime from user: 18:00 LIMIT: 1 ORDER: DIRECT
//
// Batches on the page:
// batch1: 17:59 - 17:59
// batch2: 18:01 - 18:01
//
// first query from getNearestBatchTime: give me the last batch out of all pages where start time <= 18:00 -> batch1
// second query with start 17:59 limit 1 and direction DIRECT: cassandra returns batch1, this batch is later programmatically filtered as last_message_time < 18:00. User haven't received batch2 as it should.
//

// Example 2:
// startTime from user: 18:00 LIMIT: 1 ORDER: DIRECT
//
// Batches on the page:
// batch1: 17:59 - 17:59
// batch2: 17:59 - 18:01
//
// first query from getNearestBatchTime: give me the last batch out of all pages where start time <= 18:00 -> batch2
// second query with start 17:59 limit 1 and direction DIRECT: cassandra returns batch1, this batch is later programmatically filtered as last_message_time < 18:00. User haven't received batch2 as it should.

cassandraFilter = createFilter(nextPage, 0);
} else {
cassandraFilter = createFilter(nextPage, max(limit - returned.get(), 0));
}


logger.debug("Getting next iterator for '{}' by filter {}", getRequestInfo(), cassandraFilter);
return op.getByFilter(cassandraFilter, selectQueryExecutor, getRequestInfo(), readAttrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ public abstract class MappedIterator<S, T> implements Iterator<T>
private final AtomicInteger returned;
private final Iterator<T> targetIterator;

public MappedIterator(Iterator<S> sourceIterator, int limit, AtomicInteger returned)
public MappedIterator(Iterator<T> targetIterator, int limit, AtomicInteger returned)
{
this.targetIterator = createTargetIterator(sourceIterator);
this.targetIterator = targetIterator;
this.limit = limit;
this.returned = returned;
}

abstract Iterator<T> createTargetIterator(Iterator<S> sourceIterator);

@Override
public boolean hasNext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public abstract class BaseCradleCassandraTest {

private final long storeActionRejectionThreshold = new CoreStorageSettings().calculateStoreActionRejectionThreshold();

protected final Instant dataStart = Instant.now().minus(1, ChronoUnit.HOURS);
protected final Instant dataStart = Instant.now().minus(70, ChronoUnit.MINUTES);
protected final BookId bookId = generateBookId();

protected CqlSession session;
Expand All @@ -80,10 +80,11 @@ public abstract class BaseCradleCassandraTest {
private final PageToAdd page_a3 = new PageToAdd(DEFAULT_PAGE_PREFIX + 3, dataStart.plus(30, ChronoUnit.MINUTES), "");
private final PageToAdd page_a4 = new PageToAdd(DEFAULT_PAGE_PREFIX + 4, dataStart.plus(40, ChronoUnit.MINUTES), "");
private final PageToAdd page_a5 = new PageToAdd(DEFAULT_PAGE_PREFIX + 5, dataStart.plus(50, ChronoUnit.MINUTES), "");
private final PageToAdd page_a6 = new PageToAdd(DEFAULT_PAGE_PREFIX + 6, dataStart.plus(60, ChronoUnit.MINUTES), "");

private final List<PageToAdd> pagesToAdd = List.of(
page_r2, page_r1,
page_a0, page_a1, page_a2, page_a3, page_a4, page_a5
page_a0, page_a1, page_a2, page_a3, page_a4, page_a5, page_a6
);
private final List<PageId> pageIdToRemove = Stream.of(page_r2, page_r1)
.map(page -> new PageId(bookId, page.getStart(), page.getName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,24 @@ protected void generateData() {
GroupedMessageBatchToStore b6 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b6.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 53, 15L));
b6.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 55, 16L));
List<GroupedMessageBatchToStore> data = List.of(b1, b2, b3, b4, b5, b6);

// page 6 contains 2 messages from batch 7, 3 from batch 8 and 1 from batch 1
// contains 1 group test_group3
// contains 1 session alias test_session_alias6
// Those batches are added to test different cases with limit 1 filter.
GroupedMessageBatchToStore b7 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b7.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 65, 17L));
b7.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 65, 18L));

GroupedMessageBatchToStore b8 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 66, 18L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 68, 19L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 69, 20L));

GroupedMessageBatchToStore b9 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b9.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 70, 21L));

List<GroupedMessageBatchToStore> data = List.of(b1, b2, b3, b4, b5, b6, b7, b8, b9);
for (GroupedMessageBatchToStore el : data) {
storage.storeGroupedMessageBatch(el);
}
Expand Down
Loading
Loading