Skip to content

Commit

Permalink
[AMORO-3104] Added table summary metrics (#3109)
Browse files Browse the repository at this point in the history
* add table summary metrics

* remove pendingInput

* update metrics doc

* add unit test

* update for table summary metrics

* add unit test

* update unit test for table summary metrics

* update unit test for table summary metrics

* update test for table summary metrics

* Update amoro-ams/amoro-ams-server/src/main/resources/postgres/ams-postgres-init.sql

Co-authored-by: ZhouJinsong <[email protected]>

* Update amoro-ams/amoro-ams-server/src/main/resources/postgres/upgrade.sql

Co-authored-by: ZhouJinsong <[email protected]>

---------

Co-authored-by: ZhouJinsong <[email protected]>
  • Loading branch information
MarigWeizhi and zhoujinsong authored Aug 30, 2024
1 parent 7b516d4 commit 854f8d4
Show file tree
Hide file tree
Showing 17 changed files with 651 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ public long getFragmentFileSize() {
return evaluator().getFragmentFileSize();
}

@Override
public long getFragmentFileRecords() {
return evaluator().getFragmentFileRecords();
}

@Override
public int getSegmentFileCount() {
return evaluator().getSegmentFileCount();
Expand All @@ -201,6 +206,11 @@ public long getSegmentFileSize() {
return evaluator().getSegmentFileSize();
}

@Override
public long getSegmentFileRecords() {
return evaluator().getSegmentFileRecords();
}

@Override
public int getEqualityDeleteFileCount() {
return evaluator().getEqualityDeleteFileCount();
Expand All @@ -211,6 +221,11 @@ public long getEqualityDeleteFileSize() {
return evaluator().getEqualityDeleteFileSize();
}

@Override
public long getEqualityDeleteFileRecords() {
return evaluator().getEqualityDeleteFileRecords();
}

@Override
public int getPosDeleteFileCount() {
return evaluator().getPosDeleteFileCount();
Expand All @@ -221,6 +236,11 @@ public long getPosDeleteFileSize() {
return evaluator().getPosDeleteFileSize();
}

@Override
public long getPosDeleteFileRecords() {
return evaluator().getPosDeleteFileRecords();
}

@Override
public Weight getWeight() {
return evaluator().getWeight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,29 @@ public class CommonPartitionEvaluator implements PartitionEvaluator {
// fragment files
protected int fragmentFileCount = 0;
protected long fragmentFileSize = 0;
protected long fragmentFileRecords = 0;

// segment files
protected int rewriteSegmentFileCount = 0;
protected long rewriteSegmentFileSize = 0L;
protected long rewriteSegmentFileRecords = 0L;
protected int undersizedSegmentFileCount = 0;
protected long undersizedSegmentFileSize = 0;
protected long undersizedSegmentFileRecords = 0;
protected int rewritePosSegmentFileCount = 0;
protected int combinePosSegmentFileCount = 0;
protected long rewritePosSegmentFileSize = 0L;
protected long rewritePosSegmentFileRecords = 0L;
protected long min1SegmentFileSize = Integer.MAX_VALUE;
protected long min2SegmentFileSize = Integer.MAX_VALUE;

// delete files
protected int equalityDeleteFileCount = 0;
protected long equalityDeleteFileSize = 0L;
protected long equalityDeleteFileRecords = 0L;
protected int posDeleteFileCount = 0;
protected long posDeleteFileSize = 0L;
protected long posDeleteFileRecords = 0L;

private long cost = -1;
private Boolean necessary = null;
Expand Down Expand Up @@ -132,6 +138,7 @@ private boolean isDuplicateDelete(ContentFile<?> delete) {
private boolean addFragmentFile(DataFile dataFile, List<ContentFile<?>> deletes) {
fragmentFileSize += dataFile.fileSizeInBytes();
fragmentFileCount++;
fragmentFileRecords += dataFile.recordCount();

for (ContentFile<?> delete : deletes) {
addDelete(delete);
Expand All @@ -149,6 +156,7 @@ private boolean addUndersizedSegmentFile(DataFile dataFile, List<ContentFile<?>>
if (fileShouldRewrite(dataFile, deletes)) {
rewriteSegmentFileSize += dataFile.fileSizeInBytes();
rewriteSegmentFileCount++;
rewriteSegmentFileRecords += dataFile.recordCount();
return true;
}

Expand All @@ -162,13 +170,15 @@ private boolean addUndersizedSegmentFile(DataFile dataFile, List<ContentFile<?>>

undersizedSegmentFileSize += dataFile.fileSizeInBytes();
undersizedSegmentFileCount++;
undersizedSegmentFileRecords += dataFile.recordCount();
return true;
}

private boolean addTargetSizeReachedFile(DataFile dataFile, List<ContentFile<?>> deletes) {
if (fileShouldRewrite(dataFile, deletes)) {
rewriteSegmentFileSize += dataFile.fileSizeInBytes();
rewriteSegmentFileCount++;
rewriteSegmentFileRecords += dataFile.recordCount();
for (ContentFile<?> delete : deletes) {
addDelete(delete);
}
Expand All @@ -178,6 +188,7 @@ private boolean addTargetSizeReachedFile(DataFile dataFile, List<ContentFile<?>>
if (segmentShouldRewritePos(dataFile, deletes)) {
rewritePosSegmentFileSize += dataFile.fileSizeInBytes();
rewritePosSegmentFileCount++;
rewritePosSegmentFileRecords += dataFile.recordCount();
for (ContentFile<?> delete : deletes) {
addDelete(delete);
}
Expand Down Expand Up @@ -241,9 +252,11 @@ private void addDelete(ContentFile<?> delete) {
if (delete.content() == FileContent.POSITION_DELETES) {
posDeleteFileCount++;
posDeleteFileSize += delete.fileSizeInBytes();
posDeleteFileRecords += delete.recordCount();
} else {
equalityDeleteFileCount++;
equalityDeleteFileSize += delete.fileSizeInBytes();
equalityDeleteFileRecords += delete.recordCount();
}
}

Expand Down Expand Up @@ -366,6 +379,11 @@ public long getFragmentFileSize() {
return fragmentFileSize;
}

@Override
public long getFragmentFileRecords() {
return fragmentFileRecords;
}

@Override
public int getSegmentFileCount() {
return rewriteSegmentFileCount + undersizedSegmentFileCount + rewritePosSegmentFileCount;
Expand All @@ -376,6 +394,11 @@ public long getSegmentFileSize() {
return rewriteSegmentFileSize + undersizedSegmentFileSize + rewritePosSegmentFileSize;
}

@Override
public long getSegmentFileRecords() {
return rewriteSegmentFileRecords + undersizedSegmentFileRecords + rewritePosSegmentFileRecords;
}

@Override
public int getEqualityDeleteFileCount() {
return equalityDeleteFileCount;
Expand All @@ -386,6 +409,11 @@ public long getEqualityDeleteFileSize() {
return equalityDeleteFileSize;
}

@Override
public long getEqualityDeleteFileRecords() {
return equalityDeleteFileRecords;
}

@Override
public int getPosDeleteFileCount() {
return posDeleteFileCount;
Expand All @@ -396,6 +424,11 @@ public long getPosDeleteFileSize() {
return posDeleteFileSize;
}

@Override
public long getPosDeleteFileRecords() {
return posDeleteFileRecords;
}

public static class Weight implements PartitionEvaluator.Weight {

private final long cost;
Expand Down Expand Up @@ -423,18 +456,24 @@ public String toString() {
.add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime())
.add("fragmentFileCount", fragmentFileCount)
.add("fragmentFileSize", fragmentFileSize)
.add("fragmentFileRecords", fragmentFileRecords)
.add("rewriteSegmentFileCount", rewriteSegmentFileCount)
.add("rewriteSegmentFileSize", rewriteSegmentFileSize)
.add("rewriteSegmentFileRecords", rewriteSegmentFileRecords)
.add("undersizedSegmentFileCount", undersizedSegmentFileCount)
.add("undersizedSegmentFileSize", undersizedSegmentFileSize)
.add("undersizedSegmentFileRecords", undersizedSegmentFileRecords)
.add("rewritePosSegmentFileCount", rewritePosSegmentFileCount)
.add("rewritePosSegmentFileSize", rewritePosSegmentFileSize)
.add("rewritePosSegmentFileRecords", rewritePosSegmentFileRecords)
.add("min1SegmentFileSize", min1SegmentFileSize)
.add("min2SegmentFileSize", min2SegmentFileSize)
.add("equalityDeleteFileCount", equalityDeleteFileCount)
.add("equalityDeleteFileSize", equalityDeleteFileSize)
.add("equalityDeleteFileRecords", equalityDeleteFileRecords)
.add("posDeleteFileCount", posDeleteFileCount)
.add("posDeleteFileSize", posDeleteFileSize)
.add("posDeleteFileRecords", posDeleteFileRecords)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class OptimizingEvaluator {

Expand All @@ -60,6 +61,7 @@ public class OptimizingEvaluator {
protected final TableSnapshot currentSnapshot;
protected boolean isInitialized = false;

protected Map<String, PartitionEvaluator> needOptimizingPlanMap = Maps.newHashMap();
protected Map<String, PartitionEvaluator> partitionPlanMap = Maps.newHashMap();

public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
Expand Down Expand Up @@ -95,7 +97,7 @@ protected void initEvaluator() {
LOG.info(
"{} finished evaluating, found {} partitions that need optimizing in {} ms",
mixedTable.id(),
partitionPlanMap.size(),
needOptimizingPlanMap.size(),
System.currentTimeMillis() - startTime);
}

Expand Down Expand Up @@ -129,7 +131,10 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
mixedTable.id(),
count,
System.currentTimeMillis() - startTime);
partitionPlanMap.values().removeIf(plan -> !plan.isNecessary());
needOptimizingPlanMap.putAll(
partitionPlanMap.entrySet().stream()
.filter(entry -> entry.getValue().isNecessary())
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())));
}

private Map<String, String> partitionProperties(Pair<Integer, StructLike> partition) {
Expand Down Expand Up @@ -165,7 +170,7 @@ public boolean isNecessary() {
if (!isInitialized) {
initEvaluator();
}
return !partitionPlanMap.isEmpty();
return !needOptimizingPlanMap.isEmpty();
}

public PendingInput getPendingInput() {
Expand All @@ -175,16 +180,26 @@ public PendingInput getPendingInput() {
return new PendingInput(partitionPlanMap.values());
}

public PendingInput getOptimizingPendingInput() {
if (!isInitialized) {
initEvaluator();
}
return new PendingInput(needOptimizingPlanMap.values());
}

public static class PendingInput {

@JsonIgnore private final Map<Integer, Set<StructLike>> partitions = Maps.newHashMap();

private int dataFileCount = 0;
private long dataFileSize = 0;
private long dataFileRecords = 0;
private int equalityDeleteFileCount = 0;
private int positionalDeleteFileCount = 0;
private long positionalDeleteBytes = 0L;
private long equalityDeleteBytes = 0L;
private long equalityDeleteFileRecords = 0L;
private long positionalDeleteFileRecords = 0L;

public PendingInput() {}

Expand All @@ -195,9 +210,12 @@ public PendingInput(Collection<PartitionEvaluator> evaluators) {
.add(evaluator.getPartition().second());
dataFileCount += evaluator.getFragmentFileCount() + evaluator.getSegmentFileCount();
dataFileSize += evaluator.getFragmentFileSize() + evaluator.getSegmentFileSize();
dataFileRecords += evaluator.getFragmentFileRecords() + evaluator.getSegmentFileRecords();
positionalDeleteBytes += evaluator.getPosDeleteFileSize();
positionalDeleteFileRecords += evaluator.getPosDeleteFileRecords();
positionalDeleteFileCount += evaluator.getPosDeleteFileCount();
equalityDeleteBytes += evaluator.getEqualityDeleteFileSize();
equalityDeleteFileRecords += evaluator.getEqualityDeleteFileRecords();
equalityDeleteFileCount += evaluator.getEqualityDeleteFileCount();
}
}
Expand All @@ -214,6 +232,10 @@ public long getDataFileSize() {
return dataFileSize;
}

public long getDataFileRecords() {
return dataFileRecords;
}

public int getEqualityDeleteFileCount() {
return equalityDeleteFileCount;
}
Expand All @@ -230,16 +252,27 @@ public long getEqualityDeleteBytes() {
return equalityDeleteBytes;
}

public long getEqualityDeleteFileRecords() {
return equalityDeleteFileRecords;
}

public long getPositionalDeleteFileRecords() {
return positionalDeleteFileRecords;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("partitions", partitions)
.add("dataFileCount", dataFileCount)
.add("dataFileSize", dataFileSize)
.add("dataFileRecords", dataFileRecords)
.add("equalityDeleteFileCount", equalityDeleteFileCount)
.add("positionalDeleteFileCount", positionalDeleteFileCount)
.add("positionalDeleteBytes", positionalDeleteBytes)
.add("equalityDeleteBytes", equalityDeleteBytes)
.add("equalityDeleteFileRecords", equalityDeleteFileRecords)
.add("positionalDeleteFileRecords", positionalDeleteFileRecords)
.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,33 @@ interface Weight extends Comparable<Weight> {}
/** Get the total size of fragment files involved in optimizing. */
long getFragmentFileSize();

/** Get the total records of fragment files involved in optimizing. */
long getFragmentFileRecords();

/** Get the count of segment files involved in optimizing. */
int getSegmentFileCount();

/** Get the total size of segment files involved in optimizing. */
long getSegmentFileSize();

/** Get the total records of segment files involved in optimizing. */
long getSegmentFileRecords();

/** Get the count of equality delete files involved in optimizing. */
int getEqualityDeleteFileCount();

/** Get the total size of equality delete files involved in optimizing. */
long getEqualityDeleteFileSize();

/** Get the total records of equality delete files involved in optimizing. */
long getEqualityDeleteFileRecords();

/** Get the count of positional delete files involved in optimizing. */
int getPosDeleteFileCount();

/** Get the total size of positional delete files involved in optimizing. */
long getPosDeleteFileSize();

/** Get the total records of positional delete files involved in optimizing. */
long getPosDeleteFileRecords();
}
Loading

0 comments on commit 854f8d4

Please sign in to comment.