diff --git a/deploy/samples/subscriptions.yaml b/deploy/samples/subscriptions.yaml index 6e897c3..224f1f0 100644 --- a/deploy/samples/subscriptions.yaml +++ b/deploy/samples/subscriptions.yaml @@ -4,7 +4,7 @@ kind: Subscription metadata: name: names spec: - sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON + sql: SELECT NAME, NULL AS KEY FROM DATAGEN.PERSON database: RAWKAFKA diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java index 2da1406..3e5e4b5 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java @@ -15,7 +15,8 @@ public enum DataType { VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)), - VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false)); + VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false)), + NULL(x -> x.createSqlType(SqlTypeName.NULL)); public static final RelDataTypeFactory DEFAULT_TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); private final RelProtoDataType protoType; diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index 72474be..070ea53 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -1,5 +1,6 @@ package com.linkedin.hoptimator.catalog; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -27,11 +28,13 @@ import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.pretty.SqlPrettyWriter; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.SqlShuttle; import java.util.Map; import java.util.List; import java.util.Arrays; +import java.util.ArrayList; import java.util.stream.Collectors; /** @@ -171,6 +174,7 @@ public SqlNode visit(SqlCall call) { * * N.B. the following magic: * - field 'PRIMARY_KEY' is treated as a PRIMARY KEY + * - NULL fields are promoted to BYTES */ class ConnectorImplementor implements ScriptImplementor { private final String database; @@ -228,7 +232,12 @@ public void implement(SqlWriter w) { } } - /** Implements an INSERT INTO statement */ + /** Implements an INSERT INTO statement. + * + * N.B. the following magic: + * - NULL columns (e.g. `NULL AS KEY`) are elided from the pipeline + * + * */ class InsertImplementor implements ScriptImplementor { private final String database; private final String name; @@ -245,11 +254,24 @@ public void implement(SqlWriter w) { w.keyword("INSERT INTO"); (new CompoundIdentifierImplementor(database, name)).implement(w); SqlWriter.Frame frame1 = w.startList("(", ")"); - (new ColumnListImplementor(relNode.getRowType())).implement(w); + RelNode project = dropNullFields(relNode); + (new ColumnListImplementor(project.getRowType())).implement(w); w.endList(frame1); - (new QueryImplementor(relNode)).implement(w); + (new QueryImplementor(project)).implement(w); w.literal(";"); } + + private static RelNode dropNullFields(RelNode relNode) { + List cols = new ArrayList<>(); + int i = 0; + for (RelDataTypeField field : relNode.getRowType().getFieldList()) { + if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { + cols.add(i); + } + i++; + } + return RelOptUtil.createProject(relNode, cols); + } } /** Implements a CREATE DATABASE IF NOT EXISTS statement */ @@ -288,7 +310,11 @@ public void implement(SqlWriter w) { } } - /** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER` */ + /** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER`. + * + * N.B. the following magic: + * - NULL fields are promoted to BYTES + */ class RowTypeSpecImplementor implements ScriptImplementor { private final RelDataType dataType; @@ -309,7 +335,11 @@ public void implement(SqlWriter w) { for (int i = 0; i < fieldNames.size(); i++) { w.sep(","); fieldNames.get(i).unparse(w, 0, 0); - fieldTypes.get(i).unparse(w, 0, 0); + if (fieldTypes.get(i).getTypeName().getSimple().equals("NULL")) { + w.literal("BYTES"); // promote NULL fields to BYTES + } else { + fieldTypes.get(i).unparse(w, 0, 0); + } } } diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java index 763bc1e..a8a0a67 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java @@ -34,4 +34,27 @@ public void implementsFlinkCreateTableDDL() { assertTrue(out, out.contains("'topic'='topic1'")); assertFalse(out, out.contains("Row")); } + + @Test + public void magicPrimaryKey() { + SqlWriter w = new SqlPrettyWriter(); + RelDataType rowType = DataType.struct().with("F1", DataType.VARCHAR) + .with("PRIMARY_KEY", DataType.VARCHAR).rel(); + HopTable table = new HopTable("DATABASE", "TABLE1", rowType, ConfigProvider.empty().config("x")); + table.implement(w); + String out = w.toString(); + assertTrue(out, out.contains("PRIMARY KEY (PRIMARY_KEY)")); + } + + @Test + public void magicNullFields() { + SqlWriter w = new SqlPrettyWriter(); + RelDataType rowType = DataType.struct().with("F1", DataType.VARCHAR) + .with("KEY", DataType.NULL).rel(); + HopTable table = new HopTable("DATABASE", "TABLE1", rowType, ConfigProvider.empty().config("x")); + table.implement(w); + String out = w.toString(); + assertTrue(out, out.contains("\"KEY\" BYTES")); // NULL fields are promoted to BYTES. + assertFalse(out, out.contains("\"KEY\" NULL")); // Without magic, this is what you'd get. + } } diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java index bc8fa2b..49c9cfb 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java @@ -32,10 +32,11 @@ public Schema create(SchemaPlus parentSchema, String name, Map o ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig) .withPrefix("properties.") .with("connector", "kafka") - .with("key.format", "csv") + .with("key.format", "raw") .with("key.fields", "KEY") .with("value.format", "csv") .with("value.fields-include", "EXCEPT_KEY") + .with("scan.startup.mode", "earliest-offset") .with("topic", x -> x); TableLister tableLister = () -> { AdminClient client = AdminClient.create(clientConfig); diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java index 6cfcb84..60fa511 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java @@ -80,8 +80,9 @@ public ScriptImplementor query() { /** Script ending in INSERT INTO ... */ public ScriptImplementor insertInto(HopTable sink) { RelOptUtil.eq(sink.name(), sink.rowType(), "subscription", rowType(), Litmus.THROW); + RelNode castRel = RelOptUtil.createCastRel(relNode, sink.rowType(), true); return script.database(sink.database()).with(sink) - .insert(sink.database(), sink.name(), relNode); + .insert(sink.database(), sink.name(), castRel); } /** Add any resources, SQL, DDL etc required to access the table. */