From 0cc41c620b96a4798adbe2237e48b67269e3e509 Mon Sep 17 00:00:00 2001
From: Nishant Goel <nisgoel@amazon.com>
Date: Mon, 30 Sep 2024 00:14:37 +0530
Subject: [PATCH] Adding additional debug and trace logs in cluster state fetch
 path and replication path between leader and follower cluster

Signed-off-by: Nishant Goel <nisgoel@amazon.com>
---
 .../replication/action/changes/TransportGetChangesAction.kt | 5 ++++-
 .../action/index/TransportReplicateIndexAction.kt           | 4 ++++
 .../TransportReplicateIndexClusterManagerNodeAction.kt      | 6 +++++-
 .../replication/task/autofollow/AutoFollowTask.kt           | 1 +
 .../replication/task/shard/ShardReplicationTask.kt          | 2 ++
 .../kotlin/org/opensearch/replication/util/Extensions.kt    | 1 +
 6 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt
index 4e3649067..e8b0a17eb 100644
--- a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt
+++ b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt
@@ -67,8 +67,10 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
 
     @Suppress("BlockingMethodInNonBlockingContext")
     override fun asyncShardOperation(request: GetChangesRequest, shardId: ShardId, listener: ActionListener<GetChangesResponse>) {
+        log.debug("calling asyncShardOperation method")
         GlobalScope.launch(threadPool.coroutineContext(REPLICATION_EXECUTOR_NAME_LEADER)) {
             // TODO: Figure out if we need to acquire a primary permit here
+            log.debug("$REPLICATION_EXECUTOR_NAME_LEADER coroutine has initiated")
             listener.completeWith {
                 var relativeStartNanos  = System.nanoTime()
                 remoteStatsService.stats[shardId] = remoteStatsService.stats.getOrDefault(shardId, RemoteShardMetric())
@@ -82,8 +84,9 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
                     // There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
                     // the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
                     // should catch and start a new poll.
+                    log.trace("Waiting for global checkpoint to advance from ${request.fromSeqNo} Sequence Number")
                     val gcp = indexShard.waitForGlobalCheckpoint(request.fromSeqNo, WAIT_FOR_NEW_OPS_TIMEOUT)
-
+                    log.trace("Waiting for global checkpoint to advance is finished for ${request.fromSeqNo} Sequence Number")
                     // At this point indexShard.lastKnownGlobalCheckpoint  has advanced but it may not yet have been synced
                     // to the translog, which means we can't return those changes. Return to the caller to retry.
                     // TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
diff --git a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt
index 8b2de1ea2..583b514ea 100644
--- a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt
+++ b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt
@@ -85,10 +85,12 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
                 // Any checks on the settings is followed by setup checks to ensure all relevant changes are
                 // present across the plugins
                 // validate index metadata on the leader cluster
+                log.debug("Fetching leader cluster state for ${request.leaderIndex} index.")
                 val leaderClusterState = getLeaderClusterState(request.leaderAlias, request.leaderIndex)
                 ValidationUtil.validateLeaderIndexState(request.leaderAlias, request.leaderIndex, leaderClusterState)
 
                 val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex)
+                log.debug("Leader settings were fetched for ${request.leaderIndex} index.")
 
                 if (leaderSettings.keySet().contains(ReplicationPlugin.REPLICATED_INDEX_SETTING.key) and
                         !leaderSettings.get(ReplicationPlugin.REPLICATED_INDEX_SETTING.key).isNullOrBlank()) {
@@ -113,7 +115,9 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
                 // Setup checks are successful and trigger replication for the index
                 // permissions evaluation to trigger replication is based on the current security context set
                 val internalReq = ReplicateIndexClusterManagerNodeRequest(user, request)
+                log.debug("Starting replication index action on current master node")
                 client.suspendExecute(ReplicateIndexClusterManagerNodeAction.INSTANCE, internalReq)
+                log.debug("Response of start replication action is returned")
                 ReplicateIndexResponse(true)
             }
         }
diff --git a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt
index 042509f24..ee1fbfad7 100644
--- a/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt
+++ b/src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexClusterManagerNodeAction.kt
@@ -97,7 +97,9 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
                     throw OpenSearchStatusException("[FORBIDDEN] Replication START block is set", RestStatus.FORBIDDEN)
                 }
 
