Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/EE #210

Merged
merged 13 commits into from
Mar 27, 2024
Merged
5 changes: 5 additions & 0 deletions src/Api/PubnubApi/Builder/UrlRequestBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ Uri IUrlRequestBuilder.BuildMultiChannelSubscribeRequest(string requestMethod, s
requestQueryStringParams.Add("filter-expr", UriUtil.EncodeUriComponent(pubnubConfig[pubnubInstanceId].FilterExpression, currentType, false, false, false));
}

if (!requestQueryStringParams.ContainsKey("ee") && pubnubConfig.ContainsKey(pubnubInstanceId) && pubnubConfig[pubnubInstanceId].EnableEventEngine)
{
requestQueryStringParams.Add("ee", "");
}

if (!requestQueryStringParams.ContainsKey("tt"))
{
requestQueryStringParams.Add("tt", timetoken.ToString(CultureInfo.InvariantCulture));
Expand Down
2 changes: 1 addition & 1 deletion src/Api/PubnubApi/EndPoint/Presence/PresenceOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public PresenceOperation(Pubnub instance, string instanceId, IPubnubLog pubnubLo

public void Start(string[] channels, string[] channelGroups)
{
this.presenceEventEngine.eventQueue.Enqueue(new JoinedEvent() {
this.presenceEventEngine.EventQueue.Enqueue(new JoinedEvent() {
Input = new EventEngine.Presence.Common.PresenceInput() { Channels = channels, ChannelGroups = channelGroups }
});
}
Expand Down
219 changes: 219 additions & 0 deletions src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net;
using System.Globalization;
using PubnubApi.EventEngine.Subscribe;
using PubnubApi;
using PubnubApi.EventEngine.Core;
using PubnubApi.EventEngine.Subscribe.Events;
using PubnubApi.EventEngine.Subscribe.States;
using PubnubApi.EventEngine.Subscribe.Common;

namespace PubnubApi.EndPoint
{
public class SubscribeEndpoint<T>: ISubscribeOperation<T>
{
private readonly PNConfiguration config;
private readonly IJsonPluggableLibrary jsonLibrary;
private readonly IPubnubUnitTest unit;
private readonly IPubnubLog pubnubLog;
private readonly EndPoint.TelemetryManager pubnubTelemetryMgr;
private readonly EndPoint.TokenManager pubnubTokenMgr;

private List<string> subscribeChannelNames = new List<string>();
private List<string> subscribeChannelGroupNames = new List<string>();
Comment on lines +21 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if is it a good practice or not but I've seen in our code many occurrences of IEnumerable instead of using lists directly. Should we continue this trend?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the purpose of using List<string> is, We are going to modify the channels/groups (based on presence channels/groups subscribed or not)
Having List type here makes add/remove or indexer available for further operations.

Not sure about other places, But If we declare IEnumerable here then we will need to convert it to list/array whenever we need to access any element /update it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So It's based on usage we should assign IEnumerable / List/ Array.

private long subscribeTimetoken = -1;
private bool presenceSubscribeEnabled;
private SubscribeManager2 manager;
private Dictionary<string, object> queryParam;
private Pubnub PubnubInstance;
private SubscribeEventEngine subscribeEventEngine;
private SubscribeEventEngineFactory subscribeEventEngineFactory { get; set; }
private string instanceId { get; set; }
public List<SubscribeCallback> SubscribeListenerList
{
get;
set;
} = new List<SubscribeCallback>();

public SubscribeEndpoint(PNConfiguration pubnubConfig, IJsonPluggableLibrary jsonPluggableLibrary, IPubnubUnitTest pubnubUnit, IPubnubLog log, EndPoint.TelemetryManager telemetryManager, EndPoint.TokenManager tokenManager,SubscribeEventEngineFactory subscribeEventEngineFactory, string instanceId, Pubnub instance)
{
PubnubInstance = instance;
config = pubnubConfig;
jsonLibrary = jsonPluggableLibrary;
unit = pubnubUnit;
pubnubLog = log;
pubnubTelemetryMgr = telemetryManager;
pubnubTokenMgr = tokenManager;
this.subscribeEventEngineFactory = subscribeEventEngineFactory;
this.instanceId = instanceId;
if (unit != null) { unit.EventTypeList = new List<KeyValuePair<string, string>>(); }
}

public ISubscribeOperation<T> Channels(string[] channels)
{
if (channels != null && channels.Length > 0 && !string.IsNullOrEmpty(channels[0]))
{
this.subscribeChannelNames.AddRange(channels);
}
return this;
}

public ISubscribeOperation<T> ChannelGroups(string[] channelGroups)
{
if (channelGroups != null && channelGroups.Length > 0 && !string.IsNullOrEmpty(channelGroups[0]))
{
this.subscribeChannelGroupNames.AddRange(channelGroups);
}
return this;
}

public ISubscribeOperation<T> WithTimetoken(long timetoken)
{
this.subscribeTimetoken = timetoken;
return this;
}

public ISubscribeOperation<T> WithPresence()
{
this.presenceSubscribeEnabled = true;
return this;
}

public ISubscribeOperation<T> QueryParam(Dictionary<string, object> customQueryParam)
{
this.queryParam = customQueryParam;
return this;
}

public void Execute()
{
if (this.subscribeChannelNames == null)
{
this.subscribeChannelNames = new List<string>();
}

if (this.subscribeChannelGroupNames == null)
{
this.subscribeChannelGroupNames = new List<string>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, can be shortened with "??="

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!!!

}

if (this.presenceSubscribeEnabled)
{
List<string> presenceChannelNames = (this.subscribeChannelNames != null && this.subscribeChannelNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelNames[0]))
? this.subscribeChannelNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();
List<string> presenceChannelGroupNames = (this.subscribeChannelGroupNames != null && this.subscribeChannelGroupNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelGroupNames[0]))
? this.subscribeChannelGroupNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>();

if (this.subscribeChannelNames != null && presenceChannelNames.Count > 0)
{
this.subscribeChannelNames.AddRange(presenceChannelNames);
}

if (this.subscribeChannelGroupNames != null && presenceChannelGroupNames.Count > 0)
{
this.subscribeChannelGroupNames.AddRange(presenceChannelGroupNames);
}
}

string[] channelNames = this.subscribeChannelNames != null ? this.subscribeChannelNames.ToArray() : null;
string[] channelGroupNames = this.subscribeChannelGroupNames != null ? this.subscribeChannelGroupNames.ToArray() : null;
SubscriptionCursor cursor = null;
if (subscribeTimetoken >= 1)
{
cursor = new SubscriptionCursor { Timetoken = subscribeTimetoken, Region = 0 };
}
Subscribe(channelNames, channelGroupNames, cursor, this.queryParam);
}

private void Subscribe(string[] channels, string[] channelGroups, SubscriptionCursor cursor, Dictionary<string, object> externalQueryParam)
{
if ((channels == null || channels.Length == 0) && (channelGroups == null || channelGroups.Length == 0))
{
throw new ArgumentException("Either Channel Or Channel Group or Both should be provided.");
}

if (this.subscribeEventEngineFactory.HasEventEngine(instanceId))
{
subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId);
}
else
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it is a github issue but this formatting looks awful :kekw:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cross checked this.. It looks as expected in IDE

Copy link
Contributor Author

@mohitpubnub mohitpubnub Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still Indented & refactored a bit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

{
var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubLog, pubnubTelemetryMgr, pubnubTokenMgr, PubnubInstance);
subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, StatusEmitter, MessageEmitter);
subscribeEventEngine.OnStateTransition += SubscribeEventEngine_OnStateTransition;
subscribeEventEngine.OnEventQueued += SubscribeEventEngine_OnEventQueued;
subscribeEventEngine.OnEffectDispatch += SubscribeEventEngine_OnEffectDispatch;
}
subscribeEventEngine.Subscribe<T>(channels, channelGroups, cursor);
}

private void SubscribeEventEngine_OnEffectDispatch(IEffectInvocation obj)
{
try
{
unit?.EventTypeList.Add(new KeyValuePair<string, string>("invocation", obj?.Name));
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEffectDispatch : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => Invocation = {obj.GetType().Name}", config.LogVerbosity);
}
catch (Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEffectDispatch : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void SubscribeEventEngine_OnEventQueued(IEvent @event)
{
try
{
unit?.EventTypeList.Add(new KeyValuePair<string, string>("event", @event?.Name));
int attempts = 0;
if (subscribeEventEngine.CurrentState is HandshakeReconnectingState handshakeReconnectingState)
{
attempts = handshakeReconnectingState.AttemptedRetries;
}
else if (subscribeEventEngine.CurrentState is ReceiveReconnectingState receiveReconnectingState)
{
attempts = receiveReconnectingState.AttemptedRetries;
}
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEventQueued : CurrentState: {subscribeEventEngine.CurrentState.GetType().Name}; Event = {@event.GetType().Name}; Attempt = {attempts} of {config.ConnectionMaxRetries}", config.LogVerbosity);
}
catch(Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEventQueued : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void SubscribeEventEngine_OnStateTransition(EventEngine.Core.TransitionResult obj)
{
try
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnStateTransition : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => Transition State = {obj?.State.GetType().Name}", config.LogVerbosity);
}
catch(Exception ex)
{
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnStateTransition : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity);
}
}

private void MessageEmitter<T>(Pubnub pubnubInstance, PNMessageResult<T> messageResult)

Check warning on line 202 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs

View workflow job for this annotation

GitHub Actions / Acceptance tests

Type parameter 'T' has the same name as the type parameter from outer type 'SubscribeEndpoint<T>'

Check warning on line 202 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs

View workflow job for this annotation

GitHub Actions / Integration and Unit tests

Type parameter 'T' has the same name as the type parameter from outer type 'SubscribeEndpoint<T>'
{
foreach (var listener in SubscribeListenerList)
{
listener?.Message(pubnubInstance, messageResult);
}
}

private void StatusEmitter(Pubnub pubnubInstance, PNStatus status)
{
foreach (var listener in SubscribeListenerList)
{
listener?.Status(pubnubInstance, status);
}
}

}
}
Loading
Loading