Skip to content

Commit

Permalink
[spark] Support table options via SQL conf for Spark Engine and unify…
Browse files Browse the repository at this point in the history
… user experience with Flink engine
  • Loading branch information
xiangyuf committed Nov 4, 2024
1 parent 74a7783 commit cbb9342
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 47 deletions.
13 changes: 12 additions & 1 deletion docs/content/flink/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,27 @@ SELECT * FROM ....;
## Setting dynamic options

When interacting with the Paimon table, table options can be tuned without changing the options in the catalog. Paimon will extract job-level dynamic options and take effect in the current session.
The dynamic option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts.
The dynamic table option's key format is `paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all the specific parts.
The dynamic global option's key format is `${config_key}`. Global options will take effect for all the tables. Table options will override global options if there are conflicts.

For example:

```sql
-- set scan.timestamp-millis=1697018249001 for all tables
SET 'scan.timestamp-millis' = '1697018249001';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T
SET 'paimon.mycatalog.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table default.T in any catalog
SET 'paimon.*.default.T.scan.timestamp-millis' = '1697018249000';
SELECT * FROM T;

-- set scan.timestamp-millis=1697018249000 for the table mycatalog.default.T1
-- set scan.timestamp-millis=1697018249001 for others tables
SET 'paimon.mycatalog.default.T1.scan.timestamp-millis' = '1697018249000';
SET 'scan.timestamp-millis' = '1697018249001';
SELECT * FROM T1 JOIN T2 ON xxxx;
```
22 changes: 20 additions & 2 deletions docs/content/spark/auxiliary.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,35 @@ under the License.
## Set / Reset
The SET command sets a property, returns the value of an existing property or returns all SQLConf properties with value and meaning.
The RESET command resets runtime configurations specific to the current session which were set via the SET command to their default values.
To set paimon configs specifically, you need add the `spark.paimon.` prefix.
To set dynamic options globally, you need add the `spark.paimon.` prefix. You can also set dynamic table options at this format:
`spark.paimon.${catalogName}.${dbName}.${tableName}.${config_key}`. The catalogName/dbName/tableName can be `*`, which means matching all
the specific parts. Dynamic table options will override global options if there are conflicts.

```sql
-- set spark conf
SET spark.sql.sources.partitionOverwriteMode=dynamic;

-- set paimon conf
SET spark.paimon.file.block-size=512M;

-- reset conf
RESET spark.paimon.file.block-size;

-- set catalog
USE paimon;

-- set scan.snapshot-id=1 for the table default.T in any catalogs
SET spark.paimon.*.default.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=1 for the table T in any databases and catalogs
SET spark.paimon.*.*.T.scan.snapshot-id=1;
SELECT * FROM default.T;

-- set scan.snapshot-id=2 for the table default.T1 in any catalogs and scan.snapshot-id=1 on other tables
SET spark.paimon.scan.snapshot-id=1;
SET spark.paimon.*.default.T1.scan.snapshot-id=2;
SELECT * FROM default.T1 JOIN default.T2 ON xxxx;
```

## Describe table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSingleQuote;
Expand Down Expand Up @@ -302,6 +304,34 @@ public static Map<String, String> convertToPropertiesPrefixKey(
return properties;
}

public static Map<String, String> convertToDynamicTableProperties(
Map<String, String> confData,
String globalOptionKeyPrefix,
Pattern tableOptionKeyPattern,
int keyGroup) {
Map<String, String> globalOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();

confData.keySet().stream()
.filter(k -> k.startsWith(globalOptionKeyPrefix))
.forEach(
k -> {
Matcher matcher = tableOptionKeyPattern.matcher(k);
if (matcher.find()) {
tableOptions.put(
matcher.group(keyGroup), convertToString(confData.get(k)));
} else {
globalOptions.put(
k.substring(globalOptionKeyPrefix.length()),
convertToString(confData.get(k)));
}
});

// table options should override global options for the same key
globalOptions.putAll(tableOptions);
return globalOptions;
}

