Skip to content

Commit

Permalink
feat: add offset pagination to SCSI (linkedin#110)
Browse files Browse the repository at this point in the history
* feat: add offset pagination to SCSI

* move secondary index check and not set last urn

* fix annotations and load count in background thread

* move load count to fix tests

* move loadCount and format code

* return empty list
  • Loading branch information
kaliang1 authored Jul 13, 2021
1 parent 603e758 commit 77df5d8
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 90 deletions.
98 changes: 77 additions & 21 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ protected abstract <ASPECT extends RecordTemplate> long saveLatest(@Nonnull URN
* @param newValue {@link RecordTemplate} of the new value of aspect
* @param version version of the aspect
*/
public abstract <ASPECT extends RecordTemplate> void updateLocalIndex(@Nonnull URN urn,
@Nullable ASPECT newValue, long version);
public abstract <ASPECT extends RecordTemplate> void updateLocalIndex(@Nonnull URN urn, @Nullable ASPECT newValue,
long version);

/**
* Returns list of urns from local secondary index that satisfy the given filter conditions.
Expand All @@ -407,34 +407,37 @@ public List<URN> listUrns(@Nonnull IndexFilter indexFilter, @Nullable URN lastUr
return listUrns(indexFilter, null, lastUrn, pageSize);
}

/**
* Similar to {@link #listUrns(IndexFilter, IndexSortCriterion, Urn, int)} but returns a list result with pagination
* information.
*
* @param start the starting offset of the page
* @return a {@link ListResult} containing a list of urns and other pagination information
*/
@Nonnull
public abstract <ASPECT extends RecordTemplate> ListResult<URN> listUrns(@Nonnull IndexFilter indexFilter,
@Nullable IndexSortCriterion indexSortCriterion, int start, int pageSize);

/**
* Similar to {@link #listUrns(IndexFilter, Urn, int)}. This is to get all urns with type URN.
*/
@Nonnull
public List<URN> listUrns(@Nonnull Class<URN> urnClazz, @Nullable URN lastUrn, int pageSize) {
final IndexFilter indexFilter = new IndexFilter()
.setCriteria(new IndexCriterionArray(new IndexCriterion().setAspect(urnClazz.getCanonicalName())));
final IndexFilter indexFilter = new IndexFilter().setCriteria(
new IndexCriterionArray(new IndexCriterion().setAspect(urnClazz.getCanonicalName())));
return listUrns(indexFilter, lastUrn, pageSize);
}

/**
* Retrieves list of {@link UrnAspectEntry} containing latest version of aspects along with the urn for the list of urns
* returned from local secondary index that satisfy given filter conditions. The returned list is ordered by the
* sort criterion but ordered lexicographically by the string representation of the URN by default.
* Retrieves list of urn aspect entries corresponding to the aspect classes and urns.
*
* @param aspectClasses aspect classes whose latest versions need to be retrieved
* @param indexFilter {@link IndexFilter} containing filter conditions to be applied
* @param indexSortCriterion {@link IndexSortCriterion} sort conditions to be applied
* @param lastUrn last urn of the previous fetched page. For the first page, this should be set as NULL
* @param pageSize maximum number of distinct urns whose aspects need to be retrieved
* @return ordered list of latest versions of aspects along with urns returned from local secondary index
* satisfying given filter conditions
* @param urns corresponding urns to be retrieved
* @return list of latest versions of aspects along with urns
*/
@Nonnull
public List<UrnAspectEntry<URN>> getAspects(@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn, int pageSize) {

final List<URN> urns = listUrns(indexFilter, indexSortCriterion, lastUrn, pageSize);
private List<UrnAspectEntry<URN>> getUrnAspectEntries(@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull List<URN> urns) {
final Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> urnAspectMap =
get(aspectClasses, new HashSet<>(urns));

Expand All @@ -458,16 +461,66 @@ public List<UrnAspectEntry<URN>> getAspects(@Nonnull Set<Class<? extends RecordT
.collect(Collectors.toList());
}

/**
* Retrieves list of {@link UrnAspectEntry} containing latest version of aspects along with the urn for the list of urns
* returned from local secondary index that satisfy given filter conditions. The returned list is ordered by the
* sort criterion but ordered lexicographically by the string representation of the URN by default.
*
* @param aspectClasses aspect classes whose latest versions need to be retrieved
* @param indexFilter {@link IndexFilter} containing filter conditions to be applied
* @param indexSortCriterion {@link IndexSortCriterion} sort conditions to be applied
* @param lastUrn last urn of the previous fetched page. For the first page, this should be set as NULL
* @param pageSize maximum number of distinct urns whose aspects need to be retrieved
* @return ordered list of latest versions of aspects along with urns returned from local secondary index
* satisfying given filter conditions
*/
@Nonnull
public List<UrnAspectEntry<URN>> getAspects(@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion, @Nullable URN lastUrn,
int pageSize) {

final List<URN> urns = listUrns(indexFilter, indexSortCriterion, lastUrn, pageSize);

return getUrnAspectEntries(aspectClasses, urns);
}

/**
* Similar to {@link #getAspects(Set, IndexFilter, IndexSortCriterion, Urn, int)}
* but sorts lexicographically by the URN.
*/
@Nonnull
public List<UrnAspectEntry<URN>> getAspects(@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull IndexFilter indexFilter, @Nullable URN lastUrn, int pageSize) {
@Nonnull IndexFilter indexFilter, @Nullable URN lastUrn, int pageSize) {
return getAspects(aspectClasses, indexFilter, null, lastUrn, pageSize);
}

/**
* Similar to {@link #getAspects(Set, IndexFilter, IndexSortCriterion, Urn, int)}
* but returns a list of aspects with pagination information.
*
* @param start starting offset of the page
* @return a {@link ListResult} containing an ordered list of latest versions of aspects along with urns returned from
* local secondary index satisfying given filter conditions and pagination information
*/
@Nonnull
public ListResult<UrnAspectEntry<URN>> getAspects(@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull IndexFilter indexFilter, @Nullable IndexSortCriterion indexSortCriterion, int start, int pageSize) {

final ListResult<URN> listResult = listUrns(indexFilter, indexSortCriterion, start, pageSize);
final List<URN> urns = listResult.getValues();

final List<UrnAspectEntry<URN>> urnAspectEntries = getUrnAspectEntries(aspectClasses, urns);

return ListResult.<UrnAspectEntry<URN>>builder().values(urnAspectEntries)
.metadata(listResult.getMetadata())
.nextStart(listResult.getNextStart())
.havingMore(listResult.isHavingMore())
.totalCount(listResult.getTotalCount())
.totalPageCount(listResult.getTotalPageCount())
.pageSize(listResult.getPageSize())
.build();
}

/**
* Runs the given lambda expression in a transaction with a limited number of retries.
*
Expand Down Expand Up @@ -544,7 +597,8 @@ protected abstract <ASPECT extends RecordTemplate> void applyTimeBasedRetention(
* @deprecated Use {@link #backfill(Set, Set)} instead
*/
@Nonnull
public <ASPECT extends RecordTemplate> Optional<ASPECT> backfill(@Nonnull Class<ASPECT> aspectClass, @Nonnull URN urn) {
public <ASPECT extends RecordTemplate> Optional<ASPECT> backfill(@Nonnull Class<ASPECT> aspectClass,
@Nonnull URN urn) {
return backfill(BackfillMode.BACKFILL_ALL, aspectClass, urn);
}

Expand Down Expand Up @@ -584,7 +638,8 @@ public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTe
private Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> backfill(
@Nonnull BackfillMode mode, @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses, @Nonnull Set<URN> urns) {
checkValidAspects(aspectClasses);
final Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> urnToAspects = get(aspectClasses, urns);
final Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> urnToAspects =
get(aspectClasses, urns);
urnToAspects.forEach((urn, aspects) -> {
aspects.forEach((aspectClass, aspect) -> aspect.ifPresent(value -> backfill(mode, value, urn)));
});
Expand Down Expand Up @@ -619,7 +674,8 @@ public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTe
* @param urn {@link Urn} for the entity
* @param <ASPECT> must be a supported aspect type in {@code ASPECT_UNION}.
*/
private <ASPECT extends RecordTemplate> void backfill(@Nonnull BackfillMode mode, @Nonnull ASPECT aspect, @Nonnull URN urn) {
private <ASPECT extends RecordTemplate> void backfill(@Nonnull BackfillMode mode, @Nonnull ASPECT aspect,
@Nonnull URN urn) {
if (_enableLocalSecondaryIndex && (mode == BackfillMode.SCSI_ONLY || mode == BackfillMode.BACKFILL_ALL)) {
updateLocalIndex(urn, aspect, FIRST_VERSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ protected <ASPECT extends RecordTemplate> long saveLatest(FooUrn urn, Class<ASPE
}

@Override
public <ASPECT extends RecordTemplate> void updateLocalIndex(@Nonnull FooUrn urn,
@Nullable ASPECT newValue, long version) {
public <ASPECT extends RecordTemplate> void updateLocalIndex(@Nonnull FooUrn urn, @Nullable ASPECT newValue,
long version) {

}

Expand Down Expand Up @@ -106,6 +106,12 @@ public List<FooUrn> listUrns(@Nonnull IndexFilter indexFilter, @Nullable IndexSo
return null;
}

@Override
public ListResult<FooUrn> listUrns(@Nonnull IndexFilter indexFilter,
@Nullable IndexSortCriterion indexSortCriterion, int start, int pageSize) {
return ListResult.<FooUrn>builder().build();
}

@Override
public <ASPECT extends RecordTemplate> ListResult<ASPECT> list(Class<ASPECT> aspectClass, FooUrn urn, int start,
int pageSize) {
Expand Down
Loading

0 comments on commit 77df5d8

Please sign in to comment.