Skip to content

Commit

Permalink
Clickstream Health refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
kshitij6325 committed Jul 27, 2023
1 parent 4dc002e commit 4238201
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@ import kotlinx.coroutines.flow.Flow
* 2) Returning stream of health events for processing.
* 3) Sending health event data to upstream listener.
*
* **Sequence Diagram**
* ```
* App Clickstream
* +---+---+---+---+---+---+ +---+---+---+---+---+---+
* | Sending Events | --------> | Received the Events |
* +---+---+---+---+---+---+ +---+---+---+---+---+---+
* |
* |
* | +---+---+---+---+---+---+---+---+----+
* if app on active state ---------> | - run the ticker with 10s delay |
* | | - collect events from db |
* | | - transform and send to backend |
* | +---+---+---+---+---+---+---+---+----+
* |
* | +---+---+---+---+---+---+---+---+---+---+----+
* else if app on inactive state --> | - run flushEvents and flushHealthMetrics |
* | - transform and send to backend |
* +---+---+---+---+---+---+---+---+---+----+---+
*```
* */
public interface CSHealthEventProcessor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ internal open class CSHealthEventProcessorImpl(
}

val data = events[0].copy(
eventGuid = eventId, eventBatchGuid = batchId, count = events.size
eventGuid = eventId, eventBatchGuid = batchId, count = list.size
).eventData()

pushEventToUpstream(events[0].eventName, data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import clickstream.lifecycle.CSAppLifeCycle
import clickstream.listener.CSEventListener
import clickstream.logger.CSLogger
import clickstream.report.CSReportDataTracker
import com.gojek.clickstream.internal.Health
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand Down Expand Up @@ -102,11 +103,14 @@ internal open class CSBackgroundScheduler(
logger.debug { "$tag#sendEvents" }
csSocketConnectionManager.connect()
if (!waitForNetwork()) {
doWithHealth {
logger.debug { "$tag#No connection, trying to send health logs to upstream if enabled" }
}
return false
}
flushAllEvents()
waitForAck()
flushHealthEvents()
doWithHealth { flushHealthEvents(it) }
return eventRepository.getEventCount() == 0
}

Expand Down Expand Up @@ -154,20 +158,24 @@ internal open class CSBackgroundScheduler(
}
}

private suspend fun flushHealthEvents() {
private suspend inline fun doWithHealth(crossinline exec: suspend (List<Health>) -> Unit) {
logger.debug { "$tag#flushHealthEvents" }
healthProcessor?.getHealthEventFlow(CSEventTypesConstant.AGGREGATE)?.collect { healthList ->
val healthMappedEvent = healthList.map { health ->
CSEvent(
guid = health.healthMeta.eventGuid,
timestamp = health.eventTimestamp,
message = health
)
}.map { CSEventData.create(it) }
logger.debug { "$tag#flushHealthEvents - healthEvents size ${healthMappedEvent.size}" }
if (healthMappedEvent.isNotEmpty()) {
forwardEvents(healthMappedEvent, forFlushing = true)
}
exec(healthList)
}
}

private suspend fun flushHealthEvents(healthList: List<Health>) {
val healthMappedEvent = healthList.map { health ->
CSEvent(
guid = health.healthMeta.eventGuid,
timestamp = health.eventTimestamp,
message = health
)
}.map { CSEventData.create(it) }
logger.debug { "$tag#flushHealthEvents - healthEvents size ${healthMappedEvent.size}" }
if (healthMappedEvent.isNotEmpty()) {
forwardEvents(healthMappedEvent, forFlushing = true)
}
}

Expand Down

0 comments on commit 4238201

Please sign in to comment.