static boolean containsPrefixMap(Map<String, String> confData, String key) {
return confData.keySet().stream().anyMatch(candidate -> filterPrefixMapKey(key, candidate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.lineage.TableLineageEntity;
import org.apache.paimon.lineage.TableLineageEntityImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -69,7 +70,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
Expand Down Expand Up @@ -231,7 +231,7 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
Table table;

Map<String, String> dynamicOptions = getDynamicTableConfigOptions(context);
Map<String, String> dynamicOptions = getDynamicConfigOptions(context);
dynamicOptions.forEach(
(key, newValue) -> {
String oldValue = origin.getOptions().get(key);
Expand All @@ -241,6 +241,7 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) {
});
Map<String, String> newOptions = new HashMap<>();
newOptions.putAll(origin.getOptions());
// dynamic options should override origin options
newOptions.putAll(dynamicOptions);

// notice that the Paimon table schema must be the same with the Flink's
Expand Down Expand Up @@ -304,16 +305,19 @@ static boolean schemaEquals(RowType rowType1, RowType rowType2) {
/**
* The dynamic option's format is:
*
* <p>{@link
* FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}.${catalog}.${database}.${tableName}.key =
* value. These job level configs will be extracted and injected into the target table option.
* <p>Global Options: key = value .
*
* <p>Table Options: {@link
* FlinkConnectorOptions#TABLE_DYNAMIC_OPTION_PREFIX}${catalog}.${database}.${tableName}.key =
* value.
*
* <p>These job level options will be extracted and injected into the target table option. Table
* options will override global options if there are conflicts.
*
* @param context The table factory context.
* @return The dynamic options of this target table.
*/
static Map<String, String> getDynamicTableConfigOptions(DynamicTableFactory.Context context) {

Map<String, String> optionsFromTableConfig = new HashMap<>();
static Map<String, String> getDynamicConfigOptions(DynamicTableFactory.Context context) {

ReadableConfig config = context.getConfiguration();

Expand All @@ -329,23 +333,14 @@ static Map<String, String> getDynamicTableConfigOptions(DynamicTableFactory.Cont

String template =
String.format(
"(%s)\\.(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
"(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX,
context.getObjectIdentifier().getCatalogName(),
context.getObjectIdentifier().getDatabaseName(),
context.getObjectIdentifier().getObjectName());
Pattern pattern = Pattern.compile(template);

conf.keySet()
.forEach(
(key) -> {
if (key.startsWith(FlinkConnectorOptions.TABLE_DYNAMIC_OPTION_PREFIX)) {
Matcher matcher = pattern.matcher(key);
if (matcher.find()) {
optionsFromTableConfig.put(matcher.group(5), conf.get(key));
}
}
});
Map<String, String> optionsFromTableConfig =
OptionsUtils.convertToDynamicTableProperties(conf, "", pattern, 5);

if (!optionsFromTableConfig.isEmpty()) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class FlinkConnectorOptions {

public static final String NONE = "none";

public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon.";

public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void testSchemaEquals() {
@Test
public void testGetDynamicOptions() {
Configuration configuration = new Configuration();
configuration.setString("paimon.catalog1.db.T.k1", "v1");
configuration.setString("paimon.*.db.*.k2", "v2");
configuration.setString("k1", "v2");
ObjectIdentifier identifier = ObjectIdentifier.of("catalog1", "db", "T");
DynamicTableFactory.Context context =
new FactoryUtil.DefaultDynamicTableContext(
Expand All @@ -74,9 +73,25 @@ public void testGetDynamicOptions() {
configuration,
AbstractFlinkTableFactoryTest.class.getClassLoader(),
false);
Map<String, String> options =
AbstractFlinkTableFactory.getDynamicTableConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2"));
Map<String, String> options = AbstractFlinkTableFactory.getDynamicConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v2"));

configuration = new Configuration();
configuration.setString("k1", "v2");
configuration.setString("k3", "v3");
configuration.setString("paimon.catalog1.db.T.k1", "v1");
configuration.setString("paimon.*.db.*.k2", "v2");
identifier = ObjectIdentifier.of("catalog1", "db", "T");
context =
new FactoryUtil.DefaultDynamicTableContext(
identifier,
null,
new HashMap<>(),
configuration,
AbstractFlinkTableFactoryTest.class.getClassLoader(),
false);
options = AbstractFlinkTableFactory.getDynamicConfigOptions(context);
assertThat(options).isEqualTo(ImmutableMap.of("k1", "v1", "k2", "v2", "k3", "v3"));
}

private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,9 @@ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
+ formatTable.format().name());
}
} else {
return new SparkTable(copyWithSQLConf(paimonTable, extraOptions));
return new SparkTable(
copyWithSQLConf(
paimonTable, catalogName, toIdentifier(ident), extraOptions));
}
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.paimon.spark

import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.{CatalogContext, CatalogUtils, Identifier}
import org.apache.paimon.options.Options
import org.apache.paimon.spark.SparkSource.NAME
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.sources.PaimonSink
import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf
import org.apache.paimon.spark.util.OptionUtils.{extractCatalogName, mergeSQLConfWithIdentifier}
import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory}
import org.apache.paimon.table.system.AuditLogTable

Expand Down Expand Up @@ -80,9 +82,15 @@ class SparkSource
}

private def loadTable(options: JMap[String, String]): DataTable = {
val path = CoreOptions.path(options)
val catalogContext = CatalogContext.create(
Options.fromMap(mergeSQLConf(options)),
SparkSession.active.sessionState.newHadoopConf())
Options.fromMap(
mergeSQLConfWithIdentifier(
options,
extractCatalogName().getOrElse(NAME),
Identifier.create(CatalogUtils.database(path), CatalogUtils.table(path)))),
SparkSession.active.sessionState.newHadoopConf()
)
val table = FileStoreTableFactory.create(catalogContext)
if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) {
new AuditLogTable(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,61 @@

package org.apache.paimon.spark.util

import org.apache.paimon.catalog.Identifier
import org.apache.paimon.table.Table

import org.apache.spark.sql.catalyst.SQLConfHelper

import java.util.{HashMap => JHashMap, Map => JMap}
import java.util.{Map => JMap}
import java.util.regex.Pattern

import scala.collection.JavaConverters._

object OptionUtils extends SQLConfHelper {

private val PAIMON_OPTION_PREFIX = "spark.paimon."
private val SPARK_CATALOG_PREFIX = "spark.sql.catalog."

def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = {
val mergedOptions = new JHashMap[String, String](
conf.getAllConfs
.filterKeys(_.startsWith(PAIMON_OPTION_PREFIX))
.map {
case (key, value) =>
key.stripPrefix(PAIMON_OPTION_PREFIX) -> value
}
.toMap
.asJava)
def extractCatalogName(): Option[String] = {
val sparkCatalogTemplate = String.format("%s([^.]*)$", SPARK_CATALOG_PREFIX)
val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate)
conf.getAllConfs.filterKeys(_.startsWith(SPARK_CATALOG_PREFIX)).foreach {
case (key, _) =>
val matcher = sparkCatalogPattern.matcher(key)
if (matcher.find())
return Option(matcher.group(1))
}
Option.empty
}

def mergeSQLConfWithIdentifier(
extraOptions: JMap[String, String],
catalogName: String,
ident: Identifier): JMap[String, String] = {
val tableOptionsTemplate = String.format(
"(%s)(%s|\\*)\\.(%s|\\*)\\.(%s|\\*)\\.(.+)",
PAIMON_OPTION_PREFIX,
catalogName,
ident.getDatabaseName,
ident.getObjectName)
val tableOptionsPattern = Pattern.compile(tableOptionsTemplate)
val mergedOptions = org.apache.paimon.options.OptionsUtils
.convertToDynamicTableProperties(
conf.getAllConfs.asJava,
PAIMON_OPTION_PREFIX,
tableOptionsPattern,
5)
mergedOptions.putAll(extraOptions)
mergedOptions
}

def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, String]): T = {
val mergedOptions = mergeSQLConf(extraOptions)
def copyWithSQLConf[T <: Table](
table: T,
catalogName: String,
ident: Identifier,
extraOptions: JMap[String, String]): T = {
val mergedOptions: JMap[String, String] =
mergeSQLConfWithIdentifier(extraOptions, catalogName, ident)
if (mergedOptions.isEmpty) {
table
} else {
Expand Down
Loading

0 comments on commit cbb9342

Please sign in to comment.