Skip to content

Commit

Permalink
Add missing fixes from old PC.
Browse files Browse the repository at this point in the history
  • Loading branch information
KGronek-Pubnub committed Oct 2, 2024
1 parent 0297394 commit 8c357db
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 80 deletions.
177 changes: 123 additions & 54 deletions Source/PubnubLibrary/Private/PubnubSubsystem.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright 2024 PubNub Inc. All Rights Reserved.

#include "PubnubSubsystem.h"
#include <string>
#include "Json.h"

#include "Config/PubnubSettings.h"
#include "FunctionLibraries/PubnubUtilities.h"
#include "Threads/PubnubFunctionThread.h"
#include "Threads/PubnubLoopingThread.h"

Expand All @@ -24,7 +25,9 @@ void UPubnubSubsystem::Initialize(FSubsystemCollectionBase& Collection)
void UPubnubSubsystem::Deinitialize()
{
DeinitPubnub();
Super::Deinitialize();

//Give some time for C-Core to clean up correctly
FPlatformProcess::Sleep(0.5);

if(QuickActionThread)
{
Expand All @@ -34,6 +37,9 @@ void UPubnubSubsystem::Deinitialize()
{
LongpollThread->Stop();
}

Super::Deinitialize();

}

void UPubnubSubsystem::InitPubnub()
Expand Down Expand Up @@ -293,14 +299,14 @@ void UPubnubSubsystem::RevokeToken(FString Token)
});
}

void UPubnubSubsystem::ParseToken(FString Token)
void UPubnubSubsystem::ParseToken(FString Token, FOnPubnubResponse OnParseTokenResponse)
{
if(!CheckQuickActionThreadValidity())
{return;}

QuickActionThread->AddFunctionToQueue( [this, Token]
QuickActionThread->AddFunctionToQueue( [this, Token, OnParseTokenResponse]
{
ParseToken_priv(Token);
ParseToken_priv(Token, OnParseTokenResponse);
});
}

Expand Down Expand Up @@ -337,14 +343,14 @@ void UPubnubSubsystem::FetchHistory(FString ChannelName, FOnPubnubResponse OnFet
});
}

void UPubnubSubsystem::MessageCounts(FString ChannelName, FString TimeStamp, FOnPubnubResponse OnMessageCountsResponse)
void UPubnubSubsystem::MessageCounts(FString ChannelName, FString Timetoken, FOnPubnubIntResponse OnMessageCountsResponse)
{
if(!CheckQuickActionThreadValidity())
{return;}

QuickActionThread->AddFunctionToQueue( [this, ChannelName, TimeStamp, OnMessageCountsResponse]
QuickActionThread->AddFunctionToQueue( [this, ChannelName, Timetoken, OnMessageCountsResponse]
{
MessageCounts_priv(ChannelName, TimeStamp, OnMessageCountsResponse);
MessageCounts_priv(ChannelName, Timetoken, OnMessageCountsResponse);
});
}

Expand Down Expand Up @@ -515,14 +521,14 @@ void UPubnubSubsystem::RemoveChannelMembers(FString ChannelMetadataID, FString I
});
}

void UPubnubSubsystem::AddMessageAction(FString ChannelName, FString MessageTimeToken, EPubnubActionType ActionType, FString Value)
void UPubnubSubsystem::AddMessageAction(FString ChannelName, FString MessageTimetoken, EPubnubActionType ActionType, FString Value, FOnPubnubResponse AddActionResponse)
{
if(!CheckQuickActionThreadValidity())
{return;}

QuickActionThread->AddFunctionToQueue( [this, ChannelName, MessageTimeToken, ActionType, Value]
QuickActionThread->AddFunctionToQueue( [this, ChannelName, MessageTimetoken, ActionType, Value, AddActionResponse]
{
AddMessageAction_priv(ChannelName, MessageTimeToken, ActionType, Value);
AddMessageAction_priv(ChannelName, MessageTimetoken, ActionType, Value, AddActionResponse);
});
}

Expand All @@ -536,7 +542,7 @@ void UPubnubSubsystem::HistoryWithMessageActions(FString ChannelName, FString St
HistoryWithMessageActions_priv(ChannelName, Start, End, SizeLimit, OnHistoryWithMessageActionsResponse);
});
}

