Skip to content

Schema Registry

Andrew Robertson edited this page Mar 29, 2021 · 6 revisions

The following assumes some familiarity with pravega https://pravega.io

We use the convention pravega scope.stream = presto schema.table (pravega scope maps to a presto schema, pravega scope.stream maps to presto schema.table)

Terms

  • SR - generic schema registry
  • PSR - Pravega schema registry. This is a separate component from pravega streaming storage
  • LSR - schema registry-like functionality managed with json files on local disk

Schema Registry (SR)

We currently support Pravega Schema Registry (PSR), Confluent Schema Registry (CSR), and local disk (on coordinator) (LSR)

They can be configured via the property file etc/catalog/pravega.properties

Local SR will always be enabled (if $PRESTO_HOME/etc/pravega exists); PSR + CSR are optional

Pravega Schema Registry (PSR)

https://github.com/pravega/schema-registry

set property pravega.schema-registry, for e.g.: pravega.schema-registry=http://localhost:9092

A couple things are required in order to use PSR

  1. You must have a group with the name format {schema}.{table}, for example sensors.car
  2. You must then add your schema added to the group (Note: currently we support only a single schema per stream)

Group

Group has format, some compability options, and finally optional properties.

  • More on the properties later (i.e. what does inline mean?)
registryClient.addGroup(
        "sensors.car",
        new GroupProperties(
                SerializationFormat.Avro,
                Compatibility.allowAny(),
                false,
                ImmutableMap.<String, String>builder().put(inline ? "inline" : "", "").build()));

Schema

schema must be added. There are a couple of choices here. Which option you choose will depend on how you are using pravega and PSR. https://github.com/pravega/schema-registry/wiki/Sample-Usage:-Pravega-Application

When writing events to pravega you must provide a serializer. PSR also provides serializers that will take care of serialize/deserialize and register schemas.

DIY

With this option you must add a schema explicitly

registryClient.addSchema("sensors.car", AvroSchema.of(CarSensors.class).getSchemaInfo());

PSR managed

With this option you can create a PSR serializer in which case it will add the schema to the group for you

SerializerConfig serializerConfig = SerializerConfig.builder()
	.groupId("sensors.car").registryConfig(schemaRegistryClientConfig)
	.registerSchema(true)
	.build();

Serializer<CarSensors> serializer = 
	SerializerFactory.avroSerializer(serializerConfig, AvroSchema.of(CarSensors.class));

Back to group properties: if using this option (PSR managed) you must include the key "inline" in your group properties. This is a temporary measure - please see: https://github.com/pravega/presto-connector/issues/20

Confluent Schema Registry (CSR)

Configure endpoint with property pravega.confluent-schema-registry, e.g. pravega.confluent-schema-registry=http://localhost:8081

Note: avro only Note: latest version of a schema will be used

Local Schema Registry (LSR)

You can manage schemas using json files which contain stream, key/value table, and schema info

This directory defaults to $PRESTO_HOME/etc/pravega/, but can be configured with property pravega.table-description-dir

Note: Schema in Local SR will override similarly named schemas in other registries

File name format: {schema}.{table}.json

There are a couple of required fields. schemaName, tableName, objectName. tableName and objectName usually will be equal. The "event" field is where you can specficy an actual schema.

There are 2 options here:

  • Fields defined in file
  • Link to schema (file, url)

Fields defined in file

You may include the schmea directly, for example:

{
    "schemaName": "hr",
    "tableName": "employee",
    "objectName": "employee",
    "event": [{
        "dataFormat": "json",
        "fields": [
            {
                "name": "id",
                "mapping": "id",
                "type": "BIGINT"
            },        
            {
                "name": "first",
                "mapping": "first",
                "type": "VARCHAR(25)"
            },
            {
                "name": "last",
                "mapping": "last",
                "type": "VARCHAR(25)"
            }
        ]
    }]
}

Link to schema (file, url)

You can provide a pointer to a file containing the schema (same $PRESTO_HOME/etc/pravega directory) or url:

{
    "schemaName": "hr",
    "tableName": "employee",
    "objectName": "employee",
  	"event": [
    	{
      		"dataFormat": "avro",
      		"dataSchema": "employee.avsc"
    	}
    ]
}

Where employee.avsc is a standard avro schema defined in json:

{
    "namespace": "io.pravega.avro",
    "type": "record",
    "name": "Employee",
    "fields": [
    	{"name": "id", "type": "int"},
        {"name": "first", "type": "string"},
        {"name": "last", "type": "string"}
    ]
}

Local SR options

Using Local SR provides a couple of extra options.

Multi Source Streams

It may be the case that different streams are related to each other and share the same schema. These streams can be presented and queried as if they are a single table.

There are 2 options for defining component streams (multiexample.server.json)

Regular expressions

objectName can contain a regular expression. We will look for stream names within the pravega scope (schemaName) that match the regex.

{
  "schemaName": "multiexample",
  "tableName": "server",
  "objectName" : "server[0-9]"
}

Explicitly listed

Specify exactly which streams you would like by including a list to "objectArgs"

{
  "schemaName": "multiexample",
  "tableName": "server",
  "objectName": "server",
  "objectArgs" : ["server0","server2"]
}

Key Value Tables

The next option available in Local SR is to setup querying Pravega KeyValue Tables https://pravega.io/docs/v0.9.0/javadoc/clients/index.html?io/pravega/client/tables/KeyValueTable.html

It is different from a stream scheama in a couple of ways. (1) objectType must be KV_TABLE (2) objectArgs will be a list of key families to include (3) there will be 2 schemas - 1 for the key, 1 for the value

{
  "schemaName": "kv",
  "tableName": "employee",
  "objectName": "employee",
  "objectType": "KV_TABLE",
  "objectArgs": ["kf1", "kf2"],
  "event": [
    {
      "dataFormat": "avro",
      "dataSchema": "employee-id.avsc"
    },
    {
      "dataFormat": "avro",
      "dataSchema": "employee-value.avsc"
    }
  ]
}

Key and value schema will be "flattened" into a single presto table definition. for e.g.: describe pravega.kv.employee might show something like this for fields where the field names are prefixed:

key/id
value/first
value/last

Code Impl.

The term 'schema' is overloaded. Presto schema is a database schema and will contain a collection of tables. There is also 'schema' as applied to the data itself. i.e. what is the definition of the data you are storing.

In the connector we need to know 2 things:

  1. what are the available presto schemas, and tables within those schemas
  2. what are the fields for those tables (what does the data look like)

This info may come from different places (for example list of schemas + tables may come from local disk, data schema stored elsewhere). There are 2 APIs for this. SchemaSupplier, and SchemaRegistry.

SchemaSupplier

  • listSchemas should return available presto schemas
  • listTables should return tables available within the given schema
public interface SchemaSupplier
{
    List<String> listSchemas();

    List<PravegaTableHandle> listTables(String schema);
}

SchemaRegistry

  • getSchema return schema (field definitions) needed for the given table name.
  • getTable should return the full table definition. (PravegaStreamDescription is basically simple table metadata + schema)
public interface SchemaRegistry
{
    List<PravegaStreamFieldGroup> getSchema(SchemaTableName schemaTableName);

    PravegaStreamDescription getTable(SchemaTableName schemaTableName);
}
Clone this wiki locally