Skip to content

Commit

Permalink
Merge pull request #24 from open-craft/navin/trim-stream
Browse files Browse the repository at this point in the history
feat: limit stream length by using redis stream trim option
  • Loading branch information
bmtcril authored May 25, 2023
2 parents a3e7eb6 + d6048e0 commit c636eca
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 31 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ Unreleased

*

[0.3.1] - 2023-05-24
************************************************

Added
=====

* Option to limit length of stream.

Changed
=======

* Updated README.

[0.3.0] - 2023-05-23
************************************************

Expand Down
21 changes: 12 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
extract_translations fake_translations help pii_check pull_translations push_translations \
quality requirements selfcheck test test-all upgrade validate install_transifex_client \
produce_test_event consume_test_event multiple_consumer_test_event kill_all_consume_test_events \
redis-up redis-down
redis_up redis_down redis_shell

.DEFAULT_GOAL := help

Expand All @@ -11,7 +11,7 @@ BROWSER := python -m webbrowser file://$(CURDIR)/

help: ## display this help message
@echo "Please use \`make <target>' where <target> is one of"
@awk -F ':.*?## ' '/^[a-zA-Z]/ && NF==2 {printf "\033[36m %-25s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort
@awk -F ':.*?## ' '/^[a-zA-Z]/ && NF==2 {printf "\033[36m %-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort

clean: ## remove generated byte code, coverage reports, and build artifacts
find . -name '__pycache__' -exec rm -rf {} +
Expand Down Expand Up @@ -114,21 +114,24 @@ install_transifex_client: ## Install the Transifex client
git checkout -- LICENSE README.md ## overwritten by Transifex installer

## Local test helpers
produce_test_event:
EVENT_BUS_PRODUCER='edx_event_bus_redis.create_producer' EVENT_BUS_REDIS_CONNECTION_URL='redis://:password@localhost:6379/' EVENT_BUS_TOPIC_PREFIX='dev' python manage.py produce_event --signal openedx_events.content_authoring.signals.XBLOCK_DELETED --topic xblock-status --key-field None --data '{"xblock_info": {"usage_key": "block-v1:edx+DemoX+Demo_course+type@video+block@UaEBjyMjcLW65gaTXggB93WmvoxGAJa0JeHRrDThk", "block_type": "video"}}'
produce_test_event: ## Produce a test event
EVENT_BUS_REDIS_STREAM_MAX_LEN=5 EVENT_BUS_PRODUCER='edx_event_bus_redis.create_producer' EVENT_BUS_REDIS_CONNECTION_URL='redis://:password@localhost:6379/' EVENT_BUS_TOPIC_PREFIX='dev' python manage.py produce_event --signal openedx_events.content_authoring.signals.XBLOCK_DELETED --topic xblock-status --key-field None --data '{"xblock_info": {"usage_key": "block-v1:edx+DemoX+Demo_course+type@video+block@UaEBjyMjcLW65gaTXggB93WmvoxGAJa0JeHRrDThk", "block_type": "video"}}'

consume_test_event:
consume_test_event: ## Start consumer to consume test event
EVENT_BUS_CONSUMER='edx_event_bus_redis.RedisEventConsumer' EVENT_BUS_PRODUCER='edx_event_bus_redis.create_producer' EVENT_BUS_REDIS_CONNECTION_URL='redis://:password@localhost:6379/' EVENT_BUS_TOPIC_PREFIX='dev' python manage.py consume_events --topic xblock-status --group_id test_group --extra '{"consumer_name": "test_group.c1"}'

