Skip to content

Commit

Permalink
Faster import (#171)
Browse files Browse the repository at this point in the history
* Avoid array creation and string interpolation at each formatLine run
* Add a new build-watch script
* Memoize the calculate seconds from midnight and date functions
* Create indexes after importing all the GTFS files
* Avoir creating a new db connection for each importLines batch
* Use sqlite's transaction method rather than batching prepare().run()
* Fix getStops with bounding box test : order is not important
  • Loading branch information
laem authored Oct 3, 2024
1 parent a5e2ef2 commit 59fbd75
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 36 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
"scripts": {
"prepare": "husky",
"test": "node --experimental-vm-modules node_modules/jest/bin/jest.js",
"build": "tsup"
"build": "tsup",
"build-watch": "tsup --watch"
},
"exports": {
".": {
Expand Down
116 changes: 82 additions & 34 deletions src/lib/import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,14 @@ const createTables = (db: Database.Database) => {
db.prepare(
`CREATE TABLE ${model.filenameBase} (${columns.join(', ')});`,
).run();
}
};

const createIndexes = (db: Database.Database) => {
for (const model of Object.values(models) as Model[]) {
if (!model.schema) {
return;
}
for (const column of model.schema.filter((column) => column.index)) {
db.prepare(
`CREATE INDEX idx_${model.filenameBase}_${column.name} ON ${model.filenameBase} (${column.name});`,
Expand Down Expand Up @@ -604,45 +611,57 @@ const formatLine = (
}

// Convert to midnight timestamp and add timestamp columns as integer seconds from midnight
const timeColumnNames = [
'start_time',
'end_time',
'arrival_time',
'departure_time',
'prior_notice_last_time',
'prior_notice_start_time',
'start_pickup_drop_off_window',
];

for (const timeColumnName of timeColumnNames) {
if (formattedLine[timeColumnName]) {
const timestampColumnName = timeColumnName.endsWith('time')
? `${timeColumnName}stamp`
: `${timeColumnName}_timestamp`;
formattedLine[timestampColumnName] = calculateSecondsFromMidnight(
formattedLine[timeColumnName],
);

for (const [timeColumnName, timestampColumnName] of timeColumnNamesCouples) {
const value = formattedLine[timeColumnName];
if (value) {
const [seconds, date] = cachedCalculateDates(value);
formattedLine[timestampColumnName] = seconds;

// Ensure leading zeros for time columns
formattedLine[timeColumnName] = padLeadingZeros(
formattedLine[timeColumnName],
);
formattedLine[timeColumnName] = date;
}
}

return formattedLine;
};

interface Dictionary<T> {
[key: string]: T;
}
type Tuple = [seconds: number | null, date: string | null];
const cache: Dictionary<Tuple> = {};
const cachedCalculateDates = (value: string) => {
const cached = cache[value];
if (cached != null) return cached;
const seconds = calculateSecondsFromMidnight(value);
const date = padLeadingZeros(value);
const computed: Tuple = [seconds, date];
cache[value] = computed;
return computed;
};

const timeColumnNames = [
'start_time',
'end_time',
'arrival_time',
'departure_time',
'prior_notice_last_time',
'prior_notice_start_time',
'start_pickup_drop_off_window',
],
timeColumnNamesCouples = timeColumnNames.map((name) => [
name,
name.endsWith('time') ? `${name}stamp` : `${name}_timestamp`,
]);

const importLines = (
db: Database.Database,
task: ITask,
lines: { [x: string]: any; geojson?: string }[],
model: Model,
totalLineCount: number,
) => {
const db = openDb({
sqlitePath: task.sqlitePath,
});

if (lines.length === 0) {
return;
}
Expand Down Expand Up @@ -702,7 +721,7 @@ const importLines = (
);
};

const importFiles = (task: ITask) =>
const importFiles = (db: Database.Database, task: ITask) =>
mapSeries(
Object.values(models),
(model: Model) =>
Expand Down Expand Up @@ -757,17 +776,44 @@ const importFiles = (task: ITask) =>
...task.csvOptions,
});

const columns = model.schema.filter((column) => column.name !== 'id');

const placeholder = columns.map(({ name }) => '@' + name).join(', ');
const prepareStatement = `INSERT ${task.ignoreDuplicates ? 'OR IGNORE' : ''} INTO ${
model.filenameBase
} (${columns
.map((column) => column.name)
.join(', ')}) VALUES (${placeholder})`;

const insert = db.prepare(prepareStatement);

const insertMany = db.transaction((lines) => {
for (const line of lines) {
if (task.prefix === undefined) {
insert.run(line);
} else {
const prefixedLine = Object.fromEntries(
Object.entries(line).map(([columnName, value], index) => [
columnName,
columns[index].prefix === true
? `${task.prefix}${value}`
: value,
]),
);
insert.run(prefixedLine);
}
}
});

let lines: { [x: string]: any; geojson?: string }[] = [];

parser.on('readable', () => {
let record;

while ((record = parser.read())) {
try {
totalLineCount += 1;
lines.push(formatLine(record, model, totalLineCount));
// If we have a bunch of lines ready to insert, then do it
if (lines.length >= maxInsertVariables / model.schema.length) {
importLines(task, lines, model, totalLineCount);
}
} catch (error) {
reject(error);
}
Expand All @@ -776,8 +822,7 @@ const importFiles = (task: ITask) =>

parser.on('end', () => {
try {
// Insert all remaining lines
importLines(task, lines, model, totalLineCount);
insertMany(lines);
} catch (error) {
reject(error);
}
Expand All @@ -798,7 +843,7 @@ const importFiles = (task: ITask) =>
);
}
const line = formatLine({ geojson: data }, model, totalLineCount);
importLines(task, [line], model, totalLineCount);
importLines(db, task, [line], model, totalLineCount);
resolve();
})
.catch(reject);
Expand Down Expand Up @@ -861,7 +906,7 @@ export async function importGtfs(initialConfig: Config) {
}

await readFiles(task);
await importFiles(task);
await importFiles(db, task);
await updateRealtimeData(task);

await rm(tempPath, { recursive: true });
Expand All @@ -874,6 +919,9 @@ export async function importGtfs(initialConfig: Config) {
}
});

log(`Will now create DB indexes`);
createIndexes(db);

log(
`Completed GTFS import for ${pluralize('agency', agencyCount, true)}\n`,
);
Expand Down
7 changes: 6 additions & 1 deletion src/test/get-stops.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import config from './test-config.ts';
import { openDb, closeDb, importGtfs, getStops } from '../index.ts';
import { sortBy } from 'lodash-es';
import exp from 'constants';

beforeAll(async () => {
Expand Down Expand Up @@ -316,6 +317,10 @@ describe('getStops():', () => {
];

expect(results).toHaveLength(3);
expect(results).toEqual(expectedResult);

// Results aren't sorted by distance, so the DB insert statement can influence the result order
expect(sortBy(results, 'stop_id')).toEqual(
sortBy(expectedResult, 'stop_id'),
);
});
});

0 comments on commit 59fbd75

Please sign in to comment.