/* DISABLED
void UPubnubSubsystem::HistoryWithMessageActionsContinue(FOnPubnubResponse OnHistoryWithMAContinueResponse)
{
if(!CheckQuickActionThreadValidity())
Expand All @@ -547,6 +553,7 @@ void UPubnubSubsystem::HistoryWithMessageActionsContinue(FOnPubnubResponse OnHis
HistoryWithMessageActionsContinue_priv(OnHistoryWithMAContinueResponse);
});
}
*/

void UPubnubSubsystem::GetMessageActions(FString ChannelName, FString Start, FString End, int SizeLimit, FOnPubnubResponse OnGetMessageActionsResponse)
{
Expand All @@ -559,6 +566,18 @@ void UPubnubSubsystem::GetMessageActions(FString ChannelName, FString Start, FSt
});
}

void UPubnubSubsystem::RemoveMessageAction(FString ChannelName, FString MessageTimetoken, FString ActionTimetoken)
{
if(!CheckQuickActionThreadValidity())
{return;}

QuickActionThread->AddFunctionToQueue( [this, ChannelName, MessageTimetoken, ActionTimetoken]
{
RemoveMessageAction_priv(ChannelName, MessageTimetoken, ActionTimetoken);
});
}

/* DISABLED
void UPubnubSubsystem::GetMessageActionsContinue(FOnPubnubResponse OnGetMessageActionsContinueResponse)
{
if(!CheckQuickActionThreadValidity())
Expand All @@ -569,6 +588,7 @@ void UPubnubSubsystem::GetMessageActionsContinue(FOnPubnubResponse OnGetMessageA
GetMessageActionsContinue_priv(OnGetMessageActionsContinueResponse);
});
}
*/

FString UPubnubSubsystem::GrantTokenStructureToJsonString(FPubnubGrantTokenStructure TokenStructure, bool &success)
{
Expand Down Expand Up @@ -667,12 +687,8 @@ FString UPubnubSubsystem::GrantTokenStructureToJsonString(FPubnubGrantTokenStruc
PermissionsJsonObject->SetStringField("authorized_uuid", TokenStructure.AuthorizedUUID);
PermissionsJsonObject->SetObjectField("permissions", TokenStructureJsonObject);

//Convert created Json to string
FString JsonString;
TSharedRef< TJsonWriter<> > JsonWriter = TJsonWriterFactory<>::Create(&JsonString);
FJsonSerializer::Serialize(PermissionsJsonObject.ToSharedRef(), JsonWriter);

return JsonString;
//Convert created Json object to string
return UPubnubUtilities::JsonObjectToString(PermissionsJsonObject);
}

void UPubnubSubsystem::SystemPublish(FString ChannelOpt)
Expand Down Expand Up @@ -715,6 +731,10 @@ void UPubnubSubsystem::StartPubnubSubscribeLoop()
{return;}
}

//Check once again, as subsystem could be deinitialized during await
if(!IsInitialized)
{return;}

//At this stage we received messages, so read them and get channel from where they were sent
const char* MessageChar = pubnub_get(ctx_sub);
const char* ChannelChar = pubnub_get_channel(ctx_sub);
Expand Down Expand Up @@ -748,17 +768,11 @@ FString UPubnubSubsystem::StringArrayToCommaSeparated(TArray<FString> StringArra
if(CommaSeparatedString.IsEmpty())
{
CommaSeparatedString.Append(StringElement);
CommaSeparatedString.Append(",");
CommaSeparatedString.Append(StringElement);
CommaSeparatedString.Append("-pnpres");
}
else
{
CommaSeparatedString.Append(",");
CommaSeparatedString.Append(StringElement);
CommaSeparatedString.Append(",");
CommaSeparatedString.Append(StringElement);
CommaSeparatedString.Append("-pnpres");
}
}
return CommaSeparatedString;
Expand Down Expand Up @@ -860,7 +874,6 @@ void UPubnubSubsystem::PubnubPublishError()
//Broadcast bound delegate with JsonResponse
OnPubnubError.Broadcast(FinalErrorMessage, EPubnubErrorType::PET_Error);;
});

}

