Skip to content

Commit

Permalink
Update google messagign library to allow device registration
Browse files Browse the repository at this point in the history
  • Loading branch information
robertoles committed Sep 23, 2024
1 parent 4ce68ad commit e3a3dce
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.google.firebase.installations.FirebaseInstallations
import com.google.firebase.messaging.FirebaseMessaging
import com.pusher.pushnotifications.api.DeviceMetadata
import com.pusher.pushnotifications.api.PushNotificationsAPI
import com.pusher.pushnotifications.api.PushNotificationsAPIInvalidToken
import com.pusher.pushnotifications.auth.TokenProvider
import com.pusher.pushnotifications.fcm.MessagingService
import com.pusher.pushnotifications.internal.*
Expand Down Expand Up @@ -37,6 +38,7 @@ data class PusherCallbackError(val message: String, val cause: Throwable?)
internal sealed class ServerSyncEvent
internal data class InterestsChangedEvent(val interests: Set<String>): ServerSyncEvent()
internal data class UserIdSet(val userId: String, val pusherCallbackError: PusherCallbackError?): ServerSyncEvent()
internal data class InvalidTokenEvent(val invalid: Boolean): ServerSyncEvent()

internal class ServerSyncEventHandler private constructor(looper: Looper): Handler(looper) {
var onSubscriptionsChangedListener: SubscriptionsChangedListener? = null
Expand Down Expand Up @@ -111,6 +113,17 @@ class PushNotificationsInstance @JvmOverloads constructor(
secureFileDir = context.filesDir,
handleServerSyncEvent = { msg ->
serverSyncEventHandler.sendMessage(Message.obtain().apply { obj = msg })
if (msg is InvalidTokenEvent) {
// requeue start job
startHasBeenCalledThisSession = true
deviceStateStore.startJobHasBeenEnqueued = false
start()
for (i in deviceStateStore.interests) {
// this forces device interests to be re-registered
// the set of queued interests will be cleared by the registration process
forceSubscribe(i)
}
}
},
getTokenProvider = {
PushNotifications.tokenProvider[instanceId]
Expand Down Expand Up @@ -185,17 +198,25 @@ class PushNotificationsInstance @JvmOverloads constructor(
}

MessagingService.onRefreshToken = handleFcmToken
FirebaseInstallations.getInstance().getToken(true).addOnCompleteListener { task ->

FirebaseMessaging.getInstance().token.addOnCompleteListener { task ->
if (!task.isSuccessful) {
log.w("Failed to get the token from FCM", task.exception)
} else {
task.result?.let { handleFcmToken(it.token) }

// Get new FCM registration token
val token = task.result
handleFcmToken(token)
}
}

return this
}

private fun forceSubscribe(interest: String) {
serverSyncHandler.sendMessage(ServerSyncHandler.subscribe(interest))
}

/**
* Subscribes the device to an interest. For example:
* <pre>{@code pushNotifications.subscribe("hello");}</pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class PushNotificationsAPIUnprocessableEntity(val reason: String): PushNotificat
"The request was deemed to be unprocessable: $reason"
)
class PushNotificationsAPIDeviceNotFound: PushNotificationsAPIException("Device not found in the server")
class PushNotificationsAPIInvalidToken: PushNotificationsAPIException("Invalid token")
class PushNotificationsAPIBadRequest(val reason: String): PushNotificationsAPIException("A request to the server has been deemed invalid: $reason")
class PushNotificationsAPIBadJWT(val reason: String): PushNotificationsAPIException(
"The request was rejected because the JWT was invalid/unauthorized: $reason"
Expand Down Expand Up @@ -62,6 +63,10 @@ sealed class RetryStrategy<T> {
} catch (e: PushNotificationsAPIBadJWT) {
// not recoverable - will need a new JWT
throw e
} catch (e: PushNotificationsAPIInvalidToken) {
// not recoverable - the device token has been rejected by the platform
// probably requires a library upgrade
throw e
} catch (e: Exception) {
}

Expand Down Expand Up @@ -154,8 +159,13 @@ class PushNotificationsAPI(private val instanceId: String, overrideHostURL: Stri
val responseErrorBody = response.errorBody()
if (responseErrorBody != null) {
val error = safeExtractJsonError(responseErrorBody.string())

log.w("Failed to register device: $error")
throw PushNotificationsAPIException(error)
if (error.error == "InvalidToken") {
throw PushNotificationsAPIInvalidToken()
} else {
throw PushNotificationsAPIException(error)
}
}

throw PushNotificationsAPIException("Unknown API error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,76 +198,93 @@ class ServerSyncProcessHandler internal constructor(

private fun processStartJob(startJob: StartJob) {
// Register device with Errol
val registrationResponse =
api.registerFCM(
token = startJob.fcmToken,
knownPreviousClientIds = startJob.knownPreviousClientIds,
retryStrategy = RetryStrategy.WithInfiniteExpBackOff())

val outstandingJobs = mutableListOf<ServerSyncJob>()
synchronized(deviceStateStore) {
// Replay sub/unsub/setsub operations in job queue over initial interest set
val interests = registrationResponse.initialInterests.toMutableSet()
for (j in jobQueue.asIterable()) {
if (j is StartJob) {
break
}
when (j) {
is SubscribeJob -> {
interests += j.interest
}
is UnsubscribeJob -> {
interests -= j.interest
}
is SetSubscriptionsJob -> {
interests.clear()
interests.addAll(j.interests)
}
is StopJob -> {
outstandingJobs.clear()
// Any subscriptions changes done at this point are just discarded,
// and we need to assume the initial interest set as the starting point again
interests.clear()
interests.addAll(registrationResponse.initialInterests)
}
is SetUserIdJob -> {
outstandingJobs.add(j)
}
is ApplicationStartJob -> {
// ignoring it as we are already going to sync the state anyway
}
is RefreshTokenJob -> {
outstandingJobs.add(j)
}
else -> {
throw IllegalStateException("Job $j unexpected during SDK start")
}
try {
val registrationResponse =
api.registerFCM(
token = startJob.fcmToken,
knownPreviousClientIds = startJob.knownPreviousClientIds,
retryStrategy = RetryStrategy.WithInfiniteExpBackOff()
)


val outstandingJobs = mutableListOf<ServerSyncJob>()
synchronized(deviceStateStore) {
// Replay sub/unsub/setsub operations in job queue over initial interest set
val interests = registrationResponse.initialInterests.toMutableSet()
for (j in jobQueue.asIterable()) {
if (j is StartJob) {
break
}
when (j) {
is SubscribeJob -> {
interests += j.interest
}

is UnsubscribeJob -> {
interests -= j.interest
}

is SetSubscriptionsJob -> {
interests.clear()
interests.addAll(j.interests)
}

is StopJob -> {
outstandingJobs.clear()
// Any subscriptions changes done at this point are just discarded,
// and we need to assume the initial interest set as the starting point again
interests.clear()
interests.addAll(registrationResponse.initialInterests)
}

is SetUserIdJob -> {
outstandingJobs.add(j)
}

is ApplicationStartJob -> {
// ignoring it as we are already going to sync the state anyway
}

is RefreshTokenJob -> {
outstandingJobs.add(j)
}

else -> {
throw IllegalStateException("Job $j unexpected during SDK start")
}
}
}

log.d("device store interests: ${deviceStateStore.interests}")
log.d("registration interests: ${interests}")
val localInterestWillChange = deviceStateStore.interests != interests

// Replace interests with the result
if (localInterestWillChange) {
deviceStateStore.interests = interests
handleServerSyncEvent(InterestsChangedEvent(interests))
}
}
}

val localInterestWillChange = deviceStateStore.interests != interests

// Replace interests with the result
if (localInterestWillChange) {
deviceStateStore.interests = interests
handleServerSyncEvent(InterestsChangedEvent(interests))
}
}

deviceStateStore.deviceId = registrationResponse.deviceId
deviceStateStore.FCMToken = startJob.fcmToken

val remoteInterestsWillChange = deviceStateStore.interests != registrationResponse.initialInterests
if (remoteInterestsWillChange) {
api.setSubscriptions( // TODO: We don't really handle if we get a 400 or 404
deviceId = registrationResponse.deviceId,
interests = deviceStateStore.interests,
retryStrategy = RetryStrategy.WithInfiniteExpBackOff())
}
deviceStateStore.deviceId = registrationResponse.deviceId
deviceStateStore.FCMToken = startJob.fcmToken

val remoteInterestsWillChange =
deviceStateStore.interests != registrationResponse.initialInterests
if (remoteInterestsWillChange) {
api.setSubscriptions( // TODO: We don't really handle if we get a 400 or 404
deviceId = registrationResponse.deviceId,
interests = deviceStateStore.interests,
retryStrategy = RetryStrategy.WithInfiniteExpBackOff()
)
}

log.d("Number of outstanding jobs: ${outstandingJobs.size}")
outstandingJobs.forEach { j ->
processJob(j)
log.d("Number of outstanding jobs: ${outstandingJobs.size}")
outstandingJobs.forEach { j ->
processJob(j)
}
} catch (e: PushNotificationsAPIInvalidToken) {
handleServerSyncEvent(InvalidTokenEvent(true))
}
}

Expand Down
4 changes: 1 addition & 3 deletions sample_kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ dependencies {

implementation 'com.pusher:pusher-websocket-android:0.6.0'

implementation 'com.google.firebase:firebase-core:16.0.9'
implementation 'com.google.firebase:firebase-messaging:18.0.0'

implementation 'com.google.firebase:firebase-messaging:22.0.0'
}

apply plugin: 'com.google.gms.google-services'

0 comments on commit e3a3dce

Please sign in to comment.