Skip to content

Commit

Permalink
feat: use tar-fs to produce a tar stream
Browse files Browse the repository at this point in the history
Rather than building with CLI GNU tar, because that way we can control
the contents more closely
  • Loading branch information
Dominic Scheirlinck committed Jan 14, 2022
1 parent 21f5222 commit c4c9085
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 95 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"mkdirp": "1.0.4",
"rimraf": "3.0.2",
"split2": "4.1.0",
"tar-fs": "2.1.1",
"tempy": "1.0.1",
"tiny-async-pool": "1.2.0",
"toposort": "2.0.2"
Expand All @@ -99,6 +100,7 @@
"@types/mkdirp": "1.0.2",
"@types/rimraf": "3.0.2",
"@types/split2": "3.2.1",
"@types/tar-fs": "2.0.1",
"@types/tempy": "0.3.0",
"@types/tiny-async-pool": "1.0.0",
"@types/toposort": "2.0.3",
Expand Down
3 changes: 0 additions & 3 deletions src/artifacts/compression/compression.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import stream from 'stream';
import execa from 'execa';
import { Artifact } from '../model';

export type TarInputArgs = { argv: string[]; input: string } | { file: string };

export interface Compression {
/**
* inflate decompresses an input stream (usually an in-progress artifact download), writing decompressed files to disk
Expand Down
6 changes: 3 additions & 3 deletions src/artifacts/compression/tar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import fs from 'fs';
import { pipeline as pipelineCb } from 'stream';
import util from 'util';
import debug from 'debug';
import execa, { ExecaReturnValue } from 'execa';
import { EmptyArgsError, exec, hasBin } from '../../util/exec';
import { ExecaReturnValue } from 'execa';
import { exec, hasBin } from '../../util/exec';
import { tar as tarBin } from '../../util/tar';
import { Compression, TarInputArgs } from './compression';
import { Compression } from './compression';

const pipeline = util.promisify(pipelineCb);

Expand Down
116 changes: 76 additions & 40 deletions src/artifacts/matcher.ts
Original file line number Diff line number Diff line change
@@ -1,72 +1,108 @@
import fs from 'fs';
import stream, { pipeline as pipelineSync } from 'stream';
import { promisify } from 'util';
import { promises as fs, PathLike } from 'fs';
import path from 'path';
import debug from 'debug';
import _ from 'lodash';
import split from 'split2';
import { globSet } from '../util/glob';
import { depthSort } from '../util/tar';
import { count } from '../util/helper';

const pipeline = promisify(pipelineSync);
const exists = async (file: string) => {
try {
await fs.stat(file);
return true;
} catch (err) {
return false;
}
};

const log = debug('monofo:artifact:matcher');

/**
* Utility class that can act as a writable stream of file name chunks, or receive globs
*/
class MatchedFiles extends stream.Writable {
constructor(private _matched: string[] = []) {
super({
objectMode: true,
write: (chunk: string, _encoding, next) => {
this._matched.push(chunk);
next();
},
});
}
export interface PathsToPack {
[path: string]: { recurse: boolean };
}

async addGlobs(globs: string[]): Promise<void> {
this._matched = [...this._matched, ...(await globSet(_.castArray(globs), { matchBase: false }))];
}
function isRoot(p: string): boolean {
return p === '' || p === '/' || p === '.';
}

export function addIntermediateDirectories(toPack: PathsToPack): PathsToPack {
const repacked: PathsToPack = {};

const addWithParents = (p: string, recurse: boolean): void => {
if (isRoot(p)) {
return;
}

const parent = path.dirname(p);

get matched() {
return this._matched;
if (!isRoot(parent) && !(parent in repacked)) {
log(`Adding intermediate directory to included paths to upload: ${parent}`);
addWithParents(parent, false);
}

repacked[p] = { recurse };
};

for (const [p, { recurse }] of Object.entries(toPack)) {
addWithParents(p, recurse);
}

return repacked;
}

export async function filesToUpload({
/**
* The files will be passed to tar in the order shown, and then tar will
* recurse into each entry if it's a directory (because --recursive is the
* default) - it should use the --sort argument (if your tar is new enough)
* to sort the eventual input file list, but they'll still be ordered according
* to the order of this files argument
*
* This is problematic if the paths don't contain every intermediate directory
*
* To fix this, and make the eventual tar compatible with catar, we do the
* recursion into files ourselves.
*/
export async function pathsToUpload({
filesFrom,
globs,
useNull = false,
}: {
filesFrom?: string;
globs?: string[];
useNull?: boolean;
}): Promise<string[]> {
}): Promise<Record<string, { recurse: boolean }>> {
if (!filesFrom && !globs) {
return [];
return {};
}

const matched = new MatchedFiles();
const matching: Promise<void>[] = [];

if (globs) {
matching.push(matched.addGlobs(globs));
}
const paths: Record<string, { recurse: boolean }> = Object.fromEntries(
(
await globSet(
_.castArray(globs).filter((v) => v),
{ matchBase: false }
)
).map((p) => [`./${p}`, { recurse: false }])
);

if (filesFrom) {
if (filesFrom !== '-' && !fs.existsSync(filesFrom)) {
if (filesFrom !== '-' && !(await exists(filesFrom))) {
throw new Error(`Could not find file to read file list from: ${filesFrom}`);
}

log(`Reading from ${filesFrom !== '-' ? filesFrom : 'stdin'}`);
const source: stream.Readable =
filesFrom === '-' ? process.stdin : fs.createReadStream(filesFrom, { encoding: 'utf8', autoClose: true });
const source: string =
filesFrom === '-'
? await fs.readFile(0 as unknown as PathLike, 'utf8') // 0 is process.stdin
: await fs.readFile(filesFrom, { encoding: 'utf8' });

for (const p of source.split(useNull ? '\x00' : '\n').filter((v) => v)) {
paths[p] = { recurse: true };
}
}

matching.push(pipeline(source, split(useNull ? '\x00' : '\n'), matched));
// TODO: pipeline is sync here anyway, so this can be simplified
if (!Object.keys(paths).every((p) => p.startsWith('./'))) {
throw new Error('Expected to be given only relative paths to recurse, relative to CWD');
}

await Promise.all(matching);
return depthSort(matched.matched);
log(`Globs and file input matched ${count(Object.keys(paths), 'path')}`);
return addIntermediateDirectories(paths);
}
43 changes: 12 additions & 31 deletions src/commands/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ import { flags as f } from '@oclif/command';
import debug from 'debug';
import { upload } from '../artifacts/api';
import { deflator } from '../artifacts/compression';
import { filesToUpload } from '../artifacts/matcher';
import { PathsToPack, pathsToUpload } from '../artifacts/matcher';
import { Artifact } from '../artifacts/model';
import { BaseCommand, BaseFlags } from '../command';
import { exec } from '../util/exec';
import { count } from '../util/helper';
import { tar } from '../util/tar';
import { produceTarStream } from '../util/tar';

const log = debug('monofo:cmd:upload');

Expand Down Expand Up @@ -94,39 +92,22 @@ locally cached
.slice(1);

const artifact = new Artifact(args.output);
const files = await filesToUpload({ globs: args.globs, filesFrom: flags['files-from'], useNull: flags.null });

if (files.length === 0) {
log('No files to upload: nothing to do');
return;
}

/*
* The files will be passed to tar in the order shown, and then tar will
* recurse into each entry if it's a directory (because --recursive is the
* default) - it should use the --sort argument (if your tar is new enough)
* to sort the eventual input file list, but they'll still be ordered according
* to the order of this files argument
*/
log(`Uploading ${count(files, 'path')} as ${args.output}`, files);

const tarBin = await tar();
const tarArgs = ['-c', ...tarBin.createArgs, '--hard-dereference', '--null', '--files-from', '-'];

log(`About to run: ${tarBin.bin} ${tarArgs.join(' ')} <<< '${files.join(',')}'`);

const subprocess = exec(tarBin.bin, tarArgs, {
stdout: 'pipe',
buffer: false,
input: `${files.join('\x00')}\x00`,
const paths: PathsToPack = await pathsToUpload({
globs: args.globs,
filesFrom: flags['files-from'],
useNull: flags.null,
});

if (!subprocess.stdout || !subprocess.stdin) {
throw new Error('Expected to be piped stdout/stdin from tar process');
if (Object.keys(paths).length === 0) {
log('No files to upload: nothing to do');
return;
}

await deflator(artifact, subprocess.stdout);
const tarStream = produceTarStream(paths);

log(`Deflating archive`);
await deflator(artifact, tarStream);
log(`Archive deflated at ${args.output}`);

log('Uploading to Buildkite');
Expand Down
20 changes: 20 additions & 0 deletions src/util/tar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { compare } from 'compare-versions';
import debug from 'debug';
import execa from 'execa';
import _ from 'lodash';
import { pack } from 'tar-fs';
import { PathsToPack } from '../artifacts/matcher';
import { exec, hasBin } from './exec';

const log = debug('monofo:util:tar');
Expand Down Expand Up @@ -66,3 +68,21 @@ export function depthSort(paths: string[]): string[] {
.sort()
.sort((p1, p2) => p1.split(path.sep).length - p2.split(path.sep).length);
}

export function produceTarStream(paths: PathsToPack) {
const prefixMatch: string[] = Object.entries(paths)
.filter(([, { recurse }]) => recurse)
.map(([k]) => (k.startsWith('./') ? k.slice(2) : k));

const exactMatch: Record<string, boolean> = Object.fromEntries(
Object.entries(paths)
.filter(([, { recurse }]) => !recurse)
.map(([k]) => [k.startsWith('./') ? k.slice(2) : k, true])
);

return pack('.', {
ignore: (name: string): boolean => {
return !(name in exactMatch) && prefixMatch.find((prefix) => name.startsWith(prefix)) === undefined;
},
});
}
54 changes: 41 additions & 13 deletions test/artifacts/matcher.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as fs from 'fs';
import { promisify } from 'util';
import { directory } from 'tempy';
import { filesToUpload } from '../../src/artifacts/matcher';
import { addIntermediateDirectories, PathsToPack, pathsToUpload } from '../../src/artifacts/matcher';
import { mergeBase, diff, revList } from '../../src/git';
import { fakeProcess, COMMIT } from '../fixtures';

Expand Down Expand Up @@ -30,16 +30,16 @@ describe('artifact matcher', () => {

await writeFile(`${dir}/foo.txt`, `bar\n`);
await writeFile(`${dir}/bar.txt`, `baz\n`);
await writeFile(`${dir}/file-list.null.txt`, 'foo.txt\x00bar.txt\x00');
await writeFile(`${dir}/file-list.null.txt`, './foo.txt\x00./bar.txt\x00');

const result = filesToUpload({
const result = await pathsToUpload({
filesFrom: `${dir}/file-list.null.txt`,
useNull: true,
});

await expect(result).resolves.toHaveLength(2);
expect(await result).toContain('foo.txt');
expect(await result).toContain('bar.txt');
expect(result['./foo.txt']).toStrictEqual({ recurse: true });
expect(result['./bar.txt']).toStrictEqual({ recurse: true });
expect(Object.keys(result)).toHaveLength(2);
});
});

Expand All @@ -49,16 +49,16 @@ describe('artifact matcher', () => {

await writeFile(`${dir}/foo.txt`, 'bar\n');
await writeFile(`${dir}/bar.txt`, 'baz\n');
await writeFile(`${dir}/file-list.newline.txt`, 'foo.txt\nbar.txt\n');
await writeFile(`${dir}/file-list.newline.txt`, './foo.txt\n./bar.txt\n');

const result = filesToUpload({
const result = await pathsToUpload({
filesFrom: `${dir}/file-list.newline.txt`,
useNull: false,
});

await expect(result).resolves.toHaveLength(2);
expect(await result).toContain('foo.txt');
expect(await result).toContain('bar.txt');
expect(Object.keys(result)).toHaveLength(2);
expect(result['./foo.txt']).toStrictEqual({ recurse: true });
expect(result['./bar.txt']).toStrictEqual({ recurse: true });
});
});

Expand All @@ -69,11 +69,39 @@ describe('artifact matcher', () => {
await writeFile(`${dir}/foo.txt`, 'bar\n');
await writeFile(`${dir}/bar.txt`, 'baz\n');

const result = filesToUpload({
const result = await pathsToUpload({
globs: ['*.txt'],
});

return expect(result).resolves.toHaveLength(2);
expect(Object.keys(result)).toHaveLength(2);
expect(result['./foo.txt']).toStrictEqual({ recurse: false });
expect(result['./bar.txt']).toStrictEqual({ recurse: false });
});
});
});

describe('add intermediate directories', () => {
it('adds intermediate dirs', () => {
const paths: PathsToPack = {
'a/b/c/foo.txt': { recurse: false },
'd/e/f/blah.txt': { recurse: false },
'h/i/j': { recurse: true },
};

const result = addIntermediateDirectories(paths);

expect(Object.keys(result)).toStrictEqual([
'a',
'a/b',
'a/b/c',
'a/b/c/foo.txt',
'd',
'd/e',
'd/e/f',
'd/e/f/blah.txt',
'h',
'h/i',
'h/i/j',
]);
});
});
Loading

0 comments on commit c4c9085

Please sign in to comment.