From ccc9b1a29d4e15e721382073f90b831777215ae7 Mon Sep 17 00:00:00 2001 From: shfshihuafeng Date: Sun, 3 Mar 2024 08:30:50 +0800 Subject: [PATCH] DRILL-8482:Assign region throw exception when some region is deployed on affinity node and some on non-affinity node (#2885) --- .../exec/store/hbase/HBaseGroupScan.java | 2 +- .../hbase/TestHBaseRegionScanAssignments.java | 28 +++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 1ad879f7eb6..cec24bf0adf 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -267,7 +267,7 @@ public void applyAssignments(List incomingEndpoints) { PriorityQueue> minHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR); PriorityQueue> maxHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR_REV); for(List listOfScan : endpointFragmentMapping.values()) { - if (listOfScan.size() < minPerEndpointSlot) { + if (listOfScan.size() < maxPerEndpointSlot) { minHeap.offer(listOfScan); } else if (listOfScan.size() > minPerEndpointSlot) { maxHeap.offer(listOfScan); diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java index 216eb9104c5..877bf581c0f 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseRegionScanAssignments.java @@ -188,6 +188,34 @@ public void testHBaseGroupScanAssignmentSomeAfinedWithOrphans() throws Exception testParallelizationWidth(scan, endpoints.size()); } + @Test + public void testHBaseGroupScanAssignmentSomeAfinedAndSomeWithOrphans() throws Exception { + NavigableMap regionsToScan = Maps.newTreeMap(); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[0], splits[1]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[1], splits[2]), SERVER_A); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[2], splits[3]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[3], splits[4]), SERVER_B); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[6], splits[7]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[7], splits[8]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[8], splits[9]), SERVER_D); + regionsToScan.put(new HRegionInfo(TABLE_NAME, splits[9], splits[10]), SERVER_D); + final List endpoints = Lists.newArrayList(); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_A).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_B).setControlPort(1234).build()); + endpoints.add(DrillbitEndpoint.newBuilder().setAddress(HOST_C).setControlPort(1234).build()); + + HBaseGroupScan scan = new HBaseGroupScan(); + scan.setRegionsToScan(regionsToScan); + scan.setHBaseScanSpec(new HBaseScanSpec(TABLE_NAME_STR, splits[0], splits[0], null)); + scan.applyAssignments(endpoints); + + int i = 0; + assertEquals(3, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'A' + assertEquals(2, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'B' + assertEquals(3, scan.getSpecificScan(i++).getRegionScanSpecList().size()); // 'C' + testParallelizationWidth(scan, i); + } + @Test public void testHBaseGroupScanAssignmentOneEach() throws Exception { NavigableMap regionsToScan = Maps.newTreeMap();