Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non-Delta as Sink #48

Open
ravi-databricks opened this issue Apr 17, 2024 · 3 comments
Open

Add non-Delta as Sink #48

ravi-databricks opened this issue Apr 17, 2024 · 3 comments
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@ravi-databricks
Copy link
Contributor

Support non delta as sink using metadata approach.

  • In metadata if sink is non delta use Structure streaming approach with foreachbatch
  • Use DAB to deploy non-DLT pipelines to databricks workspace
@ravi-databricks ravi-databricks added the enhancement New feature or request label Apr 17, 2024
@ravi-databricks ravi-databricks added this to the v0.0.9 milestone Aug 9, 2024
@ravi-databricks
Copy link
Contributor Author

ravi-databricks commented Aug 16, 2024

This feature can be implemented using DLT's sink API as described below:

API guide

Create a sink
To create a sink, you can use the new create_sink() sink API. This API accepts three arguments:
A string for the sink name
A string specifying the format (can be either kafka or delta)
A map of sink options, formatted as {string: string}
All of the sink options available in DBR are supported. E.g. All authentication options in Kafka.

create_sink(<sink_name>, <format>, <options_string_string_map>)

Code examples
Create a Kafka sink

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "subscribe": "my_topic"
  }
)

Create a Delta sink by giving the file system path to the table


create_sink(
  "my_delta_sink",
  "delta",
  { "path": "//path/to/my/delta/table" }
)

Create a Delta sink by giving the table name in UC

create_sink(
  "my_delta_sink",
  "delta",
  { "tableName": "my_catalog.my_schema.my_table" }
)

Use append_flow to write to a sink
Once the sink object is created, you can set up an append_flow that writes to the sink.

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return read_stream("xxx")

@ravi-databricks
Copy link
Contributor Author

Added bronze_sinks and silver_sinks options in onboarding file as below:

[
   {
      "name":"sink_name1",
      "format":"delta",
      "options":{
         "tableName":"uc.tablename"
      }
   },
   {
      "name":"sink_name2",
      "format":"kafka",
      "options":{
         "kafka.bootstrap.servers":"{kafka_sink_broker}",
         "topic":"{kafka_sink_topic}"
      }
   }
]

added write_to_sinks in dataflow_pipeline.py under AppendFlowWriter

    @staticmethod
    def write_to_sinks(sinks: list[DLTSink], write_to_sink):
        """Write to Sink."""
        for sink in sinks:
            dlt.create_sink(sink.name, sink.format, sink.options)
            dlt.append_flow(name=f"{sink.name}_flow", target=sink.name)(write_to_sink)

Above code can be invoked while doing write:

        if self.dataflowSpec.sinks:
            dlt_sinks = DataflowSpecUtils.get_sinks(self.dataflowSpec.sinks, self.spark)
            AppendFlowWriter.write_to_sinks(dlt_sinks, self.write_to_delta)

@ravi-databricks ravi-databricks self-assigned this Sep 4, 2024
@ravi-databricks
Copy link
Contributor Author

Once DLT Direct publishing mode is in PuPr then will merge into release branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

When branches are created from issues, their pull requests are automatically linked.

1 participant