Author: Zara Lim (@jzaralim) | Release Target: 0.21 | Status: In Discussion | Discussion: confluentinc#7764
tl;dr: Add support for the BYTES data type. This will allow users to work with BLOBs of data that don't fit into any other data type.
Currently, ksqlDB can only handle a set of primitive types and combinations of them. A BYTES data type would allow users to work with data that does not fit into any of the primitive types such as images, as well as BLOB/binary data from other databases.
- Add BYTES type to KSQL
- Support BYTE comparisons
- Support BYTES usage in STRUCT, MAP and ARRAY
- Serialization and de-serialization of BYTES to Avro, JSON, Protobuf and Delimited formats
- Adding/updating UDFs to support the BYTES type
- Fixed sized BYTES (
BYTES(3)
representing 3 bytes, for example) - This is supported by Kafka Connect by adding theconnect.fixed.size
key in a bytes schema, but this will not be included in this KLIP.
The BYTES data type will store an array of raw bytes of an unspecified length. The maximum size of the array is limited by the maximum size of a Kafka message, as well as possibly by the serialization format being used. The syntax is as follows:
CREATE STREAM stream_name (b BYTES, COL2 STRING) AS ...
CREATE TABLE table_name (col1 STRUCT<field BYTES>) AS ...
By default, BYTES will be displayed in the CLI as HEX strings, where each byte is represented by two characters.
For example, the byte array [91, 67]
will be displayed as:
ksql> SELECT b from STREAM;
0x5B43
API response objects will store BYTES data as base64 strings. The Java client's Row
class will include a new function,
getBytes
that returns the value of a column as a ByteBuffer
object. It will expect the raw value to be a Base64 string,
and if it's not then the function will throw an error.
Implicit conversions to BYTES will not be supported.
The following UDFs will be added:
to_bytes(string, encoding)
- this will convert a STRING value in the specified encoding to BYTES. The accepted encoders arehex
,utf8
,ascii
, andbase64
.from_bytes(bytes, encoding)
- this will convert a BYTES value to STRING in the specified encoding. The accepted encoders arehex
,utf8
,ascii
, andbase64
.
We will also update some of the existing STRING functions to accept BYTES as a parameter. In general, if a function works on ASCII characters for a STRING parameter, then it will work on bytes for a BYTES parameter.
len(bytes)
- This will return the length of the stored ByteArray.concat(bytes...)
- Concatenate an arbitrary number of byte fieldsr/lpad(bytes, target_length, padding_bytes)
- pads input BYTES beginning from the left/right with the specified padding BYTES until the target length is reached.replace(bytes, old_bytes, new_bytes)
- returns the given BYTES value with all occurrences ofold_bytes
withnew_bytes
split(bytes, delimiter)
- splits a BYTES value into an array of BYTES based on a delimitersplittomap(bytes, entryDelimiter, kvDelimiter)
- splits a BYTES value into key-value pairs based on a delimiter and creates a MAP from themsubstring(bytes, to, from)
- returns the section of the BYTES from the byte at positionto
tofrom
BYTES will be handled by java.nio.ByteBuffer
within ksqlDB.
The underlying Kafka Connect type is the primitive bytes
type.
bytes
is a primitive Avro type. When converting to/from Connect data, the Avro converter ksqlDB
uses converts byte arrays to ByteBuffer.
bytes
is a primitive Protobuf type. The maximum number of bytes in a byte array is 232.
When converting to/from Connect data, the Avro converter ksqlDB uses converts byte arrays to ByteBuffer.
Byte arrays will be stored in JSON and CSV files as Base64 MIME encoded binary values. This is because ksqlDB and Schema Registry both use Jackson to serialize and deserialize JSON, and Jackson serializes binaries to Base64 strings.
The ksqlDB JSON and delimited deserializers will be updated to convert Base64 strings to ByteBuffer.
Casting between BYTES and other data types will not be supported. Users can use to_bytes
and from_bytes
if they would like to convert to/from STRING.
Comparisons will only be allowed between two BYTES. They will be compared lexicographically by
unsigned 8-bit values. For example, the following comparisons evaluate to TRUE
:
[10, 11] > [10]
[12] > [10, 11]
There will need to be tests for the following:
- Integration with Kafka Connect and Schema Registry
- All serialization formats
- Different types of byte data
- QTTs with all of the new and updated UDFs
The implementation can both be broken up as follows:
- Adding the BYTES type to ksqlDB - 2 days
- Serialization/deserialization - 4 days
- Add BYTES to the Java client - 2 days
- Documentation - 2 days
- Add to Connect integration test - 1 day
- Comparisons - 2 days
- Adding UDFs + documentation - 1 week
- Buffer time and manual testing - 3 days
- Add and update UDFs to
docs/developer-guide/ksqldb-reference/scalar-functions.md
- Serialization/deserialization information in
docs/reference/serialization.md
- Section on casting in
docs/developer-guide/ksqldb-reference/type-coercion.md
- Detailed description of
BYTES
indocs/reference/sql/data-types.md
- New section in
docs/developer-guide/ksqldb-reference/operations.md
for comparisons
If a user issues a command that includes the BYTES type, then previous versions of KSQL will not recognize the BYTES type, and the server will enter a DEGRADED state.
None