From a50deaac0aaccfa2893e6023c72501910278d93e Mon Sep 17 00:00:00 2001 From: daidai Date: Mon, 6 Jan 2025 21:53:56 +0800 Subject: [PATCH] [fix](hms)fix hive catalog follower not get event --- .../datasource/hive/HMSExternalCatalog.java | 28 +++++++++---------- .../hive/event/MetastoreEventsProcessor.java | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 6c320aa4396919..07ad2a3e82a314 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -117,20 +117,6 @@ public void checkProperties() throws DdlException { throw new DdlException( "The parameter " + FILE_META_CACHE_TTL_SECOND + " is wrong, value is " + fileMetaCacheTtlSecond); } - Map properties = catalogProperty.getProperties(); - if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) { - enableHmsEventsIncrementalSync = - properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true"); - } else { - enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync; - } - - if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) { - hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)); - } else { - hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc; - } - // check the dfs.ha properties // 'dfs.nameservices'='your-nameservice', // 'dfs.ha.namenodes.your-nameservice'='nn1,nn2', @@ -289,6 +275,20 @@ public void setDefaultPropsIfMissing(boolean isReplay) { // always allow fallback to simple auth, so to support both kerberos and simple auth catalogProperty.addProperty(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true"); } + + Map properties = catalogProperty.getProperties(); + if (properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) { + enableHmsEventsIncrementalSync = + properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true"); + } else { + enableHmsEventsIncrementalSync = Config.enable_hms_events_incremental_sync; + } + + if (properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) { + hmsEventsBatchSizePerRpc = Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)); + } else { + hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc; + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 73054773402bad..23bf324360bed1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -269,7 +269,7 @@ private NotificationEventResponse getNextEventResponseForSlave(HMSExternalCatalo // For slave FE nodes, only fetch events which id is lower than masterLastSyncedEventId int maxEventSize = Math.min((int) (masterLastSyncedEventId - lastSyncedEventId), - Config.hms_events_batch_size_per_rpc); + hmsExternalCatalog.getHmsEventsBatchSizePerRpc()); try { return hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId, maxEventSize, null); } catch (MetastoreNotificationFetchException e) {