Skip to content

Commit

Permalink
Add parameters to AsyncBufferedConsumer
Browse files Browse the repository at this point in the history
The `AsyncBufferedConsumer` constructor now supports more arguments.
Those are the same as for the `BufferConsumer` coming from the official
Mixpanel package, which is used under the hood.

I removed the `*args` and `**kwargs` parameters so that only supported
arguments are accepted. This should avoid surprises for users.  This
breaking change is mentioned in the changelog.
  • Loading branch information
bbc2 committed Feb 18, 2021
1 parent b2f2007 commit 8df3ba9
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

* New parameters for the `AsyncBufferedConsumer` constructor: `import_url`,
`request_timeout`, `groups_url`, `api_host`, `retry_limit`, `retry_backoff_factor`,
`verify_cert`.

### Changed

* Extra arguments (`*args`, `**kwargs`) are no longer accepted in the
`AsyncBufferedConsumer` constructor.

## [0.2.0] - 2019-07-11

### Added
Expand Down
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ For most users, the default configuration should work perfectly. For more specif


* `flush_after (datetime.timedelta)` - *defaults to 10 seconds* - the time period after which the AsyncBufferedConsumer will flush the events upon receiving a new event (no matter what the event queue size is)

* `flush_first (bool)` - *defaults to True* - whether the consumer should always flush the first event.

* `max_size (int)` - *defaults to 20* - how big a given event queue can get before it is flushed by the consumer

* `events_url (str)` - *defaults to standard Mixpanel API URL* - the Mixpanel API URL that track events will be sent to

* `people_url (str)` - *defaults to standard Mixpanel API URL* - the Mixpanel API URL that people events will be sent to
* `import_url (str)` - *defaults to standard Mixpanel API URL* - the Mixpanel API URL that import events will be sent to
* `request_timeout (int)` - *defaults to `None` (no timeout)* - Connection timeout in seconds.
* `groups_url (str)` - *defaults to standard Mixpanel API URL* - the Mixpanel API URL that groups events will be sent to
* `api_host (str)` - *defaults to api.mixpanel.com* - Mixpanel API domain for all requests unless overridden by above URLs
* `retry_limit (int)` - *defaults to 4* - Number of times to retry each request in case of an error, 0 to fail after first attempt.
* `retry_backoff_factor` - *defaults to 0.25* - Factor which controls sleep duration between retries: `sleep_seconds = backoff_factor * (2 ^ (retry_count - 1))`
* `verify_cert` - *defaults to `True`*- Whether to verify the server certificate. `True` is recommended.

### Usage

Expand Down
40 changes: 35 additions & 5 deletions mixpanel_async/async_buffered_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,21 @@ class AsyncBufferedConsumer(SynchronousBufferedConsumer):
ALL = "ALL"
ENDPOINT = "ENDPOINT"

def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
events_url=None, people_url=None, *args, **kwargs):
def __init__(
self,
flush_after=timedelta(0, 10),
flush_first=True,
max_size=20,
events_url=None,
people_url=None,
import_url=None,
request_timeout=None,
groups_url=None,
api_host="api.mixpanel.com",
retry_limit=4,
retry_backoff_factor=0.25,
verify_cert=True,
):
'''
Create a new instance of a AsyncBufferedConsumer class.
Expand All @@ -65,13 +78,30 @@ def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
the consumer receives
:param max_size (int): the number of events in queue that will trigger
the queue to be flushed asynchronously
:param events_url: the Mixpanel API URL that track events will be sent to
:param people_url: the Mixpanel API URL that people events will be sent to
:param events_url: Mixpanel API URL that track events will be sent to
:param people_url: Mixpanel API URL that people events will be sent to
:param import_url: Mixpanel API URL that import events will be sent to
:param request_timeout: Connection timeout in seconds
:param groups_url: Mixpanel API URL that groups events will be sent to
:param api_host: Mixpanel API domain for all requests unless overriden by above URLs
:param retry_limit: Number of times to retry each request in case of an error, 0
to fail after first attempt.
:param retry_backoff_factor: Factor which controls sleep duration between retries:
sleep_seconds = backoff_factor * (2 ^ (retry_count - 1))
:param verify_cert: Whether to verify the server certificate. "True" is
recommended.
'''
super(AsyncBufferedConsumer, self).__init__(
max_size=max_size,
events_url=events_url,
people_url=people_url
people_url=people_url,
import_url=import_url,
request_timeout=request_timeout,
groups_url=groups_url,
api_host=api_host,
retry_limit=retry_limit,
retry_backoff_factor=retry_backoff_factor,
verify_cert=verify_cert,
)

# remove the minimum max size that the SynchronousBufferedConsumer
Expand Down

0 comments on commit 8df3ba9

Please sign in to comment.