Skip to content

Commit

Permalink
feat: expose peek next commit function to python
Browse files Browse the repository at this point in the history
  • Loading branch information
PengLiVectra committed Dec 11, 2023
1 parent f87afce commit 005d52a
Show file tree
Hide file tree
Showing 82 changed files with 131 additions and 2 deletions.
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub enum DeltaTableError {
#[error("Invalid table version: {0}")]
InvalidVersion(i64),

/// Error returned when the DeltaTable has no delta log version.
#[error("Delta log not found for table version: {0}")]
DeltaLogNotFound(i64),

/// Error returned when the DeltaTable has no data files.
#[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
MissingDataFile {
Expand Down
18 changes: 18 additions & 0 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::fmt;
use std::fmt::Formatter;
use std::{cmp::max, cmp::Ordering, collections::HashSet};

use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -455,6 +456,23 @@ impl DeltaTable {
self.update().await
}

/// Get the commit obj from the version
pub async fn get_obj_from_version(
&self,
current_version: i64,
) -> Result<Bytes, DeltaTableError> {
let commit_log_bytes = match self.log_store.read_commit_entry(current_version).await {
Ok(bytes) => Ok(bytes),
Err(DeltaTableError::ObjectStore {
source: ObjectStoreError::NotFound { .. },
}) => {
return Err(DeltaTableError::DeltaLogNotFound(current_version));
}
Err(err) => Err(err),
}?;
Ok(commit_log_bytes)
}

/// Get the list of actions for the next commit
pub async fn peek_next_commit(
&self,
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}
{"add":{"path":"part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587968586000,"dataChange":true}}
{"add":{"path":"part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968586000,"dataChange":true}}
{"add":{"path":"part-00003-508ae4aa-801c-4c2c-a923-f6f89930a5c1-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968586000,"dataChange":true}}
{"add":{"path":"part-00004-80938522-09c0-420c-861f-5a649e3d9674-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968586000,"dataChange":true}}
{"add":{"path":"part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968586000,"dataChange":true}}
{"add":{"path":"part-00007-94f725e2-3963-4b00-9e83-e31021a93cf9-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968586000,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{"commitInfo":{"timestamp":1587968596254,"operation":"MERGE","operationParameters":{"predicate":"(oldData.`id` = newData.`id`)"},"readVersion":0,"isBlindAppend":false}}
{"remove":{"path":"part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet","deletionTimestamp":1587968596250,"dataChange":true}}
{"remove":{"path":"part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet","deletionTimestamp":1587968596253,"dataChange":true}}
{"remove":{"path":"part-00007-94f725e2-3963-4b00-9e83-e31021a93cf9-c000.snappy.parquet","deletionTimestamp":1587968596253,"dataChange":true}}
{"remove":{"path":"part-00003-508ae4aa-801c-4c2c-a923-f6f89930a5c1-c000.snappy.parquet","deletionTimestamp":1587968596253,"dataChange":true}}
{"remove":{"path":"part-00004-80938522-09c0-420c-861f-5a649e3d9674-c000.snappy.parquet","deletionTimestamp":1587968596253,"dataChange":true}}
{"add":{"path":"part-00000-a922ea3b-ffc2-4ca1-9074-a278c24c4449-c000.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587968595000,"dataChange":true}}
{"add":{"path":"part-00004-95c9bc2c-ac85-4581-b3cc-84502b0c314f-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00005-94a0861b-6455-4bd9-a080-73e02491c643-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00011-42f838f9-a911-40af-98f5-2fccfa1b123f-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00045-332fe409-7705-45b1-8d34-a0018cf73b70-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00049-d3095817-de74-49c1-a888-81565a40161d-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00058-b462c4cb-0c48-4148-8475-e21d2a2935f8-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00068-90650739-6a8e-492b-9403-53e33b3778ac-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00069-c78b4dd8-f955-4643-816f-cbd30a3f8c1b-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00077-2fcb1c7c-5390-48ee-93f6-0acf11199a0d-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00107-3f6c2aa0-fc28-4f4c-be15-135e15b398f4-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00112-07fd790a-11dc-4fde-9acd-623e740be992-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00116-bc66759e-6381-4f34-8cd4-6688aad8585d-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00121-d8bc3e53-d2f2-48ce-9909-78da7294ffbd-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00128-b31c3b81-24da-4a90-a8b4-578c9e9a218d-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00140-e9b1971d-d708-43fb-b07f-975d2226b800-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00143-03ceb88e-5283-4193-aa43-993cdf937fd3-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00150-ec6643fc-4963-4871-9613-f5ad1940b689-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00154-4630673a-5227-48fb-a03d-e356fcd1564a-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00164-bf40481c-4afd-4c02-befa-90f056c2d77a-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
{"add":{"path":"part-00190-8ac0ae67-fb1d-461d-a3d3-8dc112766ff5-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968596000,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"commitInfo":{"timestamp":1587968614187,"operation":"UPDATE","operationParameters":{"predicate":"((id#697L % cast(2 as bigint)) = cast(0 as bigint))"},"readVersion":2,"isBlindAppend":false}}
{"remove":{"path":"part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet","deletionTimestamp":1587968614096,"dataChange":true}}
{"remove":{"path":"part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet","deletionTimestamp":1587968614096,"dataChange":true}}
{"add":{"path":"part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968614000,"dataChange":true}}
{"add":{"path":"part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet","partitionValues":{},"size":429,"modificationTime":1587968614000,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1587968626537,"operation":"DELETE","operationParameters":{"predicate":"[\"((`id` % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))\"]"},"readVersion":3,"isBlindAppend":false}}
{"remove":{"path":"part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet","deletionTimestamp":1587968626536,"dataChange":true}}
{"remove":{"path":"part-00000-f17fcbf5-e0dc-40ba-adae-ce66d1fcaef6-c000.snappy.parquet","deletionTimestamp":1587968626536,"dataChange":true}}
{"add":{"path":"part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587968626000,"dataChange":true}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RawDeltaTable:
) -> str: ...
def table_uri(self) -> str: ...
def version(self) -> int: ...
def get_obj(self, version: int) -> bytes: ...
def get_latest_version(self) -> int: ...
def metadata(self) -> RawDeltaTableMetaData: ...
def protocol_versions(self) -> List[int]: ...
Expand Down
33 changes: 32 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from deltalake._internal import create_deltalake as _create_deltalake
from deltalake._util import encode_partition_value
from deltalake.data_catalog import DataCatalog
from deltalake.exceptions import DeltaProtocolError
from deltalake.exceptions import DeltaError, DeltaProtocolError
from deltalake.fs import DeltaStorageHandler
from deltalake.schema import Schema as DeltaSchema

Expand Down Expand Up @@ -256,6 +256,7 @@ def __init__(
"""
self._storage_options = storage_options
self._latest_version = -1
self._table = RawDeltaTable(
str(table_uri),
version=version,
Expand Down Expand Up @@ -897,6 +898,36 @@ def update_incremental(self) -> None:
"""
self._table.update_incremental()

def get_latest_version(self) -> int:
"""
Get latest version of commit.
"""
return self._table.get_latest_version()

def peek_next_commit(
self, version: int
) -> Tuple[Optional[List[Dict[Any, Any]]], int]:
"""
Peek next commit of the input version.
"""
actions = []
next_version = version + 1
if next_version > self._latest_version:
self._latest_version = self.get_latest_version()
while next_version <= self._latest_version:
try:
commit_log_bytes = self._table.get_obj(next_version)
for commit_action in commit_log_bytes.split(b"\n"):
if commit_action:
actions.append(json.loads(commit_action))
return actions, next_version
except DeltaError as e:
if str(e) == f"Delta log not found for table version: {next_version}":
next_version += 1
else:
raise
return None, version

def create_checkpoint(self) -> None:
self._table.create_checkpoint()

Expand Down
9 changes: 8 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use deltalake::DeltaOps;
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyFrozenSet, PyType};
use pyo3::types::{PyBytes, PyFrozenSet, PyType};
use serde_json::{Map, Value};

use crate::error::DeltaProtocolError;
Expand Down Expand Up @@ -154,6 +154,13 @@ impl RawDeltaTable {
Ok(self._table.version())
}

pub fn get_obj<'py>(&self, py: Python<'py>, version: i64) -> PyResult<&'py PyBytes> {
let commit_log_bytes = rt()?
.block_on(self._table.get_obj_from_version(version))
.map_err(PythonError::from)?;
return Ok(PyBytes::new(py, &commit_log_bytes));
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
let metadata = self._table.metadata().map_err(PythonError::from)?;
Ok(RawDeltaTableMetaData {
Expand Down
23 changes: 23 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,29 @@ def test_writer_fails_on_protocol():
dt.to_pandas()


@pytest.mark.parametrize("version, expected", [(2, (5, 3))])
def test_peek_next_commit(version, expected):
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
actions, next_version = dt.peek_next_commit(version=version)
assert (len(actions), next_version) == expected


def test_delta_log_not_found():
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
latest_version = dt.get_latest_version()
_, version = dt.peek_next_commit(version=latest_version)
assert version == latest_version


def test_delta_log_missed():
table_path = "../crates/deltalake-core/tests/data/simple_table_missing_commit"
dt = DeltaTable(table_path)
_, version = dt.peek_next_commit(version=1)
assert version == 3 # Missed commit version 2, should return version 3


class ExcPassThroughThread(Thread):
"""Wrapper around `threading.Thread` that propagates exceptions."""

Expand Down

0 comments on commit 005d52a

Please sign in to comment.