Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 126 additions & 194 deletions examples/src/main/kotlin/com/ably/example/Utils.kt
Original file line number Diff line number Diff line change
@@ -1,211 +1,115 @@
/**
* The LiveObjects bridge layer for the example app: Compose observers plus coroutine
* wrappers over the path-based API (`io.ably.lib.liveobjects`).
*
* The app works exclusively with [io.ably.lib.liveobjects.path.PathObject]s — references
* to a *location* in the channel's objects graph, not to a particular object. A PathObject
* resolves its path lazily on every call, so a stored reference stays valid even when the
* object at that location is replaced (e.g. "Reset all" swaps in brand-new counters), and
* a path subscription automatically observes whatever object currently lives there. The
* identity-bound alternative, [io.ably.lib.liveobjects.instance.Instance] (obtained via
* `pathObject.instance()`), tracks one specific object wherever it moves — not needed here,
* since every screen wants location semantics.
*
* ably-java's typed flavour of the API: the base PathObject exposes only type-agnostic
* methods; type-specific reads/writes live on sub-type views reached via `as*` casts
* (`asLiveCounter()`, `asLiveMap()`, `asString()`, ...). The casts never throw — a
* wrong-typed view degrades gracefully (reads return null/empty, writes fail with an
* AblyException). The root obtained from `channel.object.get()` is already a
* [LiveMapPathObject], so no cast is needed there.
*/
package com.ably.example

import androidx.compose.runtime.*
import io.ably.lib.objects.ObjectsCallback
import io.ably.lib.objects.RealtimeObjects
import io.ably.lib.objects.type.counter.LiveCounter
import io.ably.lib.objects.type.counter.LiveCounterUpdate
import io.ably.lib.objects.type.map.LiveMap
import io.ably.lib.objects.type.map.LiveMapUpdate
import io.ably.lib.objects.type.map.LiveMapUpdate.Change.UPDATED
import io.ably.lib.objects.type.map.LiveMapValue
import androidx.compose.runtime.Composable
import androidx.compose.runtime.DisposableEffect
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.setValue
import io.ably.lib.liveobjects.ValueType
import io.ably.lib.liveobjects.path.types.LiveCounterPathObject
import io.ably.lib.liveobjects.path.types.LiveMapPathObject
import io.ably.lib.liveobjects.state.ObjectStateEvent
import io.ably.lib.liveobjects.value.LiveCounter
import io.ably.lib.liveobjects.value.LiveMap
import io.ably.lib.liveobjects.value.LiveMapValue
import io.ably.lib.realtime.AblyRealtime
import io.ably.lib.realtime.Channel
import io.ably.lib.realtime.ChannelState
import io.ably.lib.realtime.ChannelStateListener
import io.ably.lib.types.AblyException
import io.ably.lib.types.ChannelMode
import io.ably.lib.types.ChannelOptions
import io.ably.lib.types.ErrorInfo
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume

private suspend fun RealtimeObjects.getRootCoroutines(): LiveMap = suspendCancellableCoroutine { continuation ->
getRootAsync(object : ObjectsCallback<LiveMap> {
override fun onSuccess(result: LiveMap?) {
continuation.resume(result!!)
}

override fun onError(exception: AblyException?) {
continuation.cancel(exception)
}
})
}

private suspend fun RealtimeObjects.createCounterCoroutine(): LiveCounter =
suspendCancellableCoroutine { continuation ->
createCounterAsync(object : ObjectsCallback<LiveCounter> {
override fun onSuccess(result: LiveCounter?) {
continuation.resume(result!!)
}

override fun onError(exception: AblyException?) {
continuation.cancel(exception)
}
})
}

private suspend fun RealtimeObjects.createMapCoroutine(): LiveMap = suspendCancellableCoroutine { continuation ->
createMapAsync(object : ObjectsCallback<LiveMap> {
override fun onSuccess(result: LiveMap?) {
continuation.resume(result!!)
}

override fun onError(exception: AblyException?) {
continuation.cancel(exception)
}
})
}

suspend fun LiveCounter.incrementCoroutine(amount: Int): Unit = supressCoroutineExceptions {
suspendCancellableCoroutine { continuation ->
incrementAsync(amount, object : ObjectsCallback<Void> {
override fun onSuccess(result: Void?) {
continuation.resume(Unit)
}

override fun onError(exception: AblyException?) {
continuation.cancel(exception)
}
})
}
}

