-
Notifications
You must be signed in to change notification settings - Fork 325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Thumbs-up function 👍 #626
Thumbs-up function 👍 #626
Changes from 21 commits
44b8b10
b49b8b0
4a2de25
286d6bd
bf57b49
da8e92f
075e3b2
a4c3287
6f6b1c0
3cf94b8
0b0483b
0cd8484
707fb7b
f0db2dd
856e3ac
6ebcbfa
e478ca9
ab883b9
3bac614
8e26ef0
59aad5a
67e699a
6f07494
7d102fd
ea1e3d4
8c64427
54940d7
035602c
5f1c524
c88056a
f9d5066
7e54b27
bc6dad4
6d1c674
5889f99
e1cba55
dbe94a2
e556cac
489fb21
69c94e9
521ba2d
eb5daa5
48c71ba
3d4c670
808f2b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,42 @@ | ||
package io.github.droidkaigi.confsched2020.data.firestore.internal | ||
|
||
import com.google.android.gms.tasks.Task | ||
import com.google.android.gms.tasks.Tasks | ||
import com.google.firebase.auth.FirebaseAuth | ||
import com.google.firebase.firestore.CollectionReference | ||
import com.google.firebase.firestore.DocumentReference | ||
import com.google.firebase.firestore.DocumentSnapshot | ||
import com.google.firebase.firestore.FieldValue | ||
import com.google.firebase.firestore.FirebaseFirestore | ||
import com.google.firebase.firestore.Query | ||
import com.google.firebase.firestore.QuerySnapshot | ||
import com.google.firebase.firestore.Source | ||
import io.github.droidkaigi.confsched2020.data.firestore.Firestore | ||
import io.github.droidkaigi.confsched2020.model.SessionId | ||
import javax.inject.Inject | ||
import kotlinx.coroutines.channels.BroadcastChannel | ||
import kotlinx.coroutines.channels.awaitClose | ||
import kotlinx.coroutines.flow.Flow | ||
import kotlinx.coroutines.flow.asFlow | ||
import kotlinx.coroutines.flow.callbackFlow | ||
import kotlinx.coroutines.flow.combine | ||
import kotlinx.coroutines.flow.debounce | ||
import kotlinx.coroutines.flow.filter | ||
import kotlinx.coroutines.flow.flatMapLatest | ||
import kotlinx.coroutines.flow.flow | ||
import kotlinx.coroutines.flow.map | ||
import kotlinx.coroutines.flow.mapLatest | ||
import kotlinx.coroutines.flow.onEach | ||
import kotlinx.coroutines.flow.onStart | ||
import kotlinx.coroutines.flow.scan | ||
import kotlinx.coroutines.flow.withIndex | ||
import kotlinx.coroutines.tasks.await | ||
import timber.log.Timber | ||
import timber.log.debug | ||
import javax.inject.Inject | ||
import kotlin.math.floor | ||
|
||
internal class FirestoreImpl @Inject constructor() : Firestore { | ||
private val thumbsUpEventChannel = BroadcastChannel<SessionId>(10000) | ||
|
||
override fun getFavoriteSessionIds(): Flow<List<String>> { | ||
val setupFavorites = flow { | ||
|
@@ -35,7 +50,7 @@ internal class FirestoreImpl @Inject constructor() : Firestore { | |
emit(favoritesRef) | ||
} | ||
val favoritesSnapshotFlow = setupFavorites.flatMapLatest { | ||
it.whereEqualTo("favorite", true).toFlow() | ||
it.whereEqualTo(FAVORITE_VALUE_KEY, true).toFlow() | ||
} | ||
return favoritesSnapshotFlow.mapLatest { favorites -> | ||
Timber.debug { "favoritesSnapshotFlow onNext" } | ||
|
@@ -57,12 +72,94 @@ internal class FirestoreImpl @Inject constructor() : Firestore { | |
} else { | ||
Timber.debug { "toggleFavorite: $sessionId document not exits" } | ||
document.reference | ||
.set(mapOf("favorite" to newFavorite)) | ||
.set(mapOf(FAVORITE_VALUE_KEY to newFavorite)) | ||
.await() | ||
} | ||
Timber.debug { "toggleFavorite: end" } | ||
} | ||
|
||
@Suppress("EXPERIMENTAL_API_USAGE") | ||
override fun getThumbsUpCount(sessionId: SessionId): Flow<Int> { | ||
val setupThumbsUp = flow { | ||
signInIfNeeded() | ||
val counterRef = getThumbsUpCounterRef(sessionId) | ||
createShardsIfNeeded(counterRef) | ||
emit(counterRef) | ||
} | ||
|
||
var appliedCount = -1 | ||
val unappliedCountChannel = BroadcastChannel<Int>(10000) | ||
val incrementFlow = thumbsUpEventChannel.asFlow() | ||
.filter { it == sessionId } | ||
.withIndex() | ||
.onEach { | ||
unappliedCountChannel.send( | ||
minOf( | ||
it.index - appliedCount, | ||
MAX_APPLY_COUNT | ||
) | ||
) | ||
} | ||
// For firebase pricing | ||
.debounce(INCREMENT_DEBOUNCE_MILLIS) | ||
.map { | ||
val result = minOf(it.index - appliedCount, MAX_APPLY_COUNT) | ||
appliedCount = it.index | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you, I understood. |
||
unappliedCountChannel.send(0) | ||
it.value to result | ||
} | ||
.map { (sessionId, count) -> | ||
Timber.debug { "thumb up increment:$count" } | ||
incrementThumbsUpCountImpl(sessionId, count) | ||
count | ||
} | ||
.onStart { | ||
emit(0) | ||
unappliedCountChannel.send(0) | ||
} | ||
val thumbsUpSnapshot = setupThumbsUp | ||
.flatMapLatest { | ||
it.toFlow() | ||
} | ||
.combine(incrementFlow) { thumbsCount, _ -> thumbsCount } | ||
|
||
return thumbsUpSnapshot.map { shards -> | ||
var count = 0 | ||
shards.forEach { snap -> | ||
count += snap.get(SHARDS_COUNT_KEY, Int::class.java) ?: 0 | ||
} | ||
count | ||
} | ||
.combine(unappliedCountChannel.asFlow()) { firestoreCount, unappliedCount -> | ||
firestoreCount + unappliedCount | ||
}.scan(0) { prev, value -> | ||
// Prevent counts from dropping due to calculations | ||
if (prev < value) { | ||
value | ||
} else { | ||
prev | ||
} | ||
} | ||
} | ||
|
||
override suspend fun incrementThumbsUpCount(sessionId: SessionId) { | ||
thumbsUpEventChannel.send(sessionId) | ||
} | ||
|
||
private suspend fun incrementThumbsUpCountImpl( | ||
sessionId: SessionId, | ||
count: Int | ||
) { | ||
signInIfNeeded() | ||
val counterRef = getThumbsUpCounterRef(sessionId) | ||
createShardsIfNeeded(counterRef) | ||
val shardId = floor(Math.random() * NUM_SHARDS).toInt() | ||
counterRef | ||
.document(shardId.toString()) | ||
.update(SHARDS_COUNT_KEY, FieldValue.increment(count.toLong())) | ||
.await() | ||
} | ||
|
||
private fun getFavoritesRef(): CollectionReference { | ||
val firebaseAuth = FirebaseAuth.getInstance() | ||
val firebaseUserId = firebaseAuth.currentUser?.uid ?: throw RuntimeException( | ||
|
@@ -83,6 +180,44 @@ internal class FirestoreImpl @Inject constructor() : Firestore { | |
firebaseAuth.signInAnonymously().await() | ||
Timber.debug { "signInIfNeeded end" } | ||
} | ||
|
||
private fun getThumbsUpCounterRef(sessionId: SessionId): CollectionReference { | ||
return FirebaseFirestore | ||
.getInstance() | ||
.collection("confsched/2020/sessions/${sessionId.id}/thumbsup_counters") | ||
} | ||
|
||
private suspend fun createShardsIfNeeded(counterRef: CollectionReference) { | ||
val lastShardId = NUM_SHARDS - 1 | ||
val lastShard = counterRef | ||
.document(lastShardId.toString()) | ||
.get(Source.SERVER) | ||
.await() | ||
|
||
if (lastShard.exists()) { | ||
Timber.debug { "createShardsIfNeeded shards already exist" } | ||
return | ||
} | ||
|
||
val tasks = arrayListOf<Task<Void>>() | ||
(0 until NUM_SHARDS).forEach { | ||
val makeShard = counterRef | ||
.document(it.toString()) | ||
.set(mapOf(SHARDS_COUNT_KEY to 0)) | ||
tasks.add(makeShard) | ||
} | ||
|
||
Tasks.whenAll(tasks).await() | ||
Timber.debug { "createShardsIfNeeded creating shards completed" } | ||
} | ||
|
||
companion object { | ||
const val NUM_SHARDS = 5 | ||
const val MAX_APPLY_COUNT = 50 | ||
const val INCREMENT_DEBOUNCE_MILLIS = 500L | ||
const val SHARDS_COUNT_KEY = "shards" | ||
const val FAVORITE_VALUE_KEY = "favorite" | ||
} | ||
} | ||
|
||
private suspend fun DocumentReference.fastGet(): DocumentSnapshot { | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,59 @@ | ||||
package io.github.droidkaigi.confsched2020.data.firestore.internal | ||||
|
||||
import kotlinx.coroutines.GlobalScope | ||||
import kotlinx.coroutines.channels.BroadcastChannel | ||||
import kotlinx.coroutines.delay | ||||
import kotlinx.coroutines.flow.asFlow | ||||
import kotlinx.coroutines.flow.collect | ||||
import kotlinx.coroutines.flow.debounce | ||||
import kotlinx.coroutines.flow.map | ||||
import kotlinx.coroutines.flow.withIndex | ||||
import kotlinx.coroutines.launch | ||||
import kotlinx.coroutines.runBlocking | ||||
import org.junit.Ignore | ||||
import org.junit.Test | ||||
|
||||
class FirestoreImplTest { | ||||
@Ignore | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry please remove this 🙇 |
||||
@Test | ||||
fun thumbsUpIncrement() { | ||||
val channel = BroadcastChannel<Unit>(10000) | ||||
GlobalScope.launch { | ||||
delay(100) | ||||
println("send") | ||||
channel.send(Unit) | ||||
println("send") | ||||
channel.send(Unit) | ||||
println("send") | ||||
channel.send(Unit) | ||||
delay(600) | ||||
println("send") | ||||
channel.send(Unit) | ||||
delay(200) | ||||
println("send") | ||||
channel.send(Unit) | ||||
delay(200) | ||||
println("send") | ||||
channel.send(Unit) | ||||
delay(200) | ||||
println("send") | ||||
channel.send(Unit) | ||||
delay(600) | ||||
channel.cancel() | ||||
} | ||||
runBlocking { | ||||
var lastIndex = -1 | ||||
channel.asFlow() | ||||
.withIndex() | ||||
.debounce(300) | ||||
.map { | ||||
val result = minOf(it.index - lastIndex, 50) | ||||
lastIndex = it.index | ||||
result | ||||
} | ||||
.collect { | ||||
println(it) | ||||
} | ||||
} | ||||
} | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