+                log.debug("Making request to get metadata of ${replicateIndexReq.leaderIndex} index on remote cluster")
                 val remoteMetadata = getRemoteIndexMetadata(replicateIndexReq.leaderAlias, replicateIndexReq.leaderIndex)
+                log.debug("Response returned of the request made to get metadata of ${replicateIndexReq.leaderIndex} index on remote cluster")
 
                 if (state.routingTable.hasIndex(replicateIndexReq.followerIndex)) {
                     throw IllegalArgumentException("Cant use same index again for replication. " +
@@ -115,6 +117,7 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
                         ReplicationOverallState.RUNNING, user, replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE, null),
                         replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.LEADER_CLUSTER_ROLE, null), replicateIndexReq.settings)
 
+                log.debug("Starting index replication task in persistent task service with name: replication:index:${replicateIndexReq.followerIndex}")
                 val task = persistentTasksService.startTask("replication:index:${replicateIndexReq.followerIndex}",
                         IndexReplicationExecutor.TASK_NAME, params)
 
@@ -123,13 +126,14 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
                     listener.onResponse(ReplicateIndexResponse(false))
                 }
 
+                log.debug("Waiting for persistent task to move to following state")
                 // Now wait for the replication to start and the follower index to get created before returning
                 persistentTasksService.waitForTaskCondition(task.id, replicateIndexReq.timeout()) { t ->
                     val replicationState = (t.state as IndexReplicationState?)?.state
                     replicationState == ReplicationState.FOLLOWING ||
                             (!replicateIndexReq.waitForRestore && replicationState == ReplicationState.RESTORING)
                 }
-
+                log.debug("Persistent task is moved to following replication state")
                 listener.onResponse(AcknowledgedResponse(true))
             } catch (e: Exception) {
                 log.error("Failed to trigger replication for ${replicateIndexReq.followerIndex} - ${e.stackTraceToString()}")
diff --git a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt
index ca376e59f..8454e4299 100644
--- a/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt
+++ b/src/main/kotlin/org/opensearch/replication/task/autofollow/AutoFollowTask.kt
@@ -211,6 +211,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
                 throw ReplicationException("Failed to auto follow leader index $leaderIndex")
             }
             successStart = true
+            log.debug("Auto follow has started replication from ${leaderAlias}:$leaderIndex -> $leaderIndex")
         } catch (e: OpenSearchSecurityException) {
             // For permission related failures, Adding as part of failed indices as autofollow role doesn't have required permissions.
             log.trace("Cannot start replication on $leaderIndex due to missing permissions $e")
diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt
index c41ee0845..5a7cd43c2 100644
--- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt
+++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt
@@ -179,6 +179,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
             logDebug("Cluster metadata listener invoked on shard task...")
             if (event.metadataChanged()) {
                 val replicationStateParams = getReplicationStateParamsForIndex(clusterService, followerShardId.indexName)
+                logDebug("Replication State Params are fetched from cluster state")
                 if (replicationStateParams == null) {
                     if (PersistentTasksNodeService.Status(State.STARTED) == status)
                         cancelTask("Shard replication task received an interrupt.")
@@ -301,6 +302,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
     private suspend fun getChanges(fromSeqNo: Long, toSeqNo: Long): GetChangesResponse {
         val remoteClient = client.getRemoteClusterClient(leaderAlias)
         val request = GetChangesRequest(leaderShardId, fromSeqNo, toSeqNo)
+
         var changesResp =  remoteClient.suspendExecuteWithRetries(replicationMetadata = replicationMetadata,
                 action = GetChangesAction.INSTANCE, req = request, log = log)
         followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = changesResp.lastSyncedGlobalCheckpoint
diff --git a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt
index 81aaa4464..050fc35e9 100644
--- a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt
+++ b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt
@@ -115,6 +115,7 @@ suspend fun <Req: ActionRequest, Resp: ActionResponse> Client.suspendExecuteWith
     var retryException: Exception
     repeat(numberOfRetries - 1) { index ->
         try {
+            log.debug("Sending get changes request after ${currentBackoff / 1000} seconds.")
             return suspendExecute(replicationMetadata, action, req,
                     injectSecurityContext = injectSecurityContext, defaultContext = defaultContext)
         } catch (e: OpenSearchException) {