Skip to content

Commit

Permalink
[BugFix][Connector-V2][Maxcompute]fix:Maxcompute sink can't map field(a…
Browse files Browse the repository at this point in the history
  • Loading branch information
panpan2019 authored and chaorongzhi committed Aug 21, 2024
1 parent e337fc2 commit 50e4666
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
return new MaxcomputeWriter(this.pluginConfig);
return new MaxcomputeWriter(this.pluginConfig, this.typeInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
Expand All @@ -46,9 +47,11 @@ public class MaxcomputeWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private final TableTunnel.UploadSession session;
private final TableSchema tableSchema;
private static final Long BLOCK_0 = 0L;
private SeaTunnelRowType rowType;

public MaxcomputeWriter(Config pluginConfig) {
public MaxcomputeWriter(Config pluginConfig, SeaTunnelRowType rowType) {
try {
this.rowType = rowType;
Table table = MaxcomputeUtil.getTable(pluginConfig);
this.tableSchema = table.getSchema();
TableTunnel tunnel = MaxcomputeUtil.getTableTunnel(pluginConfig);
Expand Down Expand Up @@ -76,7 +79,9 @@ public MaxcomputeWriter(Config pluginConfig) {

@Override
public void write(SeaTunnelRow seaTunnelRow) throws IOException {
Record record = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, this.tableSchema);
Record record =
MaxcomputeTypeMapper.getMaxcomputeRowData(
seaTunnelRow, this.tableSchema, this.rowType);
recordWriter.write(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,23 @@ public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType typeI
return new SeaTunnelRow(fields.toArray());
}

public static Record getMaxcomputeRowData(SeaTunnelRow seaTunnelRow, TableSchema tableSchema) {
public static Record getMaxcomputeRowData(
SeaTunnelRow seaTunnelRow, TableSchema tableSchema, SeaTunnelRowType rowType) {
ArrayRecord arrayRecord = new ArrayRecord(tableSchema);
List<Column> columns = tableSchema.getColumns();
for (int i = 0; i < seaTunnelRow.getFields().length; i++) {
String fieldName = rowType.getFieldName(i);
if (!tableSchema.containsColumn(fieldName)) {
throw new MaxcomputeConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
String.format(
"field not found in written table: %s,rowType: %s",
fieldName, seaTunnelRow.getField(i)));
}
Column column = tableSchema.getColumn(fieldName);

arrayRecord.set(
i,
resolveObject2Maxcompute(
seaTunnelRow.getField(i), columns.get(i).getTypeInfo()));
tableSchema.getColumnIndex(fieldName),
resolveObject2Maxcompute(seaTunnelRow.getField(i), column.getTypeInfo()));
}
return arrayRecord;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private static void testType(
}

SeaTunnelRow seaTunnelRow = MaxcomputeTypeMapper.getSeaTunnelRowData(record, typeInfo);
Record tRecord = MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, tableSchema);
Record tRecord =
MaxcomputeTypeMapper.getMaxcomputeRowData(seaTunnelRow, tableSchema, typeInfo);

for (int i = 0; i < tRecord.getColumns().length; i++) {
Assertions.assertEquals(record.get(i), tRecord.get(i));
Expand Down

0 comments on commit 50e4666

Please sign in to comment.