Skip to content

Commit

Permalink
discojs-*: rework loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
tharvik committed Aug 22, 2024
1 parent 69026d5 commit 566c44d
Show file tree
Hide file tree
Showing 19 changed files with 1,856 additions and 338 deletions.
5 changes: 4 additions & 1 deletion discojs-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
"dependencies": {
"@epfml/discojs": "*",
"@koush/wrtc": "0.5",
"@tensorflow/tfjs-node": "4"
"@tensorflow/tfjs-node": "4",
"csv-parse": "5",
"sharp": "0.33"
},
"devDependencies": {
"@types/node": "22",
"nodemon": "3",
"tmp-promise": "3",
"ts-node": "10"
}
}
3 changes: 2 additions & 1 deletion discojs-node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './data/index.js'
export { saveModelToDisk, loadModelFromDisk } from './models/model_loader.js'
export * from './loaders/index.js'
export { saveModelToDisk, loadModelFromDisk } from './model_loader.js'
62 changes: 62 additions & 0 deletions discojs-node/src/loaders.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import * as fs from "node:fs/promises";
import { withFile } from "tmp-promise";
import { describe, it } from "mocha";
import { expect } from "chai";

import {
loadCSV,
loadImage,
loadImagesInDir,
loadText,
} from "./loaders/index.js";

// Array.fromAsync not yet widely used (2024)
async function arrayFromAsync<T>(iter: AsyncIterable<T>): Promise<T[]> {
const ret: T[] = [];
for await (const e of iter) ret.push(e);
return ret;
}

describe("csv parser", () => {
it("parses basic file", async () => {
await withFile(async ({ path }) => {
await fs.writeFile(path, ["a,b,c", "1,2,3", "4,5,6"].join("\n"));

const dataset = loadCSV(path);

expect(await arrayFromAsync(dataset)).to.have.deep.ordered.members([
{ a: "1", b: "2", c: "3" },
{ a: "4", b: "5", c: "6" },
]);
});
});
});

describe("image parser", () => {
it("parses mnist example", async () => {
const parsed = await loadImage("../datasets/9-mnist-example.png");

expect(parsed).to.have.property("width").that.equals(172);
expect(parsed).to.have.property("height").that.equals(178);
});
});

describe("image directory parser", () => {
it("parses all cifar10 files", async () => {
const parsed = await loadImagesInDir("../datasets/CIFAR10");

expect(await parsed.size()).to.equal(24);
});
});

describe("text parser", () => {
it("parses basic file", async () => {
await withFile(async ({ path }) => {
await fs.writeFile(path, ["a", "b", "c"].join("\n"));

const parsed = loadText(path);

expect(await parsed.size()).to.equal(3);
});
});
});
31 changes: 31 additions & 0 deletions discojs-node/src/loaders/csv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import * as fs from "node:fs/promises";
import { parse as csvParser } from "csv-parse";

import { Dataset } from "@epfml/discojs";

function isRecordOfString(
raw: unknown,
): raw is Partial<Record<string, string>> {
if (typeof raw !== "object" || raw === null) return false;

const record: Partial<Record<string, unknown>> = raw;

for (const [k, v] of Object.entries(record))
if (typeof k !== "string" || typeof v !== "string") return false;

return true;
}

export function load(path: string): Dataset<Partial<Record<string, string>>> {
return new Dataset(async function* () {
const stream = (await fs.open(path))
.createReadStream()
.pipe(csvParser({ columns: true }));

for await (const row of stream) {
if (!isRecordOfString(row))
throw new Error("excepted object of string to string");
yield row;
}
});
}
24 changes: 24 additions & 0 deletions discojs-node/src/loaders/image.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sharp from "sharp";
import * as path from "node:path";
import * as fs from "node:fs/promises";

import { Dataset, Image } from "@epfml/discojs";

export async function load(path: string): Promise<Image> {
const { data, info } = await sharp(path).removeAlpha().raw().toBuffer({
resolveWithObject: true,
});

return {
data,
width: info.width,
height: info.height,
};
}

export async function loadAllInDir(dir: string): Promise<Dataset<Image>> {
const filenames = await fs.readdir(dir);
const paths = filenames.map((f) => path.join(dir, f));

return new Dataset(paths).map(load);
}
6 changes: 6 additions & 0 deletions discojs-node/src/loaders/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export { load as loadCSV } from "./csv.js";
export {
load as loadImage,
loadAllInDir as loadImagesInDir,
} from "./image.js";
export { load as loadText } from "./text.js";
14 changes: 14 additions & 0 deletions discojs-node/src/loaders/text.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import * as fs from "node:fs/promises";
import * as readline from "node:readline/promises";

import { Dataset, Text } from "@epfml/discojs";

