Skip to content

Commit

Permalink
Merge pull request #136 from keboola/zajca-ct-1118-null
Browse files Browse the repository at this point in the history
CT-1118 null value for import
  • Loading branch information
zajca authored Mar 14, 2024
2 parents 0b27d3b + b9597f5 commit 43f34ea
Show file tree
Hide file tree
Showing 26 changed files with 552 additions and 29 deletions.
4 changes: 2 additions & 2 deletions packages/php-db-import-export/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
"google/cloud-storage": "^1.27",
"keboola/csv-options": "^1",
"keboola/php-csv-db-import": "^6",
"keboola/php-datatypes": "^6.3|^7",
"keboola/php-datatypes": "^7.6",
"keboola/php-file-storage-utils": "^0.2.2",
"keboola/php-temp": "^2.0",
"keboola/table-backend-utils": ">=1.15|^2",
"keboola/table-backend-utils": "^2",
"microsoft/azure-storage-blob": "^1.4",
"symfony/process": "^4.4|^5.0|^6.0"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class BigqueryImportOptions extends ImportOptions
/**
* @param string[] $convertEmptyValuesToNull
* @param self::USING_TYPES_* $usingTypes
* @param string[] $importAsNull
*/
public function __construct(
array $convertEmptyValuesToNull = [],
Expand All @@ -22,13 +23,15 @@ public function __construct(
int $numberOfIgnoredLines = self::SKIP_NO_LINE,
string $usingTypes = self::USING_TYPES_STRING,
?Session $session = null,
array $importAsNull = self::DEFAULT_IMPORT_AS_NULL,
) {
parent::__construct(
$convertEmptyValuesToNull,
$isIncremental,
$useTimestamp,
$numberOfIgnoredLines,
$usingTypes,
convertEmptyValuesToNull: $convertEmptyValuesToNull,
isIncremental: $isIncremental,
useTimestamp: $useTimestamp,
numberOfIgnoredLines: $numberOfIgnoredLines,
usingTypes: $usingTypes,
importAsNull: $importAsNull,
);
$this->session = $session;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\ImportOptionsInterface;
use Keboola\Db\ImportExport\Storage;
use Keboola\TableBackendUtils\Escaping\Bigquery\BigqueryQuote;
use Keboola\TableBackendUtils\Table\Bigquery\BigqueryTableDefinition;
use Keboola\TableBackendUtils\Table\Bigquery\BigqueryTableReflection;
use Keboola\TableBackendUtils\Table\TableDefinitionInterface;
Expand Down Expand Up @@ -70,6 +71,14 @@ public function runCopyCommand(
->quote($source->getCsvOptions()->getEnclosure())
->allowQuotedNewlines(true);

if ($importOptions->importAsNull() !== []) {
// BigQuery allows only one null marker
// we implicitly use the first one and ignore others if any
$loadConfig->nullMarker(
$importOptions->importAsNull()[0],
);
}

$job = $this->bqClient->runJob($loadConfig);

if (!$job->isComplete()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ public function __construct(
string $castValueTypes = self::TABLE_TYPES_PRESERVE,
) {
parent::__construct(
$convertEmptyValuesToNull,
$isIncremental,
$useTimestamp,
$numberOfIgnoredLines,
convertEmptyValuesToNull: $convertEmptyValuesToNull,
isIncremental: $isIncremental,
useTimestamp: $useTimestamp,
numberOfIgnoredLines: $numberOfIgnoredLines,
importAsNull: [], // Exasol does not support importAsNull now
);
$this->castValueTypes = $castValueTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Keboola\Db\ImportExport\Backend\Snowflake;

use Keboola\Db\ImportExport\ImportOptions;
use Keboola\TableBackendUtils\Escaping\Snowflake\SnowflakeQuote;

class SnowflakeImportOptions extends ImportOptions
{
Expand All @@ -19,6 +20,7 @@ class SnowflakeImportOptions extends ImportOptions
* @param self::SAME_TABLES_* $requireSameTables
* @param self::NULL_MANIPULATION_* $nullManipulation
* @param string[] $ignoreColumns
* @param string[] $importAsNull
*/
public function __construct(
array $convertEmptyValuesToNull = [],
Expand All @@ -28,6 +30,7 @@ public function __construct(
bool $requireSameTables = self::SAME_TABLES_NOT_REQUIRED,
bool $nullManipulation = self::NULL_MANIPULATION_ENABLED,
array $ignoreColumns = [],
array $importAsNull = self::DEFAULT_IMPORT_AS_NULL,
) {
parent::__construct(
$convertEmptyValuesToNull,
Expand All @@ -36,6 +39,7 @@ public function __construct(
$numberOfIgnoredLines,
$requireSameTables === self::SAME_TABLES_REQUIRED ? self::USING_TYPES_USER : self::USING_TYPES_STRING,
$ignoreColumns,
$importAsNull,
);
$this->requireSameTables = $requireSameTables;
$this->nullManipulation = $nullManipulation;
Expand All @@ -50,4 +54,19 @@ public function isNullManipulationEnabled(): bool
{
return $this->nullManipulation === self::NULL_MANIPULATION_ENABLED;
}

public function getNullIfSql(): string
{
$nullIf = ', NULL_IF=()';
if ($this->importAsNull() !== []) {
$nullIf = sprintf(', NULL_IF=(%s)', implode(
',',
array_map(
fn(string $s) => SnowflakeQuote::quote($s),
$this->importAsNull(),
),
));
}
return $nullIf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static function ($entry) use ($source) {
'COPY INTO %s.%s
FROM %s
CREDENTIALS=(AZURE_SAS_TOKEN=\'%s\')
FILE_FORMAT = (TYPE=CSV %s, NULL_IF=(\'\'))
FILE_FORMAT = (TYPE=CSV %s%s)
FILES = (%s)',
SnowflakeQuote::quoteSingleIdentifier($destination->getSchemaName()),
SnowflakeQuote::quoteSingleIdentifier($destination->getTableName()),
Expand All @@ -105,6 +105,7 @@ static function ($entry) use ($source) {
$source->getCsvOptions(),
),
),
$importOptions->getNullIfSql(),
implode(', ', $quotedFiles),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private function getCopyCommand(
return sprintf(
'COPY INTO %s.%s FROM %s
STORAGE_INTEGRATION = %s,
FILE_FORMAT = (TYPE=CSV %s, NULL_IF=(\'\'))
FILE_FORMAT = (TYPE=CSV %s%s)
FILES = (%s)',
SnowflakeQuote::quoteSingleIdentifier($destination->getSchemaName()),
SnowflakeQuote::quoteSingleIdentifier($destination->getTableName()),
Expand All @@ -96,6 +96,7 @@ private function getCopyCommand(
$csvOptions->getEnclosure() ? SnowflakeQuote::quote($csvOptions->getEnclosure()) : 'NONE',
$csvOptions->getEscapedBy() ? SnowflakeQuote::quote($csvOptions->getEscapedBy()) : 'NONE',
),
$importOptions->getNullIfSql(),
implode(
', ',
array_map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private function getCopyCommand(
'COPY INTO %s.%s FROM %s
CREDENTIALS = (AWS_KEY_ID = %s AWS_SECRET_KEY = %s)
REGION = %s
FILE_FORMAT = (TYPE=CSV %s, NULL_IF=(\'\'))
FILE_FORMAT = (TYPE=CSV %s%s)
FILES = (%s)',
SnowflakeQuote::quoteSingleIdentifier($destination->getSchemaName()),
SnowflakeQuote::quoteSingleIdentifier($destination->getTableName()),
Expand All @@ -98,6 +98,7 @@ private function getCopyCommand(
$csvOptions->getEnclosure() ? SnowflakeQuote::quote($csvOptions->getEnclosure()) : 'NONE',
$csvOptions->getEscapedBy() ? SnowflakeQuote::quote($csvOptions->getEscapedBy()) : 'NONE',
),
$importOptions->getNullIfSql(),
implode(
', ',
array_map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ public function __construct(
string $tableToTableAdapter = self::TABLE_TO_TABLE_ADAPTER_INSERT_INTO,
) {
parent::__construct(
$convertEmptyValuesToNull,
$isIncremental,
$useTimestamp,
$numberOfIgnoredLines,
$requireSameTables === self::SAME_TABLES_REQUIRED ? self::USING_TYPES_USER : self::USING_TYPES_STRING,
convertEmptyValuesToNull: $convertEmptyValuesToNull,
isIncremental: $isIncremental,
useTimestamp: $useTimestamp,
numberOfIgnoredLines: $numberOfIgnoredLines,
usingTypes: $requireSameTables === self::SAME_TABLES_REQUIRED
? self::USING_TYPES_USER
: self::USING_TYPES_STRING,
importAsNull: [], // Synapse does not support importAsNull now
);
$this->importCredentialsType = $importCredentialsType;
$this->castValueTypes = $castValueTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public function getUpdateWithPkCommand(
static fn(SynapseColumn $columnDefinition): bool => !StringCaseSensitivity::isInArrayCaseInsensitive(
$columnDefinition->getColumnName(),
$destinationDefinition->getPrimaryKeysNames(),
)
),
);

// update only changed rows - mysql TIMESTAMP ON UPDATE behaviour simulation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ public function __construct(
string $usingTypes = self::USING_TYPES_STRING,
) {
parent::__construct(
$convertEmptyValuesToNull,
$isIncremental,
$useTimestamp,
$numberOfIgnoredLines,
$usingTypes,
convertEmptyValuesToNull: $convertEmptyValuesToNull,
isIncremental: $isIncremental,
useTimestamp: $useTimestamp,
numberOfIgnoredLines: $numberOfIgnoredLines,
usingTypes: $usingTypes,
importAsNull: [], // Teradata does not support importAsNull now
);
$this->teradataHost = $teradataHost;
$this->teradataUser = $teradataUser;
Expand Down
14 changes: 14 additions & 0 deletions packages/php-db-import-export/src/ImportOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ class ImportOptions implements ImportOptionsInterface
/** @var string[] */
private array $ignoreColumns;

/** @var string[] */
private array $importAsNull;

private bool $useTimestamp;

/** @var string[] */
Expand All @@ -30,6 +33,7 @@ class ImportOptions implements ImportOptionsInterface
* @param string[] $convertEmptyValuesToNull
* @param self::USING_TYPES_* $usingTypes
* @param string[] $ignoreColumns
* @param string[] $importAsNull
*/
public function __construct(
array $convertEmptyValuesToNull = [],
Expand All @@ -38,13 +42,15 @@ public function __construct(
int $numberOfIgnoredLines = self::SKIP_NO_LINE,
string $usingTypes = self::USING_TYPES_STRING,
array $ignoreColumns = [],
array $importAsNull = self::DEFAULT_IMPORT_AS_NULL,
) {
$this->useTimestamp = $useTimestamp;
$this->convertEmptyValuesToNull = $convertEmptyValuesToNull;
$this->isIncremental = $isIncremental;
$this->numberOfIgnoredLines = $numberOfIgnoredLines;
$this->usingTypes = $usingTypes;
$this->ignoreColumns = $ignoreColumns;
$this->importAsNull = $importAsNull;
}

/**
Expand Down Expand Up @@ -79,4 +85,12 @@ public function ignoreColumns(): array
{
return $this->ignoreColumns;
}

/**
* @return string[]
*/
public function importAsNull(): array
{
return $this->importAsNull;
}
}
10 changes: 10 additions & 0 deletions packages/php-db-import-export/src/ImportOptionsInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ interface ImportOptionsInterface
public const SKIP_NO_LINE = 0;
public const SKIP_FIRST_LINE = 1;

public const DEFAULT_IMPORT_AS_NULL = ['']; // default import empty string as null

/** @return string[] */
public function getConvertEmptyValuesToNull(): array;

Expand All @@ -27,4 +29,12 @@ public function usingUserDefinedTypes(): bool;
* @return string[]
*/
public function ignoreColumns(): array;

/**
* List of values which are during the import converted to null
* this option is mainly passed directly to the backend in some option
* like NULL_IF in SNFLK
* @return string[]
*/
public function importAsNull(): array;
}
6 changes: 3 additions & 3 deletions packages/php-db-import-export/tests/Common/StorageTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public function listFiles(string $dir, bool $excludeManifest = true): array
if ($excludeManifest) {
$blobs = array_filter(
$blobs,
static fn(array $blob) => !strpos($blob['Key'], 'manifest')
static fn(array $blob) => !strpos($blob['Key'], 'manifest'),
);
}
return $blobs;
Expand All @@ -291,7 +291,7 @@ public function listFiles(string $dir, bool $excludeManifest = true): array
if ($excludeManifest) {
$blobs = array_filter(
$blobs,
static fn(Blob $blob) => !strpos($blob->getName(), 'manifest')
static fn(Blob $blob) => !strpos($blob->getName(), 'manifest'),
);
}
return $blobs;
Expand All @@ -304,7 +304,7 @@ public function listFiles(string $dir, bool $excludeManifest = true): array
if ($excludeManifest) {
$objects = array_filter(
$objects,
static fn(StorageObject $blob) => !strpos($blob->name(), 'manifest')
static fn(StorageObject $blob) => !strpos($blob->name(), 'manifest'),
);
}
return $objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Keboola\CsvOptions\CsvOptions;
use Keboola\Db\ImportExport\Backend\Bigquery\BigqueryException;
use Keboola\Db\ImportExport\Backend\Bigquery\BigqueryImportOptions;
use Keboola\Db\ImportExport\Backend\Bigquery\BigqueryInputDataException;
use Keboola\Db\ImportExport\Backend\Bigquery\ToStage\ToStageImporter;
use Keboola\Db\ImportExport\Storage\Bigquery\Table;
Expand Down Expand Up @@ -59,6 +60,50 @@ public function testSimpleStageImport(): void
self::assertEquals(1, $state->getResult()->getImportedRowsCount());
}

public function testStageImportNullBehavior(): void
{
$query = $this->bqClient->query(
sprintf(
'CREATE TABLE %s.%s (
`id` INTEGER,
`first_name` STRING(100),
`last_name` STRING(100)
);',
BigqueryQuote::quoteSingleIdentifier(self::TEST_DATABASE),
BigqueryQuote::quoteSingleIdentifier(self::TABLE_GENERIC),
),
);
$this->bqClient->runQuery($query);

$importer = new ToStageImporter($this->bqClient);
$ref = new BigqueryTableReflection(
$this->bqClient,
self::TEST_DATABASE,
self::TABLE_GENERIC,
);

$state = $importer->importToStagingTable(
$this->createGcsSourceInstanceFromCsv('csv/simple/a_b_c-1row.csv', new CsvOptions()),
$ref->getTableDefinition(),
new BigqueryImportOptions(
convertEmptyValuesToNull: [],
isIncremental: false,
useTimestamp: false,
numberOfIgnoredLines: 1,
importAsNull: ['3', '2'], // two values are passed second is ignored
),
);

self::assertEquals(1, $state->getResult()->getImportedRowsCount());
$this->assertSame([
[
'id' => 1,
'first_name' => '2',
'last_name' => null,
],
], $this->fetchTable(self::TEST_DATABASE, self::TABLE_GENERIC));
}

public function testAsciiZeroImport(): void
{
$query = $this->bqClient->query(
Expand Down
Loading

0 comments on commit 43f34ea

Please sign in to comment.