void UPubnubSubsystem::LoadPluginSettings()
Expand Down Expand Up @@ -959,7 +972,7 @@ void UPubnubSubsystem::InitPubnub_priv()
pubnub_init(ctx_pub, PublishKey, SubscribeKey);
pubnub_init(ctx_sub, PublishKey, SubscribeKey);

if(PubnubSettings->SetSecretKetAutomatically)
if(PubnubSettings->SetSecretKeyAutomatically)
{
SetSecretKey();
}
Expand All @@ -975,6 +988,8 @@ void UPubnubSubsystem::DeinitPubnub_priv()
//Unsubscribe from all channels so this user will not be visible for others anymore
UnsubscribeFromAll();

IsInitialized = false;

if(ctx_pub)
{
pubnub_free(ctx_pub);
Expand All @@ -986,7 +1001,6 @@ void UPubnubSubsystem::DeinitPubnub_priv()
ctx_sub = nullptr;
}

IsInitialized = false;
}

void UPubnubSubsystem::SetUserID_priv(FString UserID)
Expand Down Expand Up @@ -1340,7 +1354,7 @@ void UPubnubSubsystem::GrantToken_priv(FString PermissionObject, FOnPubnubRespon

if(CheckIsFieldEmpty(PermissionObject, "PermissionObject", "GrantToken"))
{return;}

pubnub_grant_token(ctx_pub, TCHAR_TO_ANSI(*PermissionObject));

pubnub_res PubnubResponse = pubnub_await(ctx_pub);
Expand Down Expand Up @@ -1385,7 +1399,7 @@ void UPubnubSubsystem::RevokeToken_priv(FString Token)
}
}

void UPubnubSubsystem::ParseToken_priv(FString Token)
void UPubnubSubsystem::ParseToken_priv(FString Token, FOnPubnubResponse OnParseTokenResponse)
{
if(!CheckIsPubnubInitialized() || !CheckIsUserIDSet())
{return;}
Expand All @@ -1400,6 +1414,22 @@ void UPubnubSubsystem::ParseToken_priv(FString Token)
{
PubnubResponseError(PubnubResponse, "Failed to Parse Token.");
}

pubnub_chamebl_t grant_token_resp = pubnub_get_grant_token(ctx_pub);
if(!grant_token_resp.ptr)
{
PubnubError("Failed to get Parse Token - pointer to token is invalid.");
return;
}

FString JsonResponse(grant_token_resp.ptr);

//Delegate needs to be executed back on Game Thread
AsyncTask(ENamedThreads::GameThread, [this, OnParseTokenResponse, JsonResponse]()
{
//Broadcast bound delegate with JsonResponse
OnParseTokenResponse.ExecuteIfBound(JsonResponse);
});
}

void UPubnubSubsystem::SetAuthToken_priv(FString Token)
Expand Down Expand Up @@ -1481,36 +1511,24 @@ void UPubnubSubsystem::FetchHistory_priv(FString ChannelName, FOnPubnubResponse
});
}

