Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding disk buffering, part 3 #194

Merged
merged 16 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ val libs = extensions.getByType<VersionCatalogsExtension>().named("libs")
dependencies {
implementation(libs.findLibrary("findbugs-jsr305").get())
testImplementation(libs.findLibrary("assertj-core").get())
testImplementation(libs.findBundle("mockito").get())
testImplementation(libs.findBundle("mocking").get())
testImplementation(libs.findBundle("junit").get())
testImplementation(libs.findLibrary("opentelemetry-sdk-testing").get())
coreLibraryDesugaring(libs.findLibrary("desugarJdkLibs").get())
Expand Down
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ androidx-test-core = "androidx.test:core:1.5.0"
androidx-test-runner = "androidx.test:runner:1.5.2"
mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" }
mockk = "io.mockk:mockk:1.13.7"
junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit" }
junit-jupiter-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit" }
junit-vintage-engine = { module = "org.junit.vintage:junit-vintage-engine", version.ref = "junit" }
Expand All @@ -59,7 +60,7 @@ byteBuddy-plugin = { module = "net.bytebuddy:byte-buddy-gradle-plugin", version.
kotlin-plugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }

[bundles]
mockito = ["mockito-core", "mockito-junit-jupiter"]
mocking = ["mockito-core", "mockito-junit-jupiter", "mockk"]
junit = ["junit-jupiter-api", "junit-jupiter-engine", "junit-vintage-engine"]

[plugins]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import android.app.Application;
import android.util.Log;
import io.opentelemetry.android.config.DiskBufferingConfiguration;
import io.opentelemetry.android.config.OtelRumConfig;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
import io.opentelemetry.android.instrumentation.InstrumentedApplication;
import io.opentelemetry.android.instrumentation.activity.VisibleScreenTracker;
import io.opentelemetry.android.instrumentation.network.CurrentNetworkProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.android.config;

import io.opentelemetry.android.ScreenAttributesSpanProcessor;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
import io.opentelemetry.android.instrumentation.network.CurrentNetworkProvider;
import io.opentelemetry.api.common.Attributes;
import java.util.function.Supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.config;
package io.opentelemetry.android.features.diskbuffering;
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved

/** Configuration for disk buffering. */
public final class DiskBufferingConfiguration {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.features.diskbuffering.scheduler

import io.opentelemetry.android.internal.services.ServiceManager
import io.opentelemetry.android.internal.services.periodicwork.PeriodicWorkService
import java.util.concurrent.atomic.AtomicBoolean

class DefaultExportScheduleHandler : ExportScheduleHandler {
private val enabled = AtomicBoolean(false)

override fun enable() {
if (!enabled.getAndSet(true)) {
ServiceManager.get().getService(PeriodicWorkService::class.java)
.enqueue(DefaultExportScheduler())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the DefaultExportScheduler an instance field passed thru constructor? Both good DI practice and saves object creation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that sounds way better! I'll add the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it's updated now.

}
}

override fun disable() {
// No operation.
LikeTheSalad marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.features.diskbuffering.scheduler

import io.opentelemetry.android.internal.services.periodicwork.PeriodicRunnable
import java.util.concurrent.TimeUnit

class DefaultExportScheduler : PeriodicRunnable() {
companion object {
private val DELAY_BEFORE_NEXT_EXPORT_IN_MILLIS = TimeUnit.SECONDS.toMillis(10)
}

override fun onRun() {
// TODO for next PR.
}

override fun shouldStopRunning(): Boolean {
return false
}

override fun minimumDelayUntilNextRunInMillis(): Long {
return DELAY_BEFORE_NEXT_EXPORT_IN_MILLIS
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.features.diskbuffering.scheduler

/**
* Sets up a scheduling mechanism to read and export previously stored signals in disk.
*/
interface ExportScheduleHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR:
If using the Kotlin explicitApi mode, all those interfaces have to be made public, then you can use the org.jetbrains.kotlinx.binary-compatibility-validator plugin to generate a public API file its quite nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds really nice 👍 I think we should create an issue to address it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marandaneto do you mind creating an issue for this? I'm not familiar with it and would love to see an example in action. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, #217
It's quite simple, the plugin does all the magic, all you need to do is install the plugin and make the commands part of the CI pipeline, so the CI generates the API dump in every CI run if any, the check is already done automatically part of the builtin gradle check.

/**
* Start/Set up the exporting schedule. Called when the disk buffering feature gets enabled.
*/
fun enable()

/**
* Don't start (or stop if something was previously started) the exporting schedule, no look up
* for data stored in the disk to export will be carried over if this function is called.
* This will be called if the disk buffering feature gets disabled.
*/
fun disable()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import android.util.Log;
import io.opentelemetry.android.RumConstants;
import io.opentelemetry.android.config.DiskBufferingConfiguration;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
import io.opentelemetry.android.internal.services.CacheStorageService;
import io.opentelemetry.android.internal.services.PreferencesService;
import io.opentelemetry.android.internal.services.ServiceManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.internal.services.periodicwork

import io.opentelemetry.android.internal.services.ServiceManager
import io.opentelemetry.android.internal.tools.time.SystemTime

/**
* Utility for creating a Runnable that needs to run multiple times.
*
* <p>This class is internal and not for public use. Its APIs are unstable and can change at any
* time.
*/
abstract class PeriodicRunnable : Runnable {
private var lastTimeItRan: Long? = null

final override fun run() {
if (isReadyToRun()) {
onRun()
lastTimeItRan = getCurrentTimeMillis()
}
if (!shouldStopRunning()) {
enqueueForNextLoop()
}
}

private fun isReadyToRun(): Boolean {
return lastTimeItRan?.let {
getCurrentTimeMillis() >= (it + minimumDelayUntilNextRunInMillis())
} ?: true
}

private fun enqueueForNextLoop() {
ServiceManager.get().getService(PeriodicWorkService::class.java).enqueue(this)
}

private fun getCurrentTimeMillis() = SystemTime.get().getCurrentTimeMillis()

/**
* Called only if a) The runnable has never run before, OR b) The minimum amount of time delay has passed after the last run.
*/
abstract fun onRun()

/**
* Should return FALSE when further runs are needed, TRUE if no need for this task to ever run again.
*/
abstract fun shouldStopRunning(): Boolean

/**
* The minimum amount of time to wait between runs, it might take longer than what's defined here
* to run this task again depending on when the next batch of background work will get submitted.
*/
abstract fun minimumDelayUntilNextRunInMillis(): Long
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.internal.services.periodicwork

import android.os.Handler
import android.os.Looper
import io.opentelemetry.android.internal.services.Service
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

/**
* Utility to run periodic background work.
*
* <p>This class is internal and not for public use. Its APIs are unstable and can change at any
* time.
*/
class PeriodicWorkService : Service {
private val delegator = WorkerDelegator()
private val started = AtomicBoolean(false)

override fun start() {
if (!started.getAndSet(true)) {
delegator.run()
}
}

fun enqueue(runnable: Runnable) {
delegator.enqueue(runnable)
}

private class WorkerDelegator : Runnable {
companion object {
private const val SECONDS_TO_KILL_IDLE_THREADS = 30L
private const val SECONDS_FOR_NEXT_LOOP = 10L
private const val MAX_AMOUNT_OF_WORKER_THREADS = 1
private const val NUMBER_OF_PERMANENT_WORKER_THREADS = 0
}

private val queue = LinkedBlockingQueue<Runnable>()
private val handler = Handler(Looper.getMainLooper())
private val executor =
ThreadPoolExecutor(
NUMBER_OF_PERMANENT_WORKER_THREADS,
MAX_AMOUNT_OF_WORKER_THREADS,
SECONDS_TO_KILL_IDLE_THREADS,
TimeUnit.SECONDS,
LinkedBlockingQueue(),
)

fun enqueue(runnable: Runnable) =
synchronized(this) {
queue.add(runnable)
}

override fun run() {
delegateToWorkerThread()
scheduleNextLookUp()
}

private fun delegateToWorkerThread() =
synchronized(this) {
LikeTheSalad marked this conversation as resolved.
Show resolved Hide resolved
while (queue.isNotEmpty()) {
executor.execute(queue.poll())
}
}

private fun scheduleNextLookUp() {
handler.postDelayed(this, TimeUnit.SECONDS.toMillis(SECONDS_FOR_NEXT_LOOP))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.internal.tools.time

internal class DefaultSystemTime : SystemTime {
override fun getCurrentTimeMillis(): Long {
return System.currentTimeMillis()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.internal.tools.time

/**
* Utility to be able to mock the current system time for testing purposes.
*
* <p>This class is internal and not for public use. Its APIs are unstable and can change at any
* time.
*/
internal interface SystemTime {
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
companion object {
private var instance: SystemTime = DefaultSystemTime()
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved

fun get(): SystemTime {
return instance
}

fun setForTest(instance: SystemTime) {
this.instance = instance
}
}

fun getCurrentTimeMillis(): Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import android.app.Activity;
import android.app.Application;
import androidx.annotation.NonNull;
import io.opentelemetry.android.config.DiskBufferingConfiguration;
import io.opentelemetry.android.config.OtelRumConfig;
import io.opentelemetry.android.features.diskbuffering.DiskBufferingConfiguration;
import io.opentelemetry.android.instrumentation.ApplicationStateListener;
import io.opentelemetry.android.internal.services.CacheStorageService;
import io.opentelemetry.android.internal.services.PreferencesService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.android.features.diskbuffering.scheduler

import io.mockk.Runs
import io.mockk.clearAllMocks
import io.mockk.every
import io.mockk.just
import io.mockk.mockk
import io.mockk.slot
import io.mockk.verify
import io.opentelemetry.android.internal.services.ServiceManager
import io.opentelemetry.android.internal.services.periodicwork.PeriodicWorkService
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

class DefaultExportScheduleHandlerTest {
private lateinit var handler: DefaultExportScheduleHandler

@BeforeEach
fun setUp() {
handler = DefaultExportScheduleHandler()
}

@Test
fun `Start scheduler once when enabled`() {
val periodicWorkService = createMock()
val captor = slot<Runnable>()

// Calling enable the first time (should work)
handler.enable()
verify {
periodicWorkService.enqueue(capture(captor))
}
assertThat(captor.captured).isInstanceOf(DefaultExportScheduler::class.java)
clearAllMocks()

// Calling enable a second time (should not work)
handler.enable()
verify(exactly = 0) {
periodicWorkService.enqueue(any())
}
}

private fun createMock(): PeriodicWorkService {
val periodicWorkService = mockk<PeriodicWorkService>()
val manager = mockk<ServiceManager>()
every {
manager.getService(PeriodicWorkService::class.java)
}.returns(periodicWorkService)
every { periodicWorkService.enqueue(any()) } just Runs
ServiceManager.setForTest(manager)

return periodicWorkService
}
}
Loading