diff --git a/clickstream-health-metrics-api/src/main/kotlin/clickstream/health/intermediate/CSHealthEventProcessor.kt b/clickstream-health-metrics-api/src/main/kotlin/clickstream/health/intermediate/CSHealthEventProcessor.kt index 6fe5a9f6..0783be7d 100644 --- a/clickstream-health-metrics-api/src/main/kotlin/clickstream/health/intermediate/CSHealthEventProcessor.kt +++ b/clickstream-health-metrics-api/src/main/kotlin/clickstream/health/intermediate/CSHealthEventProcessor.kt @@ -12,6 +12,25 @@ import kotlinx.coroutines.flow.Flow * 2) Returning stream of health events for processing. * 3) Sending health event data to upstream listener. * + * **Sequence Diagram** + * ``` + * App Clickstream + * +---+---+---+---+---+---+ +---+---+---+---+---+---+ + * | Sending Events | --------> | Received the Events | + * +---+---+---+---+---+---+ +---+---+---+---+---+---+ + * | + * | + * | +---+---+---+---+---+---+---+---+----+ + * if app on active state ---------> | - run the ticker with 10s delay | + * | | - collect events from db | + * | | - transform and send to backend | + * | +---+---+---+---+---+---+---+---+----+ + * | + * | +---+---+---+---+---+---+---+---+---+---+----+ + * else if app on inactive state --> | - run flushEvents and flushHealthMetrics | + * | - transform and send to backend | + * +---+---+---+---+---+---+---+---+---+----+---+ + *``` * */ public interface CSHealthEventProcessor { diff --git a/clickstream-health-metrics/src/main/kotlin/clickstream/health/internal/processor/CSHealthEventProcessorImpl.kt b/clickstream-health-metrics/src/main/kotlin/clickstream/health/internal/processor/CSHealthEventProcessorImpl.kt index 4817d9af..44ae5187 100644 --- a/clickstream-health-metrics/src/main/kotlin/clickstream/health/internal/processor/CSHealthEventProcessorImpl.kt +++ b/clickstream-health-metrics/src/main/kotlin/clickstream/health/internal/processor/CSHealthEventProcessorImpl.kt @@ -158,7 +158,7 @@ internal open class CSHealthEventProcessorImpl( } val data = events[0].copy( - eventGuid = eventId, eventBatchGuid = batchId, count = events.size + eventGuid = eventId, eventBatchGuid = batchId, count = list.size ).eventData() pushEventToUpstream(events[0].eventName, data) diff --git a/clickstream/src/main/kotlin/clickstream/internal/eventscheduler/CSBackgroundScheduler.kt b/clickstream/src/main/kotlin/clickstream/internal/eventscheduler/CSBackgroundScheduler.kt index 604cf1fe..825b01a3 100644 --- a/clickstream/src/main/kotlin/clickstream/internal/eventscheduler/CSBackgroundScheduler.kt +++ b/clickstream/src/main/kotlin/clickstream/internal/eventscheduler/CSBackgroundScheduler.kt @@ -20,6 +20,7 @@ import clickstream.lifecycle.CSAppLifeCycle import clickstream.listener.CSEventListener import clickstream.logger.CSLogger import clickstream.report.CSReportDataTracker +import com.gojek.clickstream.internal.Health import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -102,11 +103,14 @@ internal open class CSBackgroundScheduler( logger.debug { "$tag#sendEvents" } csSocketConnectionManager.connect() if (!waitForNetwork()) { + doWithHealth { + logger.debug { "$tag#No connection, trying to send health logs to upstream if enabled" } + } return false } flushAllEvents() waitForAck() - flushHealthEvents() + doWithHealth { flushHealthEvents(it) } return eventRepository.getEventCount() == 0 } @@ -154,20 +158,24 @@ internal open class CSBackgroundScheduler( } } - private suspend fun flushHealthEvents() { + private suspend inline fun doWithHealth(crossinline exec: suspend (List) -> Unit) { logger.debug { "$tag#flushHealthEvents" } healthProcessor?.getHealthEventFlow(CSEventTypesConstant.AGGREGATE)?.collect { healthList -> - val healthMappedEvent = healthList.map { health -> - CSEvent( - guid = health.healthMeta.eventGuid, - timestamp = health.eventTimestamp, - message = health - ) - }.map { CSEventData.create(it) } - logger.debug { "$tag#flushHealthEvents - healthEvents size ${healthMappedEvent.size}" } - if (healthMappedEvent.isNotEmpty()) { - forwardEvents(healthMappedEvent, forFlushing = true) - } + exec(healthList) + } + } + + private suspend fun flushHealthEvents(healthList: List) { + val healthMappedEvent = healthList.map { health -> + CSEvent( + guid = health.healthMeta.eventGuid, + timestamp = health.eventTimestamp, + message = health + ) + }.map { CSEventData.create(it) } + logger.debug { "$tag#flushHealthEvents - healthEvents size ${healthMappedEvent.size}" } + if (healthMappedEvent.isNotEmpty()) { + forwardEvents(healthMappedEvent, forFlushing = true) } }