suspend fun LiveCounter.decrementCoroutine(amount: Int): Unit = supressCoroutineExceptions {
suspendCancellableCoroutine { continuation ->
decrementAsync(amount, object : ObjectsCallback<Void> {
override fun onSuccess(result: Void?) {
continuation.resume(Unit)
}

override fun onError(exception: AblyException?) {
continuation.cancel(exception)
}
})
}
}

suspend fun Channel.updateOptions(options: ChannelOptions): Unit = supressCoroutineExceptions {
suspendCancellableCoroutine { continuation ->
setOptions(options, object : io.ably.lib.realtime.CompletionListener {
override fun onSuccess() {
continuation.resume(Unit)
}

override fun onError(reason: ErrorInfo?) {
continuation.cancel(AblyException.fromErrorInfo(reason))
}
})
}
}

suspend fun getOrCreateCounter(channel: Channel, root: LiveMap?, path: String): LiveCounter {
val mapValue = root?.get(path)
if (mapValue == null) {
val counter = channel.objects.createCounterCoroutine()
root?.setCoroutine(path, LiveMapValue.of(counter))
return counter
} else {
return mapValue.asLiveCounter
}
}

suspend fun getOrCreateMap(channel: Channel, root: LiveMap?, path: String): LiveMap {
val mapValue = root?.get(path)
if (mapValue == null) {
val map = channel.objects.createMapCoroutine()
root?.setCoroutine(path, LiveMapValue.of(map))
return map
} else {
return mapValue.asLiveMap
}
import kotlinx.coroutines.future.await

/**
* Returns the counter path object at [key] under [root], creating and linking a fresh
* zero-value counter when the path is missing or holds a non-counter value. Path objects
* re-resolve on every call, so the returned reference stays valid even if another client
* replaces the counter object at this key.
*
* The check-then-set is not atomic: another client can create the same key in between.
* That is fine here - the colliding operations converge deterministically via
* last-write-wins, and both sides end up bound to whichever counter won.
*/
suspend fun getOrCreateCounter(root: LiveMapPathObject, key: String): LiveCounterPathObject {
val path = root.get(key)
if (path.type != ValueType.LIVE_COUNTER) {
// Creates the counter and links it under root in a single published operation
root.set(key, LiveMapValue.of(LiveCounter.create())).await()
}
return path.asLiveCounter()
}

suspend fun LiveMap.setCoroutine(key: String, value: LiveMapValue) = supressCoroutineExceptions {
suspendCancellableCoroutine<Unit> { continuation ->
setAsync(key, value, object : ObjectsCallback<Void> {
override fun onSuccess(result: Void?) {
continuation.resume(Unit)
}

override fun onError(exception: AblyException?) {
continuation.cancel(exception)
}
})
}
}

suspend fun LiveMap.removeCoroutine(key: String) = supressCoroutineExceptions {
suspendCancellableCoroutine<Unit> { continuation ->
removeAsync(key, object : ObjectsCallback<Void> {
override fun onSuccess(result: Void?) {
continuation.resume(Unit)
}

override fun onError(exception: AblyException?) {
continuation.cancel(exception)
}
})
}
/**
* Returns the map path object at [key] under [root], creating and linking a fresh empty
* map when the path is missing or holds a non-map value. Same last-write-wins caveat as
* [getOrCreateCounter].
*/
suspend fun getOrCreateMap(root: LiveMapPathObject, key: String): LiveMapPathObject {
val path = root.get(key)
if (path.type != ValueType.LIVE_MAP) {
root.set(key, LiveMapValue.of(LiveMap.create())).await()
}
return path.asLiveMap()
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@Composable
fun observeCounter(channel: Channel, root: LiveMap?, path: String): CounterState {
var counter by remember { mutableStateOf<LiveCounter?>(null) }
fun observeCounter(root: LiveMapPathObject?, key: String): CounterState {
var counter by remember { mutableStateOf<LiveCounterPathObject?>(null) }
var counterValue by remember { mutableStateOf<Int?>(null) }

LaunchedEffect(root) {
supressCoroutineExceptions {
counter = getOrCreateCounter(channel, root, path)
LaunchedEffect(root, key) {
root?.let {
supressCoroutineExceptions {
counter = getOrCreateCounter(it, key)
}
}
}

DisposableEffect(counter) {
counterValue = counter?.value()?.toInt()

val listener: (LiveCounterUpdate) -> Unit = {
counter?.value()?.let {
counterValue = it.toInt()
}
// The path subscription fires both for increments on the counter and for the key
// being replaced with a new counter (e.g. "Reset all" on another device); the path
// re-resolves on read, so no explicit rebinding is needed.
val subscription = counter?.subscribe {
counterValue = counter?.value()?.toInt()
}

counter?.subscribe(listener)

onDispose {
counter?.unsubscribe(listener)
}
}

DisposableEffect(root) {
val listener: (LiveMapUpdate) -> Unit = { rootUpdate ->
val counterHasBeenRemoved = rootUpdate.update
.filter { (_, change) -> change == UPDATED }
.any { (keyName) -> keyName == path }

if (counterHasBeenRemoved) root?.get(path)?.asLiveCounter?.let { counter = it }
}

root?.subscribe(listener)

onDispose {
root?.unsubscribe(listener)
subscription?.unsubscribe()
}
}

return CounterState(counterValue, counter) {
coroutineScope {
launch {
counter = channel.objects.createCounterCoroutine().also {
root?.setCoroutine(path, LiveMapValue.of(it))
}
}
}
// Reset by replacing the object at this key with a brand-new zero-value counter;
// fire-and-forget - the path subscription refreshes the displayed value on ack
root?.set(key, LiveMapValue.of(LiveCounter.create()))
}
}

data class CounterState(val value: Int?, val counter: LiveCounter?, val reset: suspend () -> Unit)
data class CounterState(val value: Int?, val counter: LiveCounterPathObject?, val reset: () -> Unit)

@Composable
fun observeChannelState(channel: Channel): ChannelState {
Expand All @@ -227,46 +131,74 @@ fun observeChannelState(channel: Channel): ChannelState {
}

@Composable
fun observeMap(channel: Channel, root: LiveMap?, path: String): Pair<Map<String, String>, LiveMap?> {
var map by remember { mutableStateOf<LiveMap?>(null) }
fun observeMap(root: LiveMapPathObject?, key: String): Pair<Map<String, String>, LiveMapPathObject?> {
var map by remember { mutableStateOf<LiveMapPathObject?>(null) }
var mapValue by remember { mutableStateOf<Map<String, String>>(mapOf()) }

LaunchedEffect(root) {
supressCoroutineExceptions {
map = getOrCreateMap(channel, root, path)
fun readEntries(liveMap: LiveMapPathObject?): Map<String, String> =
liveMap?.entries()
?.mapNotNull { (entryKey, valuePath) -> valuePath.asString().value()?.let { entryKey to it } }
?.toMap()
?: mapOf()

LaunchedEffect(root, key) {
root?.let {
supressCoroutineExceptions {
map = getOrCreateMap(it, key)
}
}
}

DisposableEffect(map) {
map?.entries()?.associate { (key, value) -> key to value.asString }?.let {
mapValue = it
mapValue = readEntries(map)

// Fires for every entry change on the map at this path (default subscription depth
// covers nested updates), after which entries are re-read from the resolved map.
val subscription = map?.subscribe {
mapValue = readEntries(map)
}

val listener: (LiveMapUpdate) -> Unit = {
map?.entries()?.associate { (key, value) -> key to value.asString }?.let {
mapValue = it
}
onDispose {
subscription?.unsubscribe()
}
}

return mapValue to map
}

map?.subscribe(listener)
/**
* Observes the channel's objects synchronization state via `channel.object.on(event)`.
* Returns the most recent [ObjectStateEvent], or null before any event has been received
* (events fire on transitions only, so an already-completed sync emits nothing).
*/
@Composable
fun observeObjectsSyncState(channel: Channel): ObjectStateEvent? {
var syncState by remember { mutableStateOf<ObjectStateEvent?>(null) }

DisposableEffect(channel) {
// There is no wildcard subscription - register one listener per event
val subscriptions = listOf(ObjectStateEvent.SYNCING, ObjectStateEvent.SYNCED).map { event ->
channel.`object`.on(event) { stateEvent -> syncState = stateEvent }
}

onDispose {
map?.unsubscribe(listener)
subscriptions.forEach { it.unsubscribe() }
}
}

return mapValue to map
return syncState
}

@Composable
fun observeRootObject(channel: Channel): LiveMap? {
fun observeRootObject(channel: Channel): LiveMapPathObject? {
val channelState = observeChannelState(channel)
var root: LiveMap? by remember { mutableStateOf(null) }
var root: LiveMapPathObject? by remember { mutableStateOf(null) }

LaunchedEffect(channelState) {
if (channelState == ChannelState.attached) {
supressCoroutineExceptions {
root = channel.objects.getRootCoroutines()
// Completes once the objects synchronization state has reached SYNCED
root = channel.`object`.get().await()
}
}
}
Comment thread
sacOO7 marked this conversation as resolved.
Expand Down
Loading
Loading