Skip to content

Commit

Permalink
[core] Changelog expire should check index file existence (#4186)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Sep 13, 2024
1 parent 983a552 commit 5564403
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public static Consumer fromJson(String json) {
public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
int retryNumber = 0;
MismatchedInputException exception = null;
while (retryNumber++ < 5) {
while (retryNumber++ < 10) {
try {
return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);
} catch (MismatchedInputException e) {
// retry
exception = e;
try {
Thread.sleep(100);
Thread.sleep(1_000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public Set<String> manifestSkippingSet(List<Snapshot> skippingSnapshots) {

// index manifests
String indexManifest = skippingSnapshot.indexManifest();
if (indexManifest != null) {
if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
skippingSet.add(indexManifest);
indexFileHandler.readManifest(indexManifest).stream()
.map(IndexManifestEntry::indexFile)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TraceableFileIO;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.UUID;

import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;

/** Test for changelog expire. */
public class ChangelogExpireTest extends IndexFileExpireTableTest {

@BeforeEach
public void beforeEachBase() throws Exception {
CatalogContext context =
CatalogContext.create(
new Path(TraceableFileIO.SCHEME + "://" + tempPath.toString()));
context.options().set(CACHE_ENABLED.key(), "false");
Catalog catalog = CatalogFactory.createCatalog(context);
Identifier identifier = new Identifier("default", "T");
catalog.createDatabase(identifier.getDatabaseName(), true);
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.option("changelog-producer", "input")
.option("changelog.num-retained.max", "40")
.option("snapshot.num-retained.max", "39")
.options(tableOptions().toMap())
.build();
catalog.createTable(identifier, schema, true);
table = (FileStoreTable) catalog.getTable(identifier);
commitUser = UUID.randomUUID().toString();
}

@Test
public void testChangelogExpire() throws Exception {
ExpireConfig expireConfig =
ExpireConfig.builder().changelogRetainMax(40).snapshotRetainMax(39).build();
prepareExpireTable();
ExpireChangelogImpl expire =
(ExpireChangelogImpl) table.newExpireChangelog().config(expireConfig);

ExpireSnapshotsImpl expireSnapshots =
(ExpireSnapshotsImpl) table.newExpireSnapshots().config(expireConfig);
expireSnapshots.expireUntil(1, 7);
Assertions.assertThatCode(() -> expire.expireUntil(1, 6)).doesNotThrowAnyException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void testIndexFileRollbackTag() throws Exception {
assertThat(indexManifestSize()).isEqualTo(1);
}

private void prepareExpireTable() throws Exception {
protected void prepareExpireTable() throws Exception {
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = writeBuilder.newWrite();
StreamTableCommit commit = writeBuilder.newCommit();
Expand Down

0 comments on commit 5564403

Please sign in to comment.