void UPubnubSubsystem::MessageCounts_priv(FString ChannelName, FString TimeStamp, FOnPubnubResponse OnMessageCountsResponse)
void UPubnubSubsystem::MessageCounts_priv(FString ChannelName, FString Timetoken, FOnPubnubIntResponse OnMessageCountsResponse)
{
if(!CheckIsPubnubInitialized() || !CheckIsUserIDSet())
{return;}

if(CheckIsFieldEmpty(ChannelName, "ChannelName", "MessageCounts"))
{return;}

//FString UnixTimeStamp = FString::FromInt(TimeStamp.ToUnixTimestamp());
pubnub_message_counts(ctx_pub, TCHAR_TO_ANSI(*ChannelName), TCHAR_TO_ANSI(*Timetoken));

//UE_LOG(LogTemp, Warning, TEXT("unix timestamp: %s"), *UnixTimeStamp);

pubnub_message_counts(ctx_pub, TCHAR_TO_ANSI(*ChannelName), TCHAR_TO_ANSI(*TimeStamp));
int MessageCountsNumber = 0;
pubnub_get_message_counts(ctx_pub, TCHAR_TO_ANSI(*ChannelName), &MessageCountsNumber);

pubnub_res PubnubResponse = pubnub_await(ctx_pub);
if(PubnubResponse != PNR_OK)
{
PubnubResponseError(PubnubResponse, "Failed to get message counts.");
}

int MessageCountsReturn;
pubnub_get_message_counts(ctx_pub, TCHAR_TO_ANSI(*ChannelName), &MessageCountsReturn);

FString JsonResponse(FString::FromInt(MessageCountsReturn));

//Delegate needs to be executed back on Game Thread
AsyncTask(ENamedThreads::GameThread, [this, OnMessageCountsResponse, JsonResponse]()
AsyncTask(ENamedThreads::GameThread, [this, OnMessageCountsResponse, MessageCountsNumber]()
{
//Broadcast bound delegate with JsonResponse
OnMessageCountsResponse.ExecuteIfBound(JsonResponse);
OnMessageCountsResponse.ExecuteIfBound(MessageCountsNumber);
});
}

Expand Down Expand Up @@ -1714,7 +1732,8 @@ void UPubnubSubsystem::RemoveMemberships_priv(FString UUIDMetadataID, FString In
}
}

