From 5eabdce80aec26ce6b34eba5e598646fc8f09f4f Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 8 Nov 2024 09:53:10 +0530 Subject: [PATCH] refactor: Deleted legacy snapshot logic (#5617) --- .../table/impl/remote/ConstructSnapshot.java | 343 ------------------ .../table/impl/remote/DeltaUpdates.java | 88 ----- .../table/impl/remote/InitialSnapshot.java | 55 --- .../impl/remote/InitialSnapshotTable.java | 212 ----------- .../engine/table/impl/QueryTableTest.java | 10 +- .../engine/table/impl/SnapshotTestUtils.java | 68 ++++ .../impl/remote/TestConstructSnapshot.java | 90 ++--- .../arrow/ArrowWrapperToolsTest.java | 61 ++-- 8 files changed, 144 insertions(+), 783 deletions(-) delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/remote/DeltaUpdates.java delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshot.java delete mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshotTable.java create mode 100644 engine/table/src/test/java/io/deephaven/engine/table/impl/SnapshotTestUtils.java diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java index 794a4ed9387..e690cd052dc 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/ConstructSnapshot.java @@ -25,12 +25,9 @@ import io.deephaven.engine.updategraph.NotificationQueue.Dependency; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.io.log.LogEntry; -import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableDefinition; import io.deephaven.util.SafeCloseableArray; -import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.engine.liveness.LivenessManager; import io.deephaven.engine.liveness.LivenessScope; import io.deephaven.engine.liveness.LivenessScopeStack; @@ -45,9 +42,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.lang.reflect.Array; -import java.time.Instant; -import java.time.ZonedDateTime; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -522,108 +516,6 @@ public static LogOutput appendConcurrentAttemptClockInfo(@NotNull final LogOutpu return state().appendConcurrentAttemptClockInfo(logOutput); } - /** - * Create a {@link InitialSnapshot snapshot} of the entire table specified. Note that this method is - * notification-oblivious, i.e. it makes no attempt to ensure that notifications are not missed. - * - * @param logIdentityObject An object used to prepend to log rows. - * @param table the table to snapshot. - * @return a snapshot of the entire base table. - */ - public static InitialSnapshot constructInitialSnapshot( - @NotNull final Object logIdentityObject, - @NotNull final BaseTable table) { - return constructInitialSnapshot(logIdentityObject, table, null, null); - } - - /** - * Create a {@link InitialSnapshot snapshot} of the specified table using a set of requested columns and keys. Note - * that this method uses a RowSet that is in key space, and that it is notification-oblivious, i.e. it makes no - * attempt to ensure that notifications are not missed. - * - * @param logIdentityObject An object used to prepend to log rows. - * @param table the table to snapshot. - * @param columnsToSnapshot A {@link BitSet} of columns to include, null for all - * @param keysToSnapshot RowSet of keys within the table to include, null for all - * @return a snapshot of the entire base table. - */ - public static InitialSnapshot constructInitialSnapshot( - @NotNull final Object logIdentityObject, - @NotNull final BaseTable table, - @Nullable final BitSet columnsToSnapshot, - @Nullable final RowSet keysToSnapshot) { - return constructInitialSnapshot(logIdentityObject, table, columnsToSnapshot, keysToSnapshot, - makeSnapshotControl(false, table.isRefreshing(), table)); - } - - static InitialSnapshot constructInitialSnapshot( - @NotNull final Object logIdentityObject, - @NotNull final BaseTable table, - @Nullable final BitSet columnsToSnapshot, - @Nullable final RowSet keysToSnapshot, - @NotNull final SnapshotControl control) { - final UpdateGraph updateGraph = table.getUpdateGraph(); - try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { - final InitialSnapshot snapshot = new InitialSnapshot(); - - final SnapshotFunction doSnapshot = (usePrev, beforeClockValue) -> snapshotAllTable( - usePrev, snapshot, table, logIdentityObject, columnsToSnapshot, keysToSnapshot); - - snapshot.step = callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot); - - return snapshot; - } - } - - /** - * Create a {@link InitialSnapshot snapshot} of the specified table using a set of requested columns and positions. - * Note that this method uses a RowSet that is in position space, and that it is notification-oblivious, i.e. it - * makes no attempt to ensure that notifications are not missed. - * - * @param logIdentityObject An object used to prepend to log rows. - * @param table the table to snapshot. - * @param columnsToSnapshot A {@link BitSet} of columns to include, null for all - * @param positionsToSnapshot RowSet of positions within the table to include, null for all - * @return a snapshot of the entire base table. - */ - public static InitialSnapshot constructInitialSnapshotInPositionSpace( - @NotNull final Object logIdentityObject, - @NotNull final BaseTable table, - @Nullable final BitSet columnsToSnapshot, - @Nullable final RowSet positionsToSnapshot) { - try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph( - table.getUpdateGraph()).open()) { - return constructInitialSnapshotInPositionSpace(logIdentityObject, table, columnsToSnapshot, - positionsToSnapshot, makeSnapshotControl(false, table.isRefreshing(), table)); - } - } - - static InitialSnapshot constructInitialSnapshotInPositionSpace( - @NotNull final Object logIdentityObject, - @NotNull final BaseTable table, - @Nullable final BitSet columnsToSnapshot, - @Nullable final RowSet positionsToSnapshot, - @NotNull final SnapshotControl control) { - final InitialSnapshot snapshot = new InitialSnapshot(); - - final SnapshotFunction doSnapshot = (usePrev, beforeClockValue) -> { - final RowSet keysToSnapshot; - if (positionsToSnapshot == null) { - keysToSnapshot = null; - } else { - keysToSnapshot = (usePrev ? table.getRowSet().prev() : table.getRowSet()) - .subSetForPositions(positionsToSnapshot); - } - return snapshotAllTable(usePrev, snapshot, table, logIdentityObject, columnsToSnapshot, keysToSnapshot); - }; - - try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph( - table.getUpdateGraph()).open()) { - snapshot.step = callDataSnapshotFunction(System.identityHashCode(logIdentityObject), control, doSnapshot); - } - return snapshot; - } - /** * Create a {@link BarrageMessage snapshot} of the specified table including all columns and rows. Note that this * method is notification-oblivious, i.e. it makes no attempt to ensure that notifications are not missed. @@ -731,37 +623,6 @@ public static BarrageMessage constructBackplaneSnapshotInPositionSpace( } } - /** - * Constructs {@link InitialSnapshot}s for the entirety of the tables. Note that this method is - * notification-oblivious, i.e. it makes no attempt to ensure that notifications are not missed. - * - * @param logIdentityObject identifier prefixing the log message - * @param tables tables to snapshot - * @return list of the resulting {@link InitialSnapshot}s - */ - public static List constructInitialSnapshots( - @NotNull final Object logIdentityObject, - @NotNull final BaseTable... tables) { - if (tables.length == 0) { - return Collections.emptyList(); - } - final UpdateGraph updateGraph = NotificationQueue.Dependency.getUpdateGraph(null, tables); - try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { - final List snapshots = new ArrayList<>(); - - final SnapshotControl snapshotControl = tables.length == 1 - ? makeSnapshotControl(false, tables[0].isRefreshing(), tables[0]) - : makeSnapshotControl(false, Arrays.stream(tables).anyMatch(BaseTable::isRefreshing), tables); - - final SnapshotFunction doSnapshot = - (usePrev, beforeClockValue) -> snapshotAllTables(usePrev, snapshots, tables, logIdentityObject); - - callDataSnapshotFunction(System.identityHashCode(logIdentityObject), snapshotControl, doSnapshot); - - return snapshots; - } - } - @FunctionalInterface public interface SnapshotFunction { /** @@ -1428,77 +1289,6 @@ public static long callDataSnapshotFunction( return step; } - /** - *

- * Populate an {@link InitialSnapshot} with the specified keys and columns to snapshot. - *

- * Note that care must be taken while using this method to ensure the underlying table is locked or does not change, - * otherwise the resulting snapshot may be inconsistent. In general users should instead use - * {@link #constructInitialSnapshot} for simple use cases or {@link #callDataSnapshotFunction} for more advanced - * uses. - * - * @param usePrev Whether to use previous values - * @param snapshot The snapshot to populate - * @param logIdentityObject An object for use with log() messages - * @param columnsToSnapshot A {@link BitSet} of columns to include, null for all - * @param keysToSnapshot A RowSet of keys within the table to include, null for all - * - * @return Whether the snapshot succeeded - */ - private static boolean snapshotAllTable( - final boolean usePrev, - @NotNull final InitialSnapshot snapshot, - @NotNull final BaseTable table, - @NotNull final Object logIdentityObject, - @Nullable final BitSet columnsToSnapshot, - @Nullable final RowSet keysToSnapshot) { - snapshot.rowSet = (usePrev ? table.getRowSet().prev() : table.getRowSet()).copy(); - - if (keysToSnapshot != null) { - snapshot.rowsIncluded = snapshot.rowSet.intersect(keysToSnapshot); - } else { - snapshot.rowsIncluded = snapshot.rowSet; - } - - LongSizedDataStructure.intSize("construct snapshot", snapshot.rowsIncluded.size()); - final String[] columnSources = table.getDefinition().getColumnNamesArray(); - - snapshot.dataColumns = new Object[columnSources.length]; - try (final SharedContext sharedContext = - (columnSources.length > 1) ? SharedContext.makeSharedContext() : null) { - for (int ii = 0; ii < columnSources.length; ii++) { - if (columnsToSnapshot != null && !columnsToSnapshot.get(ii)) { - continue; - } - - if (concurrentAttemptInconsistent()) { - if (log.isDebugEnabled()) { - final LogEntry logEntry = log.debug().append(System.identityHashCode(logIdentityObject)) - .append(" Bad snapshot before column ").append(ii); - appendConcurrentAttemptClockInfo(logEntry); - logEntry.endl(); - } - return false; - } - - final ColumnSource columnSource = table.getColumnSource(columnSources[ii]); - snapshot.dataColumns[ii] = getSnapshotData(columnSource, sharedContext, snapshot.rowsIncluded, usePrev); - } - } - - if (log.isDebugEnabled()) { - log.debug().append(System.identityHashCode(logIdentityObject)) - .append(": Snapshot candidate step=") - .append((usePrev ? -1 : 0) + LogicalClock.getStep(getConcurrentAttemptClockValue())) - .append(", rows=").append(snapshot.rowsIncluded).append("/").append(keysToSnapshot) - .append(", cols=").append(FormatBitSet.arrayToLog(snapshot.dataColumns)).append("/") - .append((columnsToSnapshot != null) ? FormatBitSet.formatBitSet(columnsToSnapshot) - : FormatBitSet.arrayToLog(snapshot.dataColumns)) - .append(", usePrev=").append(usePrev).endl(); - } - return true; - } - /** *

* Populate a BarrageMessage with the specified positions to snapshot and columns. @@ -1699,56 +1489,6 @@ private static boolean snapshotEmptyColumns( return true; } - private static boolean snapshotAllTables( - final boolean usePrev, - @NotNull final List snapshots, - @NotNull final BaseTable[] tables, - @NotNull final Object logIdentityObject) { - snapshots.clear(); - - for (final BaseTable table : tables) { - final InitialSnapshot snapshot = new InitialSnapshot(); - snapshots.add(snapshot); - if (!snapshotAllTable(usePrev, snapshot, table, logIdentityObject, null, null)) { - snapshots.clear(); - return false; - } - } - - return true; - } - - private static Object getSnapshotData( - @NotNull final ColumnSource columnSource, - @Nullable final SharedContext sharedContext, - @NotNull final RowSet rowSet, - final boolean usePrev) { - final ColumnSource sourceToUse = ReinterpretUtils.maybeConvertToPrimitive(columnSource); - final Class type = sourceToUse.getType(); - final int size = rowSet.intSize(); - try (final ColumnSource.FillContext context = sourceToUse.makeFillContext(size, sharedContext)) { - final ChunkType chunkType = sourceToUse.getChunkType(); - final Object resultArray = chunkType.makeArray(size); - final WritableChunk result = chunkType.writableChunkWrap(resultArray, 0, size); - if (usePrev) { - sourceToUse.fillPrevChunk(context, result, rowSet); - } else { - sourceToUse.fillChunk(context, result, rowSet); - } - if (chunkType == ChunkType.Object) { - // noinspection unchecked - final T[] values = (T[]) Array.newInstance(type, size); - for (int ii = 0; ii < values.length; ++ii) { - // noinspection unchecked - values[ii] = (T) result.asObjectChunk().get(ii); - } - return values; - - } - return resultArray; - } - } - private static void snapshotColumnsSerial( @NotNull final TIntList columnIndices, @NotNull final List> columnSources, @@ -1789,87 +1529,4 @@ private static void snapshotColumnsSerial( } } } - - /** - * Estimate the size of a complete table snapshot in bytes. - * - * @param table the table to estimate - * @return the estimated snapshot size in bytes. - */ - public static long estimateSnapshotSize(@NotNull final Table table) { - final BitSet columns = new BitSet(table.numColumns()); - columns.set(0, table.numColumns()); - return estimateSnapshotSize(table.getDefinition(), columns, table.size()); - } - - /** - * Make a rough guess at the size of a snapshot, using the column types and common column names. The use case is - * when a user requests something from the GUI; we'd like to know if it is ridiculous before actually doing it. - * - * @param tableDefinition the table definition - * @param columns a bitset indicating which columns are included - * @param rowCount how many rows of this data we'll be snapshotting - * @return the estimated size of the snapshot - */ - public static long estimateSnapshotSize( - @NotNull final TableDefinition tableDefinition, - @NotNull final BitSet columns, - final long rowCount) { - long sizePerRow = 0; - long totalSize = 0; - - final int numColumns = tableDefinition.numColumns(); - final List> columnDefinitions = tableDefinition.getColumns(); - for (int ii = 0; ii < numColumns; ++ii) { - if (!columns.get(ii)) { - continue; - } - - totalSize += 44; // for an array - - final ColumnDefinition definition = columnDefinitions.get(ii); - if (definition.getDataType() == byte.class || definition.getDataType() == char.class - || definition.getDataType() == Boolean.class) { - sizePerRow += 1; - } else if (definition.getDataType() == short.class) { - sizePerRow += 2; - } else if (definition.getDataType() == int.class || definition.getDataType() == float.class) { - sizePerRow += 4; - } else if (definition.getDataType() == long.class || definition.getDataType() == double.class - || definition.getDataType() == Instant.class || definition.getDataType() == ZonedDateTime.class) { - sizePerRow += 8; - } else { - switch (definition.getName()) { - case "Date": - sizePerRow += 5; - totalSize += 10; - break; - case "USym": - sizePerRow += 5; - totalSize += Math.min(rowCount, 10000) * 10; - break; - case "Sym": - sizePerRow += 5; - totalSize += Math.min(rowCount, 1000000) * 30; - break; - case "Parity": - sizePerRow += 5; - totalSize += 30; - break; - case "SecurityType": - sizePerRow += 5; - totalSize += 100; - break; - case "Exchange": - sizePerRow += 5; - totalSize += 130; - break; - default: - sizePerRow += (42 + 8); // how big a dummy object was on a test + a pointer - } - } - } - - return totalSize + (sizePerRow * rowCount); - } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/DeltaUpdates.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/DeltaUpdates.java deleted file mode 100644 index 0ff27a9af3d..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/DeltaUpdates.java +++ /dev/null @@ -1,88 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.remote; - -import io.deephaven.engine.table.TableUpdate; -import io.deephaven.engine.table.impl.TableUpdateImpl; -import io.deephaven.engine.table.ModifiedColumnSet; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetShiftData; - -import java.io.Serializable; - -public class DeltaUpdates implements Serializable, Cloneable { - - public static class ColumnAdditions implements Serializable { - public int columnIndex; - public Object serializedRows; - } - public static class ColumnModifications implements Serializable { - public int columnIndex; - public RowSet modified; - public RowSet rowsIncluded; - public Object serializedRows; - } - - public long deltaSequence; - public long firstStep; - public long lastStep; - public RowSet added; - public RowSet removed; - public RowSetShiftData shifted; - public RowSet includedAdditions; - public ColumnAdditions[] serializedAdditions; - public ColumnModifications[] serializedModifications; - - public DeltaUpdates clone() { - try { - return (DeltaUpdates) super.clone(); - } catch (CloneNotSupportedException e) { - throw new InternalError(e); - } - } - - public TableUpdate asUpdate(RowSet modified, ModifiedColumnSet mcs) { - return new TableUpdateImpl(added, removed, modified, shifted, mcs); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("added=").append(added); - builder.append(", removed=").append(removed); - builder.append(", shifted=").append(shifted); - builder.append(", includedAdditions=").append(includedAdditions); - if (serializedAdditions != null) { - builder.append(", serializedAdditions={"); - for (int i = 0; i < serializedAdditions.length; ++i) { - if (i != 0) { - builder.append(","); - } else { - builder.append(serializedAdditions[i].columnIndex); - } - } - builder.append("}"); - } else { - builder.append(", serializedAdditions=null"); - } - if (serializedModifications != null) { - builder.append(", serializedModifications={i="); - for (int i = 0; i < serializedModifications.length; ++i) { - if (i != 0) { - builder.append(";"); - } else { - builder.append(serializedModifications[i].columnIndex); - builder.append(",modified="); - builder.append(serializedModifications[i].modified); - builder.append(",rowsIncluded="); - builder.append(serializedModifications[i].rowsIncluded); - } - } - builder.append("}"); - } else { - builder.append(", serializedModifications=null"); - } - return builder.toString(); - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshot.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshot.java deleted file mode 100644 index 44832389d85..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshot.java +++ /dev/null @@ -1,55 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.remote; - -import io.deephaven.base.formatters.FormatBitSet; -import io.deephaven.engine.table.Table; -import io.deephaven.engine.rowset.RowSet; - -import java.io.Serializable; - -/** - * A Raw table snapshot. Users may use {@link InitialSnapshotTable#setupInitialSnapshotTable(Table, InitialSnapshot)} to - * convert this into a {@link io.deephaven.engine.table.impl.QueryTable} - */ -public class InitialSnapshot implements Serializable, Cloneable { - static final long serialVersionUID = 4380513367437361741L; - - public Object type; - public RowSet rowSet; - public Object[] dataColumns; - public long deltaSequence; - public long step; - public RowSet rowsIncluded; - public RowSet viewport; - - public InitialSnapshot clone() { - try { - return (InitialSnapshot) super.clone(); - } catch (CloneNotSupportedException e) { - throw new IllegalStateException(); - } - } - - public InitialSnapshot setType(Object type) { - this.type = type; - return this; - } - - public InitialSnapshot setViewport(RowSet viewport) { - this.viewport = viewport; - return this; - } - - @Override - public String toString() { - return "InitialSnapshot{" + - "type=" + type + - ", rows=" + rowsIncluded + (rowSet == null ? "" : "/" + rowSet) + - ", columns=" + FormatBitSet.formatBitSetAsString(FormatBitSet.arrayToBitSet(dataColumns)) + - ", deltaSequence=" + deltaSequence + - ", step=" + step + - '}'; - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshotTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshotTable.java deleted file mode 100644 index 2cd1f3e7170..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/remote/InitialSnapshotTable.java +++ /dev/null @@ -1,212 +0,0 @@ -// -// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending -// -package io.deephaven.engine.table.impl.remote; - -import io.deephaven.base.verify.Assert; -import io.deephaven.engine.rowset.WritableRowSet; -import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.rowset.RowSetFactory; -import io.deephaven.engine.table.*; -import io.deephaven.engine.table.impl.QueryTable; -import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource; -import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource; -import io.deephaven.engine.table.WritableColumnSource; -import io.deephaven.engine.table.impl.util.*; - -import java.time.Instant; -import java.time.ZonedDateTime; -import java.util.BitSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -public class InitialSnapshotTable extends QueryTable { - protected final Setter[] setters; - protected int capacity; - protected WritableRowSet freeset = RowSetFactory.empty(); - protected final WritableRowSet populatedRows; - protected final WritableRowSet[] populatedCells; - protected WritableColumnSource[] writableSources; - protected WritableRowRedirection rowRedirection; - - private final BitSet subscribedColumns; - - protected InitialSnapshotTable(Map> result, - WritableColumnSource[] writableSources, - WritableRowRedirection rowRedirection, BitSet subscribedColumns) { - super(RowSetFactory.empty().toTracking(), result); - this.subscribedColumns = subscribedColumns; - this.writableSources = writableSources; - this.setters = new Setter[writableSources.length]; - this.populatedCells = new WritableRowSet[writableSources.length]; - for (int ii = 0; ii < writableSources.length; ++ii) { - setters[ii] = getSetter(writableSources[ii]); - this.populatedCells[ii] = RowSetFactory.fromKeys(); - } - this.rowRedirection = rowRedirection; - this.populatedRows = RowSetFactory.fromKeys(); - } - - public BitSet getSubscribedColumns() { - return subscribedColumns; - } - - public boolean isSubscribedColumn(int column) { - return subscribedColumns == null || subscribedColumns.get(column); - } - - @SuppressWarnings("rawtypes") - protected Setter getSetter(final WritableColumnSource source) { - if (source.getType() == byte.class) { - return (Setter) (array, arrayIndex, destIndex) -> source.set(destIndex, array[arrayIndex]); - } else if (source.getType() == char.class) { - return (Setter) (array, arrayIndex, destIndex) -> source.set(destIndex, array[arrayIndex]); - } else if (source.getType() == double.class) { - return (Setter) (array, arrayIndex, destIndex) -> source.set(destIndex, array[arrayIndex]); - } else if (source.getType() == float.class) { - return (Setter) (array, arrayIndex, destIndex) -> source.set(destIndex, array[arrayIndex]); - } else if (source.getType() == int.class) { - return (Setter) (array, arrayIndex, destIndex) -> source.set(destIndex, array[arrayIndex]); - } else if (source.getType() == long.class - || source.getType() == Instant.class - || source.getType() == ZonedDateTime.class) { - return (Setter) (array, arrayIndex, destIndex) -> source.set(destIndex, array[arrayIndex]); - } else if (source.getType() == short.class) { - return (Setter) (array, arrayIndex, destIndex) -> source.set(destIndex, array[arrayIndex]); - } else if (source.getType() == Boolean.class) { - return (Setter) (array, arrayIndex, destIndex) -> source.set(destIndex, array[arrayIndex]); - } else { - return (Setter) (array, arrayIndex, destIndex) -> { - // noinspection unchecked - source.set(destIndex, array[arrayIndex]); - }; - } - } - - protected void processInitialSnapshot(InitialSnapshot snapshot) { - final RowSet viewPort = snapshot.viewport; - final RowSet addedRowSet = snapshot.rowsIncluded; - try (final WritableRowSet newlyPopulated = - viewPort == null ? addedRowSet.copy() : snapshot.rowSet.subSetForPositions(viewPort)) { - if (viewPort != null) { - newlyPopulated.retain(addedRowSet); - } - - final RowSet destinationRowSet = getFreeRows(newlyPopulated.size()); - - final RowSet.Iterator addedIt = addedRowSet.iterator(); - final RowSet.Iterator destIt = destinationRowSet.iterator(); - - long nextInViewport = -1; - final RowSet.Iterator populationIt; - if (viewPort == null) { - populationIt = null; - } else { - populationIt = newlyPopulated.iterator(); - if (populationIt.hasNext()) { - nextInViewport = populationIt.nextLong(); - } - } - - int arrayIndex = 0; - while (addedIt.hasNext()) { - final long addedKey = addedIt.nextLong(); - final boolean found = viewPort == null || addedKey == nextInViewport; - - if (found) { - final long destIndex = destIt.nextLong(); - for (int ii = 0; ii < setters.length; ii++) { - if (subscribedColumns.get(ii) && snapshot.dataColumns[ii] != null) { - // noinspection unchecked,rawtypes - ((Setter) setters[ii]).set(snapshot.dataColumns[ii], arrayIndex, destIndex); - } - } - final long prevIndex = rowRedirection.put(addedKey, destIndex); - Assert.assertion(prevIndex == -1, "prevIndex == -1", prevIndex, "prevIndex"); - if (populationIt != null) { - nextInViewport = populationIt.hasNext() ? populationIt.nextLong() : -1; - } - } - arrayIndex++; - } - - for (int ii = 0; ii < setters.length; ii++) { - if (subscribedColumns.get(ii) && snapshot.dataColumns[ii] != null) { - final WritableRowSet ix = populatedCells[ii]; - ix.insert(newlyPopulated); - } - } - populatedRows.insert(newlyPopulated); - } - getRowSet().writableCast().insert(snapshot.rowSet); - } - - protected RowSet getFreeRows(long size) { - boolean needsResizing = false; - if (capacity == 0) { - capacity = Integer.highestOneBit((int) Math.max(size * 2, 8)); - freeset = RowSetFactory.flat(capacity); - needsResizing = true; - } else if (freeset.size() < size) { - int allocatedSize = (int) (capacity - freeset.size()); - int prevCapacity = capacity; - do { - capacity *= 2; - } while ((capacity - allocatedSize) < size); - freeset.insertRange(prevCapacity, capacity - 1); - needsResizing = true; - } - if (needsResizing) { - for (ColumnSource source : getColumnSources()) { - ((WritableColumnSource) source).ensureCapacity(capacity); - } - } - RowSet result = freeset.subSetByPositionRange(0, (int) size); - Assert.assertion(result.size() == size, "result.size() == size"); - freeset = freeset.subSetByPositionRange((int) size, (int) freeset.size()); - return result; - } - - protected interface Setter { - void set(T array, int arrayIndex, long destIndex); - } - - public static InitialSnapshotTable setupInitialSnapshotTable(Table originalTable, InitialSnapshot snapshot) { - return setupInitialSnapshotTable(originalTable.getDefinition(), snapshot); - } - - public static InitialSnapshotTable setupInitialSnapshotTable(Table originalTable, InitialSnapshot snapshot, - BitSet subscribedColumns) { - return setupInitialSnapshotTable(originalTable.getDefinition(), snapshot, subscribedColumns); - } - - public static InitialSnapshotTable setupInitialSnapshotTable(TableDefinition definition, InitialSnapshot snapshot) { - BitSet allColumns = new BitSet(definition.numColumns()); - allColumns.set(0, definition.numColumns()); - return setupInitialSnapshotTable(definition, snapshot, allColumns); - } - - public static InitialSnapshotTable setupInitialSnapshotTable( - TableDefinition definition, InitialSnapshot snapshot, BitSet subscribedColumns) { - final List> columns = definition.getColumns(); - WritableColumnSource[] writableSources = new WritableColumnSource[columns.size()]; - WritableRowRedirection rowRedirection = WritableRowRedirection.FACTORY.createRowRedirection(8); - LinkedHashMap> finalColumns = new LinkedHashMap<>(); - for (int ci = 0; ci < writableSources.length; ci++) { - final ColumnDefinition column = columns.get(ci); - writableSources[ci] = ArrayBackedColumnSource.getMemoryColumnSource( - 0, column.getDataType(), column.getComponentType()); - finalColumns.put(column.getName(), - WritableRedirectedColumnSource.maybeRedirect(rowRedirection, writableSources[ci], 0)); - } - // This table does not run, so we don't need to tell our row redirection or column source to start - // tracking - // prev values. - - InitialSnapshotTable initialSnapshotTable = - new InitialSnapshotTable(finalColumns, writableSources, rowRedirection, subscribedColumns); - initialSnapshotTable.processInitialSnapshot(snapshot); - return initialSnapshotTable; - } -} diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java index 5c532df1a65..5aa0832ebd1 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java @@ -23,7 +23,6 @@ import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.indexer.DataIndexer; import io.deephaven.engine.table.impl.remote.ConstructSnapshot; -import io.deephaven.engine.table.impl.remote.InitialSnapshotTable; import io.deephaven.engine.table.impl.select.*; import io.deephaven.engine.table.impl.select.MatchFilter.CaseSensitivity; import io.deephaven.engine.table.impl.select.MatchFilter.MatchType; @@ -70,6 +69,7 @@ import java.util.stream.LongStream; import static io.deephaven.api.agg.Aggregation.*; +import static io.deephaven.engine.table.impl.SnapshotTestUtils.verifySnapshotBarrageMessage; import static io.deephaven.engine.testutil.TstUtils.*; import static io.deephaven.engine.util.TableTools.*; import static org.junit.Assert.assertArrayEquals; @@ -3180,10 +3180,10 @@ public void testUngroupJoined_IDS6311() { assertNull(ungrouped.getColumnSource("CCol").getPrev(firstKey)); assertEquals('b', ungrouped.getColumnSource("CCol").getPrev(secondKey)); - // This tests the NPE condition in the ungrouped column sources - final Table snappy = InitialSnapshotTable.setupInitialSnapshotTable(ungrouped, - ConstructSnapshot.constructInitialSnapshot(this, (QueryTable) ungrouped)); - assertTableEquals(expected, snappy); + try (final BarrageMessage snap = + ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable) ungrouped)) { + verifySnapshotBarrageMessage(snap, expected); + } } private void testMemoize(QueryTable source, UnaryOperator op) { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SnapshotTestUtils.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SnapshotTestUtils.java new file mode 100644 index 00000000000..68a393433b1 --- /dev/null +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SnapshotTestUtils.java @@ -0,0 +1,68 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.engine.table.impl; + +import io.deephaven.chunk.*; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.chunk.util.hashing.ChunkEquals; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ChunkSource; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource; +import io.deephaven.engine.table.impl.sources.ReinterpretUtils; +import io.deephaven.engine.table.impl.util.BarrageMessage; +import io.deephaven.engine.table.impl.remote.ConstructSnapshot; +import org.jetbrains.annotations.NotNull; +import org.junit.Assert; +import java.util.Iterator; +import java.util.List; + +public class SnapshotTestUtils { + + /** + * Used to compare snapshot data inside a {@link BarrageMessage} generated using + * {@link ConstructSnapshot#constructBackplaneSnapshot(Object, BaseTable)} with the expected data in a + * {@link Table}. + */ + public static void verifySnapshotBarrageMessage(@NotNull final BarrageMessage snap, @NotNull final Table expected) { + Assert.assertEquals(snap.rowsAdded.size(), expected.size()); + Assert.assertEquals(snap.addColumnData.length, expected.getColumnSources().size()); + + final int numColumns = expected.getColumnSources().size(); + final int numRows = expected.intSize(); + final List columnNames = expected.getDefinition().getColumnNames(); + final int maxSliceSize = Math.min(ArrayBackedColumnSource.BLOCK_SIZE, numRows); + + for (int colId = 0; colId < numColumns; colId++) { + final ChunkSource expectedSource = ReinterpretUtils.maybeConvertToPrimitive( + expected.getColumnSource(columnNames.get(colId))); + final ChunkType chunkType = expectedSource.getChunkType(); + + // @formatter:off + try (final ChunkSource.GetContext expectedGetContext = expectedSource.makeGetContext(maxSliceSize); + final RowSequence.Iterator expectedRows = expected.getRowSet().getRowSequenceIterator(); + final ResettableReadOnlyChunk snapshotSlice = chunkType.makeResettableReadOnlyChunk()) { + // @formatter:on + + final Iterator> snapshotChunks = snap.addColumnData[colId].data.iterator(); + final ChunkEquals chunkEquals = ChunkEquals.makeEqual(chunkType); + + while (snapshotChunks.hasNext()) { + Assert.assertTrue(expectedRows.hasMore()); + final Chunk snapshotChunk = snapshotChunks.next(); + final int snapshotChunkSize = snapshotChunk.size(); + for (int snapshotChunkUsed = 0; snapshotChunkUsed < snapshotChunkSize;) { + final int sliceSize = Math.min(snapshotChunkSize - snapshotChunkUsed, maxSliceSize); + final Chunk expectedSlice = expectedSource.getChunk(expectedGetContext, + expectedRows.getNextRowSequenceWithLength(sliceSize)); + snapshotSlice.resetFromChunk(snapshotChunk, snapshotChunkUsed, sliceSize); + Assert.assertTrue(chunkEquals.equalReduce(expectedSlice, snapshotSlice)); + snapshotChunkUsed += sliceSize; + } + } + Assert.assertFalse(expectedRows.hasMore()); + } + } + } +} diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java index dfe651a81ce..f037f631580 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/remote/TestConstructSnapshot.java @@ -8,6 +8,7 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.select.FunctionalColumn; +import io.deephaven.engine.table.impl.util.BarrageMessage; import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.updategraph.LogicalClock; @@ -23,8 +24,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static io.deephaven.engine.table.impl.SnapshotTestUtils.verifySnapshotBarrageMessage; import static io.deephaven.engine.testutil.TstUtils.addToTable; -import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.testutil.TstUtils.i; import static io.deephaven.engine.testutil.TstUtils.testRefreshingTable; import static io.deephaven.engine.util.TableTools.intCol; @@ -83,12 +84,12 @@ public UpdateGraph getUpdateGraph() { assertEquals(1, changed.get()); } - public void testUsePrevSnapshot() throws ExecutionException, InterruptedException { + public void testConstructBackplaneSnapshot() throws ExecutionException, InterruptedException { final ExecutorService executor = Executors.newSingleThreadExecutor( new NamingThreadFactory(TestConstructSnapshot.class, "TestConstructSnapshot Executor")); final QueryTable table = testRefreshingTable(i(1000).toTracking(), intCol("I", 10)); - FunctionalColumn plusOneColumn = + final FunctionalColumn plusOneColumn = new FunctionalColumn<>("I", Integer.class, "S2", String.class, (Integer i) -> Integer.toString(i + 1)); final QueryTable functionalTable = (QueryTable) table.updateView(List.of(plusOneColumn)); @@ -97,58 +98,47 @@ public void testUsePrevSnapshot() throws ExecutionException, InterruptedExceptio final BitSet twoBits = new BitSet(); twoBits.set(0, 2); - final InitialSnapshot initialSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace( - "table", table, oneBit, RowSetFactory.fromRange(0, 10)); - final InitialSnapshot funcSnapshot = ConstructSnapshot.constructInitialSnapshotInPositionSpace( - "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10)); - - final InitialSnapshotTable initialSnapshotTable = - InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot); - final InitialSnapshotTable funcSnapshotTable = - InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot); - - assertTableEquals(TableTools.newTable(intCol("I", 10)), initialSnapshotTable); - assertTableEquals(TableTools.newTable(intCol("I", 10), stringCol("S2", "11")), funcSnapshotTable); + try (final BarrageMessage initialSnapshot1 = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10), null); + final BarrageMessage funcSnapshot1 = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10), null)) { + verifySnapshotBarrageMessage(initialSnapshot1, TableTools.newTable(intCol("I", 10))); + verifySnapshotBarrageMessage(funcSnapshot1, TableTools.newTable(intCol("I", 10), stringCol("S2", "11"))); + } final ControlledUpdateGraph ug = ExecutionContext.getContext().getUpdateGraph().cast(); ug.startCycleForUnitTests(false); addToTable(table, i(1000), intCol("I", 20)); - final InitialSnapshot initialSnapshot2 = - executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( - "table", table, oneBit, RowSetFactory.fromRange(0, 10))).get(); - final InitialSnapshot funcSnapshot2 = - executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( - "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); - table.notifyListeners(i(), i(), i(1000)); - ug.markSourcesRefreshedForUnitTests(); - - // noinspection StatementWithEmptyBody - while (ug.flushOneNotificationForUnitTests()); - - final InitialSnapshot initialSnapshot3 = - executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( - "table", table, oneBit, RowSetFactory.fromRange(0, 10))).get(); - final InitialSnapshot funcSnapshot3 = - executor.submit(() -> ConstructSnapshot.constructInitialSnapshotInPositionSpace( - "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10))).get(); - ug.completeCycleForUnitTests(); - - final InitialSnapshotTable initialSnapshotTable2 = - InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot2); - final InitialSnapshotTable initialSnapshotTable3 = - InitialSnapshotTable.setupInitialSnapshotTable(table.getDefinition(), initialSnapshot3); - - assertTableEquals(TableTools.newTable(intCol("I", 10)), initialSnapshotTable2); - assertTableEquals(TableTools.newTable(intCol("I", 20)), initialSnapshotTable3); - - final InitialSnapshotTable funcSnapshotTable2 = - InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot2); - final InitialSnapshotTable funcSnapshotTable3 = - InitialSnapshotTable.setupInitialSnapshotTable(functionalTable.getDefinition(), funcSnapshot3); - - assertTableEquals(TableTools.newTable(intCol("I", 10), stringCol("S2", "11")), funcSnapshotTable2); - assertTableEquals(TableTools.newTable(intCol("I", 20), stringCol("S2", "21")), funcSnapshotTable3); + + try (final BarrageMessage initialSnapshot2 = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10), null); + final BarrageMessage funcSnapshot2 = ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10), null)) { + table.notifyListeners(i(), i(), i(1000)); + ug.markSourcesRefreshedForUnitTests(); + + // noinspection StatementWithEmptyBody + while (ug.flushOneNotificationForUnitTests()); + + try (final BarrageMessage initialSnapshot3 = + executor.submit(() -> ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( + "table", table, oneBit, RowSetFactory.fromRange(0, 10), null)).get(); + final BarrageMessage funcSnapshot3 = + executor.submit(() -> ConstructSnapshot.constructBackplaneSnapshotInPositionSpace( + "functionalTable", functionalTable, twoBits, RowSetFactory.fromRange(0, 10), null)) + .get()) { + ug.completeCycleForUnitTests(); + + verifySnapshotBarrageMessage(initialSnapshot2, TableTools.newTable(intCol("I", 10))); + verifySnapshotBarrageMessage(initialSnapshot3, TableTools.newTable(intCol("I", 20))); + + verifySnapshotBarrageMessage(funcSnapshot2, + TableTools.newTable(intCol("I", 10), stringCol("S2", "11"))); + verifySnapshotBarrageMessage(funcSnapshot3, + TableTools.newTable(intCol("I", 20), stringCol("S2", "21"))); + } + } executor.shutdownNow(); } diff --git a/extensions/arrow/src/test/java/io/deephaven/extensions/arrow/ArrowWrapperToolsTest.java b/extensions/arrow/src/test/java/io/deephaven/extensions/arrow/ArrowWrapperToolsTest.java index 89329e44626..905c8288791 100644 --- a/extensions/arrow/src/test/java/io/deephaven/extensions/arrow/ArrowWrapperToolsTest.java +++ b/extensions/arrow/src/test/java/io/deephaven/extensions/arrow/ArrowWrapperToolsTest.java @@ -19,7 +19,7 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.remote.ConstructSnapshot; -import io.deephaven.engine.table.impl.remote.InitialSnapshotTable; +import io.deephaven.engine.table.impl.util.BarrageMessage; import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.TableTools; import io.deephaven.util.QueryConstants; @@ -51,6 +51,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import static io.deephaven.engine.table.impl.SnapshotTestUtils.verifySnapshotBarrageMessage; import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static java.util.Arrays.asList; @@ -287,37 +288,37 @@ public void testConcurrentSnapshots() { final QueryTable readback = ArrowWrapperTools.readFeather(file.getPath()); final Thread[] threads = new Thread[10]; - final Table[] results = new Table[10]; - - // Lets simulate 10 threads trying to snapshot the table at the same time. - // Each thread will start up and wait for the barrier, then they will all attempt - // a snapshot at the same time and poke the countdown latch to release the test thread - // Then we'll validate all the results and life will be great - final CyclicBarrier barrier = new CyclicBarrier(10); - final CountDownLatch latch = new CountDownLatch(10); - final ExecutionContext executionContext = ExecutionContext.getContext(); - for (int ii = 0; ii < 10; ii++) { - final int threadNo = ii; - threads[ii] = new Thread(() -> { - try (final SafeCloseable ignored = executionContext.open()) { - barrier.await(); - results[threadNo] = - InitialSnapshotTable.setupInitialSnapshotTable(expected, - ConstructSnapshot.constructInitialSnapshot(new Object(), readback)); - } catch (InterruptedException | BrokenBarrierException e) { - throw new RuntimeException(e); - } finally { - latch.countDown(); - } - }); - threads[ii].start(); - } + final BarrageMessage[] results = new BarrageMessage[10]; + try (final SafeCloseable ignored1 = () -> SafeCloseable.closeAll(results)) { + // Lets simulate 10 threads trying to snapshot the table at the same time. + // Each thread will start up and wait for the barrier, then they will all attempt + // a snapshot at the same time and poke the countdown latch to release the test thread + // Then we'll validate all the results and life will be great + final CyclicBarrier barrier = new CyclicBarrier(10); + final CountDownLatch latch = new CountDownLatch(10); + final ExecutionContext executionContext = ExecutionContext.getContext(); + for (int ii = 0; ii < 10; ii++) { + final int threadNo = ii; + threads[ii] = new Thread(() -> { + try (final SafeCloseable ignored2 = executionContext.open()) { + barrier.await(); + // noinspection resource + results[threadNo] = ConstructSnapshot.constructBackplaneSnapshot(new Object(), readback); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } finally { + latch.countDown(); + } + }); + threads[ii].start(); + } - latch.await(); - for (int ii = 0; ii < 10; ii++) { - assertTableEquals(expected, results[ii]); + latch.await(); + for (int ii = 0; ii < 10; ii++) { + verifySnapshotBarrageMessage(results[ii], expected); + } + readback.close(); } - readback.close(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally {