Skip to content

Commit

Permalink
refactor/EE (#210)
Browse files Browse the repository at this point in the history
* refactor EventEngine source code: * Updated subscribe states.
* updated subscribe Effects
* Fixed Retry in effect handlers to get configuration from PNConfiguration instance.
* Refactored Unsubscribe.
* Renamed SubscirbeOperation2 to SubscriptionEndpoint
* Miscellaneous minor fixes

* fix csproj file for UWP

* Revert "fix csproj file for UWP"

This reverts commit 7897081.

* fix compilation error csproj UWP

* removed duplicate entry for include

* fix: subscribe event engine workflow

* UnsubscribeAllEndpoint added for event Engine

* Refactored code for handling unsubscribe in Presence

* added UnsubscribeAllEndpoint class

* refactored code for receive Effect handler and some code readablility issue addressed

* update PCL and UWP

* code indentation pubnub.cs

* fix: acceptance test build
  • Loading branch information
mohitpubnub authored Mar 27, 2024
1 parent 1dd0966 commit 8789efb
Show file tree
Hide file tree
Showing 61 changed files with 1,075 additions and 3,643 deletions.
5 changes: 5 additions & 0 deletions src/Api/PubnubApi/Builder/UrlRequestBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ Uri IUrlRequestBuilder.BuildMultiChannelSubscribeRequest(string requestMethod, s
requestQueryStringParams.Add("filter-expr", UriUtil.EncodeUriComponent(pubnubConfig[pubnubInstanceId].FilterExpression, currentType, false, false, false));
}

if (!requestQueryStringParams.ContainsKey("ee") && pubnubConfig.ContainsKey(pubnubInstanceId) && pubnubConfig[pubnubInstanceId].EnableEventEngine)
{
requestQueryStringParams.Add("ee", "");
}

if (!requestQueryStringParams.ContainsKey("tt"))
{
requestQueryStringParams.Add("tt", timetoken.ToString(CultureInfo.InvariantCulture));
Expand Down
8 changes: 4 additions & 4 deletions src/Api/PubnubApi/EndPoint/Presence/PresenceOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ public class PresenceOperation<T>
public PresenceOperation(Pubnub instance, string instanceId, IPubnubLog pubnubLog, TelemetryManager telemetryManager, TokenManager tokenManager, PresenceEventEngineFactory presenceEventEngineFactory)
{
this.presenceEventEngineFactory = presenceEventEngineFactory;
if (this.presenceEventEngineFactory.hasEventEngine(instanceId)) {
presenceEventEngine = this.presenceEventEngineFactory.getEventEngine(instanceId);
if (this.presenceEventEngineFactory.HasEventEngine(instanceId)) {
presenceEventEngine = this.presenceEventEngineFactory.GetEventEngine(instanceId);
} else {
presenceEventEngine = this.presenceEventEngineFactory.initializeEventEngine<T>(instanceId, instance, pubnubLog, telemetryManager, tokenManager);
presenceEventEngine = this.presenceEventEngineFactory.InitializeEventEngine<T>(instanceId, instance, pubnubLog, telemetryManager, tokenManager);
}
}

public void Start(string[] channels, string[] channelGroups)
{
this.presenceEventEngine.eventQueue.Enqueue(new JoinedEvent() {
this.presenceEventEngine.EventQueue.Enqueue(new JoinedEvent() {
Input = new EventEngine.Presence.Common.PresenceInput() { Channels = channels, ChannelGroups = channelGroups }
});
}
Expand Down
203 changes: 203 additions & 0 deletions src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Globalization;
using PubnubApi.EventEngine.Subscribe;
using PubnubApi.EventEngine.Core;
using PubnubApi.EventEngine.Subscribe.States;
using PubnubApi.EventEngine.Subscribe.Common;

namespace PubnubApi.EndPoint
{
public class SubscribeEndpoint<T>: ISubscribeOperation<T>
{
private readonly PNConfiguration config;
private readonly IJsonPluggableLibrary jsonLibrary;
private readonly IPubnubUnitTest unit;
private readonly IPubnubLog pubnubLog;
private readonly EndPoint.TelemetryManager pubnubTelemetryMgr;
private readonly EndPoint.TokenManager pubnubTokenMgr;

private List<string> subscribeChannelNames = new List<string>();
private List<string> subscribeChannelGroupNames = new List<string>();
private long subscribeTimetoken = -1;
private bool presenceSubscribeEnabled;
private SubscribeManager2 manager;
private Dictionary<string, object> queryParam;
private Pubnub PubnubInstance;
private SubscribeEventEngine subscribeEventEngine;
private SubscribeEventEngineFactory subscribeEventEngineFactory;
private PresenceOperation<T> presenceOperation;
private string instanceId { get; set; }
public List<SubscribeCallback> SubscribeListenerList
{
get;
set;
} = new List<SubscribeCallback>();

public SubscribeEndpoint(PNConfiguration pubnubConfig, IJsonPluggableLibrary jsonPluggableLibrary, IPubnubUnitTest pubnubUnit, IPubnubLog log, EndPoint.TelemetryManager telemetryManager, EndPoint.TokenManager tokenManager,SubscribeEventEngineFactory subscribeEventEngineFactory, PresenceOperation<T> presenceOperation , string instanceId, Pubnub instance)
{
PubnubInstance = instance;
config = pubnubConfig;
jsonLibrary = jsonPluggableLibrary;
unit = pubnubUnit;
pubnubLog = log;
pubnubTelemetryMgr = telemetryManager;
pubnubTokenMgr = tokenManager;
this.subscribeEventEngineFactory = subscribeEventEngineFactory;
this.presenceOperation = presenceOperation;
this.instanceId = instanceId;
if (unit != null) { unit.EventTypeList = new List<KeyValuePair<string, string>>(); }
}

public ISubscribeOperation<T> Channels(string[] channels)
{
if (channels != null && channels.Length > 0 && !string.IsNullOrEmpty(channels[0]))
{
this.subscribeChannelNames.AddRange(channels);
}
return this;
}

public ISubscribeOperation<T> ChannelGroups(string[] channelGroups)
{
if (channelGroups != null && channelGroups.Length > 0 && !string.IsNullOrEmpty(channelGroups[0]))
{
this.subscribeChannelGroupNames.AddRange(channelGroups);
}
return this;
}

public ISubscribeOperation<T> WithTimetoken(long timetoken)
{
this.subscribeTimetoken = timetoken;
return this;
}

public ISubscribeOperation<T> WithPresence()
{
this.presenceSubscribeEnabled = true;
return this;
}

public ISubscribeOperation<T> QueryParam(Dictionary<string, object> customQueryParam)
{
this.queryParam = customQueryParam;
return this;
}

public void Execute()
{
subscribeChannelNames ??= new List<string>();
subscribeChannelGroupNames ??= new List<string>();

if (presenceSubscribeEnabled) {
List<string> presenceChannelNames = (this.subscribeChannelNames != null && this.subscribeChannelNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelNames[0]))
? this.subscribeChannelNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();
List<string> presenceChannelGroupNames = (this.subscribeChannelGroupNames != null && this.subscribeChannelGroupNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelGroupNames[0]))
? this.subscribeChannelGroupNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();

if (this.subscribeChannelNames != null && presenceChannelNames.Count > 0) {
this.subscribeChannelNames.AddRange(presenceChannelNames);
}

if (this.subscribeChannelGroupNames != null && presenceChannelGroupNames.Count > 0) {
this.subscribeChannelGroupNames.AddRange(presenceChannelGroupNames);
}
}

string[] channelNames = subscribeChannelNames != null ? this.subscribeChannelNames.ToArray() : null;
string[] channelGroupNames = subscribeChannelGroupNames != null ? this.subscribeChannelGroupNames.ToArray() : null;
SubscriptionCursor cursor = null;
if (subscribeTimetoken >= 1) {
cursor = new SubscriptionCursor { Timetoken = subscribeTimetoken, Region = 0 };
}
Subscribe(channelNames, channelGroupNames, cursor, this.queryParam);
}

private void Subscribe(string[] channels, string[] channelGroups, SubscriptionCursor cursor, Dictionary<string, object> externalQueryParam)
{
if ((channels?.Length ?? 0) == 0 && (channelGroups?.Length ?? 0) == 0) {
throw new ArgumentException("Either Channel Or Channel Group or Both should be provided.");
}

if (subscribeEventEngineFactory.HasEventEngine(instanceId)) {
subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId);
} else {
var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubLog, pubnubTelemetryMgr, pubnubTokenMgr, PubnubInstance);
subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, StatusEmitter, MessageEmitter);
subscribeEventEngine.OnStateTransition += SubscribeEventEngine_OnStateTransition;
subscribeEventEngine.OnEventQueued += SubscribeEventEngine_OnEventQueued;
subscribeEventEngine.OnEffectDispatch += SubscribeEventEngine_OnEffectDispatch;
}
subscribeEventEngine.Subscribe<T>(channels, channelGroups, cursor);
if (this.presenceOperation != null) {
presenceOperation.Start(channels?.Where(c => !c.EndsWith("-pnpres")).ToArray(), channelGroups?.Where(cg => !cg.EndsWith("-pnpres")).ToArray());
}
}

private void SubscribeEventEngine_OnEffectDispatch(IEffectInvocation obj)
{
try
{
unit?.EventTypeList.Add(new KeyValuePair<string, string>("invocation", obj?.Name));
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEffectDispatch : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => Invocation = {obj.GetType().Name}", config.LogVerbosity);
}
catch (Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEffectDispatch : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void SubscribeEventEngine_OnEventQueued(IEvent @event)
{
try
{
unit?.EventTypeList.Add(new KeyValuePair<string, string>("event", @event?.Name));
int attempts = 0;
if (subscribeEventEngine.CurrentState is HandshakeReconnectingState handshakeReconnectingState)
{
attempts = handshakeReconnectingState.AttemptedRetries;
}
else if (subscribeEventEngine.CurrentState is ReceiveReconnectingState receiveReconnectingState)
{
attempts = receiveReconnectingState.AttemptedRetries;
}
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEventQueued : CurrentState: {subscribeEventEngine.CurrentState.GetType().Name}; Event = {@event.GetType().Name}; Attempt = {attempts} of {config.ConnectionMaxRetries}", config.LogVerbosity);
}
catch(Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEventQueued : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void SubscribeEventEngine_OnStateTransition(EventEngine.Core.TransitionResult obj)
{
try
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnStateTransition : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => Transition State = {obj?.State.GetType().Name}", config.LogVerbosity);
}
catch(Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnStateTransition : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void MessageEmitter<T>(Pubnub pubnubInstance, PNMessageResult<T> messageResult)

Check warning on line 186 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs

View workflow job for this annotation

GitHub Actions / Integration and Unit tests

Type parameter 'T' has the same name as the type parameter from outer type 'SubscribeEndpoint<T>'

Check warning on line 186 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs

View workflow job for this annotation

GitHub Actions / Acceptance tests

Type parameter 'T' has the same name as the type parameter from outer type 'SubscribeEndpoint<T>'
{
foreach (var listener in SubscribeListenerList)
{
listener?.Message(pubnubInstance, messageResult);
}
}

private void StatusEmitter(Pubnub pubnubInstance, PNStatus status)
{
foreach (var listener in SubscribeListenerList)
{
listener?.Status(pubnubInstance, status);
}
}

}
}
Loading

0 comments on commit 8789efb

Please sign in to comment.