Skip to content

Commit

Permalink
Merge pull request #21 from centrifugal/add_delta_support
Browse files Browse the repository at this point in the history
add Fossil delta compression support
  • Loading branch information
FZambia authored Oct 8, 2024
2 parents 600d072 + 8fb4d82 commit dde4b2d
Show file tree
Hide file tree
Showing 29 changed files with 789 additions and 107 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
uses: crazy-max/ghaction-setup-docker@v3

- name: Start Centrifugo
run: docker run -d -p 8000:8000 -e CENTRIFUGO_TOKEN_HMAC_SECRET_KEY="secret" -e CENTRIFUGO_PRESENCE=1 -e CENTRIFUGO_JOIN_LEAVE=true -e CENTRIFUGO_FORCE_PUSH_JOIN_LEAVE=true -e CENTRIFUGO_HISTORY_TTL=300s -e CENTRIFUGO_HISTORY_SIZE=100 -e CENTRIFUGO_FORCE_RECOVERY=true -e CENTRIFUGO_USER_SUBSCRIBE_TO_PERSONAL=true -e CENTRIFUGO_ALLOW_PUBLISH_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_PRESENCE_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_HISTORY_FOR_SUBSCRIBER=true centrifugo/centrifugo:v5 centrifugo
run: docker run -d -p 8000:8000 -e CENTRIFUGO_TOKEN_HMAC_SECRET_KEY="secret" -e CENTRIFUGO_PRESENCE=true -e CENTRIFUGO_JOIN_LEAVE=true -e CENTRIFUGO_FORCE_PUSH_JOIN_LEAVE=true -e CENTRIFUGO_HISTORY_TTL=300s -e CENTRIFUGO_HISTORY_SIZE=100 -e CENTRIFUGO_FORCE_RECOVERY=true -e CENTRIFUGO_USER_SUBSCRIBE_TO_PERSONAL=true -e CENTRIFUGO_ALLOW_PUBLISH_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_PRESENCE_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_HISTORY_FOR_SUBSCRIBER=true -e CENTRIFUGO_DELTA_PUBLISH=true -e CENTRIFUGO_ALLOWED_DELTA_TYPES="fossil" centrifugo/centrifugo:v5 centrifugo

- name: Install dependencies
run: |
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repos:
- id: "check-merge-conflict"
- id: "debug-statements"
- id: "end-of-file-fixer"
exclude: "tests/data/.*"
- id: "mixed-line-ending"
- id: "detect-private-key"
- id: "check-yaml"
Expand Down
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ When using Protobuf protocol:

Event callbacks are called by SDK using `await` internally, the websocket connection read loop is blocked for the time SDK waits for the callback to be executed. This means that if you need to perform long operations in callbacks consider moving the work to a separate coroutine/task to return fast and continue reading data from the websocket.

The fact WebSocket read is blocked for the time we execute callbacks means that you can not call awaitable SDK APIs from callback – because SDK does not have a chance to read the reply. You will get `OperationTimeoutError` exception. The rule is the same - do the work asynchronously, for example use `asyncio.ensure_future`.
The fact WebSocket read is blocked for the time we execute callbacks means that you can not call awaitable SDK APIs from callback – because SDK does not have a chance to read the reply. You will get `OperationTimeoutError` exception. The rule is the same - do the work asynchronously, for example use `asyncio.ensure_future`.

## Run example

Expand Down Expand Up @@ -85,9 +85,11 @@ To run tests, first start Centrifugo server:

