Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't get value in different streams #2

Open
jmilktea opened this issue Jan 20, 2020 · 1 comment
Open

Can't get value in different streams #2

jmilktea opened this issue Jan 20, 2020 · 1 comment

Comments

@jmilktea
Copy link

Can't get value in different streams

eg:
Setting parameters using webfilter

@Component
public class MdcWebFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        return webFilterChain.filter(serverWebExchange).subscriberContext(ctx -> ctx.put("traceId", 123));
    }
}
private final static Scheduler SETTLEMENT_SCHEDULER = Schedulers.newParallel("settlement-scheduler", 30);

@GetMapping("/test")
public Flux<Integer> test() {
    //123
    System.out.println(Thread.currentThread().getId() + ":" + MDC.get("traceId"));

    Mono.just(1).map(s -> {
        thread1();
        return 1;
    }).subscribeOn(SETTLEMENT_SCHEDULER).subscribe();

    return Flux.just(1).map(s -> {
        thread2();
        return 1;
    }).subscribeOn(SETTLEMENT_SCHEDULER);
}

private void thread1() {
    //null
    System.out.println(Thread.currentThread().getId() + ":" + MDC.get("traceId"));
}

private void thread2() {
    //123
    System.out.println(Thread.currentThread().getId() + ":" + MDC.get("traceId"));
}
@ghenadiibatalski
Copy link

I think, i know what the problem is. You are the subscriber now and you have lost the original subscriber context with the traceId, so you need to take over the original subscriber context eg.:

  //123
        System.out.println(Thread.currentThread().getId() + ":" + MDC.get("traceId"));

        return Mono.subscriberContext()
                .doOnNext(context -> Mono.just(1).subscriberContext(context) // here the take over
                        .map(s -> {
                            thread1(); // and it works here
                            return 1;
                        }).doOnNext(integer -> {
                            System.out.println(Thread.currentThread().getId() + ":" + MDC.get(CORRELATION_ID_KEY));  // and here!
                        })
                        .subscribeOn(SETTLEMENT_SCHEDULER)
                        .subscribe())
                .thenMany(
                        Flux.just(1).map(s -> {
                            thread2();
                            return 1;
                        }).subscribeOn(SETTLEMENT_SCHEDULER));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants