Skip to content

🐞: allure-okhttp3 with okhttp3-sse messes SSE-events #1036

Open
@vvyushmanov

Description

@vvyushmanov

What happened?

Hi!

I am trying to integrate allure report with my okhttp3-sse SSE tests.
Particularly, I use EventSourceListener to asyncronously collect events to then process them with different asserts.

I tried adding AllureOkHttp3 interceptor, but this completely breaks the code, looks like it consumes the events and they do not reach the onEvent in the listener.

Also, without this integration, any methods annotated with @Step annotation, that are executed within the listener, are not present in the report.

I am not sure if this is a bug, but maybe you could point me in a direction on how I can make sure my @Step methods are included in the report (like some mumbo-jumbo with AspectJ).

Here's my SSEClient class:

package .tms.utils

import tms.api.ConnectionApi
import tms.api.ConnectionApi.becomeOnline
import tms.api.SSEApi
import tms.api.SSEApi.SUBSCRIBE
import tms.constants.General.BRAINS
import tms.steps.sse.SSEEventName
import tms.utils.LatchManager.latch
import tms.utils.debug.DebugLogger
import io.qameta.allure.okhttp3.AllureOkHttp3
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources
import org.apache.http.HttpHeaders.AUTHORIZATION
import java.net.SocketException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

class SSEClient {

    private val params = OkHttpClient.Builder()
        .connectTimeout(60, TimeUnit.SECONDS)
        .readTimeout(60, TimeUnit.SECONDS)
        .writeTimeout(60, TimeUnit.SECONDS)
        .addInterceptor(AllureOkHttp3())
        .build()

    private lateinit var eventSource: EventSource

    fun listen(
        token: String,
        onConnect: () -> Unit = {},
        onEvent: (type: String?, data: String?) -> Unit,
        stopAfter: SSEEventName? = null,
        timeout: Long = 10,
        eventsCount: Int = 10,
    ) {
        latch = CountDownLatch(eventsCount)
        eventSource =
            EventSources.createFactory(params)
                .newEventSource(buildRequest(token), createListener(token, onConnect, onEvent, stopAfter))
        try {
            if (!latch.await(timeout, TimeUnit.SECONDS)) {
                if (DebugLogger.sseLoggingIsActive) println("Async timeout reached, disconnected")
            }
        } finally {
            eventSource.cancel()
        }
    }

    private fun buildRequest(token: String): Request {
        return Request.Builder().url(BRAINS + SUBSCRIBE).addHeader(AUTHORIZATION, token).build()
    }

    private fun pong(token: String) {
        SSEApi.pong(token)
        ConnectionApi.ping(token)
        if (DebugLogger.sseLoggingIsActive) println("pong")
    }


    private fun createListener(
        token: String,
        trigger: () -> Unit = {},
        onEventWrap: (type: String?, data: String?) -> Unit,
        stopAfter: SSEEventName? = null,
        onFailure: () -> Unit = {},
        onClosed: () -> Unit = {},
    ) = object : EventSourceListener() {

        override fun onOpen(eventSource: EventSource, response: Response) {
            becomeOnline(token)
            trigger()
        }

        override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
            if (DebugLogger.sseLoggingIsActive) println("Received event: $type : $data")
            if (type == "ping") pong(token)
            if (type != "ping") {
                onEventWrap(type, data)
                if (stopAfter == null) latch.countDown()
            }
            if (type == stopAfter?.event) {
                eventSource.cancel()
                repeat(latch.count.toInt()) { latch.countDown() }
            }
        }

        override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) {
            if (DebugLogger.sseLoggingIsActive) println(t)
            onFailure()
            if (t != null && t !is SocketException) throw t
        }

        override fun onClosed(eventSource: EventSource) {
            if (DebugLogger.sseLoggingIsActive) println("Closed connection")
            onClosed()
        }
    }
}
```

![Screenshot from 2024-04-19 19-08-01](https://github.com/allure-framework/allure-java/assets/111231219/00d66723-ed6a-4daa-8cfd-ca91e2beb634)



### What Allure Integration are you using?

allure-okhttp3

### What version of Allure Integration you are using?

2.27.0

### What version of Allure Report you are using?

2.27.0

### Code of Conduct

- [X] I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions