Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add flush function #822

Closed
mikicho opened this issue Mar 16, 2023 · 22 comments
Closed

Node: Add flush function #822

mikicho opened this issue Mar 16, 2023 · 22 comments
Labels
enhancement New feature or request

Comments

@mikicho
Copy link

mikicho commented Mar 16, 2023

We need this for testing to empty the queue before assertion.
maxEventsInBatch still do it async, which cause flaky tests


@segmentio update: team is starting the process of looking at a reimplemention of the vanilla flush function, though requirements are a bit more complex than the old node SDK because of the existence of async plugins. See: #822 (comment)

@silesky
Copy link
Contributor

silesky commented Mar 16, 2023

Hey, @mikicho, this is the purpose of 'await analytics.closeAndFlush()'.

Currently, there is no way to do a synchronous flush.

@mikicho
Copy link
Author

mikicho commented Mar 16, 2023

@silesky yeah, I don't want to close the connection, just flush all events in the queue.

There is no way to do a synchronous flush.

I don't it to be sync, I can await for it:

analytics.flush()

@silesky
Copy link
Contributor

silesky commented Mar 16, 2023

@mikicho there is no persistent connection to close. The close means stopping analytics from accepting new events so the queue is able to completely drain.

A scenario where you flush without close but still allow events into the pipeline isn't particularly useful. You can always reinstantiate analytics.

I am not sure what your tests look like, but between our event emitters and closeAndFlush, I guarantee you have a way to to test the thing you want to test.

@silesky silesky closed this as completed Mar 16, 2023
@mikicho
Copy link
Author

mikicho commented Mar 16, 2023

@silesky why did you close this issue?
My case is valid, and closeAndFlush isn't good enough because it impacts the next test.

@silesky
Copy link
Contributor

silesky commented Mar 16, 2023

@mikicho I closed this issue because I am not necessarily convinced at this point that it's a valid use case -- seems quite coupled to a very uncommon testing scenario that could be set up differently.

I would recommend that you set up your tests so you can re-instantiate analytics between tests.

@mikicho
Copy link
Author

mikicho commented Mar 16, 2023

I can't do so because I'm doing integration tests, not units, so instantiating the server beforeEach would be time-consuming.

I can open a pull request to implement this if this is the problem.

@silesky
Copy link
Contributor

silesky commented Mar 16, 2023

@mikicho feel free to open a PR if you think it's a low lift, but I'm reluctant to introduce APIs just for testing, as I still think this is quite an edge case that could be solved by writing your tests differently.

Instantiating a server between a few tests seems perfectly fine, but there could be other ways to do it as well where you only reset that one module: you could rewrite how you instantiate analytics so it is wrapped in an object with a reset function. This seems like overkill.

@mikicho
Copy link
Author

mikicho commented Mar 16, 2023

First, thank you!

I do think in general that instantiating a server between tests is typically a good idea

Test cases or test suites? 😅
I instantiate a server beforeAll.

I am also not sure why you need a complex test suite just for our analytics library

I have something like that:

describe('Users', () => {
  beforeAll(() => {
    startServer() // this does almost production-like bootstrap
  })

  it('should fire "User Created" event', () => {
    const scope = nock('https://api.segment.io/v1/batch').reply(200); // https://github.com/nock/nock/

    await client.post('/users', {...});
    
   // I need to wait for all events to be flushed before executing the expect line
   // await analytics.flush()

    expect(scope.isDone()).toBeTruthy()
  })
})

So I need a way to wait for the event that POST /users fires.

As a workaround, I copied closeAndFlush function and removed the this._closed = true, but it's hacky, and I access to _pendingEvent, which should be private.

I just didn't think it would be difficult to add a flush function, but it seems like you oppose and I guess you have a good reason for that because the legacy packages do have a flush function, and you decided to remove it.

Do you have other ideas on how to support my case?

@silesky silesky reopened this Mar 16, 2023
@silesky
Copy link
Contributor

silesky commented Mar 16, 2023

Do you have other ideas on how to support my case?

Try registering an event listener like this:

  it('should fire "User Created" event', () => {
   //  you want analytics to to be instantiated with maxEventsInBatch: 1 -- you can do this with mocks OR with a test env variable.
    const scope = nock('https://api.segment.io/v1/batch').reply(201); // https://github.com/nock/nock/
    const pDrain = new Promise(resolve => analytics.once('drained', resolve))
    await client.post('/users', {...});
    await pDrain
    expect(scope.isDone()).toBeTruthy()
  })

@silesky silesky closed this as completed Mar 19, 2023
@silesky silesky reopened this Mar 19, 2023
@silesky silesky closed this as not planned Won't fix, can't repro, duplicate, stale Mar 20, 2023
@makeable
Copy link

makeable commented Oct 23, 2023

There is a valid usecase for flush without close, and thats when you want to flush without close!
For example, you may want to flush at the end of each request in lambda to ensure that events get dispatched before lambda suspends. Why would you want to reinitialize multiple times on a warm lambda?

@silesky
Copy link
Contributor

silesky commented Oct 23, 2023

There is a valid usecase for flush without close, and thats when you want to flush without close!
For example, you may want to flush at the end of each request in lambda to ensure that events get dispatched before lambda suspends. Why would you want to reinitialize multiple times on a warm lambda?

There are other ways to listen to when flush occurs without an explicit flush function (eg. drain event emitter), as outlined in this thread. #822 (comment)

@makeable
Copy link

Its a case of having to initialize it at all that is the problem. we treat segment as a service, as we do our DB, etc. For all our services we initialize them once. This is a common practice, and suggested by AMZ.

