From 9737f76723a65cb420bd076346a6572e7ab6c47a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 2 Jan 2025 21:26:42 +0000 Subject: [PATCH] PEERDB_BINARY_FORMAT In #2181 we consolidated on transmitting bytea columns as base64 strings ClickHouse supports binary data in strings. They need not be valid utf8 This adds PEERDB_BINARY_FORMAT with 3 formats: raw, hex, & base64 Default to base64 to avoid breaking existing setups Also add caching logic for non-immediate dynamic settings. Helps here where we'd hit catalog for every bytea field processed --- flow/connectors/clickhouse/normalize.go | 56 ++++++++++++++++------- flow/connectors/utils/avro/avro_writer.go | 2 +- flow/model/conversion_avro.go | 9 ++-- flow/model/qvalue/avro_converter.go | 28 ++++++++++-- flow/model/record_items.go | 6 +-- flow/peerdbenv/dynamicconf.go | 51 ++++++++++++++++++++- 6 files changed, 118 insertions(+), 34 deletions(-) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 60a79a6d9f..26e0cf9162 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -366,38 +366,62 @@ func (c *ClickHouseConnector) NormalizeRecords( case "Date32", "Nullable(Date32)": projection.WriteString(fmt.Sprintf( "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6)) AS `%s`,", - colName, - dstColName, + colName, dstColName, )) if enablePrimaryUpdate { projectionUpdate.WriteString(fmt.Sprintf( "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6)) AS `%s`,", - colName, - dstColName, + colName, dstColName, )) } case "DateTime64(6)", "Nullable(DateTime64(6))": projection.WriteString(fmt.Sprintf( "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, '%s'),6) AS `%s`,", - colName, - dstColName, + colName, dstColName, )) if enablePrimaryUpdate { projectionUpdate.WriteString(fmt.Sprintf( "parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, '%s'),6) AS `%s`,", - colName, - dstColName, + colName, dstColName, )) } default: - projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName)) - if enablePrimaryUpdate { - projectionUpdate.WriteString(fmt.Sprintf( - "JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,", - colName, - clickHouseType, - dstColName, - )) + projLen := projection.Len() + if colType == qvalue.QValueKindBytes { + format, err := peerdbenv.PeerDBBinaryFormat(ctx, req.Env) + if err != nil { + return model.NormalizeResponse{}, err + } + switch format { + case peerdbenv.BinaryFormatRaw: + projection.WriteString(fmt.Sprintf("base64Decode(JSONExtractString(_peerdb_data, '%s')) AS `%s`,", colName, dstColName)) + if enablePrimaryUpdate { + projectionUpdate.WriteString(fmt.Sprintf( + "base64Decode(JSONExtractString(_peerdb_match_data, '%s')) AS `%s`,", + colName, dstColName, + )) + } + case peerdbenv.BinaryFormatHex: + projection.WriteString(fmt.Sprintf("hex(base64Decode(JSONExtractString(_peerdb_data, '%s'))) AS `%s`,", + colName, dstColName)) + if enablePrimaryUpdate { + projectionUpdate.WriteString(fmt.Sprintf( + "hex(base64Decode(JSONExtractString(_peerdb_match_data, '%s'))) AS `%s`,", + colName, dstColName, + )) + } + } + } + + // proceed with default logic if logic above didn't add any sql + if projection.Len() == projLen { + projection.WriteString(fmt.Sprintf("JSONExtract(_peerdb_data, '%s', '%s') AS `%s`,", colName, clickHouseType, dstColName)) + if enablePrimaryUpdate { + projectionUpdate.WriteString(fmt.Sprintf( + "JSONExtract(_peerdb_match_data, '%s', '%s') AS `%s`,", + colName, clickHouseType, dstColName, + )) + } } } } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 75bc9f4358..21648bc45d 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -154,7 +154,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ctx context.Context, env map[s if err := ctx.Err(); err != nil { return numRows.Load(), err } else { - avroMap, err := avroConverter.Convert(qrecord) + avroMap, err := avroConverter.Convert(ctx, env, qrecord) if err != nil { logger.Error("Failed to convert QRecord to Avro compatible map", slog.Any("error", err)) return numRows.Load(), fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err) diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index ec7cfc6e37..b4cab1613a 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -46,15 +46,12 @@ func NewQRecordAvroConverter( }, nil } -func (qac *QRecordAvroConverter) Convert(qrecord []qvalue.QValue) (map[string]any, error) { +func (qac *QRecordAvroConverter) Convert(ctx context.Context, env map[string]string, qrecord []qvalue.QValue) (map[string]any, error) { m := make(map[string]any, len(qrecord)) for idx, val := range qrecord { avroVal, err := qvalue.QValueToAvro( - val, - &qac.Schema.Fields[idx], - qac.TargetDWH, - qac.logger, - qac.UnboundedNumericAsString, + ctx, env, val, + &qac.Schema.Fields[idx], qac.TargetDWH, qac.logger, qac.UnboundedNumericAsString, ) if err != nil { return nil, fmt.Errorf("failed to convert QValue to Avro-compatible value: %w", err) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index df5aaee040..de446a1069 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -3,6 +3,7 @@ package qvalue import ( "context" "encoding/base64" + "encoding/hex" "errors" "fmt" "log/slog" @@ -115,7 +116,11 @@ func GetAvroSchemaFromQValueKind( case QValueKindBoolean: return "boolean", nil case QValueKindBytes: - if targetDWH == protos.DBType_CLICKHOUSE { + format, err := peerdbenv.PeerDBBinaryFormat(ctx, env) + if err != nil { + return nil, err + } + if targetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw { return "string", nil } return "bytes", nil @@ -245,6 +250,7 @@ type QValueAvroConverter struct { } func QValueToAvro( + ctx context.Context, env map[string]string, value QValue, field *QField, targetDWH protos.DBType, logger log.Logger, unboundedNumericAsString bool, ) (any, error) { @@ -377,7 +383,11 @@ func QValueToAvro( case QValueNumeric: return c.processNumeric(v.Val), nil case QValueBytes: - return c.processBytes(v.Val), nil + format, err := peerdbenv.PeerDBBinaryFormat(ctx, env) + if err != nil { + return nil, err + } + return c.processBytes(v.Val, format), nil case QValueJSON: return c.processJSON(v.Val), nil case QValueHStore: @@ -509,9 +519,17 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) any { return rat } -func (c *QValueAvroConverter) processBytes(byteData []byte) interface{} { - if c.TargetDWH == protos.DBType_CLICKHOUSE { - encoded := base64.StdEncoding.EncodeToString(byteData) +func (c *QValueAvroConverter) processBytes(byteData []byte, format peerdbenv.BinaryFormat) interface{} { + if c.TargetDWH == protos.DBType_CLICKHOUSE && format != peerdbenv.BinaryFormatRaw { + var encoded string + switch format { + case peerdbenv.BinaryFormatBase64: + encoded = base64.StdEncoding.EncodeToString(byteData) + case peerdbenv.BinaryFormatHex: + encoded = hex.EncodeToString(byteData) + default: + panic(fmt.Sprintf("unhandled binary format: %d", format)) + } if c.Nullable { return goavro.Union("string", encoded) } diff --git a/flow/model/record_items.go b/flow/model/record_items.go index 13b6dfef5a..33c2ac4941 100644 --- a/flow/model/record_items.go +++ b/flow/model/record_items.go @@ -102,9 +102,8 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) { if len(v.Val) > 15*1024*1024 { jsonStruct[col] = "{}" } else if _, ok := opts.UnnestColumns[col]; ok { - var unnestStruct map[string]interface{} - err := json.Unmarshal([]byte(v.Val), &unnestStruct) - if err != nil { + var unnestStruct map[string]any + if err := json.Unmarshal([]byte(v.Val), &unnestStruct); err != nil { return nil, err } @@ -185,7 +184,6 @@ func (r RecordItems) toMap(opts ToJSONOptions) (map[string]interface{}, error) { } } jsonStruct[col] = nullableFloatArr - default: jsonStruct[col] = v.Value() } diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 6501a6bce1..9d2036d2aa 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -107,6 +107,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_NEW_MIRROR, TargetForSetting: protos.DynconfTarget_ALL, }, + { + Name: "PEERDB_BINARY_FORMAT", + Description: "Binary field encoding; either raw, hex, or base64", + DefaultValue: "base64", + ValueType: protos.DynconfValueType_STRING, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, { Name: "PEERDB_SNOWFLAKE_MERGE_PARALLELISM", Description: "Parallel MERGE statements to run for CDC mirrors with Snowflake targets. -1 for no limit", @@ -238,6 +246,14 @@ var DynamicIndex = func() map[string]int { return defaults }() +type BinaryFormat int + +const ( + BinaryFormatRaw = iota + BinaryFormatBase64 + BinaryFormatHex +) + func dynLookup(ctx context.Context, env map[string]string, key string) (string, error) { if val, ok := env[key]; ok { return val, nil @@ -249,6 +265,11 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string, return "", fmt.Errorf("failed to get catalog connection pool: %w", err) } + var setting *protos.DynamicSetting + if idx, ok := DynamicIndex[key]; ok { + setting = DynamicSettings[idx] + } + var value pgtype.Text query := "SELECT config_value FROM dynamic_settings WHERE config_name=$1" if err := conn.QueryRow(ctx, query, key).Scan(&value); err != nil && err != pgx.ErrNoRows { @@ -257,12 +278,21 @@ func dynLookup(ctx context.Context, env map[string]string, key string) (string, } if !value.Valid { if val, ok := os.LookupEnv(key); ok { + if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE { + env[key] = val + } return val, nil } - if idx, ok := DynamicIndex[key]; ok { - return DynamicSettings[idx].DefaultValue, nil + if setting != nil { + if env != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE { + env[key] = setting.DefaultValue + } + return setting.DefaultValue, nil } } + if env != nil && setting != nil && setting.ApplyMode != protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE { + env[key] = value.String + } return value.String, nil } @@ -398,6 +428,23 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_NULLABLE") } +func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) { + format, err := dynLookup(ctx, env, "PEERDB_BINARY_FORMAT") + if err != nil { + return 0, err + } + switch format { + case "raw": + return BinaryFormatRaw, nil + case "hex": + return BinaryFormatHex, nil + case "base64": + return BinaryFormatBase64, nil + default: + return 0, fmt.Errorf("unknown binary format %s", format) + } +} + func PeerDBEnableClickHousePrimaryUpdate(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_CLICKHOUSE_ENABLE_PRIMARY_UPDATE") }