Skip to content

Commit

Permalink
[core] The record level expired time field type is automatically reco…
Browse files Browse the repository at this point in the history
…gnized. (#4458)
  • Loading branch information
zhuangchong authored Nov 6, 2024
1 parent 29c1347 commit b4601f3
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 162 deletions.
12 changes: 6 additions & 6 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@
<td>Duration</td>
<td>Controls the duration for which databases and tables in the catalog are cached.</td>
</tr>
<tr>
<td><h5>cache.partition.max-num</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Long</td>
<td>Controls the max number for which partitions in the catalog are cached.</td>
</tr>
<tr>
<td><h5>cache.manifest.max-memory</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand All @@ -68,6 +62,12 @@
<td>MemorySize</td>
<td>Controls the threshold of small manifest file.</td>
</tr>
<tr>
<td><h5>cache.partition.max-num</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Long</td>
<td>Controls the max number for which partitions in the catalog are cached.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
8 changes: 1 addition & 7 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -627,13 +627,7 @@
<td><h5>record-level.time-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Time field for record level expire.</td>
</tr>
<tr>
<td><h5>record-level.time-field-type</h5></td>
<td style="word-wrap: break-word;">seconds-int</td>
<td><p>Enum</p></td>
<td>Time field type for record level expire, it can be seconds-int,seconds-long, millis-long or timestamp.<br /><br />Possible values:<ul><li>"seconds-int": Timestamps in seconds with INT field type.</li><li>"seconds-long": Timestamps in seconds with BIGINT field type.</li><li>"millis-long": Timestamps in milliseconds with BIGINT field type.</li><li>"timestamp": Timestamp field type.</li></ul></td>
<td>Time field for record level expire. It supports the following types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`, `timestamps in milliseconds with BIGINT` or `timestamp`.</td>
</tr>
<tr>
<td><h5>rowkind.field</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@
<td>MemorySize</td>
<td>Weight of writer buffer in managed memory, Flink will compute the memory size for writer according to the weight, the actual memory used depends on the running environment.</td>
</tr>
<tr>
<td><h5>sink.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand All @@ -248,18 +254,6 @@
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.</td>
</tr>
<tr>
<td><h5>sink.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>source.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the source operators. After setting, the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>source.checkpoint-align.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand All @@ -272,6 +266,12 @@
<td>Duration</td>
<td>If the new snapshot has not been generated when the checkpoint starts to trigger, the enumerator will block the checkpoint and wait for the new snapshot. Set the maximum waiting time to avoid infinite waiting, if timeout, the checkpoint will fail. Note that it should be set smaller than the checkpoint timeout.</td>
</tr>
<tr>
<td><h5>source.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the source operators. After setting, the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>streaming-read.shuffle-bucket-with-partition</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
42 changes: 1 addition & 41 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1351,14 +1351,8 @@ public class CoreOptions implements Serializable {
key("record-level.time-field")
.stringType()
.noDefaultValue()
.withDescription("Time field for record level expire.");

public static final ConfigOption<TimeFieldType> RECORD_LEVEL_TIME_FIELD_TYPE =
key("record-level.time-field-type")
.enumType(TimeFieldType.class)
.defaultValue(TimeFieldType.SECONDS_INT)
.withDescription(
"Time field type for record level expire, it can be seconds-int,seconds-long, millis-long or timestamp.");
"Time field for record level expire. It supports the following types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`, `timestamps in milliseconds with BIGINT` or `timestamp`.");

public static final ConfigOption<String> FIELDS_DEFAULT_AGG_FUNC =
key(FIELDS_PREFIX + "." + DEFAULT_AGG_FUNCTION)
Expand Down Expand Up @@ -2267,11 +2261,6 @@ public String recordLevelTimeField() {
return options.get(RECORD_LEVEL_TIME_FIELD);
}

@Nullable
public TimeFieldType recordLevelTimeFieldType() {
return options.get(RECORD_LEVEL_TIME_FIELD_TYPE);
}

public boolean prepareCommitWaitCompaction() {
if (!needLookup()) {
return false;
Expand Down Expand Up @@ -2920,35 +2909,6 @@ public InlineElement getDescription() {
}
}

/** Time field type for record level expire. */
public enum TimeFieldType implements DescribedEnum {
SECONDS_INT("seconds-int", "Timestamps in seconds with INT field type."),

SECONDS_LONG("seconds-long", "Timestamps in seconds with BIGINT field type."),

MILLIS_LONG("millis-long", "Timestamps in milliseconds with BIGINT field type."),

TIMESTAMP("timestamp", "Timestamp field type.");

private final String value;
private final String description;

TimeFieldType(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}

/** The time unit of materialized table freshness. */
public enum MaterializedTableIntervalFreshnessTimeUnit {
SECOND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ public boolean testPredicate(@Nullable Predicate filePredicate) {
return true;
}

Set<String> requredFieldNames = getRequiredNames(filePredicate);
Set<String> requiredFieldNames = getRequiredNames(filePredicate);

Map<String, Collection<FileIndexReader>> indexReaders = new HashMap<>();
requredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name)));
requiredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name)));
if (!new FileIndexPredicateTest(indexReaders).test(filePredicate).remain()) {
LOG.debug(
"One file has been filtered: "
Expand Down
133 changes: 42 additions & 91 deletions paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
Expand All @@ -33,16 +33,15 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.util.function.Function;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A factory to create {@link RecordReader} expires records by time. */
public class RecordLevelExpire {

private final int timeFieldIndex;
private final int expireTime;
private final CoreOptions.TimeFieldType timeFieldType;
private final DataField rawDataField;
private final Function<InternalRow, Integer> fieldGetter;

@Nullable
public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
Expand All @@ -65,46 +64,14 @@ public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
"Can not find time field %s for record level expire.", timeFieldName));
}

