Skip to content

Commit

Permalink
load state
Browse files Browse the repository at this point in the history
  • Loading branch information
jeniii committed Jan 22, 2025
1 parent 5143195 commit 5e88dfa
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
2 changes: 1 addition & 1 deletion airbyte-local-cli-nodejs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async function main(): Promise<void> {
// Run airbyte source connector
if (!cfg.srcInputFile) {
await logImageVersion(ImageType.SRC, cfg.src?.image);
cfg.dstStreamPrefix = generateDstStreamPrefix(cfg);
generateDstStreamPrefix(cfg);
loadStateFile(context.tmpDir, cfg?.stateFile, cfg?.connectionName);
await runSrcSync(context.tmpDir, cfg);
} else {
Expand Down
13 changes: 7 additions & 6 deletions airbyte-local-cli-nodejs/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export function loadStateFile(tempDir: string, filePath?: string, connectionName
);
}
writeFileSync(`${tempDir}/${DEFAULT_STATE_FILE}`, '{}');
logger.debug(`State file '${DEFAULT_STATE_FILE}' not found. An empty state file is created.`);
logger.debug(`State file '${path}' not found. An empty state file will be created.`);
}
}

Expand Down Expand Up @@ -255,7 +255,7 @@ export function processSrcDataByLine(line: string, outputStream: Writable, cfg:

// non RECORD and STATE type messages: print as stdout
// RECORD and STATE type messages: when the output is set to stdout
if ((data.type !== 'RECORD' && data.type !== 'STATE') || cfg.srcOutputFile === OutputStream.STDOUT) {
if ((data?.type !== 'RECORD' && data?.type !== 'STATE') || cfg.srcOutputFile === OutputStream.STDOUT) {
if (cfg.rawMessages) {
process.stdout.write(`${line}\n`);
} else {
Expand Down Expand Up @@ -314,8 +314,7 @@ export function processSrcInputFile(tmpDir: string, cfg: FarosConfig): Promise<v
});
}

export function generateDstStreamPrefix(cfg: FarosConfig): string {
let dst_stream_prefix = '';
export function generateDstStreamPrefix(cfg: FarosConfig): void {
const srcImage = cfg.src?.image;
const dstImage = cfg.dst?.image;
if (dstImage?.startsWith('farosai/airbyte-faros-destination')) {
Expand All @@ -326,14 +325,16 @@ export function generateDstStreamPrefix(cfg: FarosConfig): string {
(cfg.src?.config as any)?.feed_cfg?.feed_name
) {
cfg.connectionName = `${(cfg.src?.config as any)?.feed_cfg?.feed_name}-feed`;
logger.debug(`Using connection name: ${cfg.connectionName}`);
}
// if image is an airbyte image
if (srcImage?.startsWith('farosai/airbyte')) {
const [imageName] = srcImage.split(':');
const imageParts = imageName?.split('-').slice(1, -1);
cfg.connectionName = `my${imageParts?.join('') ?? ''}src`;
dst_stream_prefix = `${cfg.connectionName}_${imageParts?.join('_') ?? ''}__`;
cfg.dstStreamPrefix = `${cfg.connectionName}_${imageParts?.join('_') ?? ''}__`;
logger.debug(`Using connection name: ${cfg.connectionName}`);
logger.debug(`Using destination stream prefix: ${cfg.dstStreamPrefix}`);
}
}
return dst_stream_prefix;
}

0 comments on commit 5e88dfa

Please sign in to comment.