void UPubnubSubsystem::GetChannelMembers_priv(FString ChannelMetadataID, FString Include, int Limit, FString Start, FString End, EPubnubTribool Count, FOnPubnubResponse OnGetMembersResponse)
void UPubnubSubsystem::GetChannelMembers_priv(FString ChannelMetadataID, FString Include, int Limit, FString Start,
FString End, EPubnubTribool Count, FOnPubnubResponse OnGetMembersResponse)
{
if(!CheckIsPubnubInitialized() || !CheckIsUserIDSet())
{return;}
Expand All @@ -1723,9 +1742,8 @@ void UPubnubSubsystem::GetChannelMembers_priv(FString ChannelMetadataID, FString
{return;}

pubnub_tribool InCount = (pubnub_tribool)(uint8)Count;

pubnub_get_members(ctx_pub,TCHAR_TO_ANSI(*ChannelMetadataID), TCHAR_TO_ANSI(*Include), Limit, TCHAR_TO_ANSI(*Start), TCHAR_TO_ANSI(*End), InCount);

FString JsonResponse = GetLastResponse(ctx_pub);

//Delegate needs to be executed back on Game Thread
Expand Down Expand Up @@ -1787,29 +1805,80 @@ void UPubnubSubsystem::RemoveChannelMembers_priv(FString ChannelMetadataID, FStr
}
}

void UPubnubSubsystem::AddMessageAction_priv(FString ChannelName, FString MessageTimeToken, EPubnubActionType ActionType, FString Value)
void UPubnubSubsystem::AddMessageAction_priv(FString ChannelName, FString MessageTimetoken, EPubnubActionType ActionType, FString Value, FOnPubnubResponse AddActionResponse)
{
if(!CheckIsPubnubInitialized() || !CheckIsUserIDSet())
{return;}

if(CheckIsFieldEmpty(ChannelName, "ChannelName", "AddMessageAction") || CheckIsFieldEmpty(MessageTimeToken, "MessageTimeToken", "AddMessageAction"))
if(CheckIsFieldEmpty(ChannelName, "ChannelName", "AddMessageAction") || CheckIsFieldEmpty(MessageTimetoken, "MessageTimetoken", "AddMessageAction"))
{return;}

pubnub_action_type PubnubActionType = (pubnub_action_type)(uint8)ActionType;
pubnub_add_message_action(ctx_pub, TCHAR_TO_ANSI(*ChannelName), TCHAR_TO_ANSI(*MessageTimeToken), PubnubActionType, TCHAR_TO_ANSI(*Value));
pubnub_add_message_action(ctx_pub, TCHAR_TO_ANSI(*ChannelName), TCHAR_TO_ANSI(*MessageTimetoken), PubnubActionType, TCHAR_TO_ANSI(*Value));

pubnub_res PubnubResponse = pubnub_await(ctx_pub);
if(PubnubResponse != PNR_OK)
{
PubnubResponseError(PubnubResponse, "Failed to Add Message Action.");
}
pubnub_chamebl_t AddMessageActionResponse = pubnub_get_message_action_timetoken(ctx_pub);

if(!AddMessageActionResponse.ptr)
{
return;
}
FString JsonResponse(AddMessageActionResponse.ptr);
UE_LOG(PubnubLog, Warning, TEXT("AddMessageAction response: %s"), *JsonResponse);

//Delegate needs to be executed back on Game Thread
AsyncTask(ENamedThreads::GameThread, [this, AddActionResponse, JsonResponse]()
{
//Broadcast bound delegate with JsonResponse
AddActionResponse.ExecuteIfBound(JsonResponse);
});
}

void UPubnubSubsystem::RemoveMessageAction_priv(FString ChannelName, FString MessageTimetoken, FString ActionTimetoken)
{
if(!CheckIsPubnubInitialized() || !CheckIsUserIDSet())
{return;}

if(CheckIsFieldEmpty(ChannelName, "ChannelName", "RemoveMessageAction") || CheckIsFieldEmpty(MessageTimetoken, "MessageTimetoken", "RemoveMessageAction")
|| CheckIsFieldEmpty(ActionTimetoken, "ActionTimetoken", "RemoveMessageAction"))
{return;}

auto MessageTimetokenConverter = StringCast<ANSICHAR>(*MessageTimetoken);
auto ActionTimetokenConverter = StringCast<ANSICHAR>(*ActionTimetoken);

// Allocate memory for message_timetoken_char and copy the content
char* message_timetoken_char = new char[MessageTimetoken.Len() + 1];
std::strcpy(message_timetoken_char, MessageTimetokenConverter.Get());

pubnub_chamebl_t message_timetoken_chamebl;
message_timetoken_chamebl.ptr = message_timetoken_char;
message_timetoken_chamebl.size = MessageTimetoken.Len();

// Allocate memory for action_timetoken_char and copy the content
char* action_timetoken_char = new char[ActionTimetoken.Len() + 1];
std::strcpy(action_timetoken_char, ActionTimetokenConverter.Get());

pubnub_chamebl_t action_timetoken_chamebl;
action_timetoken_chamebl.ptr = action_timetoken_char;
action_timetoken_chamebl.size = ActionTimetoken.Len();

pubnub_remove_message_action(ctx_pub, TCHAR_TO_ANSI(*ChannelName), message_timetoken_chamebl, action_timetoken_chamebl);

pubnub_res PubnubResponse = pubnub_await(ctx_pub);

FString JsonResponse = GetLastResponse(ctx_pub);

if(PubnubResponse != PNR_OK)
{
PubnubResponseError(PubnubResponse, "Failed to Remove Message Action.");
}

// Clean up allocated memory
delete[] message_timetoken_char;
delete[] action_timetoken_char;
}

void UPubnubSubsystem::HistoryWithMessageActions_priv(FString ChannelName, FString Start, FString End, int SizeLimit, FOnPubnubResponse OnHistoryWithMessageActionsResponse)
Expand Down
Loading

0 comments on commit 8c357db

Please sign in to comment.