Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
package io.ably.lib.liveobjects

import io.ably.lib.liveobjects.adapter.AblyClientAdapter
import io.ably.lib.liveobjects.message.WireCounterCreateWithObjectId
import io.ably.lib.liveobjects.message.WireMapCreateWithObjectId
import io.ably.lib.liveobjects.message.WireObjectMessage
import io.ably.lib.liveobjects.message.WireObjectOperation
import io.ably.lib.liveobjects.message.WireObjectOperationAction
import io.ably.lib.liveobjects.path.PathObjectSubscriptionRegister
import io.ably.lib.liveobjects.path.types.DefaultLiveMapPathObject
import io.ably.lib.liveobjects.path.types.LiveMapPathObject
import io.ably.lib.liveobjects.serialization.gson
import io.ably.lib.liveobjects.state.ObjectStateChange
import io.ably.lib.liveobjects.state.ObjectStateEvent
import io.ably.lib.liveobjects.value.LiveCounter
import io.ably.lib.liveobjects.value.LiveMap
import io.ably.lib.liveobjects.value.LiveMapValue
import io.ably.lib.liveobjects.value.ObjectType
import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter
import io.ably.lib.liveobjects.value.livemap.InternalLiveMap
import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.AblyException
import io.ably.lib.types.ProtocolMessage
Expand All @@ -25,6 +17,7 @@ import io.ably.lib.util.Log
import io.ably.lib.util.SystemClock
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.future.future
import kotlinx.coroutines.flow.MutableSharedFlow
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletableFuture
Expand All @@ -33,8 +26,8 @@ import java.util.concurrent.CompletableFuture
* Default implementation of [RealtimeObject], the entry point to the strongly-typed,
* path-based LiveObjects API for a single channel.
*
* This is currently a skeleton: the path-based read and subscribe operations are not yet
* implemented. The method bodies will be filled in as the path-based API is built out.
* [get] returns the root of the path-addressed view once the channel is attached and the
* initial sync has completed; path subscriptions are managed by [pathObjectSubscriptionRegister].
*
* Spec: RTO23
*/
Expand Down Expand Up @@ -62,6 +55,12 @@ internal class DefaultRealtimeObject(
*/
private val objectsManager = ObjectsManager(this)

/**
* Registry for PathObject subscriptions and path-event dispatch.
* @spec RTO24a
*/
internal val pathObjectSubscriptionRegister = PathObjectSubscriptionRegister(this)

/**
* Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues.
*/
Expand All @@ -80,10 +79,17 @@ internal class DefaultRealtimeObject(
}

override fun get(): CompletableFuture<LiveMapPathObject> {
throwIfInvalidAccessApiConfiguration()
TODO("Not yet implemented, this should call getRootAsync")
throwIfInvalidAccessApiConfiguration() // RTO23a
return asyncApi { getRootAsync() }
}

/**
* Runs [block] on the sequential scope and exposes it as a CompletableFuture. Failures
* complete the future exceptionally with the original AblyException.
*/
internal fun <T> asyncApi(block: suspend () -> T): CompletableFuture<T> =
sequentialScope.future { block() }

override fun on(event: ObjectStateEvent, listener: ObjectStateChange.Listener): Subscription {
throwIfInvalidAccessApiConfiguration()
return objectsManager.on(event, listener)
Expand All @@ -99,90 +105,27 @@ internal class DefaultRealtimeObject(
objectsManager.offAll()
}

private suspend fun getRootAsync(): LiveMap = withContext(sequentialScope.coroutineContext) {
adapter.ensureAttached(channelName)
objectsManager.ensureSynced(state)
objectsPool.get(ROOT_OBJECT_ID) as LiveMap
private suspend fun getRootAsync(): LiveMapPathObject {
adapter.ensureAttached(channelName) // RTO23e
objectsManager.ensureSynced(state) // RTO23c
// RTO23d - a PathObject with an empty path, rooted at the channel's root InternalLiveMap;
// the root reference (RTPO2b) is realised as a pool lookup at resolution time, which is
// equivalent because the pool never replaces the root instance (RTO4b2, RTO5c2a).
// RTTS6d - the static type is LiveMapPathObject.
return DefaultLiveMapPathObject(this, "")
}

private suspend fun createMapAsync(entries: MutableMap<String, LiveMapValue>): LiveMap {
throwIfInvalidWriteApiConfiguration() // RTO26

if (entries.keys.any { it.isEmpty() }) { // RTLMV4b
throw invalidInputError("Map keys should not be empty")
}

// RTLMV4e - Create initial value operation
val initialMapValue = InternalLiveMap.initialValue(entries)

// RTLMV4f - Create initial value JSON string
val initialValueJSONString = gson.toJson(initialMapValue)

// RTO14 - Create object ID from initial value
val (objectId, nonce) = getObjectIdStringWithNonce(ObjectType.Map, initialValueJSONString)

// Create ObjectMessage with the operation
val msg = WireObjectMessage(
operation = WireObjectOperation(
action = WireObjectOperationAction.MapCreate,
objectId = objectId,
mapCreateWithObjectId = WireMapCreateWithObjectId(
nonce = nonce,
initialValue = initialValueJSONString,
derivedFrom = initialMapValue,
),
)
)

// RTLMV3 - publish and apply locally on ACK
publishAndApply(arrayOf(msg))

// RTLMV3 - Return existing object if found after apply
return objectsPool.get(objectId) as? LiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTLMV3
}

private suspend fun createCounterAsync(initialValue: Number): LiveCounter {
throwIfInvalidWriteApiConfiguration() // RTO26

// Validate input parameter
if (initialValue.toDouble().isNaN() || initialValue.toDouble().isInfinite()) {
throw invalidInputError("Counter value should be a valid number")
}

// RTLCV4b
val initialCounterValue = InternalLiveCounter.initialValue(initialValue)
// RTLCV4c - Create initial value operation
val initialValueJSONString = gson.toJson(initialCounterValue)

// RTO14 - Create object ID from initial value
val (objectId, nonce) = getObjectIdStringWithNonce(ObjectType.Counter, initialValueJSONString)

// Create ObjectMessage with the operation
val msg = WireObjectMessage(
operation = WireObjectOperation(
action = WireObjectOperationAction.CounterCreate,
objectId = objectId,
counterCreateWithObjectId = WireCounterCreateWithObjectId(
nonce = nonce,
initialValue = initialValueJSONString,
derivedFrom = initialCounterValue,
),
)
)

// RTLCV3 - publish and apply locally on ACK
publishAndApply(arrayOf(msg))

// RTLCV3 - Return existing object if found after apply
return objectsPool.get(objectId) as? LiveCounter
?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTLCV3
}
/**
* Runs a mutating [block] on the sequential scope, exposed as a CompletableFuture<Void>.
* Used by the path/instance write APIs.
*/
internal fun asyncVoidApi(block: suspend () -> Unit): CompletableFuture<Void> =
asyncApi(block).thenApply { null }

/**
* Spec: RTO14
*/
private suspend fun getObjectIdStringWithNonce(objectType: ObjectType, initialValue: String): Pair<String, String> {
internal suspend fun getObjectIdStringWithNonce(objectType: ObjectType, initialValue: String): Pair<String, String> {
val nonce = generateNonce()
val msTimestamp = ServerTime.getCurrentTime(adapter) // RTO16 - Get server time for nonce generation
return Pair(ObjectId.fromInitialValue(objectType, initialValue, nonce, msTimestamp).toString(), nonce)
Expand Down Expand Up @@ -302,13 +245,11 @@ internal class DefaultRealtimeObject(

objectsManager.clearBufferedObjectOperations() // RTO4d - clear unconditionally on ATTACHED

// RTO4a
val fromInitializedState = this@DefaultRealtimeObject.state == ObjectsState.Initialized
if (hasObjects || fromInitializedState) {
// should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value.
// this guarantees we emit both "syncing" -> "synced" events in that order.
objectsManager.startNewSync(null)
}
// RTO4c - always transition to SYNCING, regardless of the HAS_OBJECTS flag (CV-1:
// matches the spec and current ably-js; the previous hasObjects/initialized
// conditional was removed upstream in ably-js commit e280bff1, so a re-attach with
// HAS_OBJECTS=0 from the Synced state now emits SYNCING -> SYNCED as required)
objectsManager.startNewSync(null)

// RTO4b
if (!hasObjects) {
Expand Down Expand Up @@ -356,6 +297,7 @@ internal class DefaultRealtimeObject(
incomingObjectsHandler.cancel(disposeReason) // objectsEventBus automatically garbage collected when collector is cancelled
objectsPool.dispose()
objectsManager.dispose()
pathObjectSubscriptionRegister.dispose()
// Don't cancel sequentialScope (needed in getRoot method), just cancel ongoing coroutines
sequentialScope.coroutineContext.cancelChildren(disposeReason)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.ably.lib.liveobjects.value.BaseRealtimeObject
import io.ably.lib.liveobjects.value.ObjectUpdate
import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter
import io.ably.lib.liveobjects.value.livemap.InternalLiveMap
import io.ably.lib.liveobjects.value.livemap.isEntryOrRefTombstoned
import io.ably.lib.types.AblyException
import io.ably.lib.util.Log
import kotlinx.coroutines.CompletableDeferred
Expand Down Expand Up @@ -180,12 +181,38 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
// RTO5c2 - need to remove realtimeObject instances from the ObjectsPool for which objectIds were not received during the sync sequence
realtimeObjects.objectsPool.deleteExtraObjectIds(receivedObjectIds)

// RTO5c10 - rebuild every parentReferences map after the pool has settled, so that
// getFullPaths is correct by the time the RTO5c7 notifications below are dispatched
rebuildAllParentReferences()

// RTO5c7 - call subscription callbacks for all updated existing objects
existingObjectUpdates.forEach { (obj, update) ->
obj.notifyUpdated(update)
}
}

/**
* Rebuilds all parent references from the settled pool state. Necessary after a sync because
* objects may reference other objects that were not yet in the pool when their references
* were first applied. Mirrors ably-js realtimeobject.ts#_rebuildAllParentReferences.
*
* @spec RTO5c10
*/
private fun rebuildAllParentReferences() {
val objects = realtimeObjects.objectsPool.all()
objects.forEach { it.clearParentReferences() } // RTO5c10a
objects.filterIsInstance<InternalLiveMap>().forEach { map ->
// RTO5c10b - RTLM11-equivalent iteration over the raw entries: skip entries that are
// tombstoned or reference a tombstoned object (RTLM14), but avoid the full value
// resolution of entries() since only entry.data.objectId is needed here
for ((key, entry) in map.data) {
val refId = entry.data?.objectId ?: continue
if (entry.isEntryOrRefTombstoned(realtimeObjects.objectsPool)) continue
realtimeObjects.objectsPool.get(refId)?.addParentReference(map, key)
}
}
}

/**
* Applies object messages to objects.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ internal class ObjectsPool(
pool[objectId] = realtimeObject
}

/**
* All objects currently in the pool. Used by the RTO5c10 parent-reference rebuild.
*/
internal fun all(): Collection<BaseRealtimeObject> = pool.values

/**
* Removes all objects but root from the pool and clears the data for root.
* Does not create a new root object, so the reference to the root object remains the same.
Expand Down
3 changes: 0 additions & 3 deletions liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ internal fun objectError(errorMessage: String, cause: Throwable? = null): AblyEx
return ablyException(errorMessage, ObjectErrorCode.InvalidObject, ObjectHttpStatusCode.InternalServerError, cause)
}

internal fun invalidInputError(errorMessage: String, cause: Throwable? = null): AblyException {
return ablyException(errorMessage, ObjectErrorCode.InvalidInputParams, ObjectHttpStatusCode.InternalServerError, cause)
}

/**
* Calculates the byte size of a string.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
package io.ably.lib.liveobjects.instance

import io.ably.lib.liveobjects.DefaultRealtimeObject
import io.ably.lib.liveobjects.ValueType
import io.ably.lib.liveobjects.instance.types.BinaryInstance
import io.ably.lib.liveobjects.instance.types.BooleanInstance
import io.ably.lib.liveobjects.instance.types.DefaultBinaryInstance
import io.ably.lib.liveobjects.instance.types.DefaultBooleanInstance
import io.ably.lib.liveobjects.instance.types.DefaultJsonArrayInstance
import io.ably.lib.liveobjects.instance.types.DefaultJsonObjectInstance
import io.ably.lib.liveobjects.instance.types.DefaultLiveCounterInstance
import io.ably.lib.liveobjects.instance.types.DefaultLiveMapInstance
import io.ably.lib.liveobjects.instance.types.DefaultNumberInstance
import io.ably.lib.liveobjects.instance.types.DefaultStringInstance
import io.ably.lib.liveobjects.instance.types.JsonArrayInstance
import io.ably.lib.liveobjects.instance.types.JsonObjectInstance
import io.ably.lib.liveobjects.instance.types.LiveCounterInstance
import io.ably.lib.liveobjects.instance.types.LiveMapInstance
import io.ably.lib.liveobjects.instance.types.NumberInstance
import io.ably.lib.liveobjects.instance.types.StringInstance
import io.ably.lib.liveobjects.value.ResolvedValue
import io.ably.lib.liveobjects.value.valueType
import java.util.Base64

/**
* Default implementation of [Instance], the identity-addressed node in the LiveObjects graph.
Expand Down Expand Up @@ -44,3 +56,25 @@ internal abstract class DefaultInstance(

override fun asJsonArray(): JsonArrayInstance = throw IllegalStateException("Not a JsonArray instance")
}

/**
* Wraps a resolved value in its typed Instance. Returns null only for a Leaf that matches no
* known category (ValueType.UNKNOWN - no typed wrapper exists for it). Primitive instances bind
* the extracted (decoded) value, not the wire leaf: an Instance is identity/value-addressed and
* O(1), so it must not re-read mutable map state.
*
* Spec: RTPO8c, RTINS5c, RTTS7e (an Instance is always a concrete typed sub-class)
*/
internal fun ResolvedValue.toInstance(channelObject: DefaultRealtimeObject): Instance? = when (this) {
is ResolvedValue.MapRef -> DefaultLiveMapInstance(channelObject, map)
is ResolvedValue.CounterRef -> DefaultLiveCounterInstance(channelObject, counter)
is ResolvedValue.Leaf -> when (valueType()) {
ValueType.STRING -> DefaultStringInstance(channelObject, data.string!!)
ValueType.NUMBER -> DefaultNumberInstance(channelObject, data.number!!)
ValueType.BOOLEAN -> DefaultBooleanInstance(channelObject, data.boolean!!)
ValueType.BINARY -> DefaultBinaryInstance(channelObject, Base64.getDecoder().decode(data.bytes))
ValueType.JSON_OBJECT -> DefaultJsonObjectInstance(channelObject, data.json!!.asJsonObject)
ValueType.JSON_ARRAY -> DefaultJsonArrayInstance(channelObject, data.json!!.asJsonArray)
else -> null // UNKNOWN leaf - no typed wrapper exists
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package io.ably.lib.liveobjects.instance

import io.ably.lib.liveobjects.message.ObjectMessage
import io.ably.lib.liveobjects.value.livecounter.LiveCounterChangeEvent
import io.ably.lib.liveobjects.value.livemap.LiveMapChangeEvent

/**
* Default implementation of [InstanceSubscriptionEvent], the event delivered to an
* [InstanceListener] when the wrapped LiveObject is updated. A plain holder for the updated
* [Instance] and the source [ObjectMessage] (if any).
*
* Implements both internal change-event markers so the map/counter change emitters can carry
* it without casts (the markers exist only to type the two emitters).
*
* Spec: RTINS16e
*/
internal class DefaultInstanceSubscriptionEvent(
private val instance: Instance,
private val message: ObjectMessage?,
) : InstanceSubscriptionEvent {
) : InstanceSubscriptionEvent, LiveMapChangeEvent, LiveCounterChangeEvent {

override fun getObject(): Instance = instance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@ import com.google.gson.JsonPrimitive
import io.ably.lib.liveobjects.DefaultRealtimeObject
import io.ably.lib.liveobjects.ValueType
import io.ably.lib.liveobjects.instance.DefaultInstance
import java.util.Base64

/**
* Default implementation of [BinaryInstance], a read-only primitive view that only adds a
* type-narrowed, non-null [value]; left unimplemented for now.
* Default implementation of [BinaryInstance], a read-only primitive view bound to its
* extracted (base64-decoded) value (RTINS2a).
*
* Spec: RTTS10c
*/
internal class DefaultBinaryInstance(
channelObject: DefaultRealtimeObject,
internal val value: ByteArray,
) : DefaultInstance(channelObject), BinaryInstance {

override fun getType(): ValueType = ValueType.BINARY

override fun compactJson(): JsonPrimitive {
channelObject.throwIfInvalidAccessApiConfiguration()
TODO("Not yet implemented")
channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a
return JsonPrimitive(Base64.getEncoder().encodeToString(value)) // RTTS7a3; base64 per RTPO14b1
}

override fun asBinary(): BinaryInstance = this

override fun value(): ByteArray {
channelObject.throwIfInvalidAccessApiConfiguration()
TODO("Not yet implemented")
channelObject.throwIfInvalidAccessApiConfiguration() // RTINS4a
return value.clone() // RTINS4c; defensive copy out
}
}
Loading
Loading