Skip to content

Commit 6bbf041

Browse files
committed
feat: implement bidirection chat stream
Signed-off-by: Brandon McAnsh <git@bmcreations.dev>
1 parent faa91d6 commit 6bbf041

14 files changed

Lines changed: 450 additions & 73 deletions

File tree

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.getcode.mapper
2+
3+
4+
import com.codeinc.gen.chat.v2.ChatService
5+
import com.getcode.model.ChatMessage
6+
import com.getcode.model.MessageContent
7+
import javax.inject.Inject
8+
import com.codeinc.gen.chat.v2.ChatService.ChatMessage as ApiChatMessage
9+
import com.getcode.model.ChatMessage as DomainChatMessage
10+
11+
12+
class ChatMessageV2Mapper @Inject constructor(
13+
): Mapper<ApiChatMessage, DomainChatMessage> {
14+
override fun map(from: ChatService.ChatMessage): ChatMessage {
15+
return ChatMessage(
16+
id = from.messageId.value.toByteArray().toList(),
17+
cursor = from.cursor.value.toList(),
18+
dateMillis = from.ts.seconds * 1_000L,
19+
contents = from.contentList.mapNotNull { MessageContent.fromV2(it) },
20+
)
21+
}
22+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.getcode.model
2+
3+
import com.getcode.network.repository.hexEncodedString
4+
import java.nio.ByteBuffer
5+
import java.util.UUID
6+
7+
typealias ID = List<Byte>
8+
9+
val ID.uuid: UUID?
10+
get() {
11+
if (count() != 16) return null
12+
13+
val byteBuffer = ByteBuffer.wrap(this.toByteArray())
14+
val high = byteBuffer.getLong()
15+
val low = byteBuffer.getLong()
16+
return UUID(high, low)
17+
}
18+
19+
val ID.description: String
20+
get() = uuid?.toString() ?: hexEncodedString()
21+
22+
23+

api/src/main/java/com/getcode/network/ConversationController.kt

Lines changed: 101 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,23 @@ import androidx.paging.RemoteMediator
1010
import com.getcode.db.AppDatabase
1111
import com.getcode.db.Database
1212
import com.getcode.manager.SessionManager
13+
import com.getcode.model.ChatMessage
1314
import com.getcode.model.Conversation
1415
import com.getcode.model.ConversationMessage
1516
import com.getcode.model.ConversationMessageContent
1617
import com.getcode.model.ID
1718
import com.getcode.model.MessageStatus
19+
import com.getcode.network.client.ChatMessageStreamReference
20+
import com.getcode.network.client.Client
21+
import com.getcode.network.client.openChatStream
1822
import com.getcode.network.exchange.Exchange
1923
import com.getcode.network.repository.base58
24+
import com.getcode.network.repository.decodeBase64
2025
import com.getcode.network.source.ConversationMockProvider
2126
import com.getcode.vendor.Base58
2227
import kotlinx.coroutines.CoroutineScope
2328
import kotlinx.coroutines.Dispatchers
29+
import kotlinx.coroutines.GlobalScope
2430
import kotlinx.coroutines.delay
2531
import kotlinx.coroutines.flow.Flow
2632
import kotlinx.coroutines.launch
@@ -29,96 +35,142 @@ import timber.log.Timber
2935
import java.io.IOException
3036
import java.util.UUID
3137
import javax.inject.Inject
38+
import kotlin.jvm.Throws
3239

3340
interface ConversationController {
3441
fun observeConversationForMessage(messageId: ID): Flow<Conversation?>
35-
suspend fun getConversationForMessage(messageId: ID): Conversation?
36-
suspend fun getConversation(conversationId: ID): Conversation?
37-
suspend fun createConversation(messageId: ID)
42+
fun openChatStream(scope: CoroutineScope, messageId: ID)
43+
fun closeChatStream()
3844
suspend fun hasThanked(messageId: ID): Boolean
3945
suspend fun thankTipper(messageId: ID)
4046
suspend fun revealIdentity(messageId: ID)
4147
fun sendMessage(conversationId: ID, message: String)
42-
fun conversationPagingData(conversationId: ID): Flow<PagingData<ConversationMessage>>
48+
fun conversationPagingData(chatId: ID): Flow<PagingData<ConversationMessage>>
4349
}
4450

45-
class ConversationMockController @Inject constructor(
51+
class ConversationStreamController @Inject constructor(
4652
private val historyController: HistoryController,
4753
private val exchange: Exchange,
48-
) : ConversationController, CoroutineScope by CoroutineScope(Dispatchers.IO) {
49-
54+
private val client: Client
55+
): ConversationController {
5056
private val pagingConfig = PagingConfig(pageSize = 20)
5157

5258
private val db: AppDatabase by lazy { Database.requireInstance() }
5359

60+
private var stream: ChatMessageStreamReference? = null
61+
5462
private fun conversationPagingSource(conversationId: ID) =
5563
db.conversationMessageDao().observeConversationMessages(conversationId.base58)
5664

5765
override fun observeConversationForMessage(messageId: ID): Flow<Conversation?> {
5866
return db.conversationDao().observeConversationForMessage(messageId)
5967
}
60-
override suspend fun getConversationForMessage(messageId: ID): Conversation? {
61-
return db.conversationDao().findConversationForMessage(messageId)
68+
69+
@Throws(IllegalStateException::class)
70+
override fun openChatStream(scope: CoroutineScope, messageId: ID) {
71+
val chatId = "468f158662880905e966f7c27f36b39e368837887aa5cf889cb55d91537d1a76".decodeBase64().toList()
72+
val owner = SessionManager.getOrganizer()?.ownerKeyPair ?: throw IllegalStateException()
73+
stream = client.openChatStream(scope, chatId, owner) { result ->
74+
if (result.isSuccess) {
75+
println("chat messages: ${result.getOrNull()}")
76+
}
77+
}
6278
}
6379

64-
override suspend fun getConversation(conversationId: ID): Conversation? {
65-
return db.conversationDao().findConversation(conversationId)
80+
override fun closeChatStream() {
81+
stream?.destroy()
82+
}
83+
84+
override suspend fun hasThanked(messageId: ID): Boolean {
85+
val conversation = db.conversationDao().findConversationForMessage(messageId) ?: return false
86+
return db.conversationDao().hasThanked(conversation.messageId)
87+
}
88+
89+
override suspend fun thankTipper(messageId: ID) {
90+
}
91+
92+
override suspend fun revealIdentity(messageId: ID) {
93+
}
94+
95+
override fun sendMessage(conversationId: ID, message: String) {
96+
6697
}
6798

6899
@OptIn(ExperimentalPagingApi::class)
69-
override fun conversationPagingData(conversationId: ID) =
100+
override fun conversationPagingData(chatId: ID) =
70101
Pager(
71102
config = pagingConfig,
72103
initialKey = null,
73104
remoteMediator = ConversationMessagePageKeyedRemoteMediator(db)
74-
) { conversationPagingSource(conversationId) }.flow
75-
76-
override suspend fun createConversation(messageId: ID) {
77-
Timber.d("creating conversation: ${messageId.base58}")
78-
val message =
79-
historyController.chats.value?.find {
80-
Timber.d("messages=${it.messages.joinToString { it.id.base58 }}")
81-
it.messages.firstOrNull { it.id == messageId } != null
82-
}?.messages?.find { it.id == messageId }
83-
84-
if (message == null) {
85-
Timber.e("No message for ${messageId.base58} found")
86-
return
87-
}
105+
) { conversationPagingSource(chatId) }.flow
88106

89-
val conversation = ConversationMockProvider.createConversation(exchange, message)
90-
if (conversation == null) {
91-
Timber.e("Failed to create conversation!")
92-
return
93-
}
107+
}
108+
class ConversationMockController @Inject constructor(
109+
private val historyController: HistoryController,
110+
private val exchange: Exchange,
111+
) : ConversationController {
112+
113+
private val pagingConfig = PagingConfig(pageSize = 20)
94114

95-
db.conversationDao().upsertConversations(conversation)
115+
private val db: AppDatabase by lazy { Database.requireInstance() }
96116

97-
val tipMessage = ConversationMockProvider.createMessage(
98-
conversation.messageId,
99-
ConversationMessageContent.TipMessage
100-
)
117+
private fun conversationPagingSource(conversationId: ID) =
118+
db.conversationMessageDao().observeConversationMessages(conversationId.base58)
101119

102-
Timber.d("upserting tip message")
103-
db.conversationMessageDao().upsertMessages(tipMessage)
120+
override fun observeConversationForMessage(messageId: ID): Flow<Conversation?> {
121+
return db.conversationDao().observeConversationForMessage(messageId)
104122
}
105123

124+
override fun openChatStream(scope: CoroutineScope, messageId: ID) {
125+
scope.launch {
126+
Timber.d("creating conversation: ${messageId.base58}")
127+
val message =
128+
historyController.chats.value?.find {
129+
Timber.d("messages=${it.messages.joinToString { it.id.base58 }}")
130+
it.messages.firstOrNull { it.id == messageId } != null
131+
}?.messages?.find { it.id == messageId }
132+
133+
if (message == null) {
134+
Timber.e("No message for ${messageId.base58} found")
135+
return@launch
136+
}
137+
138+
val conversation = ConversationMockProvider.createConversation(exchange, message)
139+
if (conversation == null) {
140+
Timber.e("Failed to create conversation!")
141+
return@launch
142+
}
143+
144+
db.conversationDao().upsertConversations(conversation)
145+
146+
val tipMessage = ConversationMockProvider.createMessage(
147+
conversation.messageId,
148+
ConversationMessageContent.TipMessage
149+
)
150+
151+
Timber.d("upserting tip message")
152+
db.conversationMessageDao().upsertMessages(tipMessage)
153+
}
154+
}
155+
156+
override fun closeChatStream() {
157+
158+
}
159+
160+
@OptIn(ExperimentalPagingApi::class)
161+
override fun conversationPagingData(chatId: ID) =
162+
Pager(
163+
config = pagingConfig,
164+
initialKey = null,
165+
remoteMediator = ConversationMessagePageKeyedRemoteMediator(db)
166+
) { conversationPagingSource(chatId) }.flow
167+
106168
override suspend fun hasThanked(messageId: ID): Boolean {
107169
val conversation = db.conversationDao().findConversationForMessage(messageId) ?: return false
108170
return db.conversationDao().hasThanked(conversation.messageId)
109171
}
110172

111173
override suspend fun thankTipper(messageId: ID) {
112-
val conversation = db.conversationDao().findConversationForMessage(messageId)
113-
if (conversation == null) {
114-
Timber.d("conversation doesn't exist.. creating")
115-
val message =
116-
historyController.chats.value?.find { it.messages.firstOrNull { it.id == messageId } != null }
117-
?.messages?.find { it.id == messageId } ?: return
118-
119-
createConversation(message.id)
120-
}
121-
122174
val message = ConversationMockProvider.thankTipper(messageId) ?: return
123175
db.conversationMessageDao().upsertMessages(message)
124176
}
@@ -129,7 +181,7 @@ class ConversationMockController @Inject constructor(
129181
}
130182

131183
override fun sendMessage(conversationId: ID, message: String) {
132-
launch {
184+
GlobalScope.launch {
133185
val messageId = generateId()
134186

135187
val tipAddress = SessionManager.getOrganizer()?.primaryVault
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.getcode.network.api
2+
3+
import com.codeinc.gen.chat.v2.ChatGrpc
4+
import com.codeinc.gen.chat.v2.ChatService
5+
import com.getcode.network.core.GrpcApi
6+
import io.grpc.ManagedChannel
7+
import io.grpc.stub.StreamObserver
8+
import javax.inject.Inject
9+
import javax.inject.Named
10+
11+
class ChatApiV2 @Inject constructor(
12+
@Named("devManagedChannel")
13+
managedChannel: ManagedChannel
14+
) : GrpcApi(managedChannel) {
15+
private val api = ChatGrpc.newStub(managedChannel)
16+
17+
18+
fun streamChatEvents(
19+
observer: StreamObserver<ChatService.StreamChatEventsResponse>
20+
): StreamObserver<ChatService.StreamChatEventsRequest>? {
21+
return api.streamChatEvents(observer)
22+
}
23+
}

api/src/main/java/com/getcode/network/client/Client.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import com.getcode.network.repository.MessagingRepository
1111
import com.getcode.network.repository.PrefRepository
1212
import com.getcode.network.repository.TransactionRepository
1313
import com.getcode.network.service.ChatService
14+
import com.getcode.network.service.ChatServiceV2
1415
import com.getcode.network.service.DeviceService
1516
import com.getcode.utils.ErrorUtils
1617
import com.getcode.utils.network.NetworkConnectivityListener
@@ -45,6 +46,7 @@ class Client @Inject constructor(
4546
internal val transactionReceiver: TransactionReceiver,
4647
internal val networkObserver: NetworkConnectivityListener,
4748
internal val chatService: ChatService,
49+
internal val chatServiceV2: ChatServiceV2,
4850
internal val deviceService: DeviceService,
4951
internal val mnemonicManager: MnemonicManager,
5052
) {

api/src/main/java/com/getcode/network/client/Client_Chat.kt

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.getcode.network.client
22

3+
import com.codeinc.gen.chat.v2.ChatService
34
import com.getcode.ed25519.Ed25519.KeyPair
45
import com.getcode.manager.SessionManager
56
import com.getcode.model.Chat
@@ -8,10 +9,14 @@ import com.getcode.model.Cursor
89
import com.getcode.model.Domain
910
import com.getcode.model.ID
1011
import com.getcode.model.Title
12+
import com.getcode.network.core.BidirectionalStreamReference
1113
import com.getcode.network.repository.base58
12-
import com.getcode.network.repository.encodeBase64
14+
import kotlinx.coroutines.CoroutineScope
1315
import timber.log.Timber
1416

17+
typealias ChatMessageStreamReference = BidirectionalStreamReference<ChatService.StreamChatEventsRequest, ChatService.StreamChatEventsResponse>
18+
19+
1520
suspend fun Client.fetchChats(owner: KeyPair): Result<List<Chat>> {
1621
return chatService.fetchChats(owner)
1722
.onSuccess {
@@ -25,11 +30,20 @@ suspend fun Client.setMuted(owner: KeyPair, chat: ID, muted: Boolean): Result<Bo
2530
return chatService.setMuteState(owner, chat, muted)
2631
}
2732

28-
suspend fun Client.setSubscriptionState(owner: KeyPair, chatId: ID, subscribed: Boolean): Result<Boolean> {
33+
suspend fun Client.setSubscriptionState(
34+
owner: KeyPair,
35+
chatId: ID,
36+
subscribed: Boolean
37+
): Result<Boolean> {
2938
return chatService.setSubscriptionState(owner, chatId, subscribed)
3039
}
3140

32-
suspend fun Client.fetchMessagesFor(owner: KeyPair, chat: Chat, cursor: Cursor? = null, limit: Int? = null) : Result<List<ChatMessage>> {
41+
suspend fun Client.fetchMessagesFor(
42+
owner: KeyPair,
43+
chat: Chat,
44+
cursor: Cursor? = null,
45+
limit: Int? = null
46+
): Result<List<ChatMessage>> {
3347
return chatService.fetchMessagesFor(owner, chat.id, cursor, limit)
3448
.mapCatching {
3549
val domain = if (chat.title is Title.Domain) {
@@ -64,4 +78,18 @@ suspend fun Client.advancePointer(
6478
to: ID,
6579
): Result<Unit> {
6680
return chatService.advancePointer(owner, chatId, to)
81+
}
82+
83+
fun Client.openChatStream(
84+
scope: CoroutineScope,
85+
chatId: ID,
86+
owner: KeyPair,
87+
completion: (Result<List<ChatMessage>>) -> Unit
88+
): ChatMessageStreamReference {
89+
return chatServiceV2.openChatStream(
90+
scope = scope,
91+
chatId = chatId,
92+
owner = owner,
93+
completion = completion
94+
)
6795
}

0 commit comments

Comments
 (0)