```bash
docker pull centrifugo/centrifugo:v5
docker run -d -p 8000:8000 -e CENTRIFUGO_TOKEN_HMAC_SECRET_KEY="secret" -e CENTRIFUGO_PRESENCE=1 \
docker run -d -p 8000:8000 -e CENTRIFUGO_LOG_LEVEL=trace \
-e CENTRIFUGO_TOKEN_HMAC_SECRET_KEY="secret" -e CENTRIFUGO_PRESENCE=true \
-e CENTRIFUGO_JOIN_LEAVE=true -e CENTRIFUGO_FORCE_PUSH_JOIN_LEAVE=true \
-e CENTRIFUGO_HISTORY_TTL=300s -e CENTRIFUGO_HISTORY_SIZE=100 \
-e CENTRIFUGO_DELTA_PUBLISH=true -e CENTRIFUGO_ALLOWED_DELTA_TYPES="fossil" \
-e CENTRIFUGO_FORCE_RECOVERY=true -e CENTRIFUGO_USER_SUBSCRIBE_TO_PERSONAL=true \
-e CENTRIFUGO_ALLOW_PUBLISH_FOR_SUBSCRIBER=true -e CENTRIFUGO_ALLOW_PRESENCE_FOR_SUBSCRIBER=true \
-e CENTRIFUGO_ALLOW_HISTORY_FOR_SUBSCRIBER=true centrifugo/centrifugo:v5 centrifugo
Expand Down
3 changes: 2 additions & 1 deletion centrifuge/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Main module of a Centrifuge Python client library."""

from .client import Client, ClientState, Subscription, SubscriptionState
from .client import Client, ClientState, Subscription, SubscriptionState, DeltaType
from .contexts import (
ConnectedContext,
ConnectingContext,
Expand Down Expand Up @@ -50,6 +50,7 @@
"ClientState",
"ConnectedContext",
"ConnectingContext",
"DeltaType",
"DisconnectedContext",
"DuplicateSubscriptionError",
"ErrorContext",
Expand Down
36 changes: 31 additions & 5 deletions centrifuge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ class _ServerSubscription:
recoverable: bool


class DeltaType(Enum):
FOSSIL = "fossil"

def __str__(self) -> str:
return self.value


class Client:
"""Client is a websocket client to Centrifuge/Centrifugo server."""

Expand Down Expand Up @@ -201,6 +208,7 @@ def new_subscription(
positioned: bool = False,
recoverable: bool = False,
join_leave: bool = False,
delta: Optional[DeltaType] = None,
) -> "Subscription":
"""Creates new subscription to channel. If subscription already exists then
DuplicateSubscriptionError exception will be raised.
Expand All @@ -221,6 +229,7 @@ def new_subscription(
positioned=positioned,
recoverable=recoverable,
join_leave=join_leave,
delta=delta,
)
self._subs[channel] = sub
return sub
Expand Down Expand Up @@ -782,6 +791,9 @@ def _construct_subscribe_command(self, sub: "Subscription", cmd_id: int) -> Dict
subscribe["epoch"] = sub._epoch
subscribe["offset"] = sub._offset

if sub._delta:
subscribe["delta"] = sub._delta.value

command = {
"id": cmd_id,
"subscribe": subscribe,
Expand Down Expand Up @@ -1311,6 +1323,7 @@ def _publication_from_proto(self, pub: Any) -> Publication:
data=self._decode_data(pub.get("data")),
info=client_info,
tags=pub.get("tags", {}),
delta=pub.get("delta", False),
)


Expand Down Expand Up @@ -1352,6 +1365,7 @@ def _initialize(
positioned: bool = False,
recoverable: bool = False,
join_leave: bool = False,
delta: Optional[DeltaType] = None,
) -> None:
"""Initializes Subscription instance.
Note: use Client.new_subscription method to create new subscriptions in your app.
Expand All @@ -1376,6 +1390,12 @@ def _initialize(
self._recover: bool = False
self._offset: int = 0
self._epoch: str = ""
self._prev_data: Optional[Any] = None

if delta and delta not in {DeltaType.FOSSIL}:
raise CentrifugeError("unsupported delta format")
self._delta = delta
self._delta_negotiated: bool = False

@classmethod
def _create_instance(cls, *args: Any, **kwargs: Any) -> "Subscription":
Expand Down Expand Up @@ -1552,6 +1572,8 @@ async def _move_subscribed(self, subscribe: Dict[str, Any]) -> None:
lambda: asyncio.ensure_future(self._refresh(), loop=self._client._loop),
)

self._delta_negotiated = subscribe.get("delta", False)

await on_subscribed_handler(
SubscribedContext(
channel=self.channel,
Expand All @@ -1566,12 +1588,8 @@ async def _move_subscribed(self, subscribe: Dict[str, Any]) -> None:

publications = subscribe.get("publications", [])
if publications:
on_publication_handler = self.events.on_publication
for pub in publications:
publication = self._client._publication_from_proto(pub)
await on_publication_handler(PublicationContext(pub=publication))
if publication.offset > 0:
self._offset = publication.offset
await self._process_publication(pub)

self._clear_subscribing_state()

Expand Down Expand Up @@ -1603,6 +1621,14 @@ async def _resubscribe(self) -> None:

async def _process_publication(self, pub: Any) -> None:
publication = self._client._publication_from_proto(pub)

if self._delta and self._delta_negotiated:
new_data, prev_data = self._client._codec.apply_delta_if_needed(
self._prev_data, publication
)
publication.data = new_data
self._prev_data = prev_data

await self.events.on_publication(PublicationContext(pub=publication))
if publication.offset > 0:
self._offset = publication.offset
Expand Down
26 changes: 25 additions & 1 deletion centrifuge/codecs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import json
from typing import Union, Iterable, AsyncIterable
from typing import Union, Iterable, AsyncIterable, TYPE_CHECKING

from google.protobuf.json_format import MessageToDict, ParseDict
from websockets.typing import Data

import centrifuge.protocol.client_pb2 as protocol
from centrifuge.fossil import apply_delta

if TYPE_CHECKING:
from centrifuge import Publication


class _JsonCodec:
Expand All @@ -18,6 +22,16 @@ def encode_commands(commands):
def decode_replies(data):
return [json.loads(reply) for reply in data.strip().split("\n")]

@staticmethod
def apply_delta_if_needed(prev_data: bytes, pub: "Publication"):
if pub.delta:
prev_data = apply_delta(prev_data, pub.data.encode("utf-8"))
new_data = json.loads(prev_data)
else:
prev_data = pub.data.encode("utf-8")
new_data = json.loads(pub.data)
return new_data, prev_data


def _varint_encode(number):
"""Encode an integer as a varint."""
Expand Down Expand Up @@ -73,3 +87,13 @@ def decode_replies(data: bytes):
reply.ParseFromString(message_bytes)
replies.append(MessageToDict(reply, preserving_proto_field_name=True))
return replies

@staticmethod
def apply_delta_if_needed(prev_data: bytes, pub: "Publication"):
if pub.delta:
prev_data = apply_delta(prev_data, pub.data)
new_data = prev_data
else:
prev_data = pub.data
new_data = pub.data
return new_data, prev_data
Loading

0 comments on commit dde4b2d

Please sign in to comment.