multiple_consumer_test_event:
multiple_consumer_test_event: ## Start 2 consumers to consume test event
EVENT_BUS_CONSUMER='edx_event_bus_redis.RedisEventConsumer' EVENT_BUS_PRODUCER='edx_event_bus_redis.create_producer' EVENT_BUS_REDIS_CONNECTION_URL='redis://:password@localhost:6379/' EVENT_BUS_TOPIC_PREFIX='dev' python manage.py consume_events --topic xblock-status --group_id test_group --extra '{"consumer_name": "test_group.c1"}' &
EVENT_BUS_CONSUMER='edx_event_bus_redis.RedisEventConsumer' EVENT_BUS_PRODUCER='edx_event_bus_redis.create_producer' EVENT_BUS_REDIS_CONNECTION_URL='redis://:password@localhost:6379/' EVENT_BUS_TOPIC_PREFIX='dev' python manage.py consume_events --topic xblock-status --group_id test_group --extra '{"consumer_name": "test_group.c2"}' &

kill_all_consume_test_events:
kill_all_consume_test_events: ## Kill all test event consumers
pgrep -lf python\ manage.py\ consume_events\ --topic\ xblock-status\ --group_id\ test_group | cut -d" " -f1 | xargs kill -15

redis-up:
redis_up: ## Start redis in docker container for testing locally.
docker compose up

redis-down:
redis_down: ## Stop and remove redis container
docker compose down

redis_shell: ## Start shell inside redis container
docker compose exec redis /bin/bash -c "redis-cli -a password"
109 changes: 94 additions & 15 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
edx_event_bus_redis
#############################

.. note::

This README was auto-generated. Maintainer: please review its contents and
update all relevant sections. Instructions to you are marked with
"PLACEHOLDER" or "TODO". Update or remove those sections, and remove this
note when you are done.

|pypi-badge| |ci-badge| |codecov-badge| |doc-badge| |pyversions-badge|
|license-badge| |status-badge|

Expand All @@ -16,6 +9,58 @@ Purpose

Redis Streams implementation for the Open edX event bus.

Overview
********
This package implements an event bus for Open EdX using Redis streams.

The event bus acts as a broker between services publishing events and other services that consume these events.

This package contains both the publishing code, which processes events into
messages to send to the stream, and the consumer code, which polls the stream
using a `while True` loop in order to turn messages back into django signal to
be emitted. This django signal contains event data which can be consumed by the
host application which does the actual event handling.
The actual Redis host is configurable.

The repository works together with the openedx/openedx-events repository to make the fully functional event bus.

Documentation
*************

To use this implementation of the Event Bus with openedx-events, you'll need to ensure that below the following Django settings are set::

# redis connection url
# https://redis.readthedocs.io/en/stable/examples/ssl_connection_examples.html#Connecting-to-a-Redis-instance-via-a-URL-string
EVENT_BUS_REDIS_CONNECTION_URL: redis://:password@localhost:6379/
EVENT_BUS_TOPIC_PREFIX: dev

# Required, on the producing side only:
# https://github.com/openedx/openedx-events/blob/06635f3642cee4020d6787df68bba694bd1233fe/openedx_events/event_bus/__init__.py#L105-L112
# This will load a producer class which can send events to redis streams.
EVENT_BUS_PRODUCER: edx_event_bus_redis.create_producer

# Required, on the consumer side only:
# https://github.com/openedx/openedx-events/blob/06635f3642cee4020d6787df68bba694bd1233fe/openedx_events/event_bus/__init__.py#L150-L157
# This will load a consumer class which can consume events from redis streams.
EVENT_BUS_CONSUMER: edx_event_bus_redis.RedisEventConsumer

Optional settings that are worth considering::

# If the consumer encounters this many consecutive errors, exit with an error. This is intended to be used in a context where a management system (such as Kubernetes) will relaunch the consumer automatically.
EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT (defaults to None)

# How long the consumer should wait for new entries in a stream.
# As we are running the consumer in a while True loop, changing this setting doesn't make much difference
# expect for changing number of monitoring messages while waiting for new events.
# https://redis.io/commands/xread/#blocking-for-data
EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT (defaults to 60 seconds)

# Limits stream size to approximately this number
EVENT_BUS_REDIS_STREAM_MAX_LEN (defaults to 10_000)

For manual local testing, see ``Testing locally`` section below.


