Skip to content

Commit

Permalink
Fix #183 by sorting multi-action RPCs by row key as well as region so…
Browse files Browse the repository at this point in the history
… that

if a later version of HBase is used that sorts the requests on receipt then
the RPCs will still match up with the responses.

Signed-off-by: Chris Larsen <[email protected]>
  • Loading branch information
manolama committed Mar 18, 2018
1 parent 147b4e9 commit 4cba6bf
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 10 deletions.
27 changes: 19 additions & 8 deletions src/MultiAction.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2010-2016 The Async HBase Authors. All rights reserved.
* Copyright (C) 2010-2018 The Async HBase Authors. All rights reserved.
* This file is part of Async HBase.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -45,6 +45,11 @@
* <p>
* This RPC is guaranteed to be sent atomically (but HBase doesn't guarantee
* that it will apply it atomically).
* <p>
* Before serializing, the RPCs are sorted by region and (as of 1.8.1)
* by row key (to fix issues with
* https://issues.apache.org/jira/browse/HBASE-17924) so that the
* responses can be matched to the original RPC.
*/
final class MultiAction extends HBaseRpc implements HBaseRpc.IsEdit {

Expand All @@ -70,7 +75,7 @@ final class MultiAction extends HBaseRpc implements HBaseRpc.IsEdit {
private static final byte[] MULTI = { 'm', 'u', 'l', 't', 'i' };

/** RPC method name for HBase 0.95 and above. */
private static final byte[] MMULTI = { 'M', 'u', 'l', 't', 'i' };
static final byte[] MMULTI = { 'M', 'u', 'l', 't', 'i' };

/** Template for NSREs. */
private static final NotServingRegionException NSRE =
Expand Down Expand Up @@ -214,7 +219,7 @@ ChannelBuffer serialize(final byte server_version) {
}

// we create a new RegionAction for each region.
Collections.sort(batch, SORT_BY_REGION);
Collections.sort(batch, SORT_BY_REGION_AND_KEY);
final MultiRequest.Builder req = MultiRequest.newBuilder();
RegionAction.Builder actions = null;
byte[] prev_region = HBaseClient.EMPTY_ARRAY;
Expand Down Expand Up @@ -508,24 +513,30 @@ public int compare(final BatchableRpc a, final BatchableRpc b) {
}

/**
* Sorts {@link BatchableRpc}s appropriately for HBase 0.95+ multi-action.
* Sorts {@link BatchableRpc}s appropriately for HBase 0.95+ multi-action
* as well as 1.3x where the response is also sorted by key.
*/
static final RegionComparator SORT_BY_REGION = new RegionComparator();
static final RegionAndKeyComparator SORT_BY_REGION_AND_KEY =
new RegionAndKeyComparator();

/**
* Sorts {@link BatchableRpc}s by region.
* Used with HBase 0.95+ only.
*/
private static final class RegionComparator
private static final class RegionAndKeyComparator
implements Comparator<BatchableRpc> {

private RegionComparator() { // Can't instantiate outside of this class.
private RegionAndKeyComparator() { // Can't instantiate outside of this class.
}

@Override
/** Compares two RPCs. */
public int compare(final BatchableRpc a, final BatchableRpc b) {
return Bytes.memcmp(a.getRegion().name(), b.getRegion().name());
int region_cmp = Bytes.memcmp(a.getRegion().name(), b.getRegion().name());
if (region_cmp != 0) {
return region_cmp;
}
return Bytes.memcmp(a.key, b.key);
}

}
Expand Down
49 changes: 47 additions & 2 deletions test/TestMultiAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
package org.hbase.async;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
Expand All @@ -40,7 +41,9 @@
import java.util.Collections;
import java.util.List;

import org.hbase.async.generated.ClientPB.MultiRequest;
import org.hbase.async.generated.ClientPB.MultiResponse;
import org.hbase.async.generated.ClientPB.RegionAction;
import org.hbase.async.generated.ClientPB.RegionActionResult;
import org.hbase.async.generated.ClientPB.ResultOrException;
import org.hbase.async.generated.ClientPB.RegionActionResult.Builder;
Expand Down Expand Up @@ -212,6 +215,48 @@ public void addErrors() throws Exception {
}

// NOTE: The following are tests for HBase 0.96 and up

@Test
public void serdesOrdering() throws Exception {
PutRequest put1 = new PutRequest(TABLE, concat(KEY, new byte[] { 2 }),
FAMILY, QUALIFIER, VALUE);
put1.region = region;
PutRequest put2 = new PutRequest(TABLE, KEY, FAMILY, "myqual".getBytes(), VALUE);
put2.region = region;
PutRequest put3 = new PutRequest(TABLE, concat(KEY, new byte[] { 2 }),
FAMILY, "myqual".getBytes(), VALUE);
put3.region = region2;
PutRequest put4 = new PutRequest(TABLE, KEY, FAMILY, "myqual".getBytes(), VALUE);
put4.region = region2;
MultiAction multi = new MultiAction();
multi.add(put1);
multi.add(put2);
multi.add(put3);
multi.add(put4);

ChannelBuffer buffer = multi.serialize(RegionClient.SERVER_VERSION_095_OR_ABOVE);
buffer.readerIndex(4 + 19 + MultiAction.MMULTI.length);
HBaseRpc.readProtoBufVarint(buffer);
byte[] bytes = new byte[buffer.writerIndex() - buffer.readerIndex()];
buffer.readBytes(bytes);
MultiRequest parsed = MultiRequest.parseFrom(bytes);
assertEquals(2, parsed.getRegionActionCount());

RegionAction actions = parsed.getRegionAction(0);
assertEquals(2, actions.getActionCount());
assertArrayEquals(region.name(), actions.getRegion().getValue().toByteArray());
assertArrayEquals(KEY, actions.getAction(0).getMutation().getRow().toByteArray());
assertArrayEquals(concat(KEY, new byte[] { 2 }),
actions.getAction(1).getMutation().getRow().toByteArray());

actions = parsed.getRegionAction(1);
assertEquals(2, actions.getActionCount());
assertArrayEquals(region2.name(), actions.getRegion().getValue().toByteArray());
assertArrayEquals(KEY, actions.getAction(0).getMutation().getRow().toByteArray());
assertArrayEquals(concat(KEY, new byte[] { 2 }),
actions.getAction(1).getMutation().getRow().toByteArray());
}

@Test
public void deserializePuts() throws Exception {
final List<ResultOrException> results = new ArrayList<ResultOrException>(2);
Expand Down Expand Up @@ -870,7 +915,7 @@ public void deserializeMultiRegionOneFailed() throws Exception {
multi.add(put2);
multi.add(put3);
multi.add(put4);
Collections.sort(multi.batch(), MultiAction.SORT_BY_REGION);
Collections.sort(multi.batch(), MultiAction.SORT_BY_REGION_AND_KEY);

final MultiAction.Response decoded =
(MultiAction.Response)multi.deserialize(
Expand Down Expand Up @@ -916,7 +961,7 @@ public void deserializeMultiRegionTwoFailed() throws Exception {
multi.add(put2);
multi.add(put3);
multi.add(put4);
Collections.sort(multi.batch(), MultiAction.SORT_BY_REGION);
Collections.sort(multi.batch(), MultiAction.SORT_BY_REGION_AND_KEY);

final MultiAction.Response decoded =
(MultiAction.Response)multi.deserialize(
Expand Down

0 comments on commit 4cba6bf

Please sign in to comment.