Skip to content

Commit

Permalink
progressive loading
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Li <[email protected]>
  • Loading branch information
lixun910 committed Nov 30, 2023
1 parent 897b93b commit e81aec7
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 50 deletions.
1 change: 1 addition & 0 deletions src/actions/src/action-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export const ActionTypes = {
SET_FILTER_PLOT: `${ACTION_PREFIX}SET_FILTER_PLOT`,
LOAD_FILES: `${ACTION_PREFIX}LOAD_FILES`,
LOAD_NEXT_FILE: `${ACTION_PREFIX}LOAD_NEXT_FILE`,
LOAD_BATCH_DATA_SUCCESS: `${ACTION_PREFIX}LOAD_BATCH_DATA_SUCCESS`,
LOAD_FILE_STEP_SUCCESS: `${ACTION_PREFIX}LOAD_FILE_STEP_SUCCESS`,
LOAD_FILES_ERR: `${ACTION_PREFIX}LOAD_FILES_ERR`,
LOAD_FILES_SUCCESS: `${ACTION_PREFIX}LOAD_FILES_SUCCESS`,
Expand Down
14 changes: 14 additions & 0 deletions src/actions/src/vis-state-actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,20 @@ export function loadFileStepSuccess({
};
}

export function loadBatchDataSuccess({
fileName,
fileCache
}: {
fileName: string;
fileCache: FileCacheItem[];
}): Merge<LoadFileStepSuccessAction, {type: typeof ActionTypes.LOAD_BATCH_DATA_SUCCESS}> {
return {
type: ActionTypes.LOAD_BATCH_DATA_SUCCESS,
fileName,
fileCache
};
}

export type LoadFilesErrUpdaterAction = {
fileName: string;
error: any;
Expand Down
2 changes: 1 addition & 1 deletion src/layers/src/base-layer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ class Layer {
const dataUpdateTriggers = this.getDataUpdateTriggers(layerDataset);
const triggerChanged = this.getChangedTriggers(dataUpdateTriggers);

if (triggerChanged && triggerChanged.getMeta) {
if (triggerChanged && (triggerChanged.getMeta || triggerChanged.getData)) {
this.updateLayerMeta(dataContainer, getPosition);
}

Expand Down
45 changes: 29 additions & 16 deletions src/layers/src/geojson-layer/geojson-layer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ export default class GeoJsonLayer extends Layer {
this.filteredIndex[filteredIndex[i]] = 1;
}

this.filteredIndexTrigger = filteredIndex;
// this.filteredIndexTrigger = filteredIndex;

// for arrow, always return full dataToFeature instead of a filtered one, so there is no need to update attributes in GPU
// for geojson, this should work as well and more efficient. But we need to update some test cases e.g. #GeojsonLayer -> formatLayerData
Expand Down Expand Up @@ -436,21 +436,34 @@ export default class GeoJsonLayer extends Layer {
const getGeoColumn = geoColumnAccessor(this.config.columns);
const getGeoField = geoFieldAccessor(this.config.columns);

if (this.dataToFeature.length === 0) {
const updateLayerMetaFunc =
dataContainer instanceof ArrowDataContainer
? getGeojsonLayerMetaFromArrow
: getGeojsonLayerMeta;
const {dataToFeature, bounds, fixedRadius, featureTypes, centroids} = updateLayerMetaFunc({
dataContainer,
getFeature,
getGeoColumn,
getGeoField
});

if (centroids) this.centroids = centroids;
this.dataToFeature = dataToFeature;
this.updateMeta({bounds, fixedRadius, featureTypes});
if (dataContainer instanceof ArrowDataContainer) {
const arrowContainer = dataContainer as ArrowDataContainer;
if (this.dataToFeature.length < arrowContainer.numChunks()) {
const {dataToFeature, bounds, fixedRadius, featureTypes, centroids} = getGeojsonLayerMetaFromArrow({
dataContainer,
getGeoColumn,
getGeoField,
chunkIndex: this.dataToFeature.length
});
if (centroids) this.centroids = centroids;
if (this.dataToFeature.length === 0) {
// not update bounds for every batch, to avoid interrupt user interacts with map while loading the map incrementally
this.updateMeta({bounds, fixedRadius, featureTypes});
}
// @ts-expect-error TODO fix this
this.dataToFeature = [...this.dataToFeature, ...dataToFeature];
}
} else {
if (this.dataToFeature.length === 0) {
const {dataToFeature, bounds, fixedRadius, featureTypes, centroids} = getGeojsonLayerMeta({
dataContainer,
getFeature
});

if (centroids) this.centroids = centroids;
this.dataToFeature = dataToFeature;
this.updateMeta({ bounds, fixedRadius, featureTypes });
}
}
}

Expand Down
74 changes: 71 additions & 3 deletions src/layers/src/layer-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import {BinaryFeatures} from '@loaders.gl/schema';
import {
getBinaryGeometriesFromArrow,
parseGeometryFromArrow,
BinaryGeometriesFromArrowOptions
BinaryGeometriesFromArrowOptions,
parseGeoArrowOnWorker
} from '@loaders.gl/arrow';

import {DeckGlGeoTypes} from './geojson-layer/geojson-utils';
Expand Down Expand Up @@ -57,17 +58,20 @@ export type GeojsonLayerMetaProps = {
export function getGeojsonLayerMetaFromArrow({
dataContainer,
getGeoColumn,
getGeoField
getGeoField,
chunkIndex
}: {
dataContainer: DataContainerInterface;
getGeoColumn: (dataContainer: DataContainerInterface) => unknown;
getGeoField: (dataContainer: DataContainerInterface) => Field | null;
getGeoField: (dataContainer: DataContainerInterface) => Field | null;
chunkIndex?: number;
}): GeojsonLayerMetaProps {
const geoColumn = getGeoColumn(dataContainer) as arrow.Vector;
const arrowField = getGeoField(dataContainer);

const encoding = arrowField?.metadata?.get('ARROW:extension:name');
const options: BinaryGeometriesFromArrowOptions = {
...(chunkIndex !== undefined && chunkIndex >= 0 ? {chunkIndex} : {}),
triangulate: true,
meanCenter: true
};
Expand All @@ -90,6 +94,70 @@ export function getGeojsonLayerMetaFromArrow({
};
}


export async function getGeojsonLayerMetaFromArrowAsync({
dataContainer,
getGeoColumn,
getGeoField,
chunkIndex
}: {
dataContainer: DataContainerInterface;
getGeoColumn: (dataContainer: DataContainerInterface) => unknown;
getGeoField: (dataContainer: DataContainerInterface) => Field | null;
chunkIndex: number;
}): Promise<GeojsonLayerMetaProps> {
const arrowField = getGeoField(dataContainer);
const geoColumn = getGeoColumn(dataContainer) as arrow.Vector;
const geometryChunk = geoColumn?.data[chunkIndex];

const chunkData = {
type: {
...geometryChunk?.type,
typeId: geometryChunk?.typeId,
listSize: geometryChunk?.type?.listSize
},
offset: geometryChunk.offset,
length: geometryChunk.length,
nullCount: geometryChunk.nullCount,
buffers: geometryChunk.buffers,
children: geometryChunk.children,
dictionary: geometryChunk.dictionary
};
const encoding = arrowField?.metadata?.get('ARROW:extension:name');

console.log('start parseGeoArrowOnWorker');

const parsedGeoArrowData = await parseGeoArrowOnWorker(
{
operation: 'parse-geoarrow',
chunkData,
chunkIndex: 0,
geometryEncoding: encoding,
meanCenter: true,
triangle: false
},
{
_workerType: 'test'
}
);

console.log('end parseGeoArrowOnWorker');
// kepler should await for the result from web worker and render the binary geometries
const {binaryGeometries, bounds, featureTypes, meanCenters} =
parsedGeoArrowData.binaryDataFromGeoArrow!;

// since there is no feature.properties.radius, we set fixedRadius to false
const fixedRadius = false;

return {
dataToFeature: binaryGeometries,
featureTypes,
bounds,
fixedRadius,
centroids: meanCenters
};
}

export function isLayerHoveredFromArrow(objectInfo, layerId: string): boolean {
// there could be multiple deck.gl layers created from multiple chunks in arrow table
// the objectInfo.layer id should be `${this.id}-${i}`
Expand Down
8 changes: 6 additions & 2 deletions src/processors/src/file-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
processKeplerglJSON,
processRowObject
} from './data-processor';
import {generateHashId, isPlainObject} from '@kepler.gl/utils';
import {generateHashId, isPlainObject, generateHashIdFromString} from '@kepler.gl/utils';
import {DATASET_FORMATS} from '@kepler.gl/constants';
import {Loader} from '@loaders.gl/loader-utils';
import {FileCacheItem, ValidKeplerGlMap} from './types';
Expand Down Expand Up @@ -215,10 +215,13 @@ export function processFileData({
fileCache: FileCacheItem[];
}): Promise<FileCacheItem[]> {
return new Promise((resolve, reject) => {
let {data} = content;
let {fileName, data} = content;
let format: string | undefined;
let processor: Function | undefined;

// generate unique id with length of 4 using fileName string
const id = generateHashIdFromString(fileName);

if (isArrowData(data)) {
format = DATASET_FORMATS.arrow;
processor = processArrowTable;
Expand All @@ -241,6 +244,7 @@ export function processFileData({
{
data: result,
info: {
id,
label: content.fileName,
format
}
Expand Down
31 changes: 20 additions & 11 deletions src/reducers/src/combined-updaters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ export const addDataToMapUpdater = (
...payload.options
};

// check if progresive loading dataset by bataches
const isProgressiveLoading = Array.isArray(datasets) && datasets[0].info.id && datasets[0].info.id in state.visState.datasets;

// @ts-expect-error
let parsedConfig: ParsedConfig = config;

Expand Down Expand Up @@ -193,23 +196,29 @@ export const addDataToMapUpdater = (
apply_<VisState, any>(setMapInfoUpdater, {info})
)
),

with_(({visState}) =>
pick_('mapState')(
apply_(
stateMapConfigUpdater,
payload_({
config: parsedConfig,
options,
bounds: findMapBoundsIfCentered(filterNewlyAddedLayers(visState.layers))
})
if_(
isProgressiveLoading === false,
with_(({ visState }) =>
pick_('mapState')(
apply_(
stateMapConfigUpdater,
payload_({
config: parsedConfig,
options,
bounds: findMapBoundsIfCentered(filterNewlyAddedLayers(visState.layers))
})
)
)
)
),
pick_('mapStyle')(apply_(styleMapConfigUpdater, payload_({config: parsedConfig, options}))),
if_(
isProgressiveLoading === false,
pick_('mapStyle')(apply_(styleMapConfigUpdater, payload_({ config: parsedConfig, options }))),
),
pick_('uiState')(apply_(uiStateLoadFilesSuccessUpdater, payload_(null))),
pick_('uiState')(apply_(toggleModalUpdater, payload_(null))),
pick_('uiState')(merge_(options.hasOwnProperty('readOnly') ? {readOnly: options.readOnly} : {}))

])(state);
};

Expand Down
57 changes: 41 additions & 16 deletions src/reducers/src/vis-state-updaters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
layerTypeChange,
layerVisConfigChange,
layerVisualChannelConfigChange,
loadBatchDataSuccess,
loadFilesErr,
loadFilesSuccess,
loadFileStepSuccess,
Expand Down Expand Up @@ -2277,6 +2278,21 @@ export function parseProgress(prevProgress = {}, progress) {
};
}

export function loadBatchDataSuccessUpdater(
state: VisState,
action: VisStateActions.LoadFileStepSuccessAction
): VisState {
if (!state.fileLoading) {
return state;
}
const {fileCache} = action;
const {onFinish} = state.fileLoading;
return withTask(
state,
DELAY_TASK(200).map(() => onFinish(fileCache))
);
}

/**
* gets called with payload = AsyncGenerator<???>
* @memberof visStateUpdaters
Expand All @@ -2292,23 +2308,32 @@ export const nextFileBatchUpdater = (
fileName,
progress: parseProgress(state.fileLoadingProgress[fileName], progress)
});

return withTask(
stateWithProgress,
UNWRAP_TASK(gen.next()).bimap(
({value, done}) => {
return done
? onFinish(accumulated)
: nextFileBatch({
gen,
fileName,
progress: value.progress,
accumulated: value,
onFinish
});
},
err => loadFilesErr(fileName, err)
)
);
stateWithProgress, [
...(fileName.endsWith('arrow') && accumulated && accumulated.data?.length > 0
? [
PROCESS_FILE_DATA({content: accumulated, fileCache: []}).bimap(
result => loadBatchDataSuccess({fileName, fileCache: result}),
err => loadFilesErr(fileName, err)
)
]
: []),
UNWRAP_TASK(gen.next()).bimap(
({value, done}) => {
return done
? onFinish(accumulated)
: nextFileBatch({
gen,
fileName,
progress: value.progress,
accumulated: value,
onFinish
});
},
err => loadFilesErr(fileName, err)
)
]);
};

/**
Expand Down
2 changes: 2 additions & 0 deletions src/reducers/src/vis-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const actionHandler = {

[ActionTypes.LOAD_NEXT_FILE]: visStateUpdaters.loadNextFileUpdater,

[ActionTypes.LOAD_BATCH_DATA_SUCCESS]: visStateUpdaters.loadBatchDataSuccessUpdater,

[ActionTypes.LOAD_FILE_STEP_SUCCESS]: visStateUpdaters.loadFileStepSuccessUpdater,

[ActionTypes.MAP_CLICK]: visStateUpdaters.mapClickUpdater,
Expand Down
11 changes: 11 additions & 0 deletions src/table/src/dataset-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ export function createNewDataEntry(
return {};
}

// check if dataset already exists, and update it when loading data by batches incrementally
if (info && info.id && datasets[info.id]) {
// get keplerTable from datasets
const keplerTable = datasets[info.id];
// update the data in keplerTable
keplerTable.update(validatedData);
return {
[keplerTable.id]: keplerTable
};
}

info = info || {};
const color = info.color || getNewDatasetColor(datasets);

Expand Down
Loading

0 comments on commit e81aec7

Please sign in to comment.