-
Notifications
You must be signed in to change notification settings - Fork 126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor/EE #210
Merged
Merged
refactor/EE #210
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
4ec339f
refactor EventEngine source code: * Updated subscribe states.
mohitpubnub 7897081
fix csproj file for UWP
mohitpubnub 8c78c11
Revert "fix csproj file for UWP"
mohitpubnub dd10ff0
fix compilation error csproj UWP
mohitpubnub 03ab8dc
removed duplicate entry for include
mohitpubnub a1ac119
fix: subscribe event engine workflow
mohitpubnub 5cc5574
UnsubscribeAllEndpoint added for event Engine
mohitpubnub 687a98c
Refactored code for handling unsubscribe in Presence
mohitpubnub 00012e8
added UnsubscribeAllEndpoint class
mohitpubnub aef3db6
refactored code for receive Effect handler and some code readablility…
mohitpubnub d720695
update PCL and UWP
mohitpubnub 24ecc9b
code indentation pubnub.cs
mohitpubnub 9f7f268
fix: acceptance test build
mohitpubnub File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Globalization; | ||
using PubnubApi.EventEngine.Subscribe; | ||
using PubnubApi.EventEngine.Core; | ||
using PubnubApi.EventEngine.Subscribe.States; | ||
using PubnubApi.EventEngine.Subscribe.Common; | ||
|
||
namespace PubnubApi.EndPoint | ||
{ | ||
public class SubscribeEndpoint<T>: ISubscribeOperation<T> | ||
{ | ||
private readonly PNConfiguration config; | ||
private readonly IJsonPluggableLibrary jsonLibrary; | ||
private readonly IPubnubUnitTest unit; | ||
private readonly IPubnubLog pubnubLog; | ||
private readonly EndPoint.TelemetryManager pubnubTelemetryMgr; | ||
private readonly EndPoint.TokenManager pubnubTokenMgr; | ||
|
||
private List<string> subscribeChannelNames = new List<string>(); | ||
private List<string> subscribeChannelGroupNames = new List<string>(); | ||
private long subscribeTimetoken = -1; | ||
private bool presenceSubscribeEnabled; | ||
private SubscribeManager2 manager; | ||
private Dictionary<string, object> queryParam; | ||
private Pubnub PubnubInstance; | ||
private SubscribeEventEngine subscribeEventEngine; | ||
private SubscribeEventEngineFactory subscribeEventEngineFactory; | ||
private PresenceOperation<T> presenceOperation; | ||
private string instanceId { get; set; } | ||
public List<SubscribeCallback> SubscribeListenerList | ||
{ | ||
get; | ||
set; | ||
} = new List<SubscribeCallback>(); | ||
|
||
public SubscribeEndpoint(PNConfiguration pubnubConfig, IJsonPluggableLibrary jsonPluggableLibrary, IPubnubUnitTest pubnubUnit, IPubnubLog log, EndPoint.TelemetryManager telemetryManager, EndPoint.TokenManager tokenManager,SubscribeEventEngineFactory subscribeEventEngineFactory, PresenceOperation<T> presenceOperation , string instanceId, Pubnub instance) | ||
{ | ||
PubnubInstance = instance; | ||
config = pubnubConfig; | ||
jsonLibrary = jsonPluggableLibrary; | ||
unit = pubnubUnit; | ||
pubnubLog = log; | ||
pubnubTelemetryMgr = telemetryManager; | ||
pubnubTokenMgr = tokenManager; | ||
this.subscribeEventEngineFactory = subscribeEventEngineFactory; | ||
this.presenceOperation = presenceOperation; | ||
this.instanceId = instanceId; | ||
if (unit != null) { unit.EventTypeList = new List<KeyValuePair<string, string>>(); } | ||
} | ||
|
||
public ISubscribeOperation<T> Channels(string[] channels) | ||
{ | ||
if (channels != null && channels.Length > 0 && !string.IsNullOrEmpty(channels[0])) | ||
{ | ||
this.subscribeChannelNames.AddRange(channels); | ||
} | ||
return this; | ||
} | ||
|
||
public ISubscribeOperation<T> ChannelGroups(string[] channelGroups) | ||
{ | ||
if (channelGroups != null && channelGroups.Length > 0 && !string.IsNullOrEmpty(channelGroups[0])) | ||
{ | ||
this.subscribeChannelGroupNames.AddRange(channelGroups); | ||
} | ||
return this; | ||
} | ||
|
||
public ISubscribeOperation<T> WithTimetoken(long timetoken) | ||
{ | ||
this.subscribeTimetoken = timetoken; | ||
return this; | ||
} | ||
|
||
public ISubscribeOperation<T> WithPresence() | ||
{ | ||
this.presenceSubscribeEnabled = true; | ||
return this; | ||
} | ||
|
||
public ISubscribeOperation<T> QueryParam(Dictionary<string, object> customQueryParam) | ||
{ | ||
this.queryParam = customQueryParam; | ||
return this; | ||
} | ||
|
||
public void Execute() | ||
{ | ||
subscribeChannelNames ??= new List<string>(); | ||
subscribeChannelGroupNames ??= new List<string>(); | ||
|
||
if (presenceSubscribeEnabled) { | ||
List<string> presenceChannelNames = (this.subscribeChannelNames != null && this.subscribeChannelNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelNames[0])) | ||
? this.subscribeChannelNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>(); | ||
List<string> presenceChannelGroupNames = (this.subscribeChannelGroupNames != null && this.subscribeChannelGroupNames.Count > 0 && !string.IsNullOrEmpty(this.subscribeChannelGroupNames[0])) | ||
? this.subscribeChannelGroupNames.Select(c => string.Format(CultureInfo.InvariantCulture, "{0}-pnpres", c)).ToList() : new List<string>(); | ||
|
||
if (this.subscribeChannelNames != null && presenceChannelNames.Count > 0) { | ||
this.subscribeChannelNames.AddRange(presenceChannelNames); | ||
} | ||
|
||
if (this.subscribeChannelGroupNames != null && presenceChannelGroupNames.Count > 0) { | ||
this.subscribeChannelGroupNames.AddRange(presenceChannelGroupNames); | ||
} | ||
} | ||
|
||
string[] channelNames = subscribeChannelNames != null ? this.subscribeChannelNames.ToArray() : null; | ||
string[] channelGroupNames = subscribeChannelGroupNames != null ? this.subscribeChannelGroupNames.ToArray() : null; | ||
SubscriptionCursor cursor = null; | ||
if (subscribeTimetoken >= 1) { | ||
cursor = new SubscriptionCursor { Timetoken = subscribeTimetoken, Region = 0 }; | ||
} | ||
Subscribe(channelNames, channelGroupNames, cursor, this.queryParam); | ||
} | ||
|
||
private void Subscribe(string[] channels, string[] channelGroups, SubscriptionCursor cursor, Dictionary<string, object> externalQueryParam) | ||
{ | ||
if ((channels?.Length ?? 0) == 0 && (channelGroups?.Length ?? 0) == 0) { | ||
throw new ArgumentException("Either Channel Or Channel Group or Both should be provided."); | ||
} | ||
|
||
if (subscribeEventEngineFactory.HasEventEngine(instanceId)) { | ||
subscribeEventEngine = subscribeEventEngineFactory.GetEventEngine(instanceId); | ||
} else { | ||
var subscribeManager = new SubscribeManager2(config, jsonLibrary, unit, pubnubLog, pubnubTelemetryMgr, pubnubTokenMgr, PubnubInstance); | ||
subscribeEventEngine = subscribeEventEngineFactory.InitializeEventEngine(instanceId, PubnubInstance, config, subscribeManager, StatusEmitter, MessageEmitter); | ||
subscribeEventEngine.OnStateTransition += SubscribeEventEngine_OnStateTransition; | ||
subscribeEventEngine.OnEventQueued += SubscribeEventEngine_OnEventQueued; | ||
subscribeEventEngine.OnEffectDispatch += SubscribeEventEngine_OnEffectDispatch; | ||
} | ||
subscribeEventEngine.Subscribe<T>(channels, channelGroups, cursor); | ||
if (this.presenceOperation != null) { | ||
presenceOperation.Start(channels?.Where(c => !c.EndsWith("-pnpres")).ToArray(), channelGroups?.Where(cg => !cg.EndsWith("-pnpres")).ToArray()); | ||
} | ||
} | ||
|
||
private void SubscribeEventEngine_OnEffectDispatch(IEffectInvocation obj) | ||
{ | ||
try | ||
{ | ||
unit?.EventTypeList.Add(new KeyValuePair<string, string>("invocation", obj?.Name)); | ||
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEffectDispatch : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => Invocation = {obj.GetType().Name}", config.LogVerbosity); | ||
} | ||
catch (Exception ex) | ||
{ | ||
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEffectDispatch : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity); | ||
} | ||
} | ||
|
||
private void SubscribeEventEngine_OnEventQueued(IEvent @event) | ||
{ | ||
try | ||
{ | ||
unit?.EventTypeList.Add(new KeyValuePair<string, string>("event", @event?.Name)); | ||
int attempts = 0; | ||
if (subscribeEventEngine.CurrentState is HandshakeReconnectingState handshakeReconnectingState) | ||
{ | ||
attempts = handshakeReconnectingState.AttemptedRetries; | ||
} | ||
else if (subscribeEventEngine.CurrentState is ReceiveReconnectingState receiveReconnectingState) | ||
{ | ||
attempts = receiveReconnectingState.AttemptedRetries; | ||
} | ||
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEventQueued : CurrentState: {subscribeEventEngine.CurrentState.GetType().Name}; Event = {@event.GetType().Name}; Attempt = {attempts} of {config.ConnectionMaxRetries}", config.LogVerbosity); | ||
} | ||
catch(Exception ex) | ||
{ | ||
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnEventQueued : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity); | ||
} | ||
} | ||
|
||
private void SubscribeEventEngine_OnStateTransition(EventEngine.Core.TransitionResult obj) | ||
{ | ||
try | ||
{ | ||
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnStateTransition : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => Transition State = {obj?.State.GetType().Name}", config.LogVerbosity); | ||
} | ||
catch(Exception ex) | ||
{ | ||
LoggingMethod.WriteToLog(pubnubLog, $"DateTime {DateTime.Now.ToString(CultureInfo.InvariantCulture)}, EE OnStateTransition : CurrentState = {subscribeEventEngine.CurrentState.GetType().Name} => EXCEPTION = {ex}", config.LogVerbosity); | ||
} | ||
} | ||
|
||
private void MessageEmitter<T>(Pubnub pubnubInstance, PNMessageResult<T> messageResult) | ||
Check warning on line 186 in src/Api/PubnubApi/EndPoint/PubSub/SubscribeEndpoint.cs GitHub Actions / Acceptance tests
|
||
{ | ||
foreach (var listener in SubscribeListenerList) | ||
{ | ||
listener?.Message(pubnubInstance, messageResult); | ||
} | ||
} | ||
|
||
private void StatusEmitter(Pubnub pubnubInstance, PNStatus status) | ||
{ | ||
foreach (var listener in SubscribeListenerList) | ||
{ | ||
listener?.Status(pubnubInstance, status); | ||
} | ||
} | ||
|
||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if is it a good practice or not but I've seen in our code many occurrences of
IEnumerable
instead of using lists directly. Should we continue this trend?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the purpose of using
List<string>
is, We are going to modify the channels/groups (based on presence channels/groups subscribed or not)Having
List
type here makes add/remove or indexer available for further operations.Not sure about other places, But If we declare
IEnumerable
here then we will need to convert it to list/array whenever we need to access any element /update itThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So It's based on usage we should assign IEnumerable / List/ Array.