It seems segment is the only "service" that has issues keeping the event loop active, which is why we need to flush it at all. Its really weird that you think its normal to manage our apps lifecycle around segment, and not vice versa.

@silesky
Copy link
Contributor

silesky commented Oct 23, 2023

Hey @makeable I edited my post above to remove the part about lambda execution, as that's not really relevant to the discussion. Sorry, I think you responded to the old comment.

@makeable
Copy link

Hi silesky, yes i was replying to your original comment.

I had a look at the link, and it raised an issue that i have navigating the documentation website. Where does it mention that drained event? I am used to API docs actually documenting the api, but it seems a lot of guesswork compiling an interface from the list of things sprinkled around that documentation. Sure, I can read the code, but its really frustrating when you just want to spend 5 mins to implement analytics.track and it turns into 3 hours.

@makeable
Copy link

I should respond to your earlier suggestion of reading the lambda example. Unfortunately, the lambda example is a VERY naive example, and probably not very representative of how non-trivial node apps are run on lambda.

You are awaiting in the main handler for the track callback to be fired That is the ONLY way that example can work. The reality is that you may be using segment much deeper in the stack, possibly in an unmanaged async call, as many libs are not written with suspension in mind and dont expect you to wait on them - i.e. fire and forget.

The issue is further exacerbated when using an async handler, as the lambda runtime handles callbacks and async differently when it comes to suspension.

Its not uncommon to wrap the handler for this reason, and in that wrapper you would handle the flushing, which is exactly what I hoped to do. I will look at using this drained event, but I will also need to know if there are any in the queue, otherwise i could be waiting on a drained event that never comes. Is there some way to do that?

@silesky
Copy link
Contributor

silesky commented Oct 23, 2023

Sorry, that sounds really frustrating. We will be focusing on improving our documentation -- I see we don't have much emitter docs besides a link, this is a new repo/interface, and sometimes we don't really notice the gaps until people tell us.

We have AWS lamda documentation on our repo (our suggestion is to reinstantiate it every time), but if that's not your preference, I can understand how you could go down a confusing rabbit hole.

I will also be discussing bring back an explicit flush method as more of a feature request -- we removed it because the main reason people were using it was for graceful shutdown, and it wasn't working as expected for that use case since events were still allowed to enter the queue.

However, this thread makes it sound like there is another subset of users who use it for testing / draining and the drained event is a little confusing (especially if it's not a documented recipe!).

@makeable
Copy link

We have AWS lamda documentation on our repo (our suggestion is to reinstantiate it every time), but if that's not your preference, I can't understand how you could go down a confusing rabbit hole.

Yeah, as I explained above, the example is very naive, and as for reinstantiating at every request, that would mean we are wrapping our code in segment handling, which seems to be working around segment, rather than simply instrumenting our code. I did consider instantiating for each track call, but realised I would just end up with more queues to flush :)

@silesky
Copy link
Contributor

silesky commented Oct 23, 2023

@makeable typo: can't understand-> can understand 😄

@makeable
Copy link

Heh, i assumed as much :)

Any word on how i can tell if there are queued requests? As mentioned, just waiting on a drained event wont work if there is nothing in the queue

@silesky
Copy link
Contributor

silesky commented Oct 23, 2023

@makeable yep, but they are 'private' methods / fields atm -- what is your comfort level there?

Had a team discussion -- we probably don't want to recommend using any private methods... we are discussing a way to expose this to you. Want to avoid a band-aid.

@chrisradek
Copy link
Contributor

@makeable Thanks for engaging with us here!

Just to make sure we're all on the same page:

  • You want to be able to instantiate your segment analytics class once outside a Lambda handler
  • You need a way to know once all events from Segment are flushed, without instrumenting each API call

Is that right? Are you ok with calling flush on an analytics client within a handler wrapper?

It seems segment is the only "service" that has issues keeping the event loop active, which is why we need to flush it at all.

For context, why do we even need flush at all? 2 reasons:

  1. analytics does batching by default, so without the flush, your lambda function could be frozen before the batch of events is actually sent, and it may or may not be thawed based on your lambda load.
  2. Events go through a pipeline of async plugins before getting batched. This allows you to do things like enrich events in a centralized location. This is really related to the first point - these events can be lost if the lambda function is frozen and never thawed before the HTTP request to segment is made.

Now, it would be great if we could make the HTTP request immediately if both the batch size is 1, and all plugins are synchronous, but that's a digression!

As @silesky mentioned, our guidance so far has been to instantiate a new analytics client within the Lambda handler (each lambda invocation), and call closeAndFlush() in your handler to ensure events make it to Segment. However, that definitely adds additional burden to you as you now need to make sure your analytics instance is passed through to any callsite that uses it, versus importing a singleton/relying on closures.

I think it makes sense to support a flush that doesn't close the client. One scenario we need to address is what to do with new events when there is a pending flush: wait to resolve flush until those have also been sent, or hold off on sending them until the current flush has completed. In your scenario, would you expect any track calls to be made after calling flush?

Sorry for the wall of text, thanks again for sharing your use-case with us!

@silesky silesky reopened this Nov 2, 2023
@silesky silesky added the enhancement New feature or request label Nov 2, 2023
@silesky silesky changed the title Add flush function Node: Add flush function Nov 2, 2023
@silesky
Copy link
Contributor

silesky commented Jan 10, 2024

@makeable @mikicho

analytics.flush was released in @segment/analytics-node v1.3.0 via this PR.

You can now do:

await analytics.flush()

@silesky silesky closed this as completed Jan 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants