Skip to content

Commit

Permalink
address
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 committed Sep 20, 2024
1 parent 4f2e24d commit a4c7a10
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 19 deletions.
2 changes: 1 addition & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ This section introduce all available spark procedures about paimon.
<li>options_map: Options map for adding key-value options which is a map.</li>
<li>parallelism: the parallelism for migrate process, default is core numbers of machine.</li>
</td>
<td>CALL sys.migrate_database(source_type => 'hive', table => 'db', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)</td>
<td>CALL sys.migrate_database(source_type => 'hive', db => 'db01', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)</td>
</tr>
<tr>
<td>migrate_table</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/** Migrate procedure to migrate all hive tables in database to paimon table. */
public class MigrateDatabaseProcedure extends ProcedureBase {

private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class);

@Override
public String identifier() {
return "migrate_database";
Expand All @@ -54,12 +58,25 @@ public String[] call(
Runtime.getRuntime().availableProcessors(),
ParameterUtils.parseCommaSeparatedKeyValues(properties));

int errorCount = 0;
int successCount = 0;

for (Migrator migrator : migrators) {
migrator.executeMigrate();
migrator.renameTable(false);
try {
migrator.executeMigrate();
migrator.renameTable(false);
successCount++;
} catch (Exception e) {
errorCount++;
LOG.error("Call migrate_database error:" + e.getMessage());
}
}
String retStr =
String.format(
"migrate database is finished, success cnt: %s , failed cnt: %s",
String.valueOf(successCount), String.valueOf(errorCount));

return new String[] {"Success"};
return new String[] {retStr};
}

public String[] call(
Expand All @@ -78,11 +95,24 @@ public String[] call(
p,
ParameterUtils.parseCommaSeparatedKeyValues(properties));

int errorCount = 0;
int successCount = 0;

for (Migrator migrator : migrators) {
migrator.executeMigrate();
migrator.renameTable(false);
try {
migrator.executeMigrate();
migrator.renameTable(false);
successCount++;
} catch (Exception e) {
errorCount++;
LOG.error("Call migrate_database error:" + e.getMessage());
}
}
String retStr =
String.format(
"migrate database is finished, success cnt: %s , failed cnt: %s",
String.valueOf(successCount), String.valueOf(errorCount));

return new String[] {"Success"};
return new String[] {retStr};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/** Migrate procedure to migrate all hive tables in database to paimon table. */
public class MigrateDatabaseProcedure extends ProcedureBase {

private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class);

@Override
public String identifier() {
return "migrate_database";
Expand Down Expand Up @@ -64,11 +68,24 @@ public String[] call(
p,
ParameterUtils.parseCommaSeparatedKeyValues(properties));

int errorCount = 0;
int successCount = 0;

for (Migrator migrator : migrators) {
migrator.executeMigrate();
migrator.renameTable(false);
try {
migrator.executeMigrate();
migrator.renameTable(false);
successCount++;
} catch (Exception e) {
errorCount++;
LOG.error("Call migrate_database error:" + e.getMessage());
}
}
String retStr =
String.format(
"migrate database is finished, success cnt: %s , failed cnt: %s",
String.valueOf(successCount), String.valueOf(errorCount));

return new String[] {"Success"};
return new String[] {retStr};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
Expand All @@ -48,6 +50,8 @@
*/
public class MigrateDatabaseProcedure extends BaseProcedure {

private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("source_type", StringType),
Expand All @@ -61,7 +65,7 @@ public class MigrateDatabaseProcedure extends BaseProcedure {
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
new StructField("result", StringType, true, Metadata.empty())
});

protected MigrateDatabaseProcedure(TableCatalog tableCatalog) {
Expand Down Expand Up @@ -93,20 +97,29 @@ public InternalRow[] call(InternalRow args) {
Map<String, String> options = ParameterUtils.parseCommaSeparatedKeyValues(properties);
options.putAll(optionMap);

try {
List<Migrator> migrators =
TableMigrationUtils.getImporters(
format, paimonCatalog, database, parallelism, options);
List<Migrator> migrators =
TableMigrationUtils.getImporters(
format, paimonCatalog, database, parallelism, options);

int errorCount = 0;
int successCount = 0;

for (Migrator migrator : migrators) {
for (Migrator migrator : migrators) {
try {
migrator.executeMigrate();
migrator.renameTable(false);
successCount++;
} catch (Exception e) {
errorCount++;
LOG.error("Call migrate_database error:" + e.getMessage());
}
} catch (Exception e) {
throw new RuntimeException("Call migrate_database error: " + e.getMessage(), e);
}
String retStr =
String.format(
"migrate database is finished, success cnt: %s , failed cnt: %s",
String.valueOf(successCount), String.valueOf(errorCount));

return new InternalRow[] {newInternalRow(true)};
return new InternalRow[] {newInternalRow(retStr)};
}

public static Map<String, String> mapDataToHashMap(MapData mapData) {
Expand Down

0 comments on commit a4c7a10

Please sign in to comment.