Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request][Kernel] Add support of inserts/appends into Delta table using Kernel APIs #2382

Closed
1 of 3 tasks
vkorukanti opened this issue Dec 13, 2023 · 0 comments
Closed
1 of 3 tasks
Assignees
Labels
enhancement New feature or request kernel
Milestone

Comments

@vkorukanti
Copy link
Collaborator

vkorukanti commented Dec 13, 2023

Feature request

Overview

Part of the #2299.

Motivation

Inserts/append is one of the common features used by connectors with write capabilities. For example, streaming engines such as Flink periodically get new data from sources and ingest it into the Delta table. By adding the insert capability, current standalone users can be migrated to Kernel.

Further details

Detailed design is provided here.

At a high level following is a sample connector using the proposed write APIs

Table table = Table.forPath(..)


TransactionBuilder txnBuilder =
  table.createTransactionBuilder(
    tableClient,
    "my connector" /* engineInfo */,
    "ctas" /* operation */);


Transaction txn = txnBuilder
  // specify the schema of the table
  .withSchema(
     tableClient,
     new StructType()
       .add("c1", INTEGER)
       .add("p1", TIMESTAMP))
  // specify the partition columns in the table
  .withPartitionColumns(
    tableClient,
    Collections.singletonList("p1"))

  .build(tableClient);

Row txnState = txn.getState(tableClient);


/// ------------- Task 1 - START -----------------------

// Connector generates logical data for partition 1
CloseableIterator<FilteredColumnarBatch> dataPart1 = ...

// Partition values
Map<String, Literal> partitionValuesPart1 =
  singletonMap("p1", Literal.ofTimestamp(12312312));


CloseableIterator<FilteredColumnarBatch> phyDataPart1 =
  Transaction.transformLogicalData(
    tableClient,
    txnState,
    dataPart1,
    partitionValuesPart1);

DataWriteContext writeContext1 =
  Transaction.getWriteContext(
    tableClient,
    txnState,
    partitionValuesPart1
  );

CloseableIterator<DataFileStatus> dataFileStatusesPart1 =
  tableClient.getParquetHandler()
    .writeParquetFiles(
      writeContext1.getTargetDirectory(),
      phyDataPart1,
      writeContext1.getStatisticsColumns()
    );


CloseableIterator<Row> commitActions =
  Transaction.generateAppendActions(
    tableClient,
    txnState,
    dataFileStatusesPart1,
    writeContext1
  )

// .. serialize and send commitActions to driver


/// ------------- Task 1 - END ------------------------

/// .... repeated across however many write tasks the
/// .... write operator has


// ...Driver receives tasks from the `Row`s from tasks..


// Create a iterable out of the rows
CloseableIterable iterableActions = ...

 
TransactionCommitResult result = 
  txn.commit(tableClient, iterableActions);

// Optional checkpoint if the con
if (result.isReadyForCheckpoint()) {
  table.checkpoint(tableClient, result.getVersion());
}

Detailed project plan is available here.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute the implementation of this feature?

  • Yes. I can contribute this feature independently.
  • Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
  • No. I cannot contribute this feature at this time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request kernel
Projects
None yet
Development

No branches or pull requests

2 participants