Skip to content

Commit

Permalink
Merge pull request #176 from keboola/zajca-ct-1843-bq-transaction
Browse files Browse the repository at this point in the history
CT-1843 run full import truncate in transaction
  • Loading branch information
zajca authored Dec 17, 2024
2 parents bf17455 + 42c8c1d commit 22a52bd
Showing 1 changed file with 41 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,41 @@ private function doLoadFullWithoutDedup(
ImportState $state,
Session $session,
): void {
// truncate destination table
$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getTruncateTable(
$destinationTableDefinition->getSchemaName(),
$destinationTableDefinition->getTableName(),
),
$session->getAsQueryOptions(),
));
$state->startTimer(self::TIMER_COPY_TO_TARGET);

// move data with INSERT INTO
$sql = $this->sqlBuilder->getInsertAllIntoTargetTableCommand(
$stagingTableDefinition,
$destinationTableDefinition,
$options,
DateTimeHelper::getNowFormatted(),
);
$this->bqClient->runQuery($this->bqClient->query(
$sql,
$session->getAsQueryOptions(),
));
try {
$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getBeginTransaction(),
$session->getAsQueryOptions(),
));
// truncate destination table
$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getTruncateTable(
$destinationTableDefinition->getSchemaName(),
$destinationTableDefinition->getTableName(),
),
$session->getAsQueryOptions(),
));
$state->startTimer(self::TIMER_COPY_TO_TARGET);// move data with INSERT INTO
$sql = $this->sqlBuilder->getInsertAllIntoTargetTableCommand(
$stagingTableDefinition,
$destinationTableDefinition,
$options,
DateTimeHelper::getNowFormatted(),
);
$this->bqClient->runQuery($this->bqClient->query(
$sql,
$session->getAsQueryOptions(),
));
$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getCommitTransaction(),
$session->getAsQueryOptions(),
));
} catch (Throwable $e) {
$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getRollbackTransaction(),
$session->getAsQueryOptions(),
));
throw $e;
}
$state->stopTimer(self::TIMER_COPY_TO_TARGET);
}

Expand Down Expand Up @@ -112,7 +126,6 @@ public function importToTable(
return $state->getResult();
}


private function doFullLoadWithDedup(
BigqueryTableDefinition $stagingTableDefinition,
BigqueryTableDefinition $destinationTableDefinition,
Expand Down Expand Up @@ -143,6 +156,12 @@ private function doFullLoadWithDedup(
$deduplicationTableName,
))->getTableDefinition();

$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getBeginTransaction(),
$session->getAsQueryOptions(),
));
$transactionStarted = true;

// 3 truncate destination table
$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getTruncateTable(
Expand All @@ -152,12 +171,6 @@ private function doFullLoadWithDedup(
$session->getAsQueryOptions(),
));

$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getBeginTransaction(),
$session->getAsQueryOptions(),
));
$transactionStarted = true;

// 4 move data with INSERT INTO
$this->bqClient->runQuery($this->bqClient->query(
$this->sqlBuilder->getInsertAllIntoTargetTableCommand(
Expand Down

0 comments on commit 22a52bd

Please sign in to comment.