Skip to content

Commit

Permalink
update dst override
Browse files Browse the repository at this point in the history
  • Loading branch information
jeniii committed Jan 29, 2025
1 parent f9af477 commit 4e939d2
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 50 deletions.
36 changes: 17 additions & 19 deletions airbyte-local-cli-nodejs/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export function cleanUp(context: AirbyteCliContext): void {
}

export function overrideCatalog(
catalog: object | undefined,
catalog: object,
defaultCatalog: AirbyteCatalog,
fullRefresh = false,
): AirbyteConfiguredCatalog {
Expand Down Expand Up @@ -244,32 +244,30 @@ export async function writeCatalog(tmpDir: string, config: FarosConfig): Promise
const srcCatalogFilePath = `${tmpDir}${sep}${FILENAME_PREFIX}_src_catalog.json`;
const dstCatalogFilePath = `${tmpDir}${sep}${FILENAME_PREFIX}_dst_catalog.json`;

// run discover catalog
// run discover catalog to get default catalog
const defaultCatalog = await runDiscoverCatalog(tmpDir, config.src?.image);

// src catalog: override the default catalog with user provided catalog
const srcCatalog = overrideCatalog(config.src?.catalog, defaultCatalog, config.fullRefresh);
let dstCatalog;
// src catalog: override the default with user provided catalog
const srcCatalog = overrideCatalog(config.src?.catalog ?? {}, defaultCatalog, config.fullRefresh);

// dst catalog:
// if dst catalog is not provided, use the src catalog and append the prefix to the stream name
if (!config.dst?.catalog || Object.keys(config.dst.catalog).length === 0) {
// dst catalog: use src catalog or override default with user provided dst catalog
// append dst stream prefix to the stream name
let dstCatalog;
if (Object.keys((config.dst?.catalog as AirbyteCatalog)?.streams ?? []).length === 0) {
dstCatalog = structuredClone(srcCatalog);
dstCatalog.streams.forEach((stream) => {
stream.stream.name = `${config.dstStreamPrefix ?? ''}${stream.stream.name}`;
});
}
// if dst catalog is provided, override the default catalog with user provided catalog
else {
dstCatalog = overrideCatalog(config.dst?.catalog, defaultCatalog, config.fullRefresh);
} else {
dstCatalog = overrideCatalog(config.dst?.catalog ?? {}, defaultCatalog, config.fullRefresh);
}
dstCatalog.streams.forEach((stream) => {
stream.stream.name = `${config.dstStreamPrefix ?? ''}${stream.stream.name}`;
});

logger.debug(`Writing Airbyte catalog to files...`);
writeFileSync(srcCatalogFilePath, JSON.stringify(srcCatalog ?? {}, null, 2));
writeFileSync(dstCatalogFilePath, JSON.stringify(dstCatalog ?? {}, null, 2));
writeFileSync(srcCatalogFilePath, JSON.stringify(srcCatalog, null, 2));
writeFileSync(dstCatalogFilePath, JSON.stringify(dstCatalog, null, 2));
logger.debug(`Airbyte catalog files written to: ${srcCatalogFilePath}, ${dstCatalogFilePath}`);
logger.debug(srcCatalog ?? {}, `Source catalog: `);
logger.debug(dstCatalog ?? {}, `Destination catalog: `);
logger.debug(srcCatalog, `Source catalog: `);
logger.debug(dstCatalog, `Destination catalog: `);
}

// Read file content
Expand Down
74 changes: 56 additions & 18 deletions airbyte-local-cli-nodejs/test/__snapshots__/utils.it.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -46,40 +46,78 @@ exports[`write files to temporary dir loadStateFile should pass with existing st
"
`;

exports[`write files to temporary dir writeCatalog should write files 1`] = `
"{
exports[`write files to temporary dir writeCatalog should succeed with default only 1`] = `
{
"streams": [
{
"destination_sync_mode": "append",
"stream": {
"name": "builds",
"json_schema": {},
"name": "builds",
"supported_sync_modes": [
"full_refresh",
"incremental"
]
"incremental",
],
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}"
},
],
}
`;

exports[`write files to temporary dir writeCatalog should write files 2`] = `
"{
exports[`write files to temporary dir writeCatalog should succeed with default only 2`] = `
{
"streams": [
{
"destination_sync_mode": "append",
"stream": {
"name": "testPrefix__builds",
"json_schema": {},
"name": "testPrefix__builds",
"supported_sync_modes": [
"full_refresh",
"incremental"
]
"incremental",
],
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}"
},
],
}
`;

exports[`write files to temporary dir writeCatalog should succeed with override 1`] = `
{
"streams": [
{
"destination_sync_mode": "overwrite",
"stream": {
"json_schema": {},
"name": "builds",
"supported_sync_modes": [
"full_refresh",
"incremental",
],
},
"sync_mode": "full_refresh",
},
],
}
`;

exports[`write files to temporary dir writeCatalog should succeed with override 2`] = `
{
"streams": [
{
"destination_sync_mode": "overwrite",
"stream": {
"json_schema": {},
"name": "testOverridePrefix__builds",
"supported_sync_modes": [
"full_refresh",
"incremental",
],
},
"sync_mode": "full_refresh",
},
],
}
`;
63 changes: 50 additions & 13 deletions airbyte-local-cli-nodejs/test/utils.it.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,7 @@ describe('write files to temporary dir', () => {
beforeAll(() => {
writeFileSync(srcConfigPath, '{}');
});
afterEach(() => {
rmSync(srcConfigPath, {force: true});
rmSync(srcCatalogPath, {force: true});
rmSync(dstCatalogPath, {force: true});
});

it('should write files', async () => {
beforeEach(() => {
(runDiscoverCatalog as jest.Mock).mockResolvedValue({
streams: [
{
Expand All @@ -195,17 +189,61 @@ describe('write files to temporary dir', () => {
},
],
});
const testConfigWithCatalog = {
});
afterEach(() => {
rmSync(srcConfigPath, {force: true});
rmSync(srcCatalogPath, {force: true});
rmSync(dstCatalogPath, {force: true});
});

it('should succeed with default only', async () => {
const emptyCatalogTestConfig = {
...structuredClone(testConfig),
src: {...testConfig.src, catalog: {}},
dstStreamPrefix: 'testPrefix__',
} as FarosConfig;
await writeCatalog(tmpDirPath, testConfigWithCatalog);
await writeCatalog(tmpDirPath, emptyCatalogTestConfig);

expect(existsSync(srcCatalogPath)).toBe(true);
expect(existsSync(dstCatalogPath)).toBe(true);
const srcCatalog = JSON.parse(readFileSync(srcCatalogPath, 'utf8'));
const dstCatalog = JSON.parse(readFileSync(dstCatalogPath, 'utf8'));
expect(srcCatalog.streams[0].sync_mode).toBe('incremental');
expect(srcCatalog.streams[0].destination_sync_mode).toBe('append');
expect(dstCatalog.streams[0].sync_mode).toBe('incremental');
expect(dstCatalog.streams[0].destination_sync_mode).toBe('append');
expect(dstCatalog.streams[0].stream.name).toBe('testPrefix__builds');
expect(srcCatalog).toMatchSnapshot();
expect(dstCatalog).toMatchSnapshot();
});

it('should succeed with override', async () => {
const overrideCatalog = {
streams: [
{
stream: {name: 'builds'},
sync_mode: 'full_refresh',
},
],
};
const catalogTestConfig = {
...structuredClone(testConfig),
src: {...testConfig.src, catalog: overrideCatalog},
dstStreamPrefix: 'testOverridePrefix__',
} as FarosConfig;
await writeCatalog(tmpDirPath, catalogTestConfig);

expect(existsSync(srcCatalogPath)).toBe(true);
expect(existsSync(dstCatalogPath)).toBe(true);
expect(readFileSync(srcCatalogPath, 'utf8')).toMatchSnapshot();
expect(readFileSync(dstCatalogPath, 'utf8')).toMatchSnapshot();
const srcCatalog = JSON.parse(readFileSync(srcCatalogPath, 'utf8'));
const dstCatalog = JSON.parse(readFileSync(dstCatalogPath, 'utf8'));
expect(srcCatalog.streams[0].sync_mode).toBe('full_refresh');
expect(srcCatalog.streams[0].destination_sync_mode).toBe('overwrite');
expect(dstCatalog.streams[0].sync_mode).toBe('full_refresh');
expect(dstCatalog.streams[0].destination_sync_mode).toBe('overwrite');
expect(dstCatalog.streams[0].stream.name).toBe('testOverridePrefix__builds');
expect(srcCatalog).toMatchSnapshot();
expect(dstCatalog).toMatchSnapshot();
});
});
});
Expand Down Expand Up @@ -240,8 +278,7 @@ describe('processSrcInputFile', () => {
srcOutputFile: '/dev/null',
};
await expect(processSrcInputFile(tmpDir, cfg)).rejects.toThrow(
`Failed to process the source output data: Line of data: ` +
`'invalid json'; Error: Unexpected token 'i', "invalid json" is not valid JSON`,
`Failed to process the source output data: Line of data: 'invalid json'`,
);
});

Expand Down

0 comments on commit 4e939d2

Please sign in to comment.