CoreOptions.TimeFieldType timeFieldType = options.recordLevelTimeFieldType();
DataField field = rowType.getField(timeFieldName);
if (!isValidateFieldType(timeFieldType, field)) {
throw new IllegalArgumentException(
String.format(
"The record level time field type should be one of SECONDS_INT, SECONDS_LONG, MILLIS_LONG or TIMESTAMP, "
+ "but time field type is %s, field type is %s. You can specify the type through the config '%s'.",
timeFieldType,
field.type(),
CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE.key()));
}

return new RecordLevelExpire(
fieldIndex, (int) expireTime.getSeconds(), timeFieldType, field);
}

private static boolean isValidateFieldType(
CoreOptions.TimeFieldType timeFieldType, DataField field) {
DataType dataType = field.type();
return ((timeFieldType == CoreOptions.TimeFieldType.SECONDS_INT
&& dataType instanceof IntType)
|| (timeFieldType == CoreOptions.TimeFieldType.SECONDS_LONG
&& dataType instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.MILLIS_LONG
&& dataType instanceof BigIntType)
|| (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
&& dataType instanceof TimestampType)
|| (timeFieldType == CoreOptions.TimeFieldType.TIMESTAMP
&& dataType instanceof LocalZonedTimestampType));
DataType dataType = rowType.getField(timeFieldName).type();
Function<InternalRow, Integer> fieldGetter = createFieldGetter(dataType, fieldIndex);
return new RecordLevelExpire((int) expireTime.getSeconds(), fieldGetter);
}

private RecordLevelExpire(
int timeFieldIndex,
int expireTime,
CoreOptions.TimeFieldType timeFieldType,
DataField rawDataField) {
this.timeFieldIndex = timeFieldIndex;
private RecordLevelExpire(int expireTime, Function<InternalRow, Integer> fieldGetter) {
this.expireTime = expireTime;
this.timeFieldType = timeFieldType;
this.rawDataField = rawDataField;
this.fieldGetter = fieldGetter;
}

public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactory) {
Expand All @@ -113,54 +80,38 @@ public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactor

private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(
kv -> {
checkArgument(
!kv.value().isNullAt(timeFieldIndex),
"Time field for record-level expire should not be null.");
final int recordTime;
switch (timeFieldType) {
case SECONDS_INT:
recordTime = kv.value().getInt(timeFieldIndex);
break;
case SECONDS_LONG:
recordTime = (int) kv.value().getLong(timeFieldIndex);
break;
case MILLIS_LONG:
recordTime = (int) (kv.value().getLong(timeFieldIndex) / 1000);
break;
case TIMESTAMP:
Timestamp timestamp;
if (rawDataField.type() instanceof TimestampType) {
TimestampType timestampType = (TimestampType) rawDataField.type();
timestamp =
kv.value()
.getTimestamp(
timeFieldIndex,
timestampType.getPrecision());
} else if (rawDataField.type() instanceof LocalZonedTimestampType) {
LocalZonedTimestampType timestampType =
(LocalZonedTimestampType) rawDataField.type();
timestamp =
kv.value()
.getTimestamp(
timeFieldIndex,
timestampType.getPrecision());
} else {
throw new UnsupportedOperationException(
"Unsupported timestamp type: " + rawDataField.type());
}
recordTime = (int) (timestamp.getMillisecond() / 1000);
break;
default:
String msg =
String.format(
"type %s not support in %s",
timeFieldType,
CoreOptions.TimeFieldType.class.getName());
throw new IllegalArgumentException(msg);
}
return currentTime <= recordTime + expireTime;
});
return reader.filter(kv -> currentTime <= fieldGetter.apply(kv.value()) + expireTime);
}

private static Function<InternalRow, Integer> createFieldGetter(
DataType dataType, int fieldIndex) {
final Function<InternalRow, Integer> fieldGetter;
if (dataType instanceof IntType) {
fieldGetter = row -> row.getInt(fieldIndex);
} else if (dataType instanceof BigIntType) {
fieldGetter =
row -> {
long value = row.getLong(fieldIndex);
// If it is milliseconds, convert it to seconds.
return (int) (value >= 1_000_000_000_000L ? value / 1000 : value);
};
} else if (dataType instanceof TimestampType
|| dataType instanceof LocalZonedTimestampType) {
int precision = DataTypeChecks.getPrecision(dataType);
fieldGetter =
row -> (int) (row.getTimestamp(fieldIndex, precision).getMillisecond() / 1000);
} else {
throw new IllegalArgumentException(
String.format(
"The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.",
dataType));
}

return row -> {
checkArgument(
!row.isNullAt(fieldIndex),
"Time field for record-level expire should not be null.");
return fieldGetter.apply(row);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ protected Options tableOptions() {
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
options.set(
CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.MILLIS_LONG);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ protected Options tableOptions() {
options.set(CoreOptions.BUCKET, 1);
options.set(CoreOptions.RECORD_LEVEL_EXPIRE_TIME, Duration.ofSeconds(1));
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD, "col1");
options.set(CoreOptions.RECORD_LEVEL_TIME_FIELD_TYPE, CoreOptions.TimeFieldType.TIMESTAMP);
return options;
}

Expand Down

0 comments on commit b4601f3

Please sign in to comment.