Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK] Stringify the delta optimize 'auto' operationalParameter in CommitInfo #3889

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,23 @@ package io.delta.kernel.defaults

import io.delta.kernel.Operation.{CREATE_TABLE, WRITE}
import io.delta.kernel._
import io.delta.kernel.defaults.engine.DefaultEngine
import io.delta.kernel.engine.Engine
import io.delta.kernel.exceptions.{InvalidTableException, ProtocolChangedException}
import io.delta.kernel.expressions.Literal
import io.delta.kernel.internal.TableConfig._
import io.delta.kernel.internal.actions.SingleAction.createCommitInfoSingleAction
import io.delta.kernel.internal.actions.{CommitInfo, SingleAction}
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.util.{FileNames, VectorUtils}
import io.delta.kernel.internal.{DeltaHistoryManager, SnapshotImpl, TableImpl}
import io.delta.kernel.internal.util.ManualClock
import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
import io.delta.kernel.types.IntegerType.INTEGER
import io.delta.kernel.types._
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}
import io.delta.kernel.internal.util.{FileNames, ManualClock, VectorUtils}
import io.delta.kernel.internal.{SnapshotImpl, TableImpl}
import io.delta.kernel.utils.CloseableIterable.emptyIterable
import io.delta.kernel.utils.FileStatus
import org.apache.hadoop.conf.Configuration

import java.util.{Locale, Optional}
import scala.collection.JavaConverters._
import java.util.Optional
import scala.collection.immutable.{ListMap, Seq}
import scala.collection.mutable
import io.delta.kernel.internal.TableConfig._
import io.delta.kernel.utils.FileStatus
import io.delta.kernel.internal.actions.SingleAction.createCommitInfoSingleAction

class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {

Expand Down Expand Up @@ -579,4 +576,15 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase {
"table.")))
}
}

test("CommitInfo getCommitInfoOpt should work after Spark optimize") {
withTempDir { dir =>
spark.range(10).repartition(2).write.format("delta").save(dir.toString)
spark.sql(s"OPTIMIZE delta.`$dir`").collect()

val engine = DefaultEngine.create(new Configuration())

CommitInfo.getCommitInfoOpt(engine, new Path(dir.getCanonicalPath, "_delta_log"), 1L)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ object DeltaOperations {
// When clustering columns are specified, set the zOrderBy key to empty.
ZORDER_PARAMETER_KEY -> JsonUtils.toJson(if (clusterBy.isEmpty) zOrderBy else Seq.empty),
CLUSTERING_PARAMETER_KEY -> JsonUtils.toJson(clusterBy.getOrElse(Seq.empty)),
AUTO_COMPACTION_PARAMETER_KEY -> auto
AUTO_COMPACTION_PARAMETER_KEY -> auto.toString
)
// `isFull` is not relevant for non-clustering tables, so skip it.
.++(clusterBy.filter(_.nonEmpty).map(_ => CLUSTERING_IS_FULL_KEY -> isFull))
Expand Down
Loading