Getting Started
***************

Expand Down Expand Up @@ -92,12 +137,46 @@ Testing locally
Deploying
=========

TODO: How can a new user go about deploying this component? Is it just a few
commands? Is there a larger how-to that should be linked here?
After setting up required configuration, events are produced using the
``openedx_events.get_producer().send()`` method which needs to be called from
the producing side. For more information, visit this `link`_.

.. _link: https://openedx.atlassian.net/wiki/spaces/AC/pages/3508699151/How+to+start+using+the+Event+Bus#Producing-a-signal

To consume events, openedx_events provides a management command called
``consume_events`` which can be called like so:

.. code-block:: bash
# consume events from topic xblock-status
python manage.py consume_events --topic xblock-status --group_id test_group --extra '{"consumer_name": "test_group.c1"}'
# replay events from specific redis msg id
python manage.py consume_events --topic xblock-deleted --group_id test_group --extra '{"consumer_name": "test_group.c1", "last_read_msg_id": "1679676448892-0"}'
# process all messages that were not read by this consumer group.
python manage.py consume_events -t user-login -g user-activity-service --extra '{"check_backlog": true, "consumer_name": "c1"}'
PLACEHOLDER: For details on how to deploy this component, see the `deployment how-to`_
# claim messages pending for more than 30 minutes (1,800,000 milliseconds) from other consumers in the group.
python manage.py consume_events -t user-login -g user-activity-service --extra '{"claim_msgs_older_than": 1800000, "consumer_name": "c1"}'
.. _deployment how-to: https://docs.openedx.org/projects/event-bus-redis/how-tos/how-to-deploy-this-component.html
Note that the ``consumer_name`` in ``--extra`` argument is required for redis
event bus as this name uniquely identifies the consumer in a group and helps
with tracking processed and pending messages.

If required, you can also replay events i.e. process messages from a specific
point in history.

.. code-block:: bash
# replay events from specific redis msg id
python manage.py consume_events --signal org.openedx.content_authoring.xblock.deleted.v1 --topic xblock-deleted --group_id test_group --extra '{"consumer_name": "c1", "last_read_msg_id": "1684306039300-0"}'
The redis message id can be found from the producer logs in the host application, example:

.. code-block::
Message delivered to Redis event bus: topic=dev-xblock-deleted, message_id=ab289110-f47e-11ed-bd90-1c83413013cb, signal=<OpenEdxPublicSignal: org.openedx.content_authoring.xblock.deleted.v1>, redis_msg_id=b'1684306039300-0'
Getting Help
************
Expand Down Expand Up @@ -175,8 +254,8 @@ Reporting Security Issues

Please do not report security issues in public. Please email [email protected].

.. |pypi-badge| image:: https://img.shields.io/pypi/v/event-bus-redis.svg
:target: https://pypi.python.org/pypi/event-bus-redis/
.. |pypi-badge| image:: https://img.shields.io/pypi/v/edx-event-bus-redis.svg
:target: https://pypi.python.org/pypi/edx-event-bus-redis/
:alt: PyPI

.. |ci-badge| image:: https://github.com/openedx/event-bus-redis/workflows/Python%20CI/badge.svg?branch=main
Expand All @@ -187,11 +266,11 @@ Please do not report security issues in public. Please email [email protected].
:target: https://codecov.io/github/openedx/event-bus-redis?branch=main
:alt: Codecov

.. |doc-badge| image:: https://readthedocs.org/projects/event-bus-redis/badge/?version=latest
.. |doc-badge| image:: https://readthedocs.org/projects/edx-event-bus-redis/badge/?version=latest
:target: https://event-bus-redis.readthedocs.io/en/latest/
:alt: Documentation

.. |pyversions-badge| image:: https://img.shields.io/pypi/pyversions/event-bus-redis.svg
.. |pyversions-badge| image:: https://img.shields.io/pypi/pyversions/edx-event-bus-redis.svg
:target: https://pypi.python.org/pypi/event-bus-redis/
:alt: Supported Python versions

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
from edx_event_bus_redis.internal.consumer import RedisEventConsumer
from edx_event_bus_redis.internal.producer import create_producer

