diff --git a/examples/src/main/kotlin/com/ably/example/Utils.kt b/examples/src/main/kotlin/com/ably/example/Utils.kt index 70838c3c3..2a8c20c2b 100644 --- a/examples/src/main/kotlin/com/ably/example/Utils.kt +++ b/examples/src/main/kotlin/com/ably/example/Utils.kt @@ -1,211 +1,115 @@ +/** + * The LiveObjects bridge layer for the example app: Compose observers plus coroutine + * wrappers over the path-based API (`io.ably.lib.liveobjects`). + * + * The app works exclusively with [io.ably.lib.liveobjects.path.PathObject]s — references + * to a *location* in the channel's objects graph, not to a particular object. A PathObject + * resolves its path lazily on every call, so a stored reference stays valid even when the + * object at that location is replaced (e.g. "Reset all" swaps in brand-new counters), and + * a path subscription automatically observes whatever object currently lives there. The + * identity-bound alternative, [io.ably.lib.liveobjects.instance.Instance] (obtained via + * `pathObject.instance()`), tracks one specific object wherever it moves — not needed here, + * since every screen wants location semantics. + * + * ably-java's typed flavour of the API: the base PathObject exposes only type-agnostic + * methods; type-specific reads/writes live on sub-type views reached via `as*` casts + * (`asLiveCounter()`, `asLiveMap()`, `asString()`, ...). The casts never throw — a + * wrong-typed view degrades gracefully (reads return null/empty, writes fail with an + * AblyException). The root obtained from `channel.object.get()` is already a + * [LiveMapPathObject], so no cast is needed there. + */ package com.ably.example -import androidx.compose.runtime.* -import io.ably.lib.objects.ObjectsCallback -import io.ably.lib.objects.RealtimeObjects -import io.ably.lib.objects.type.counter.LiveCounter -import io.ably.lib.objects.type.counter.LiveCounterUpdate -import io.ably.lib.objects.type.map.LiveMap -import io.ably.lib.objects.type.map.LiveMapUpdate -import io.ably.lib.objects.type.map.LiveMapUpdate.Change.UPDATED -import io.ably.lib.objects.type.map.LiveMapValue +import androidx.compose.runtime.Composable +import androidx.compose.runtime.DisposableEffect +import androidx.compose.runtime.LaunchedEffect +import androidx.compose.runtime.getValue +import androidx.compose.runtime.mutableStateOf +import androidx.compose.runtime.remember +import androidx.compose.runtime.setValue +import io.ably.lib.liveobjects.ValueType +import io.ably.lib.liveobjects.path.types.LiveCounterPathObject +import io.ably.lib.liveobjects.path.types.LiveMapPathObject +import io.ably.lib.liveobjects.state.ObjectStateEvent +import io.ably.lib.liveobjects.value.LiveCounter +import io.ably.lib.liveobjects.value.LiveMap +import io.ably.lib.liveobjects.value.LiveMapValue import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.ChannelStateListener -import io.ably.lib.types.AblyException import io.ably.lib.types.ChannelMode import io.ably.lib.types.ChannelOptions -import io.ably.lib.types.ErrorInfo -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume - -private suspend fun RealtimeObjects.getRootCoroutines(): LiveMap = suspendCancellableCoroutine { continuation -> - getRootAsync(object : ObjectsCallback { - override fun onSuccess(result: LiveMap?) { - continuation.resume(result!!) - } - - override fun onError(exception: AblyException?) { - continuation.cancel(exception) - } - }) -} - -private suspend fun RealtimeObjects.createCounterCoroutine(): LiveCounter = - suspendCancellableCoroutine { continuation -> - createCounterAsync(object : ObjectsCallback { - override fun onSuccess(result: LiveCounter?) { - continuation.resume(result!!) - } - - override fun onError(exception: AblyException?) { - continuation.cancel(exception) - } - }) - } - -private suspend fun RealtimeObjects.createMapCoroutine(): LiveMap = suspendCancellableCoroutine { continuation -> - createMapAsync(object : ObjectsCallback { - override fun onSuccess(result: LiveMap?) { - continuation.resume(result!!) - } - - override fun onError(exception: AblyException?) { - continuation.cancel(exception) - } - }) -} - -suspend fun LiveCounter.incrementCoroutine(amount: Int): Unit = supressCoroutineExceptions { - suspendCancellableCoroutine { continuation -> - incrementAsync(amount, object : ObjectsCallback { - override fun onSuccess(result: Void?) { - continuation.resume(Unit) - } - - override fun onError(exception: AblyException?) { - continuation.cancel(exception) - } - }) - } -} - -suspend fun LiveCounter.decrementCoroutine(amount: Int): Unit = supressCoroutineExceptions { - suspendCancellableCoroutine { continuation -> - decrementAsync(amount, object : ObjectsCallback { - override fun onSuccess(result: Void?) { - continuation.resume(Unit) - } - - override fun onError(exception: AblyException?) { - continuation.cancel(exception) - } - }) - } -} - -suspend fun Channel.updateOptions(options: ChannelOptions): Unit = supressCoroutineExceptions { - suspendCancellableCoroutine { continuation -> - setOptions(options, object : io.ably.lib.realtime.CompletionListener { - override fun onSuccess() { - continuation.resume(Unit) - } - - override fun onError(reason: ErrorInfo?) { - continuation.cancel(AblyException.fromErrorInfo(reason)) - } - }) - } -} - -suspend fun getOrCreateCounter(channel: Channel, root: LiveMap?, path: String): LiveCounter { - val mapValue = root?.get(path) - if (mapValue == null) { - val counter = channel.objects.createCounterCoroutine() - root?.setCoroutine(path, LiveMapValue.of(counter)) - return counter - } else { - return mapValue.asLiveCounter - } -} - -suspend fun getOrCreateMap(channel: Channel, root: LiveMap?, path: String): LiveMap { - val mapValue = root?.get(path) - if (mapValue == null) { - val map = channel.objects.createMapCoroutine() - root?.setCoroutine(path, LiveMapValue.of(map)) - return map - } else { - return mapValue.asLiveMap - } +import kotlinx.coroutines.future.await + +/** + * Returns the counter path object at [key] under [root], creating and linking a fresh + * zero-value counter when the path is missing or holds a non-counter value. Path objects + * re-resolve on every call, so the returned reference stays valid even if another client + * replaces the counter object at this key. + * + * The check-then-set is not atomic: another client can create the same key in between. + * That is fine here - the colliding operations converge deterministically via + * last-write-wins, and both sides end up bound to whichever counter won. + */ +suspend fun getOrCreateCounter(root: LiveMapPathObject, key: String): LiveCounterPathObject { + val path = root.get(key) + if (path.type != ValueType.LIVE_COUNTER) { + // Creates the counter and links it under root in a single published operation + root.set(key, LiveMapValue.of(LiveCounter.create())).await() + } + return path.asLiveCounter() } -suspend fun LiveMap.setCoroutine(key: String, value: LiveMapValue) = supressCoroutineExceptions { - suspendCancellableCoroutine { continuation -> - setAsync(key, value, object : ObjectsCallback { - override fun onSuccess(result: Void?) { - continuation.resume(Unit) - } - - override fun onError(exception: AblyException?) { - continuation.cancel(exception) - } - }) - } -} - -suspend fun LiveMap.removeCoroutine(key: String) = supressCoroutineExceptions { - suspendCancellableCoroutine { continuation -> - removeAsync(key, object : ObjectsCallback { - override fun onSuccess(result: Void?) { - continuation.resume(Unit) - } - - override fun onError(exception: AblyException?) { - continuation.cancel(exception) - } - }) - } +/** + * Returns the map path object at [key] under [root], creating and linking a fresh empty + * map when the path is missing or holds a non-map value. Same last-write-wins caveat as + * [getOrCreateCounter]. + */ +suspend fun getOrCreateMap(root: LiveMapPathObject, key: String): LiveMapPathObject { + val path = root.get(key) + if (path.type != ValueType.LIVE_MAP) { + root.set(key, LiveMapValue.of(LiveMap.create())).await() + } + return path.asLiveMap() } @Composable -fun observeCounter(channel: Channel, root: LiveMap?, path: String): CounterState { - var counter by remember { mutableStateOf(null) } +fun observeCounter(root: LiveMapPathObject?, key: String): CounterState { + var counter by remember { mutableStateOf(null) } var counterValue by remember { mutableStateOf(null) } - LaunchedEffect(root) { - supressCoroutineExceptions { - counter = getOrCreateCounter(channel, root, path) + LaunchedEffect(root, key) { + root?.let { + supressCoroutineExceptions { + counter = getOrCreateCounter(it, key) + } } } DisposableEffect(counter) { counterValue = counter?.value()?.toInt() - val listener: (LiveCounterUpdate) -> Unit = { - counter?.value()?.let { - counterValue = it.toInt() - } + // The path subscription fires both for increments on the counter and for the key + // being replaced with a new counter (e.g. "Reset all" on another device); the path + // re-resolves on read, so no explicit rebinding is needed. + val subscription = counter?.subscribe { + counterValue = counter?.value()?.toInt() } - counter?.subscribe(listener) - onDispose { - counter?.unsubscribe(listener) - } - } - - DisposableEffect(root) { - val listener: (LiveMapUpdate) -> Unit = { rootUpdate -> - val counterHasBeenRemoved = rootUpdate.update - .filter { (_, change) -> change == UPDATED } - .any { (keyName) -> keyName == path } - - if (counterHasBeenRemoved) root?.get(path)?.asLiveCounter?.let { counter = it } - } - - root?.subscribe(listener) - - onDispose { - root?.unsubscribe(listener) + subscription?.unsubscribe() } } return CounterState(counterValue, counter) { - coroutineScope { - launch { - counter = channel.objects.createCounterCoroutine().also { - root?.setCoroutine(path, LiveMapValue.of(it)) - } - } - } + // Reset by replacing the object at this key with a brand-new zero-value counter; + // fire-and-forget - the path subscription refreshes the displayed value on ack + root?.set(key, LiveMapValue.of(LiveCounter.create())) } } -data class CounterState(val value: Int?, val counter: LiveCounter?, val reset: suspend () -> Unit) +data class CounterState(val value: Int?, val counter: LiveCounterPathObject?, val reset: () -> Unit) @Composable fun observeChannelState(channel: Channel): ChannelState { @@ -227,46 +131,74 @@ fun observeChannelState(channel: Channel): ChannelState { } @Composable -fun observeMap(channel: Channel, root: LiveMap?, path: String): Pair, LiveMap?> { - var map by remember { mutableStateOf(null) } +fun observeMap(root: LiveMapPathObject?, key: String): Pair, LiveMapPathObject?> { + var map by remember { mutableStateOf(null) } var mapValue by remember { mutableStateOf>(mapOf()) } - LaunchedEffect(root) { - supressCoroutineExceptions { - map = getOrCreateMap(channel, root, path) + fun readEntries(liveMap: LiveMapPathObject?): Map = + liveMap?.entries() + ?.mapNotNull { (entryKey, valuePath) -> valuePath.asString().value()?.let { entryKey to it } } + ?.toMap() + ?: mapOf() + + LaunchedEffect(root, key) { + root?.let { + supressCoroutineExceptions { + map = getOrCreateMap(it, key) + } } } DisposableEffect(map) { - map?.entries()?.associate { (key, value) -> key to value.asString }?.let { - mapValue = it + mapValue = readEntries(map) + + // Fires for every entry change on the map at this path (default subscription depth + // covers nested updates), after which entries are re-read from the resolved map. + val subscription = map?.subscribe { + mapValue = readEntries(map) } - val listener: (LiveMapUpdate) -> Unit = { - map?.entries()?.associate { (key, value) -> key to value.asString }?.let { - mapValue = it - } + onDispose { + subscription?.unsubscribe() } + } + + return mapValue to map +} - map?.subscribe(listener) +/** + * Observes the channel's objects synchronization state via `channel.object.on(event)`. + * Returns the most recent [ObjectStateEvent], or null before any event has been received + * (events fire on transitions only, so an already-completed sync emits nothing). + */ +@Composable +fun observeObjectsSyncState(channel: Channel): ObjectStateEvent? { + var syncState by remember { mutableStateOf(null) } + + DisposableEffect(channel) { + // There is no wildcard subscription - register one listener per event + val subscriptions = listOf(ObjectStateEvent.SYNCING, ObjectStateEvent.SYNCED).map { event -> + channel.`object`.on(event) { stateEvent -> syncState = stateEvent } + } onDispose { - map?.unsubscribe(listener) + subscriptions.forEach { it.unsubscribe() } } } - return mapValue to map + return syncState } @Composable -fun observeRootObject(channel: Channel): LiveMap? { +fun observeRootObject(channel: Channel): LiveMapPathObject? { val channelState = observeChannelState(channel) - var root: LiveMap? by remember { mutableStateOf(null) } + var root: LiveMapPathObject? by remember { mutableStateOf(null) } LaunchedEffect(channelState) { if (channelState == ChannelState.attached) { supressCoroutineExceptions { - root = channel.objects.getRootCoroutines() + // Completes once the objects synchronization state has reached SYNCED + root = channel.`object`.get().await() } } } diff --git a/examples/src/main/kotlin/com/ably/example/screen/ColorVotingScreen.kt b/examples/src/main/kotlin/com/ably/example/screen/ColorVotingScreen.kt index b4c7f5f6f..6ea1d8c76 100644 --- a/examples/src/main/kotlin/com/ably/example/screen/ColorVotingScreen.kt +++ b/examples/src/main/kotlin/com/ably/example/screen/ColorVotingScreen.kt @@ -14,23 +14,20 @@ import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp import androidx.compose.ui.unit.sp import com.ably.example.getRealtimeChannel -import com.ably.example.incrementCoroutine import com.ably.example.observeCounter import com.ably.example.observeRootObject import io.ably.lib.realtime.AblyRealtime -import kotlinx.coroutines.launch @OptIn(ExperimentalMaterial3Api::class) @Composable fun ColorVotingScreen(realtimeClient: AblyRealtime) { - val scope = rememberCoroutineScope() val channel = getRealtimeChannel(realtimeClient, "objects-live-counter") val root = observeRootObject(channel) - val (redCount, redCounter, resetRed) = observeCounter(channel, root,"red") - val (greenCount, greenCounter, resetGreen) = observeCounter(channel, root,"green") - val (blueCount, blueCounter, resetBlue) = observeCounter(channel, root,"blue") + val (redCount, redCounter, resetRed) = observeCounter(root, "red") + val (greenCount, greenCounter, resetGreen) = observeCounter(root, "green") + val (blueCount, blueCounter, resetBlue) = observeCounter(root, "blue") Column( modifier = Modifier @@ -47,15 +44,16 @@ fun ColorVotingScreen(realtimeClient: AblyRealtime) { modifier = Modifier.padding(vertical = 16.dp) ) + ObjectsSyncStatusRow(channel, root) + ColorVoteCard( color = Color.Red, colorName = "Red", count = redCount ?: 0, - enabled = greenCounter != null, + enabled = redCounter != null, onVote = { - scope.launch { - redCounter?.incrementCoroutine(1) - } + // Fire-and-forget: the path subscription updates the displayed count on ack + redCounter?.increment(1) } ) @@ -65,9 +63,7 @@ fun ColorVotingScreen(realtimeClient: AblyRealtime) { count = greenCount ?: 0, enabled = greenCounter != null, onVote = { - scope.launch { - greenCounter?.incrementCoroutine(1) - } + greenCounter?.increment(1) } ) @@ -77,20 +73,16 @@ fun ColorVotingScreen(realtimeClient: AblyRealtime) { count = blueCount ?: 0, enabled = blueCounter != null, onVote = { - scope.launch { - blueCounter?.incrementCoroutine(1) - } + blueCounter?.increment(1) } ) Button( enabled = redCounter != null && greenCounter != null && blueCounter != null, onClick = { - scope.launch { - resetRed() - resetBlue() - resetGreen() - } + resetRed() + resetBlue() + resetGreen() }, ) { Text( diff --git a/examples/src/main/kotlin/com/ably/example/screen/ObjectsSyncStatus.kt b/examples/src/main/kotlin/com/ably/example/screen/ObjectsSyncStatus.kt new file mode 100644 index 000000000..19060902a --- /dev/null +++ b/examples/src/main/kotlin/com/ably/example/screen/ObjectsSyncStatus.kt @@ -0,0 +1,67 @@ +package com.ably.example.screen + +import androidx.compose.foundation.layout.Arrangement +import androidx.compose.foundation.layout.Row +import androidx.compose.foundation.layout.size +import androidx.compose.material.icons.Icons +import androidx.compose.material.icons.filled.CheckCircle +import androidx.compose.material3.CircularProgressIndicator +import androidx.compose.material3.Icon +import androidx.compose.material3.MaterialTheme +import androidx.compose.material3.Text +import androidx.compose.runtime.Composable +import androidx.compose.ui.Alignment +import androidx.compose.ui.Modifier +import androidx.compose.ui.platform.testTag +import androidx.compose.ui.unit.dp +import androidx.compose.ui.unit.sp +import com.ably.example.observeObjectsSyncState +import io.ably.lib.liveobjects.path.types.LiveMapPathObject +import io.ably.lib.liveobjects.state.ObjectStateEvent +import io.ably.lib.realtime.Channel + +/** + * Shows the channel's objects synchronization progress: a spinner with + * "Objects syncing..." while the initial sync (or a re-sync) is in flight, and a + * check mark with "Objects synced" once local state matches the channel. + * + * Sync-state events fire on transitions only, so a sync that completed before this + * composable subscribed emits nothing; a non-null [root] proves `channel.object.get()` + * has completed, which implies SYNCED. A later event (e.g. a re-sync after reattach) + * takes precedence over that inference. + */ +@Composable +fun ObjectsSyncStatusRow(channel: Channel, root: LiveMapPathObject?) { + val syncState = observeObjectsSyncState(channel) + val synced = syncState?.let { it == ObjectStateEvent.SYNCED } ?: (root != null) + + Row( + verticalAlignment = Alignment.CenterVertically, + horizontalArrangement = Arrangement.spacedBy(6.dp), + modifier = Modifier.testTag("objects_sync_status") + ) { + if (synced) { + Icon( + Icons.Default.CheckCircle, + contentDescription = "Objects synced", + tint = MaterialTheme.colorScheme.primary, + modifier = Modifier.size(16.dp) + ) + Text( + text = "Objects synced", + fontSize = 12.sp, + color = MaterialTheme.colorScheme.primary + ) + } else { + CircularProgressIndicator( + modifier = Modifier.size(14.dp), + strokeWidth = 2.dp + ) + Text( + text = "Objects syncing...", + fontSize = 12.sp, + color = MaterialTheme.colorScheme.onSurfaceVariant + ) + } + } +} diff --git a/examples/src/main/kotlin/com/ably/example/screen/TaskManagementScreen.kt b/examples/src/main/kotlin/com/ably/example/screen/TaskManagementScreen.kt index 9079aaedc..a44ffdfd2 100644 --- a/examples/src/main/kotlin/com/ably/example/screen/TaskManagementScreen.kt +++ b/examples/src/main/kotlin/com/ably/example/screen/TaskManagementScreen.kt @@ -18,11 +18,8 @@ import androidx.compose.ui.unit.sp import com.ably.example.getRealtimeChannel import com.ably.example.observeMap import com.ably.example.observeRootObject -import com.ably.example.removeCoroutine -import com.ably.example.setCoroutine -import io.ably.lib.objects.type.map.LiveMapValue +import io.ably.lib.liveobjects.value.LiveMapValue import io.ably.lib.realtime.AblyRealtime -import kotlinx.coroutines.launch import kotlin.uuid.ExperimentalUuidApi import kotlin.uuid.Uuid @@ -33,12 +30,10 @@ fun TaskManagementScreen(realtimeClient: AblyRealtime) { var editingTaskId by remember { mutableStateOf(null) } var editingText by remember { mutableStateOf("") } - val scope = rememberCoroutineScope() - val channel = getRealtimeChannel(realtimeClient, "objects-live-map") val root = observeRootObject(channel) - val (taskIdToTask, liveTasks) = observeMap(channel, root, "tasks") + val (taskIdToTask, liveTasks) = observeMap(root, "tasks") val taskEntries = remember(taskIdToTask) { taskIdToTask.entries.sortedBy { it.key } @@ -58,6 +53,8 @@ fun TaskManagementScreen(realtimeClient: AblyRealtime) { modifier = Modifier.fillMaxWidth() ) + ObjectsSyncStatusRow(channel, root) + Card( modifier = Modifier.fillMaxWidth(), elevation = CardDefaults.cardElevation(defaultElevation = 4.dp), @@ -80,13 +77,15 @@ fun TaskManagementScreen(realtimeClient: AblyRealtime) { horizontalArrangement = Arrangement.spacedBy(8.dp) ) { Button( + // Disabled until the tasks map is bound, so input is never cleared + // without the task actually being enqueued + enabled = liveTasks != null, onClick = { if (taskText.isNotBlank()) { - scope.launch { - val taskId = "${System.currentTimeMillis()}_${Uuid.random().toHexString()}" - liveTasks?.setCoroutine(taskId, LiveMapValue.of(taskText.trim())) - taskText = "" - } + val taskId = "${System.currentTimeMillis()}_${Uuid.random().toHexString()}" + // Fire-and-forget: the map subscription refreshes the task list on ack + liveTasks?.set(taskId, LiveMapValue.of(taskText.trim())) + taskText = "" } }, modifier = Modifier.weight(1f) @@ -98,10 +97,8 @@ fun TaskManagementScreen(realtimeClient: AblyRealtime) { OutlinedButton( onClick = { - scope.launch { - taskIdToTask.forEach { task -> - liveTasks?.removeCoroutine(task.key) - } + taskIdToTask.forEach { task -> + liveTasks?.remove(task.key) } }, modifier = Modifier.weight(1f) @@ -157,20 +154,16 @@ fun TaskManagementScreen(realtimeClient: AblyRealtime) { editingText = task.value }, onSave = { - scope.launch { - liveTasks?.setCoroutine(task.key, LiveMapValue.of(editingText.trim())) - editingTaskId = null - editingText = "" - } + liveTasks?.set(task.key, LiveMapValue.of(editingText.trim())) + editingTaskId = null + editingText = "" }, onCancel = { editingTaskId = null editingText = "" }, onDelete = { - scope.launch { - liveTasks?.removeCoroutine(task.key) - } + liveTasks?.remove(task.key) } ) }