Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Add Handling Logic for HBase Asynchronous Data Write Failures #8279

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
Expand All @@ -38,6 +39,7 @@
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
Expand All @@ -55,6 +57,7 @@
@Slf4j
public class HbaseClient {

private static final long RETRY_DELAY_MS = 1000;
private final Connection connection;
private final Admin admin;
private final BufferedMutator hbaseMutator;
Expand All @@ -77,7 +80,31 @@ private HbaseClient(Connection connection, HbaseParameters hbaseParameters) {
hbaseParameters.getNamespace(),
hbaseParameters.getTable()))
.pool(HTable.getDefaultExecutor(hbaseConfiguration))
.writeBufferSize(hbaseParameters.getWriteBufferSize());
.writeBufferSize(hbaseParameters.getWriteBufferSize())
.listener(
(e, mutator) -> {
for (int i = 0; i < e.getNumExceptions(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's happend when all retry failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's happend when all retry failed?

It is indeed a bug, and I haven't thought of a good way to prevent this situation from occurring. Moreover, there is currently no relevant log information when HBase write failures happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should handle this situation before merge this PR.

Copy link
Member

@Carl-Zhou-CN Carl-Zhou-CN Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the HBase client itself will also perform a certain number of retries.

Row row = e.getRow(i);
log.error("Failed to sent put {}.", row);
if (mutator != null
&& e.getCause()
instanceof NotServingRegionException) {
log.info(
"Retrying put {} after {} ms...",
row,
RETRY_DELAY_MS);
try {
Thread.sleep(RETRY_DELAY_MS);
mutator.mutate((Put) row);
} catch (IOException | InterruptedException ex) {
log.error(
"Unexpected exception during put {}.",
row,
e);
}
}
}
});
hbaseMutator = connection.getBufferedMutator(bufferedMutatorParams);
} catch (IOException e) {
throw new HbaseConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -120,7 +119,6 @@ private Put convertRowToPut(SeaTunnelRow row) {
.collect(Collectors.toList());
for (Integer writeColumnIndex : writeColumnIndexes) {
String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
Map<String, String> configurationFamilyNames = hbaseParameters.getFamilyNames();
String familyName =
hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName);
byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
Expand Down
Loading