Skip to content

Commit

Permalink
[fix](hms)fix hive catalog follower not get event
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Jan 6, 2025
1 parent 04d7927 commit a50deaa
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,6 @@ public void checkProperties() throws DdlException {
throw new DdlException(
"The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond);
}
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) {
enableHmsEventsIncrementalSync =
properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true");
} else {
enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync;
}

if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) {
hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC));
} else {
hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc;
}

// check the dfs.ha properties
// 'dfs.nameservices'='your-nameservice',
// 'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
Expand Down Expand Up @@ -289,6 +275,20 @@ public void setDefaultPropsIfMissing(boolean isReplay) {
// always allow fallback to simple auth, so to support both kerberos and simple auth
catalogProperty.addProperty(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true");
}

Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) {
enableHmsEventsIncrementalSync =
properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true");
} else {
enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync;
}

if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) {
hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC));
} else {
hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private NotificationEventResponse getNextEventResponseForSlave(HMSExternalCatalo

// For slave FE nodes, only fetch events which id is lower than masterLastSyncedEventId
int maxEventSize = Math.min((int) (masterLastSyncedEventId - lastSyncedEventId),
Config.hms_events_batch_size_per_rpc);
hmsExternalCatalog.getHmsEventsBatchSizePerRpc());
try {
return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, maxEventSize, null);
} catch (MetastoreNotificationFetchException e) {
Expand Down

0 comments on commit a50deaa

Please sign in to comment.