export function load(path: string): Dataset<Text> {
return new Dataset(async function* () {
const input = (await fs.open(path)).createReadStream({ encoding: "utf8" });

// `readline` is a bit overkill but seems standard
// https://nodejs.org/api/readline.html#example-read-file-stream-line-by-line
yield* readline.createInterface({ input, crlfDelay: Infinity });
});
}
File renamed without changes.
10 changes: 7 additions & 3 deletions discojs-web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"watch": "nodemon --ext ts --ignore dist --watch ../discojs/dist --watch . --exec npm run",
"build": "tsc",
"lint": "npx eslint .",
"test": ": nothing"
"test": "vitest --run"
},
"repository": {
"type": "git",
Expand All @@ -20,9 +20,13 @@
"homepage": "https://github.com/epfml/disco#readme",
"dependencies": {
"@epfml/discojs": "*",
"@tensorflow/tfjs": "4"
"@tensorflow/tfjs": "4",
"papaparse": "5"
},
"devDependencies": {
"nodemon": "3"
"@types/papaparse": "5",
"jsdom": "24",
"nodemon": "3",
"vitest": "1"
}
}
3 changes: 2 additions & 1 deletion discojs-web/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './data/index.js'
export * from './memory/index.js'
export * from "./loaders/index.js";
export * from "./memory/index.js";
43 changes: 43 additions & 0 deletions discojs-web/src/loaders.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { describe, it, expect } from "vitest";

import { loadCSV, loadText } from "./loaders/index.js";

async function arrayFromAsync<T>(iter: AsyncIterable<T>): Promise<T[]> {
const ret: T[] = [];
for await (const e of iter) ret.push(e);
return ret;
}

describe("csv parser", () => {
it("loads", async () => {
const csv = new File([["a,b,c", "1,2,3", "4,5,6"].join("\n")], "csv");

const parsed = loadCSV(csv);

expect(await arrayFromAsync(parsed)).to.have.deep.ordered.members([
{ a: "1", b: "2", c: "3" },
{ a: "4", b: "5", c: "6" },
]);
});
});

describe("text parser", () => {
it("loads", async () => {
// jsdom doesn't implement .text on File/Blob
// trick from https://github.com/jsdom/jsdom/issues/2555
const text = await (
await fetch(
// data URL content need to be url-encoded
["data:,first", "second", "third"].join("%0A"),
)
).blob();

const parsed = loadText(text);

expect(await arrayFromAsync(parsed)).to.have.ordered.members([
"first",
"second",
"third",
]);
});
});
43 changes: 43 additions & 0 deletions discojs-web/src/loaders/csv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import Papa from "papaparse";

import { Dataset } from "@epfml/discojs";

function isRecordOfString(raw: unknown): raw is Record<string, string> {
if (typeof raw !== "object" || raw === null) return false;

const record: Partial<Record<string, unknown>> = raw;

for (const v of Object.values(record))
if (typeof v !== "string") return false;

return true;
}

export function load(file: File): Dataset<Partial<Record<string, string>>> {
return new Dataset(async function* () {
// papaparse uses callback for streams and can't easily be converted to async generator
// maybe another library does it better but I didn't find one at the time
yield* await new Promise<Record<string, string>[]>((resolve, reject) => {
Papa.parse(file, {
header: true,
dynamicTyping: false,
skipEmptyLines: true, // TODO needed to avoid parsing last empty line
complete(results) {
if (results.errors.length > 0) {
reject(results.errors);
return;
}

const rows = results.data.map((row) => {
if (!isRecordOfString(row))
throw new Error("excepted object of string to string");

return row;
});

resolve(rows);
},
});
});
});
}
18 changes: 18 additions & 0 deletions discojs-web/src/loaders/image.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type { Image as DiscoImage } from "@epfml/discojs";

export async function load(file: Blob): Promise<DiscoImage> {
const image = new Image();
const url = URL.createObjectURL(file);
image.src = url;
await image.decode();
URL.revokeObjectURL(url);

const [width, height] = [image.naturalWidth, image.naturalHeight];

const context = new OffscreenCanvas(width, height).getContext("2d");
if (context === null) throw new Error("unable to setup image convertor");
context.drawImage(image, 0, 0);
const data = new Uint8Array(context.getImageData(0, 0, width, height).data);

return { width, height, data };
}
3 changes: 3 additions & 0 deletions discojs-web/src/loaders/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export { load as loadCSV } from "./csv.js"
export { load as loadImage } from "./image.js"
export { load as loadText } from "./text.js"
41 changes: 41 additions & 0 deletions discojs-web/src/loaders/text.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Dataset, Text } from "@epfml/discojs";

class LineStream extends TransformStream<string, string> {
constructor() {
let current_line = "";

super({
transform: (chunk, controller) => {
const [head, ...lines] = chunk.split(/\r\n|\r|\n/);
const first_line = current_line + head;

if (lines.length === 0) {
current_line = first_line;
return;
}

controller.enqueue(first_line);
for (const line of lines.slice(0, -1)) controller.enqueue(line);

current_line = lines[lines.length - 1];
},
flush: (controller) => controller.enqueue(current_line),
});
}
}

export function load(file: Blob): Dataset<Text> {
return new Dataset(async function* () {
const reader = file
.stream()
.pipeThrough(new TextDecoderStream())
.pipeThrough(new LineStream())
.getReader();

while (true) {
const { value: chunk, done } = await reader.read();
if (chunk !== undefined) yield chunk;
if (done) break;
}
});
}
8 changes: 8 additions & 0 deletions discojs-web/vitest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { defineConfig } from "vitest/config";

export default defineConfig({
cacheDir: "../node_modules/.vite/discojs-web",
test: {
environment: "jsdom",
},
});
Loading

0 comments on commit 566c44d

Please sign in to comment.