__version__ = '0.3.0'
__version__ = '0.3.1'

default_app_config = 'edx_event_bus_redis.apps.EdxEventBusRedisConfig' # pylint: disable=invalid-name
10 changes: 6 additions & 4 deletions edx_event_bus_redis/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
REDIS_CONSUMERS_ENABLED = SettingToggle('EVENT_BUS_REDIS_CONSUMERS_ENABLED', default=True)

# .. setting_name: EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT
# .. setting_default: 1
# .. setting_description: How long the consumer should wait, in seconds, for the Redis broker
# to respond to a poll() call.
CONSUMER_POLL_TIMEOUT = getattr(settings, 'EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT', 1)
# .. setting_default: 60
# .. setting_description: How long the consumer should wait for new entries in a stream.
# .. As we are running the consumer in a while True loop, changing this setting doesn't make much difference
# .. expect for changing number of monitoring messages while waiting for new events.
# .. https://redis.io/commands/xread/#blocking-for-data
CONSUMER_POLL_TIMEOUT = getattr(settings, 'EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT', 60)

# .. setting_name: EVENT_BUS_REDIS_CONSUMER_POLL_FAILURE_SLEEP
# .. setting_default: 1.0
Expand Down
10 changes: 9 additions & 1 deletion edx_event_bus_redis/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Optional

import attr
from django.conf import settings
from edx_django_utils.monitoring import record_exception
from openedx_events.data import EventsMetadata
from openedx_events.event_bus import EventBusProducer
Expand All @@ -22,6 +23,12 @@

logger = logging.getLogger(__name__)

# .. setting_name: EVENT_BUS_REDIS_STREAM_MAX_LEN
# .. setting_default: 10000
# .. setting_description: Limits stream size to approximately this number, more info can be found in
# .. docs/decisions/0003-limiting-stream-length.rst
STREAM_MAX_LEN = int(getattr(settings, 'EVENT_BUS_REDIS_STREAM_MAX_LEN', 10_000))


def record_producing_error(error, context):
"""
Expand Down Expand Up @@ -121,7 +128,8 @@ def send(
stream_data = message.to_binary_dict()

stream = self.client.Stream(full_topic)
msg_id = stream.add(stream_data)
# Read docs/decisions/0003-limiting-stream-length.rst for explanation about maxlen and approximate options.
msg_id = stream.add(stream_data, maxlen=STREAM_MAX_LEN, approximate=True)
context.on_event_deliver(msg_id)
except Exception as e: # pylint: disable=broad-except
record_producing_error(e, context)
Expand Down
6 changes: 5 additions & 1 deletion edx_event_bus_redis/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ def test_send_to_event_bus(self, mock_serializer):
b'sourcelib': b'1.2.3',
}

stream_mock.add.assert_called_once_with({b'event_data': b'value-bytes-here', **expected_headers})
stream_mock.add.assert_called_once_with(
{b'event_data': b'value-bytes-here', **expected_headers},
maxlen=10000,
approximate=True
)

@patch(
'edx_event_bus_redis.internal.producer.serialize_event_data_to_bytes', autospec=True,
Expand Down
1 change: 1 addition & 0 deletions test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ def root(*args):
EVENT_BUS_REDIS_CONNECTION_URL = getenv('EVENT_BUS_REDIS_CONNECTION_URL')
EVENT_BUS_TOPIC_PREFIX = getenv('EVENT_BUS_TOPIC_PREFIX')
EVENT_BUS_CONSUMER = getenv('EVENT_BUS_CONSUMER')
EVENT_BUS_REDIS_STREAM_MAX_LEN = getenv('EVENT_BUS_REDIS_STREAM_MAX_LEN', 10_000)

0 comments on commit c636eca

Please sign in to comment.