Skip to content

Commit

Permalink
Restart compile/analyze process using button (#1046)
Browse files Browse the repository at this point in the history
* add the ability to cancel project compilation

* get rid of parameter

* add the ability to restart compile/analyze by pressing the button

* delete logs
  • Loading branch information
pgrivachev authored Jun 30, 2023
1 parent f632fb6 commit 8be9c8a
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 129 deletions.
38 changes: 26 additions & 12 deletions server/src/DestinationContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { DagNode } from './dag/DagNode';
import { DbtProfileSuccess } from './DbtProfileCreator';
import { DbtRepository } from './DbtRepository';
import { DestinationDefinition } from './DestinationDefinition';
import { AnalyzeResult, AnalyzeTrackerFunc, ModelsAnalyzeResult, ProjectAnalyzer } from './ProjectAnalyzer';
import { AnalyzeResult, ModelsAnalyzeResult, ProjectAnalyzer } from './ProjectAnalyzer';
import { AnalyzeTrackerFunc, ProjectAnalyzeTask } from './ProjectAnalyzeTask';
import { SnowflakeZetaSqlWrapper } from './SnowflakeZetaSqlWrapper';
import { SqlHeaderAnalyzer } from './SqlHeaderAnalyzer';
import { SupportedDestinations, ZetaSqlApi } from './ZetaSqlApi';
Expand All @@ -17,7 +18,9 @@ export class DestinationContext {
private static readonly NOT_INITIALIZED_ERROR = 'projectAnalyzer is not initialized';

destinationDefinition?: DestinationDefinition;
projectAnalyzer?: ProjectAnalyzer;
private projectAnalyzer?: ProjectAnalyzer;
private projectAnalyzeTask?: ProjectAnalyzeTask;
private projectName?: string;

contextInitialized = false;
onContextInitializedEmitter = new Emitter<void>();
Expand All @@ -43,6 +46,7 @@ export class DestinationContext {
ubuntuInWslWorks: boolean,
projectName: string,
): Promise<Result<void, string>> {
this.projectName = projectName;
if (profileResult.dbtProfile && this.canUseDestination(profileResult, ubuntuInWslWorks)) {
try {
const clientResult = await profileResult.dbtProfile.createClient(profileResult.targetConfig);
Expand All @@ -62,7 +66,7 @@ export class DestinationContext {
? new BigQueryZetaSqlWrapper(destinationClient, zetaSqlApi, zetaSqlParser, sqlHeaderAnalyzer)
: new SnowflakeZetaSqlWrapper(destinationClient, zetaSqlApi, zetaSqlParser, sqlHeaderAnalyzer);

this.projectAnalyzer = new ProjectAnalyzer(dbtRepository, projectName, destinationClient, zetaSqlWrapper);
this.projectAnalyzer = new ProjectAnalyzer(dbtRepository, destinationClient, zetaSqlWrapper);
await this.projectAnalyzer.initialize();
} catch (e) {
const message = e instanceof Error ? e.message : JSON.stringify(e);
Expand All @@ -88,28 +92,38 @@ export class DestinationContext {
);
}

async analyzeModel(node: DagNode): Promise<ModelsAnalyzeResult[]> {
async analyzeModel(node: DagNode, signal: AbortSignal): Promise<ModelsAnalyzeResult[]> {
this.ensureProjectAnalyzer(this.projectAnalyzer);
return this.projectAnalyzer.analyzeModel(node);
return this.projectAnalyzer.analyzeModel(node, signal);
}

async analyzeModelTree(node: DagNode, sql?: string): Promise<ModelsAnalyzeResult[]> {
async analyzeModelTree(node: DagNode, sql: string | undefined, signal: AbortSignal): Promise<ModelsAnalyzeResult[]> {
this.ensureProjectAnalyzer(this.projectAnalyzer);
return this.projectAnalyzer.analyzeModelTree(node, sql);
return this.projectAnalyzer.analyzeModelTree(node, sql, signal);
}

async analyzeSql(sql: string): Promise<AnalyzeResult> {
async analyzeSql(sql: string, signal: AbortSignal): Promise<AnalyzeResult> {
this.ensureProjectAnalyzer(this.projectAnalyzer);
return this.projectAnalyzer.analyzeSql(sql);
return this.projectAnalyzer.analyzeSql(sql, signal);
}

// TODO: delete
cancelAnalyze(): void {
this.projectAnalyzeTask?.stop();
}

async analyzeProject(analyzeTracker: AnalyzeTrackerFunc): Promise<ModelsAnalyzeResult[]> {
this.ensureProjectAnalyzer(this.projectAnalyzer);
return this.projectAnalyzer.analyzeProject(analyzeTracker);
if (!this.projectName) {
throw new Error('projectName is not initialized');
}
this.projectAnalyzeTask?.stop();
this.projectAnalyzeTask = new ProjectAnalyzeTask(this.projectAnalyzer, this.projectName, analyzeTracker);
return this.projectAnalyzeTask.start();
}

resetTables(): void {
this.projectAnalyzer?.resetTables();
resetCache(): void {
this.projectAnalyzer?.resetCache();
}

getColumnsInRelation(db: string | undefined, schema: string | undefined, tableName: string): KnownColumn[] | undefined {
Expand Down
11 changes: 10 additions & 1 deletion server/src/MacroCompilationServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ interface QueryParams {

export class MacroCompilationServer {
port?: number;
private abortController = new AbortController();

constructor(private destinationContext: DestinationContext, private dbtRepository: DbtRepository) {}

Expand All @@ -30,7 +31,10 @@ export class MacroCompilationServer {
n => n.getValue().database === db && n.getValue().schema === schema && n.getValue().name === queryParams.table,
);
if (node) {
await this.destinationContext.analyzeModel(node);
await this.destinationContext.analyzeModel(node, this.abortController.signal);
if (this.abortController.signal.aborted) {
return;
}
const columns = this.destinationContext.getColumnsInRelation(db, schema, queryParams.table);
if (columns) {
res.send(columns.map(c => [c.name, c.type]));
Expand All @@ -49,4 +53,9 @@ export class MacroCompilationServer {
app.listen(this.port, 'localhost');
console.log(`Macro compilation server started on port ${this.port}`);
}

cancelActiveTasks(): void {
this.abortController.abort();
this.abortController = new AbortController();
}
}
4 changes: 2 additions & 2 deletions server/src/ModelCompiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class ModelCompiler {

if (this.dbtCompileJobQueue.length > 3) {
const jobToStop = this.dbtCompileJobQueue.shift();
jobToStop?.forceStop().catch(e => console.log(`Failed to stop job: ${e instanceof Error ? e.message : String(e)}`));
jobToStop?.forceStop();
}
this.startNewJob(modelPath, allowFallback);

Expand Down Expand Up @@ -63,7 +63,7 @@ export class ModelCompiler {
if (result) {
const jobsToStop = this.dbtCompileJobQueue.splice(0, i + 1);
for (let j = 0; j < i; j++) {
jobsToStop[j].forceStop().catch(e => console.log(`Failed to stop job: ${e instanceof Error ? e.message : String(e)}`));
jobsToStop[j].forceStop();
}

if (result.isErr()) {
Expand Down
73 changes: 73 additions & 0 deletions server/src/ProjectAnalyzeTask.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { AnalyzeResult, ModelsAnalyzeResult, ProjectAnalyzer } from './ProjectAnalyzer';

export type AnalyzeTrackerFunc = (completedCount: number, modelsCount: number) => void;

export class ProjectAnalyzeTask {
private stopRequested = false;
private abortController = new AbortController();

constructor(private projectAnalyzer: ProjectAnalyzer, private projectName: string, private analyzeTracker: AnalyzeTrackerFunc) {}

/** Analyzes models from project starting from the roots and stopping if there is error in node */
async start(): Promise<ModelsAnalyzeResult[]> {
console.log('Project analysis started...');

const visited = new Set<string>();
let queue = this.projectAnalyzer.dbtRepository.dag.getRootNodes(this.projectName);
const modelCount = this.projectAnalyzer.dbtRepository.dag.getNodesCount(this.projectName);

const results: ModelsAnalyzeResult[] = [];
const visitedModels = new Map<string, Promise<AnalyzeResult>>();
this.analyzeTracker(visited.size, modelCount);

while (queue.length > 0) {
const currentLevel = queue;
queue = [];

const promises = currentLevel.map(async node => {
const id = node.getValue().uniqueId;
if (visited.has(id)) {
return;
}
visited.add(id);

const analyzeResults = await this.projectAnalyzer.analyzeModelTreeInternal(node, undefined, visitedModels, this.abortController.signal);
results.push(...analyzeResults);

for (const child of node.children) {
if (!visited.has(child.getValue().uniqueId)) {
queue.push(child);
}
}
});

await Promise.all(promises);
if (this.stopRequested) {
throw new Error('Canceled');
}
this.analyzeTracker(visited.size, modelCount);
}

console.log('Project analysis completed');
return this.filterErrorResults(results);
}

/** Filters all errors. Returns only root errors */
private filterErrorResults(results: ModelsAnalyzeResult[]): ModelsAnalyzeResult[] {
const errorResults = results.filter(r => r.analyzeResult.ast.isErr());
const idToExclude = new Set(
errorResults
.filter(r => {
const current = this.projectAnalyzer.dbtRepository.dag.nodes.find(n => n.getValue().uniqueId === r.modelUniqueId);
return current?.findParent(p => errorResults.some(er => er.modelUniqueId === p.getValue().uniqueId));
})
.map(r => r.modelUniqueId),
);
return results.filter(r => !idToExclude.has(r.modelUniqueId));
}

stop(): void {
this.stopRequested = true;
this.abortController.abort();
}
}
Loading

0 comments on commit 8be9c8a

Please sign in to comment.