Skip to content

Commit f2fec69

Browse files
committed
chore: split history controllers to let v1 and v2 chats (e.g balance and chats) load in parallel
Signed-off-by: Brandon McAnsh <git@bmcreations.dev>
1 parent 3b84792 commit f2fec69

11 files changed

Lines changed: 320 additions & 152 deletions

File tree

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
package com.getcode.network
2+
3+
import androidx.paging.Pager
4+
import androidx.paging.PagingConfig
5+
import androidx.paging.PagingData
6+
import androidx.paging.PagingSource
7+
import androidx.paging.cachedIn
8+
import com.getcode.ed25519.Ed25519.KeyPair
9+
import com.getcode.manager.SessionManager
10+
import com.getcode.model.chat.Chat
11+
import com.getcode.model.chat.ChatMessage
12+
import com.getcode.model.Cursor
13+
import com.getcode.model.ID
14+
import com.getcode.model.MessageStatus
15+
import com.getcode.model.chat.ChatMember
16+
import com.getcode.model.chat.Identity
17+
import com.getcode.model.chat.Platform
18+
import com.getcode.model.chat.Title
19+
import com.getcode.model.chat.isNotification
20+
import com.getcode.network.client.Client
21+
import com.getcode.network.client.advancePointer
22+
import com.getcode.network.client.fetchMessagesFor
23+
import com.getcode.network.client.fetchV1Chats
24+
import com.getcode.network.client.setMuted
25+
import com.getcode.network.client.setSubscriptionState
26+
import com.getcode.network.repository.encodeBase64
27+
import com.getcode.network.source.ChatMessagePagingSource
28+
import com.getcode.util.resources.ResourceHelper
29+
import com.getcode.util.resources.ResourceType
30+
import com.getcode.utils.TraceType
31+
import com.getcode.utils.trace
32+
import kotlinx.coroutines.CoroutineScope
33+
import kotlinx.coroutines.Dispatchers
34+
import kotlinx.coroutines.GlobalScope
35+
import kotlinx.coroutines.flow.Flow
36+
import kotlinx.coroutines.flow.MutableStateFlow
37+
import kotlinx.coroutines.flow.SharingStarted
38+
import kotlinx.coroutines.flow.StateFlow
39+
import kotlinx.coroutines.flow.filterNotNull
40+
import kotlinx.coroutines.flow.map
41+
import kotlinx.coroutines.flow.stateIn
42+
import kotlinx.coroutines.flow.update
43+
import timber.log.Timber
44+
import java.util.Locale
45+
import javax.inject.Inject
46+
import javax.inject.Singleton
47+
48+
@Singleton
49+
class BalanceHistoryController @Inject constructor(
50+
private val client: Client,
51+
private val tipController: TipController,
52+
) : CoroutineScope by CoroutineScope(Dispatchers.IO) {
53+
54+
private val chatEntries = MutableStateFlow<List<Chat>?>(null)
55+
56+
val notifications: StateFlow<List<Chat>?>
57+
get() = chatEntries
58+
.map { it?.filter { entry -> entry.isNotification } }
59+
.stateIn(this, SharingStarted.Eagerly, emptyList())
60+
61+
var loadingMessages: Boolean = false
62+
63+
private val pagerMap = mutableMapOf<ID, PagingSource<Cursor, ChatMessage>>()
64+
private val chatFlows = mutableMapOf<ID, Flow<PagingData<ChatMessage>>>()
65+
66+
private val pagingConfig = PagingConfig(pageSize = 20)
67+
68+
fun reset() {
69+
pagerMap.clear()
70+
chatFlows.clear()
71+
}
72+
73+
private fun chatMessagePager(chatId: ID) = Pager(pagingConfig) {
74+
pagerMap[chatId] ?: ChatMessagePagingSource(
75+
client = client,
76+
owner = owner()!!,
77+
chat = chatEntries.value?.find { it.id == chatId },
78+
onMessagesFetched = { messages ->
79+
val chat = chatEntries.value?.find { it.id == chatId } ?: return@ChatMessagePagingSource
80+
updateChatWithMessages(chat, messages)
81+
}
82+
).also {
83+
pagerMap[chatId] = it
84+
}
85+
}
86+
87+
fun updateChatWithMessages(chat: Chat, messages: List<ChatMessage>) {
88+
val updatedMessages = (chat.messages + messages).distinctBy { it.id }
89+
val updatedChat = chat.copy(messages = updatedMessages)
90+
val chats = chatEntries.value?.map {
91+
if (it.id == updatedChat.id) {
92+
updatedChat
93+
} else {
94+
it
95+
}
96+
}?.sortedByDescending { it.lastMessageMillis }
97+
chatEntries.update { chats }
98+
}
99+
100+
fun chatFlow(chatId: ID) =
101+
chatFlows[chatId] ?: chatMessagePager(chatId).flow.cachedIn(GlobalScope).also {
102+
chatFlows[chatId] = it
103+
}
104+
105+
val unreadCount = notifications
106+
.filterNotNull()
107+
// Ignore muted chats and unsubscribed chats
108+
.map { it.filter { c -> !c.isMuted && c.isSubscribed } }
109+
.map { it.sumOf { c -> c.unreadCount } }
110+
111+
private fun owner(): KeyPair? = SessionManager.getKeyPair()
112+
113+
suspend fun fetchChats(update: Boolean = false) {
114+
if (loadingMessages) return
115+
116+
val updatedWithMessages = mutableListOf<Chat>()
117+
val containers = fetchChatsWithoutMessages()
118+
trace(message = "Fetched ${containers.count()} chats", type = TraceType.Silent)
119+
120+
if (!update) {
121+
pagerMap.clear()
122+
chatFlows.clear()
123+
chatEntries.value = containers
124+
125+
loadingMessages = true
126+
}
127+
128+
containers.onEach { chat ->
129+
val members = fetchMemberImages(chat)
130+
val updatedChat = chat.copy(members = members)
131+
val result = fetchLatestMessageForChat(updatedChat)
132+
result.onSuccess { message ->
133+
if (message != null) {
134+
updatedWithMessages.add(updatedChat.copy(messages = listOf(message)))
135+
}
136+
}.onFailure {
137+
updatedWithMessages.add(updatedChat)
138+
}
139+
}
140+
141+
loadingMessages = false
142+
chatEntries.value = updatedWithMessages.sortedByDescending { it.lastMessageMillis }
143+
}
144+
145+
suspend fun advanceReadPointer(chatId: ID) {
146+
val owner = owner() ?: return
147+
148+
chatEntries.update {
149+
it?.toMutableList()?.apply chats@{
150+
indexOfFirst { chat -> chat.id == chatId }
151+
.takeIf { index -> index >= 0 }
152+
?.let { index ->
153+
val chat = this[index]
154+
val newestMessage = chat.newestMessage
155+
if (newestMessage != null) {
156+
client.advancePointer(
157+
owner = owner,
158+
chat = chat,
159+
to = newestMessage.id,
160+
status = MessageStatus.Read
161+
).onSuccess {
162+
this[index] = chat.resetUnreadCount()
163+
}
164+
}
165+
}
166+
}?.toList()
167+
}
168+
}
169+
170+
suspend fun setMuted(chat: Chat, muted: Boolean): Result<Boolean> {
171+
val owner = owner() ?: return Result.failure(Throwable("No owner detected"))
172+
173+
chatEntries.update {
174+
it?.toMutableList()?.apply chats@{
175+
indexOfFirst { item -> item.id == chat.id }
176+
.takeIf { index -> index >= 0 }
177+
?.let { index ->
178+
val c = this[index]
179+
Timber.d("changing mute state for chat locally")
180+
this[index] = c.setMuteState(muted)
181+
}
182+
}?.toList()
183+
}
184+
185+
return client.setMuted(owner, chat, muted)
186+
}
187+
188+
suspend fun setSubscribed(chat: Chat, subscribed: Boolean): Result<Boolean> {
189+
val owner = owner() ?: return Result.failure(Throwable("No owner detected"))
190+
191+
chatEntries.update {
192+
it?.toMutableList()?.apply chats@{
193+
indexOfFirst { item -> item.id == chat.id }
194+
.takeIf { index -> index >= 0 }
195+
?.let { index ->
196+
val c = this[index]
197+
Timber.d("changing subscribed state for chat locally")
198+
this[index] = c.setSubscriptionState(subscribed)
199+
}
200+
}?.toList()
201+
}
202+
203+
return client.setSubscriptionState(owner, chat, subscribed)
204+
}
205+
206+
private suspend fun fetchLatestMessageForChat(chat: Chat): Result<ChatMessage?> {
207+
val encodedId = chat.id.toByteArray().encodeBase64()
208+
Timber.d("fetching last message for $encodedId")
209+
val owner = owner() ?: return Result.success(null)
210+
return client.fetchMessagesFor(owner, chat, limit = 1)
211+
.onFailure {
212+
Timber.e(t = it, "Failed to fetch messages for $encodedId.")
213+
}.map { it.getOrNull(0) }
214+
}
215+
216+
private suspend fun fetchChatsWithoutMessages(): List<Chat> {
217+
val owner = owner() ?: return emptyList()
218+
val result = client.fetchV1Chats(owner)
219+
return result.getOrNull().orEmpty()
220+
}
221+
222+
private suspend fun fetchMemberImages(chat: Chat): List<ChatMember> {
223+
return chat.members
224+
.map { member ->
225+
if (member.isSelf) return@map member
226+
if (member.identity == null) return@map member
227+
if (member.identity.imageUrl != null) return@map member
228+
val metadata = runCatching {
229+
tipController.fetch(member.identity.username)
230+
}.getOrNull() ?: return@map member
231+
232+
member.copy(
233+
identity = Identity(
234+
platform = Platform.named(metadata.platform),
235+
username = metadata.username,
236+
imageUrl = metadata.imageUrl
237+
)
238+
)
239+
}
240+
}
241+
}
242+
243+
fun Title?.localized(resources: ResourceHelper): String {
244+
return when (val t = this) {
245+
is Title.Domain -> {
246+
t.value.capitalize(Locale.getDefault())
247+
}
248+
249+
is Title.Localized -> {
250+
val resId = resources.getIdentifier(
251+
t.value,
252+
ResourceType.String,
253+
).let { if (it == 0) null else it }
254+
255+
resId?.let { resources.getString(it) } ?: t.value
256+
}
257+
258+
else -> "Anonymous"
259+
}
260+
}

0 commit comments

Comments
 (0)