From b50f9048e89e7b7622acc3d55cf4abc7985dc904 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 3 Jul 2026 15:21:40 +0530 Subject: [PATCH] feat(liveobjects): implement path-based public API - reads, writes, subscriptions and event bubbling Completes the path-addressed LiveObjects API on top of the sync engine: - Bridge ResolvedValue (MapRef/CounterRef/Leaf) to the internal graph objects so path resolution returns live references - Replace the untyped ObjectUpdate with a sealed model (MapUpdate/CounterUpdate/NoOp) carrying the source objectMessage and tombstone flag (RTLO4b4, RTLM18, RTLC11) - Track parent references on every graph object and derive getFullPaths() for event bubbling (RTLO3f, RTLO4f-RTLO4h) - Implement RTPO3 path resolution and the read APIs: RealtimeObject#get (RTO23), PathObject value/exists/instance/compactJson (RTPO8, RTPO13/RTPO14) - Add creation value-type evaluation and the path write APIs - Add value-bound typed Instances and PathObject#instance (RTINS5, RTTS6/RTTS7) - Add path/instance subscriptions with depth windows and event bubbling via PathObjectSubscriptionRegister (RTO24, RTPO19) - RTO4c fix: always start a new sync sequence on ATTACHED regardless of the HAS_OBJECTS flag, matching current ably-js - Make stored path strings round-trip-safe by escaping backslashes as well as dots in PathSegments#join - Remove dead createMap/createCounter code paths and re-enable the UTS liveobjects test suites --- .../lib/liveobjects/DefaultRealtimeObject.kt | 140 ++++-------- .../io/ably/lib/liveobjects/ObjectsManager.kt | 27 +++ .../io/ably/lib/liveobjects/ObjectsPool.kt | 5 + .../kotlin/io/ably/lib/liveobjects/Utils.kt | 3 - .../liveobjects/instance/DefaultInstance.kt | 34 +++ .../DefaultInstanceSubscriptionEvent.kt | 7 +- .../instance/types/DefaultBinaryInstance.kt | 14 +- .../instance/types/DefaultBooleanInstance.kt | 13 +- .../types/DefaultJsonArrayInstance.kt | 13 +- .../types/DefaultJsonObjectInstance.kt | 13 +- .../types/DefaultLiveCounterInstance.kt | 43 ++-- .../instance/types/DefaultLiveMapInstance.kt | 62 +++--- .../instance/types/DefaultNumberInstance.kt | 13 +- .../instance/types/DefaultStringInstance.kt | 13 +- .../lib/liveobjects/path/DefaultPathObject.kt | 62 ++++-- .../path/PathObjectSubscriptionRegister.kt | 81 +++++++ .../ably/lib/liveobjects/path/PathSegments.kt | 76 +++++++ .../path/types/DefaultBinaryPathObject.kt | 10 +- .../path/types/DefaultBooleanPathObject.kt | 9 +- .../path/types/DefaultJsonArrayPathObject.kt | 9 +- .../path/types/DefaultJsonObjectPathObject.kt | 9 +- .../types/DefaultLiveCounterPathObject.kt | 46 ++-- .../path/types/DefaultLiveMapPathObject.kt | 73 ++++--- .../path/types/DefaultNumberPathObject.kt | 9 +- .../path/types/DefaultStringPathObject.kt | 9 +- .../value/BaseRealtimeLiveObject.kt | 189 +++++++++++++--- .../lib/liveobjects/value/ResolvedValue.kt | 12 +- .../value/livecounter/DefaultLiveCounter.kt | 45 +++- .../value/livecounter/InternalLiveCounter.kt | 52 ++--- .../LiveCounterChangeCoordinator.kt | 13 +- .../value/livecounter/LiveCounterManager.kt | 69 +++--- .../value/livemap/DefaultLiveMap.kt | 94 +++++++- .../value/livemap/InternalLiveMap.kt | 204 ++++++++++-------- .../value/livemap/LiveMapChangeCoordinator.kt | 13 +- .../liveobjects/value/livemap/LiveMapEntry.kt | 53 ++--- .../value/livemap/LiveMapManager.kt | 171 ++++++++------- .../lib/liveobjects/unit/PathSegmentsTest.kt | 96 +++++++++ uts/build.gradle.kts | 10 - 38 files changed, 1203 insertions(+), 611 deletions(-) create mode 100644 liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/PathObjectSubscriptionRegister.kt create mode 100644 liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/PathSegments.kt create mode 100644 liveobjects/src/test/kotlin/io/ably/lib/liveobjects/unit/PathSegmentsTest.kt diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt index 03e32332a..6bedf3cbc 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/DefaultRealtimeObject.kt @@ -1,21 +1,13 @@ package io.ably.lib.liveobjects import io.ably.lib.liveobjects.adapter.AblyClientAdapter -import io.ably.lib.liveobjects.message.WireCounterCreateWithObjectId -import io.ably.lib.liveobjects.message.WireMapCreateWithObjectId import io.ably.lib.liveobjects.message.WireObjectMessage -import io.ably.lib.liveobjects.message.WireObjectOperation -import io.ably.lib.liveobjects.message.WireObjectOperationAction +import io.ably.lib.liveobjects.path.PathObjectSubscriptionRegister +import io.ably.lib.liveobjects.path.types.DefaultLiveMapPathObject import io.ably.lib.liveobjects.path.types.LiveMapPathObject -import io.ably.lib.liveobjects.serialization.gson import io.ably.lib.liveobjects.state.ObjectStateChange import io.ably.lib.liveobjects.state.ObjectStateEvent -import io.ably.lib.liveobjects.value.LiveCounter -import io.ably.lib.liveobjects.value.LiveMap -import io.ably.lib.liveobjects.value.LiveMapValue import io.ably.lib.liveobjects.value.ObjectType -import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter -import io.ably.lib.liveobjects.value.livemap.InternalLiveMap import io.ably.lib.realtime.ChannelState import io.ably.lib.types.AblyException import io.ably.lib.types.ProtocolMessage @@ -25,6 +17,7 @@ import io.ably.lib.util.Log import io.ably.lib.util.SystemClock import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED +import kotlinx.coroutines.future.future import kotlinx.coroutines.flow.MutableSharedFlow import java.util.concurrent.CancellationException import java.util.concurrent.CompletableFuture @@ -33,8 +26,8 @@ import java.util.concurrent.CompletableFuture * Default implementation of [RealtimeObject], the entry point to the strongly-typed, * path-based LiveObjects API for a single channel. * - * This is currently a skeleton: the path-based read and subscribe operations are not yet - * implemented. The method bodies will be filled in as the path-based API is built out. + * [get] returns the root of the path-addressed view once the channel is attached and the + * initial sync has completed; path subscriptions are managed by [pathObjectSubscriptionRegister]. * * Spec: RTO23 */ @@ -62,6 +55,12 @@ internal class DefaultRealtimeObject( */ private val objectsManager = ObjectsManager(this) + /** + * Registry for PathObject subscriptions and path-event dispatch. + * @spec RTO24a + */ + internal val pathObjectSubscriptionRegister = PathObjectSubscriptionRegister(this) + /** * Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues. */ @@ -80,10 +79,17 @@ internal class DefaultRealtimeObject( } override fun get(): CompletableFuture { - throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented, this should call getRootAsync") + throwIfInvalidAccessApiConfiguration() // RTO23a + return asyncApi { getRootAsync() } } + /** + * Runs [block] on the sequential scope and exposes it as a CompletableFuture. Failures + * complete the future exceptionally with the original AblyException. + */ + internal fun asyncApi(block: suspend () -> T): CompletableFuture = + sequentialScope.future { block() } + override fun on(event: ObjectStateEvent, listener: ObjectStateChange.Listener): Subscription { throwIfInvalidAccessApiConfiguration() return objectsManager.on(event, listener) @@ -99,90 +105,27 @@ internal class DefaultRealtimeObject( objectsManager.offAll() } - private suspend fun getRootAsync(): LiveMap = withContext(sequentialScope.coroutineContext) { - adapter.ensureAttached(channelName) - objectsManager.ensureSynced(state) - objectsPool.get(ROOT_OBJECT_ID) as LiveMap + private suspend fun getRootAsync(): LiveMapPathObject { + adapter.ensureAttached(channelName) // RTO23e + objectsManager.ensureSynced(state) // RTO23c + // RTO23d - a PathObject with an empty path, rooted at the channel's root InternalLiveMap; + // the root reference (RTPO2b) is realised as a pool lookup at resolution time, which is + // equivalent because the pool never replaces the root instance (RTO4b2, RTO5c2a). + // RTTS6d - the static type is LiveMapPathObject. + return DefaultLiveMapPathObject(this, "") } - private suspend fun createMapAsync(entries: MutableMap): LiveMap { - throwIfInvalidWriteApiConfiguration() // RTO26 - - if (entries.keys.any { it.isEmpty() }) { // RTLMV4b - throw invalidInputError("Map keys should not be empty") - } - - // RTLMV4e - Create initial value operation - val initialMapValue = InternalLiveMap.initialValue(entries) - - // RTLMV4f - Create initial value JSON string - val initialValueJSONString = gson.toJson(initialMapValue) - - // RTO14 - Create object ID from initial value - val (objectId, nonce) = getObjectIdStringWithNonce(ObjectType.Map, initialValueJSONString) - - // Create ObjectMessage with the operation - val msg = WireObjectMessage( - operation = WireObjectOperation( - action = WireObjectOperationAction.MapCreate, - objectId = objectId, - mapCreateWithObjectId = WireMapCreateWithObjectId( - nonce = nonce, - initialValue = initialValueJSONString, - derivedFrom = initialMapValue, - ), - ) - ) - - // RTLMV3 - publish and apply locally on ACK - publishAndApply(arrayOf(msg)) - - // RTLMV3 - Return existing object if found after apply - return objectsPool.get(objectId) as? LiveMap - ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTLMV3 - } - - private suspend fun createCounterAsync(initialValue: Number): LiveCounter { - throwIfInvalidWriteApiConfiguration() // RTO26 - - // Validate input parameter - if (initialValue.toDouble().isNaN() || initialValue.toDouble().isInfinite()) { - throw invalidInputError("Counter value should be a valid number") - } - - // RTLCV4b - val initialCounterValue = InternalLiveCounter.initialValue(initialValue) - // RTLCV4c - Create initial value operation - val initialValueJSONString = gson.toJson(initialCounterValue) - - // RTO14 - Create object ID from initial value - val (objectId, nonce) = getObjectIdStringWithNonce(ObjectType.Counter, initialValueJSONString) - - // Create ObjectMessage with the operation - val msg = WireObjectMessage( - operation = WireObjectOperation( - action = WireObjectOperationAction.CounterCreate, - objectId = objectId, - counterCreateWithObjectId = WireCounterCreateWithObjectId( - nonce = nonce, - initialValue = initialValueJSONString, - derivedFrom = initialCounterValue, - ), - ) - ) - - // RTLCV3 - publish and apply locally on ACK - publishAndApply(arrayOf(msg)) - - // RTLCV3 - Return existing object if found after apply - return objectsPool.get(objectId) as? LiveCounter - ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTLCV3 - } + /** + * Runs a mutating [block] on the sequential scope, exposed as a CompletableFuture. + * Used by the path/instance write APIs. + */ + internal fun asyncVoidApi(block: suspend () -> Unit): CompletableFuture = + asyncApi(block).thenApply { null } /** * Spec: RTO14 */ - private suspend fun getObjectIdStringWithNonce(objectType: ObjectType, initialValue: String): Pair { + internal suspend fun getObjectIdStringWithNonce(objectType: ObjectType, initialValue: String): Pair { val nonce = generateNonce() val msTimestamp = ServerTime.getCurrentTime(adapter) // RTO16 - Get server time for nonce generation return Pair(ObjectId.fromInitialValue(objectType, initialValue, nonce, msTimestamp).toString(), nonce) @@ -302,13 +245,11 @@ internal class DefaultRealtimeObject( objectsManager.clearBufferedObjectOperations() // RTO4d - clear unconditionally on ATTACHED - // RTO4a - val fromInitializedState = this@DefaultRealtimeObject.state == ObjectsState.Initialized - if (hasObjects || fromInitializedState) { - // should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value. - // this guarantees we emit both "syncing" -> "synced" events in that order. - objectsManager.startNewSync(null) - } + // RTO4c - always transition to SYNCING, regardless of the HAS_OBJECTS flag (CV-1: + // matches the spec and current ably-js; the previous hasObjects/initialized + // conditional was removed upstream in ably-js commit e280bff1, so a re-attach with + // HAS_OBJECTS=0 from the Synced state now emits SYNCING -> SYNCED as required) + objectsManager.startNewSync(null) // RTO4b if (!hasObjects) { @@ -356,6 +297,7 @@ internal class DefaultRealtimeObject( incomingObjectsHandler.cancel(disposeReason) // objectsEventBus automatically garbage collected when collector is cancelled objectsPool.dispose() objectsManager.dispose() + pathObjectSubscriptionRegister.dispose() // Don't cancel sequentialScope (needed in getRoot method), just cancel ongoing coroutines sequentialScope.coroutineContext.cancelChildren(disposeReason) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt index a2aaaaeb7..54c2ed888 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsManager.kt @@ -9,6 +9,7 @@ import io.ably.lib.liveobjects.value.BaseRealtimeObject import io.ably.lib.liveobjects.value.ObjectUpdate import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter import io.ably.lib.liveobjects.value.livemap.InternalLiveMap +import io.ably.lib.liveobjects.value.livemap.isEntryOrRefTombstoned import io.ably.lib.types.AblyException import io.ably.lib.util.Log import kotlinx.coroutines.CompletableDeferred @@ -180,12 +181,38 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject // RTO5c2 - need to remove realtimeObject instances from the ObjectsPool for which objectIds were not received during the sync sequence realtimeObjects.objectsPool.deleteExtraObjectIds(receivedObjectIds) + // RTO5c10 - rebuild every parentReferences map after the pool has settled, so that + // getFullPaths is correct by the time the RTO5c7 notifications below are dispatched + rebuildAllParentReferences() + // RTO5c7 - call subscription callbacks for all updated existing objects existingObjectUpdates.forEach { (obj, update) -> obj.notifyUpdated(update) } } + /** + * Rebuilds all parent references from the settled pool state. Necessary after a sync because + * objects may reference other objects that were not yet in the pool when their references + * were first applied. Mirrors ably-js realtimeobject.ts#_rebuildAllParentReferences. + * + * @spec RTO5c10 + */ + private fun rebuildAllParentReferences() { + val objects = realtimeObjects.objectsPool.all() + objects.forEach { it.clearParentReferences() } // RTO5c10a + objects.filterIsInstance().forEach { map -> + // RTO5c10b - RTLM11-equivalent iteration over the raw entries: skip entries that are + // tombstoned or reference a tombstoned object (RTLM14), but avoid the full value + // resolution of entries() since only entry.data.objectId is needed here + for ((key, entry) in map.data) { + val refId = entry.data?.objectId ?: continue + if (entry.isEntryOrRefTombstoned(realtimeObjects.objectsPool)) continue + realtimeObjects.objectsPool.get(refId)?.addParentReference(map, key) + } + } + } + /** * Applies object messages to objects. * diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt index afb460d13..df15d0d6f 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/ObjectsPool.kt @@ -82,6 +82,11 @@ internal class ObjectsPool( pool[objectId] = realtimeObject } + /** + * All objects currently in the pool. Used by the RTO5c10 parent-reference rebuild. + */ + internal fun all(): Collection = pool.values + /** * Removes all objects but root from the pool and clears the data for root. * Does not create a new root object, so the reference to the root object remains the same. diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Utils.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Utils.kt index 8cc628a32..accbafca4 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Utils.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/Utils.kt @@ -39,9 +39,6 @@ internal fun objectError(errorMessage: String, cause: Throwable? = null): AblyEx return ablyException(errorMessage, ObjectErrorCode.InvalidObject, ObjectHttpStatusCode.InternalServerError, cause) } -internal fun invalidInputError(errorMessage: String, cause: Throwable? = null): AblyException { - return ablyException(errorMessage, ObjectErrorCode.InvalidInputParams, ObjectHttpStatusCode.InternalServerError, cause) -} /** * Calculates the byte size of a string. diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/DefaultInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/DefaultInstance.kt index 0b710bbca..7af8ac210 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/DefaultInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/DefaultInstance.kt @@ -1,14 +1,26 @@ package io.ably.lib.liveobjects.instance import io.ably.lib.liveobjects.DefaultRealtimeObject +import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.types.BinaryInstance import io.ably.lib.liveobjects.instance.types.BooleanInstance +import io.ably.lib.liveobjects.instance.types.DefaultBinaryInstance +import io.ably.lib.liveobjects.instance.types.DefaultBooleanInstance +import io.ably.lib.liveobjects.instance.types.DefaultJsonArrayInstance +import io.ably.lib.liveobjects.instance.types.DefaultJsonObjectInstance +import io.ably.lib.liveobjects.instance.types.DefaultLiveCounterInstance +import io.ably.lib.liveobjects.instance.types.DefaultLiveMapInstance +import io.ably.lib.liveobjects.instance.types.DefaultNumberInstance +import io.ably.lib.liveobjects.instance.types.DefaultStringInstance import io.ably.lib.liveobjects.instance.types.JsonArrayInstance import io.ably.lib.liveobjects.instance.types.JsonObjectInstance import io.ably.lib.liveobjects.instance.types.LiveCounterInstance import io.ably.lib.liveobjects.instance.types.LiveMapInstance import io.ably.lib.liveobjects.instance.types.NumberInstance import io.ably.lib.liveobjects.instance.types.StringInstance +import io.ably.lib.liveobjects.value.ResolvedValue +import io.ably.lib.liveobjects.value.valueType +import java.util.Base64 /** * Default implementation of [Instance], the identity-addressed node in the LiveObjects graph. @@ -44,3 +56,25 @@ internal abstract class DefaultInstance( override fun asJsonArray(): JsonArrayInstance = throw IllegalStateException("Not a JsonArray instance") } + +/** + * Wraps a resolved value in its typed Instance. Returns null only for a Leaf that matches no + * known category (ValueType.UNKNOWN - no typed wrapper exists for it). Primitive instances bind + * the extracted (decoded) value, not the wire leaf: an Instance is identity/value-addressed and + * O(1), so it must not re-read mutable map state. + * + * Spec: RTPO8c, RTINS5c, RTTS7e (an Instance is always a concrete typed sub-class) + */ +internal fun ResolvedValue.toInstance(channelObject: DefaultRealtimeObject): Instance? = when (this) { + is ResolvedValue.MapRef -> DefaultLiveMapInstance(channelObject, map) + is ResolvedValue.CounterRef -> DefaultLiveCounterInstance(channelObject, counter) + is ResolvedValue.Leaf -> when (valueType()) { + ValueType.STRING -> DefaultStringInstance(channelObject, data.string!!) + ValueType.NUMBER -> DefaultNumberInstance(channelObject, data.number!!) + ValueType.BOOLEAN -> DefaultBooleanInstance(channelObject, data.boolean!!) + ValueType.BINARY -> DefaultBinaryInstance(channelObject, Base64.getDecoder().decode(data.bytes)) + ValueType.JSON_OBJECT -> DefaultJsonObjectInstance(channelObject, data.json!!.asJsonObject) + ValueType.JSON_ARRAY -> DefaultJsonArrayInstance(channelObject, data.json!!.asJsonArray) + else -> null // UNKNOWN leaf - no typed wrapper exists + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/DefaultInstanceSubscriptionEvent.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/DefaultInstanceSubscriptionEvent.kt index 428a3b88f..d1117b6f1 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/DefaultInstanceSubscriptionEvent.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/DefaultInstanceSubscriptionEvent.kt @@ -1,18 +1,23 @@ package io.ably.lib.liveobjects.instance import io.ably.lib.liveobjects.message.ObjectMessage +import io.ably.lib.liveobjects.value.livecounter.LiveCounterChangeEvent +import io.ably.lib.liveobjects.value.livemap.LiveMapChangeEvent /** * Default implementation of [InstanceSubscriptionEvent], the event delivered to an * [InstanceListener] when the wrapped LiveObject is updated. A plain holder for the updated * [Instance] and the source [ObjectMessage] (if any). * + * Implements both internal change-event markers so the map/counter change emitters can carry + * it without casts (the markers exist only to type the two emitters). + * * Spec: RTINS16e */ internal class DefaultInstanceSubscriptionEvent( private val instance: Instance, private val message: ObjectMessage?, -) : InstanceSubscriptionEvent { +) : InstanceSubscriptionEvent, LiveMapChangeEvent, LiveCounterChangeEvent { override fun getObject(): Instance = instance diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultBinaryInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultBinaryInstance.kt index 0abf41285..8265245e6 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultBinaryInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultBinaryInstance.kt @@ -4,28 +4,30 @@ import com.google.gson.JsonPrimitive import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.DefaultInstance +import java.util.Base64 /** - * Default implementation of [BinaryInstance], a read-only primitive view that only adds a - * type-narrowed, non-null [value]; left unimplemented for now. + * Default implementation of [BinaryInstance], a read-only primitive view bound to its + * extracted (base64-decoded) value (RTINS2a). * * Spec: RTTS10c */ internal class DefaultBinaryInstance( channelObject: DefaultRealtimeObject, + internal val value: ByteArray, ) : DefaultInstance(channelObject), BinaryInstance { override fun getType(): ValueType = ValueType.BINARY override fun compactJson(): JsonPrimitive { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a + return JsonPrimitive(Base64.getEncoder().encodeToString(value)) // RTTS7a3; base64 per RTPO14b1 } override fun asBinary(): BinaryInstance = this override fun value(): ByteArray { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS4a + return value.clone() // RTINS4c; defensive copy out } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultBooleanInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultBooleanInstance.kt index ab5eeae4d..7c59a81d0 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultBooleanInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultBooleanInstance.kt @@ -6,26 +6,27 @@ import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.DefaultInstance /** - * Default implementation of [BooleanInstance], a read-only primitive view that only adds a - * type-narrowed, non-null [value]; left unimplemented for now. + * Default implementation of [BooleanInstance], a read-only primitive view bound to its + * extracted value (RTINS2a). * * Spec: RTTS10c */ internal class DefaultBooleanInstance( channelObject: DefaultRealtimeObject, + internal val value: Boolean, ) : DefaultInstance(channelObject), BooleanInstance { override fun getType(): ValueType = ValueType.BOOLEAN override fun compactJson(): JsonPrimitive { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a + return JsonPrimitive(value) // RTTS7a3 } override fun asBoolean(): BooleanInstance = this override fun value(): Boolean { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS4a + return value // RTINS4c; RTTS10c non-null } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultJsonArrayInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultJsonArrayInstance.kt index ecd755a32..e94628b07 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultJsonArrayInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultJsonArrayInstance.kt @@ -6,26 +6,27 @@ import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.DefaultInstance /** - * Default implementation of [JsonArrayInstance], a read-only primitive view that only adds - * a type-narrowed, non-null [value]; left unimplemented for now. + * Default implementation of [JsonArrayInstance], a read-only primitive view bound to its + * extracted value (RTINS2a). * * Spec: RTTS10c */ internal class DefaultJsonArrayInstance( channelObject: DefaultRealtimeObject, + internal val value: JsonArray, ) : DefaultInstance(channelObject), JsonArrayInstance { override fun getType(): ValueType = ValueType.JSON_ARRAY override fun compactJson(): JsonArray { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a + return value // RTTS7a2 } override fun asJsonArray(): JsonArrayInstance = this override fun value(): JsonArray { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS4a + return value // RTINS4c; RTTS10c non-null } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultJsonObjectInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultJsonObjectInstance.kt index 3ce012fd6..8c6155770 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultJsonObjectInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultJsonObjectInstance.kt @@ -6,26 +6,27 @@ import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.DefaultInstance /** - * Default implementation of [JsonObjectInstance], a read-only primitive view that only adds - * a type-narrowed, non-null [value]; left unimplemented for now. + * Default implementation of [JsonObjectInstance], a read-only primitive view bound to its + * extracted value (RTINS2a). * * Spec: RTTS10c */ internal class DefaultJsonObjectInstance( channelObject: DefaultRealtimeObject, + internal val value: JsonObject, ) : DefaultInstance(channelObject), JsonObjectInstance { override fun getType(): ValueType = ValueType.JSON_OBJECT override fun compactJson(): JsonObject { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a + return value // RTTS7a1 } override fun asJsonObject(): JsonObjectInstance = this override fun value(): JsonObject { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS4a + return value // RTINS4c; RTTS10c non-null } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultLiveCounterInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultLiveCounterInstance.kt index 60aae7cc4..1f00ca2d3 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultLiveCounterInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultLiveCounterInstance.kt @@ -6,60 +6,61 @@ import io.ably.lib.liveobjects.Subscription import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.DefaultInstance import io.ably.lib.liveobjects.instance.InstanceListener -import io.ably.lib.liveobjects.onceSubscription +import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter import java.util.concurrent.CompletableFuture /** - * Default implementation of [LiveCounterInstance], adding counter operations and subscribe - * on top of [DefaultInstance]; all left unimplemented for now. + * Default implementation of [LiveCounterInstance], bound to a specific [InternalLiveCounter] + * (RTINS2a). Operations dereference the wrapped counter in O(1) - no path resolution. * * Spec: RTTS10b */ internal class DefaultLiveCounterInstance( channelObject: DefaultRealtimeObject, + internal val counter: InternalLiveCounter, ) : DefaultInstance(channelObject), LiveCounterInstance { override fun getType(): ValueType = ValueType.LIVE_COUNTER override fun compactJson(): JsonPrimitive { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a + return JsonPrimitive(counter.value()) // RTPO13d; RTTS7a3 narrowed to JsonPrimitive } override fun asLiveCounter(): LiveCounterInstance = this - override fun getId(): String = TODO("Not yet implemented") + override fun getId(): String = counter.objectId // RTINS3a; RTTS10b non-null override fun value(): Double { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS4a + return counter.value() // RTINS4b via RTLC5c; RTTS10b non-null } override fun increment(): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidWriteApiConfiguration() // RTINS14b + return channelObject.asyncVoidApi { counter.increment(1) } // RTINS14a1 default, RTINS14c -> RTLC12 } override fun increment(amount: Number): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidWriteApiConfiguration() // RTINS14b + return channelObject.asyncVoidApi { counter.increment(amount) } // RTINS14c -> RTLC12 } override fun decrement(): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidWriteApiConfiguration() // RTINS15b + return channelObject.asyncVoidApi { counter.decrement(1) } // RTINS15a1 default, RTINS15c -> RTLC13 } override fun decrement(amount: Number): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidWriteApiConfiguration() // RTINS15b + return channelObject.asyncVoidApi { counter.decrement(amount) } // RTINS15c -> RTLC13 } override fun subscribe(listener: InstanceListener): Subscription { - channelObject.throwIfInvalidAccessApiConfiguration() - // TODO - subscribe logic goes here - return onceSubscription { - // TODO - remove InstanceListener - } + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS16b + // RTINS16c is satisfied by construction: primitive instances don't declare subscribe (RTTS10c) + // RTINS16d - identity-based: follows the wrapped counter wherever it sits in the graph + // (RTINS16g); pure registration, no side effects (RTINS16h) + return counter.subscribe(listener) } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultLiveMapInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultLiveMapInstance.kt index 22d9c3f10..b48d78197 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultLiveMapInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultLiveMapInstance.kt @@ -7,72 +7,84 @@ import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.DefaultInstance import io.ably.lib.liveobjects.instance.Instance import io.ably.lib.liveobjects.instance.InstanceListener -import io.ably.lib.liveobjects.onceSubscription +import io.ably.lib.liveobjects.instance.toInstance import io.ably.lib.liveobjects.value.LiveMapValue +import io.ably.lib.liveobjects.value.livemap.InternalLiveMap +import java.util.AbstractMap import java.util.concurrent.CompletableFuture /** - * Default implementation of [LiveMapInstance], adding map reads, writes and subscribe on top - * of [DefaultInstance]; all left unimplemented for now. + * Default implementation of [LiveMapInstance], bound to a specific [InternalLiveMap] + * (RTINS2a). Operations dereference the wrapped map in O(1) - no path resolution. + * + * No type-guard branches are needed: the wrapped value is statically an InternalLiveMap + * (RTTS9d - a matching cast returns a view that always matches), so the RTINS wrong-type + * clauses are unreachable by construction. * * Spec: RTTS10a */ internal class DefaultLiveMapInstance( channelObject: DefaultRealtimeObject, + internal val map: InternalLiveMap, ) : DefaultInstance(channelObject), LiveMapInstance { override fun getType(): ValueType = ValueType.LIVE_MAP override fun compactJson(): JsonObject { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a + return map.compactJson() // RTINS11b -> RTPO13c/RTPO14b; RTTS7a1 narrowed to JsonObject } override fun asLiveMap(): LiveMapInstance = this - override fun getId(): String = TODO("Not yet implemented") + override fun getId(): String = map.objectId // RTINS3a; RTTS10a non-null @Suppress("RedundantNullableReturnType") override fun get(key: String): Instance? { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS5b + return map.get(key)?.toInstance(channelObject) // RTINS5c - null result stays null } override fun entries(): Iterable> { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS6a + // RTINS6b. Deviation from RTLM11d3a (documented in the plan): entries whose value resolves + // to null (dangling reference) or to an UNKNOWN leaf are skipped - a typed Iterable cannot + // carry a null Instance. keys() intentionally does not skip them (RTLM12 delegation). + return map.entries().mapNotNull { (key, resolved) -> + resolved?.toInstance(channelObject)?.let { AbstractMap.SimpleImmutableEntry(key, it) } + } } override fun keys(): Iterable { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS7a + return map.keys().toList() // RTINS7b - via RTLM12 } override fun values(): Iterable { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS8a + return entries().map { it.value } // RTINS8b } override fun size(): Long { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS9a + return map.size() // RTINS9b - via RTLM10d; RTTS10a non-null } override fun set(key: String, value: LiveMapValue): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidWriteApiConfiguration() // RTINS12b + return channelObject.asyncVoidApi { map.set(key, value) } // RTINS12c -> RTLM20 } override fun remove(key: String): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidWriteApiConfiguration() // RTINS13b + return channelObject.asyncVoidApi { map.remove(key) } // RTINS13c -> RTLM21 } override fun subscribe(listener: InstanceListener): Subscription { - channelObject.throwIfInvalidAccessApiConfiguration() - // TODO - subscribe logic goes here - return onceSubscription { - // TODO - remove InstanceListener - } + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS16b + // RTINS16c is satisfied by construction: primitive instances don't declare subscribe (RTTS10c) + // RTINS16d - identity-based: follows the wrapped map wherever it sits in the graph + // (RTINS16g); pure registration, no side effects (RTINS16h) + return map.subscribe(listener) } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultNumberInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultNumberInstance.kt index 27910de7b..8da55d26e 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultNumberInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultNumberInstance.kt @@ -6,26 +6,27 @@ import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.DefaultInstance /** - * Default implementation of [NumberInstance], a read-only primitive view that only adds a - * type-narrowed, non-null [value]; left unimplemented for now. + * Default implementation of [NumberInstance], a read-only primitive view bound to its + * extracted value (RTINS2a). * * Spec: RTTS10c */ internal class DefaultNumberInstance( channelObject: DefaultRealtimeObject, + internal val value: Number, ) : DefaultInstance(channelObject), NumberInstance { override fun getType(): ValueType = ValueType.NUMBER override fun compactJson(): JsonPrimitive { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a + return JsonPrimitive(value) // RTTS7a3 } override fun asNumber(): NumberInstance = this override fun value(): Number { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS4a + return value // RTINS4c; RTTS10c non-null } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultStringInstance.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultStringInstance.kt index 998608e9b..19efc683f 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultStringInstance.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/instance/types/DefaultStringInstance.kt @@ -6,26 +6,27 @@ import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.DefaultInstance /** - * Default implementation of [StringInstance], a read-only primitive view that only adds a - * type-narrowed, non-null [value]; left unimplemented for now. + * Default implementation of [StringInstance], a read-only primitive view bound to its + * extracted value (RTINS2a). * * Spec: RTTS10c */ internal class DefaultStringInstance( channelObject: DefaultRealtimeObject, + internal val value: String, ) : DefaultInstance(channelObject), StringInstance { override fun getType(): ValueType = ValueType.STRING override fun compactJson(): JsonPrimitive { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS11a + return JsonPrimitive(value) // RTTS7a3 } override fun asString(): StringInstance = this override fun value(): String { - channelObject.throwIfInvalidAccessApiConfiguration() - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTINS4a + return value // RTINS4c; RTTS10c non-null } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/DefaultPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/DefaultPathObject.kt index 4cc3a38c8..40f1b27ff 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/DefaultPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/DefaultPathObject.kt @@ -1,11 +1,13 @@ package io.ably.lib.liveobjects.path import com.google.gson.JsonElement +import com.google.gson.JsonPrimitive import io.ably.lib.liveobjects.DefaultRealtimeObject +import io.ably.lib.liveobjects.ROOT_OBJECT_ID import io.ably.lib.liveobjects.Subscription import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.instance.Instance -import io.ably.lib.liveobjects.onceSubscription +import io.ably.lib.liveobjects.instance.toInstance import io.ably.lib.liveobjects.path.types.BinaryPathObject import io.ably.lib.liveobjects.path.types.BooleanPathObject import io.ably.lib.liveobjects.path.types.DefaultBinaryPathObject @@ -23,15 +25,17 @@ import io.ably.lib.liveobjects.path.types.LiveMapPathObject import io.ably.lib.liveobjects.path.types.NumberPathObject import io.ably.lib.liveobjects.path.types.StringPathObject import io.ably.lib.liveobjects.value.ResolvedValue +import io.ably.lib.liveobjects.value.livemap.InternalLiveMap +import io.ably.lib.liveobjects.value.livemap.toCompactJsonElement import io.ably.lib.liveobjects.value.valueType /** * Default implementation of [PathObject], the untyped node in the path-addressed view of * the LiveObjects graph. * - * This is a skeleton. The `as*` casts return a typed view of the same position; the - * operations that require resolving the path against the live objects graph are left - * unimplemented for now and will be filled in as the path-based API is built out. + * The `as*` casts return a typed view of the same position without resolving it; operations + * that need a value re-resolve the stored path against the live objects graph on every call + * via [resolveValueAtPath], so a PathObject never holds a stale reference. * * Spec: RTPO1, RTPO2, RTTS3 */ @@ -48,20 +52,24 @@ internal open class DefaultPathObject( } override fun instance(): Instance? { - channelObject.throwIfInvalidAccessApiConfiguration() - val resolvedValue = resolveValueAtPath(path) ?: return null // unresolved path -> no instance + channelObject.throwIfInvalidAccessApiConfiguration() // RTPO8a + val resolvedValue = resolveValueAtPath(path) ?: return null // RTPO8e - unresolved path -> no instance return when (resolvedValue) { - is ResolvedValue.Leaf -> null // primitives have no Instance; only live objects do - // TODO - wrap the resolved live object (LiveMap/LiveCounter) in an Instance - is ResolvedValue.MapRef, is ResolvedValue.CounterRef -> TODO("Not yet implemented") + is ResolvedValue.Leaf -> null // RTPO8d - primitives have no Instance here; only live objects do + // RTPO8c - wrap the resolved live object in its typed Instance (RTTS6e: primitive + // *PathObject sub-types inherit this and resolve to leaves, so they return null) + is ResolvedValue.MapRef, is ResolvedValue.CounterRef -> resolvedValue.toInstance(channelObject) } } override fun compactJson(): JsonElement? { - channelObject.throwIfInvalidAccessApiConfiguration() - resolveValueAtPath(path) ?: return null // unresolved path -> null - // TODO - build the compacted JSON snapshot (LiveMap -> JsonObject, LiveCounter -> number, leaf -> JSON value) - TODO("Not yet implemented") + channelObject.throwIfInvalidAccessApiConfiguration() // RTPO14a / RTO25 + return when (val resolved = resolveValueAtPath(path)) { + null -> null // RTPO3c1 - unresolved path + is ResolvedValue.MapRef -> resolved.map.compactJson() // RTPO13c + is ResolvedValue.CounterRef -> JsonPrimitive(resolved.counter.value()) // RTPO13d + is ResolvedValue.Leaf -> resolved.data.toCompactJsonElement() // RTPO13e, RTPO14b1 + } } override fun exists(): Boolean { @@ -88,15 +96,29 @@ internal open class DefaultPathObject( override fun subscribe(listener: PathObjectListener): Subscription = subscribe(listener, null) override fun subscribe(listener: PathObjectListener, options: PathObjectSubscriptionOptions?): Subscription { - channelObject.throwIfInvalidAccessApiConfiguration() - // TODO - subscribe logic goes here - return onceSubscription { - // TODO - remove PathObjectListener from list - } + channelObject.throwIfInvalidAccessApiConfiguration() // RTPO19b + // depth validation happens in PathObjectSubscriptionOptions(int) - RTPO19c1a + val segments = PathSegments.parseStored(path) // empty stored path = root = zero segments + // RTPO19f - pure registration, no side effects on channel state (RTPO19g) + return channelObject.pathObjectSubscriptionRegister.subscribe(segments, listener, options?.depth) } + /** + * RTPO3 path resolution against the local objects graph, evaluated freshly at call time. + * Returns null on resolution failure; read callers degrade per RTPO3c1, write callers + * throw 92005 per RTPO3c2. + */ protected fun resolveValueAtPath(path: String): ResolvedValue? { - // TODO - resolve the path against the live objects graph and return the value at that position - TODO("Not yet implemented") + // root is always present and always an InternalLiveMap (RTO3b); the pool never replaces + // the root instance (RTO4b2, RTO5c2a), so looking it up per call is equivalent to + // holding the RTPO2b root reference + val root = channelObject.objectsPool.get(ROOT_OBJECT_ID) as InternalLiveMap + var current: ResolvedValue = ResolvedValue.MapRef(root) + // parseStored: an empty stored path is the root itself - zero segments (RTPO3b) + for (segment in PathSegments.parseStored(path)) { + val map = (current as? ResolvedValue.MapRef)?.map ?: return null // RTPO3a1 - non-map mid-path + current = map.get(segment) ?: return null // RTPO3a2 - via RTLM5 + } + return current // RTPO3a3 } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/PathObjectSubscriptionRegister.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/PathObjectSubscriptionRegister.kt new file mode 100644 index 000000000..22d9030e9 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/PathObjectSubscriptionRegister.kt @@ -0,0 +1,81 @@ +package io.ably.lib.liveobjects.path + +import io.ably.lib.liveobjects.DefaultRealtimeObject +import io.ably.lib.liveobjects.Subscription +import io.ably.lib.liveobjects.message.ObjectMessage +import io.ably.lib.liveobjects.onceSubscription +import io.ably.lib.util.Log +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +/** + * Registry for PathObject subscriptions and path-event dispatch. One per RealtimeObject. + * Mirrors ably-js pathobjectsubscriptionregister.ts. + * + * Subscriptions may be added/removed from any thread (RTPO19 is non-blocking); dispatch always + * happens on the sequential scope via LiveObject notifications. + * + * Spec: RTO24, RTO24a + */ +internal class PathObjectSubscriptionRegister(private val channelObject: DefaultRealtimeObject) { + + private val tag = "PathObjectSubscriptionRegister" + + private class SubscriptionEntry( + val listener: PathObjectListener, + val depth: Int?, // RTPO19c1 - null = infinite depth + val segments: List, // copied at subscribe time + ) + + private val subscriptions = ConcurrentHashMap() + private val nextId = AtomicLong() + + /** Registers a subscription for [segments]. Spec: RTPO19f */ + internal fun subscribe(segments: List, listener: PathObjectListener, depth: Int?): Subscription { + val id = nextId.getAndIncrement() + subscriptions[id] = SubscriptionEntry(listener, depth, segments.toList()) + return onceSubscription { subscriptions.remove(id) } + } + + /** + * Dispatches one path event: each subscription covering any candidate path is notified at + * most once, at the first (most-preferred) covered candidate. Listener errors are caught and + * logged without affecting other subscriptions (RTO24b2c). + * + * Spec: RTO24b2b + */ + internal fun notifyPathEvent(candidatePaths: List>, message: ObjectMessage?) { + for (entry in subscriptions.values) { + val chosen = candidatePaths.firstOrNull { covers(entry, it) } ?: continue // RTO24b2b + try { + entry.listener.onUpdated( + DefaultPathObjectSubscriptionEvent( + DefaultPathObject(channelObject, PathSegments.join(chosen)), // RTO24b2b1 / RTPO19e1 + message, // RTO24b2b2 / RTPO19e2 + ) + ) + } catch (t: Throwable) { + Log.e(tag, "Error in PathObject subscription listener; path=$chosen", t) // RTO24b2c + } + } + } + + /** Drops all subscriptions; called when the owning RealtimeObject is disposed. */ + internal fun dispose() = subscriptions.clear() + + /** + * A subscription covers [eventPath] iff its path is a prefix of it (exact match included) + * and the relative depth is within the subscription's depth window. + * + * Spec: RTO24c1 (worked examples RTO24c2) + */ + private fun covers(entry: SubscriptionEntry, eventPath: List): Boolean { + val subPath = entry.segments + if (subPath.size > eventPath.size) return false + for (i in subPath.indices) { + if (eventPath[i] != subPath[i]) return false + } + val depth = entry.depth ?: return true // null = infinite depth + return eventPath.size - subPath.size + 1 <= depth + } +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/PathSegments.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/PathSegments.kt new file mode 100644 index 000000000..37169a447 --- /dev/null +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/PathSegments.kt @@ -0,0 +1,76 @@ +package io.ably.lib.liveobjects.path + +/** + * Dot-delimited path <-> segment list conversions. A dot inside a segment is escaped as `\.` + * (RTPO4b); parsing honours the escape (RTPO6b). Mirrors ably-js pathobject.ts#at / #_escapePath. + * + * Root convention: the root PathObject stores the empty string, which represents ZERO segments + * (RTPO4c) - unlike ably-js, which stores segment arrays and never parses a stored path. Every + * helper below therefore treats an empty *stored base path* as zero segments via [parseStored]; + * [parse] itself is only ever given user-supplied sub-paths (where "" means one empty segment, + * matching ably-js `at("")`) or non-empty stored paths. + */ +internal object PathSegments { + + /** + * RTPO6b - split on unescaped dots; `\.` yields a literal dot; any other `\x` keeps the + * backslash; a trailing lone `\` is kept. `""` parses to one empty segment (ably-js parity). + * Manual scanner (no regex lookbehind), ported from ably-js pathobject.ts#at. + */ + internal fun parse(path: String): List = parse(path, strict = false) + + private fun parse(path: String, strict: Boolean): List { + val segments = mutableListOf() + val currentSegment = StringBuilder() + var escaping = false + for (char in path) { + if (escaping) { + // user-supplied paths keep the escape character unless it escapes a dot, replicating + // ably-js behaviour where only escaped dots are unescaped; stored paths were produced + // by [join], which escapes both '.' and '\', so strict mode unescapes both + if (char != '.' && !(strict && char == '\\')) currentSegment.append('\\') + currentSegment.append(char) + escaping = false + continue + } + when (char) { + '\\' -> escaping = true + '.' -> { + segments.add(currentSegment.toString()) + currentSegment.setLength(0) + } + else -> currentSegment.append(char) + } + } + if (escaping) { + currentSegment.append('\\') + } + segments.add(currentSegment.toString()) + return segments + } + + /** + * RTPO4a/RTPO4b - join segments, escaping dots inside segments. Empty list -> "" (RTPO4c). + * Backslashes are escaped too (deviation from ably-js `_escapePath`, which escapes only dots): + * ably-js stores segment ARRAYS and its escaped string is display-only, but here the joined + * string IS the storage and gets re-parsed by [parseStored] on every resolution. Without + * doubling backslashes, a key ending in `\` collides with the escaped-dot separator + * (`["a\", "b"]` -> `a\.b` -> re-parses as `["a.b"]`), breaking lookups and subscriptions. + */ + internal fun join(segments: List): String = + segments.joinToString(".") { it.replace("\\", "\\\\").replace(".", "\\.") } + + /** + * Stored-path parsing: empty stored path = root = zero segments. Use for `this.path`, never + * raw [parse]. Stored paths only ever come from [join], so this inverts join's full escaping + * (`\\` -> `\` as well as `\.` -> `.`). + */ + internal fun parseStored(path: String): List = + if (path.isEmpty()) emptyList() else parse(path, strict = true) + + /** RTPO5c - append one raw key (escaping it) to an existing stored path. */ + internal fun appendKey(path: String, key: String): String = join(parseStored(path) + key) + + /** RTPO6c - append a dot-delimited sub-path to an existing stored path. */ + internal fun appendPath(path: String, subPath: String): String = join(parseStored(path) + parse(subPath)) +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultBinaryPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultBinaryPathObject.kt index 35ac94c5d..4a22649c3 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultBinaryPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultBinaryPathObject.kt @@ -3,11 +3,13 @@ package io.ably.lib.liveobjects.path.types import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.path.DefaultPathObject +import io.ably.lib.liveobjects.value.ResolvedValue import io.ably.lib.liveobjects.value.valueType +import java.util.Base64 /** * Default implementation of [BinaryPathObject], a terminal primitive view that only adds a - * type-narrowed [value]; left unimplemented for now. + * type-narrowed [value]. * * Spec: RTTS6c */ @@ -18,8 +20,8 @@ internal class DefaultBinaryPathObject( override fun value(): ByteArray? { channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path)?.valueType() != ValueType.BINARY) return null // not a Binary value at this path -> no value - // TODO - extract the primitive value from the resolved leaf, narrowed to ByteArray (base64-decoded) - TODO("Not yet implemented") + val resolved = resolveValueAtPath(path) ?: return null + if (resolved.valueType() != ValueType.BINARY) return null // RTTS6c - exact type only + return Base64.getDecoder().decode((resolved as ResolvedValue.Leaf).data.bytes) // wire form is base64 (OD2d) } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultBooleanPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultBooleanPathObject.kt index e8554f780..d261d5a62 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultBooleanPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultBooleanPathObject.kt @@ -3,11 +3,12 @@ package io.ably.lib.liveobjects.path.types import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.path.DefaultPathObject +import io.ably.lib.liveobjects.value.ResolvedValue import io.ably.lib.liveobjects.value.valueType /** * Default implementation of [BooleanPathObject], a terminal primitive view that only adds a - * type-narrowed [value]; left unimplemented for now. + * type-narrowed [value]. * * Spec: RTTS6c */ @@ -18,8 +19,8 @@ internal class DefaultBooleanPathObject( override fun value(): Boolean? { channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path)?.valueType() != ValueType.BOOLEAN) return null // not a Boolean at this path -> no value - // TODO - extract the primitive value from the resolved leaf, narrowed to Boolean - TODO("Not yet implemented") + val resolved = resolveValueAtPath(path) ?: return null + if (resolved.valueType() != ValueType.BOOLEAN) return null // RTTS6c - exact type only + return (resolved as ResolvedValue.Leaf).data.boolean } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultJsonArrayPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultJsonArrayPathObject.kt index fa40c460f..b2614de04 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultJsonArrayPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultJsonArrayPathObject.kt @@ -4,11 +4,12 @@ import com.google.gson.JsonArray import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.path.DefaultPathObject +import io.ably.lib.liveobjects.value.ResolvedValue import io.ably.lib.liveobjects.value.valueType /** * Default implementation of [JsonArrayPathObject], a terminal primitive view that only adds - * a type-narrowed [value]; left unimplemented for now. + * a type-narrowed [value]. * * Spec: RTTS6c */ @@ -19,8 +20,8 @@ internal class DefaultJsonArrayPathObject( override fun value(): JsonArray? { channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path)?.valueType() != ValueType.JSON_ARRAY) return null // not a JSON array at this path -> no value - // TODO - extract the primitive value from the resolved leaf, narrowed to JsonArray - TODO("Not yet implemented") + val resolved = resolveValueAtPath(path) ?: return null + if (resolved.valueType() != ValueType.JSON_ARRAY) return null // RTTS6c - exact type only + return (resolved as ResolvedValue.Leaf).data.json!!.asJsonArray } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultJsonObjectPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultJsonObjectPathObject.kt index e9362fcfe..db26d06ae 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultJsonObjectPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultJsonObjectPathObject.kt @@ -4,11 +4,12 @@ import com.google.gson.JsonObject import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.path.DefaultPathObject +import io.ably.lib.liveobjects.value.ResolvedValue import io.ably.lib.liveobjects.value.valueType /** * Default implementation of [JsonObjectPathObject], a terminal primitive view that only adds - * a type-narrowed [value]; left unimplemented for now. + * a type-narrowed [value]. * * Spec: RTTS6c */ @@ -19,8 +20,8 @@ internal class DefaultJsonObjectPathObject( override fun value(): JsonObject? { channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path)?.valueType() != ValueType.JSON_OBJECT) return null // not a JSON object at this path -> no value - // TODO - extract the primitive value from the resolved leaf, narrowed to JsonObject - TODO("Not yet implemented") + val resolved = resolveValueAtPath(path) ?: return null + if (resolved.valueType() != ValueType.JSON_OBJECT) return null // RTTS6c - exact type only + return (resolved as ResolvedValue.Leaf).data.json!!.asJsonObject } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultLiveCounterPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultLiveCounterPathObject.kt index 6e4e320ca..b71173896 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultLiveCounterPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultLiveCounterPathObject.kt @@ -11,7 +11,7 @@ import java.util.concurrent.CompletableFuture * Default implementation of [LiveCounterPathObject]. * * Counters are terminal nodes (no navigation), so this only adds the counter read/write - * operations on top of [DefaultPathObject]; they are left unimplemented for now. + * operations on top of [DefaultPathObject]. * * Spec: RTTS6b */ @@ -20,50 +20,46 @@ internal class DefaultLiveCounterPathObject( path: String, ) : DefaultPathObject(channelObject, path), LiveCounterPathObject { - override fun value(): Double? { + override fun value(): Double? { // RTTS6b channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path) !is ResolvedValue.CounterRef) return null // not a LiveCounter (or unresolved) -> null - // TODO - return the resolved counter's value - TODO("Not yet implemented") + val counter = (resolveValueAtPath(path) as? ResolvedValue.CounterRef)?.counter + ?: return null // not a LiveCounter (or unresolved) -> null + return counter.value() // RTPO7c via RTLC5c } override fun increment(): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) + channelObject.throwIfInvalidWriteApiConfiguration() // RTPO17b / RTO26 + val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) // RTPO17c / RTPO3c2 if (resolvedValue !is ResolvedValue.CounterRef) { - throw typeMismatchError("Cannot increment a non-LiveCounter object at path: \"$path\"") + throw typeMismatchError("Cannot increment a non-LiveCounter object at path: \"$path\"") // RTPO17e } - // TODO - delegate the COUNTER_INC (amount 1) to the resolved LiveCounter - TODO("Not yet implemented") + return channelObject.asyncVoidApi { resolvedValue.counter.increment(1) } // RTPO17a1 default, RTPO17d -> RTLC12 } override fun increment(amount: Number): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) + channelObject.throwIfInvalidWriteApiConfiguration() // RTPO17b / RTO26 + val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) // RTPO17c / RTPO3c2 if (resolvedValue !is ResolvedValue.CounterRef) { - throw typeMismatchError("Cannot increment a non-LiveCounter object at path: \"$path\"") + throw typeMismatchError("Cannot increment a non-LiveCounter object at path: \"$path\"") // RTPO17e } - // TODO - delegate the COUNTER_INC to the resolved LiveCounter - TODO("Not yet implemented") + return channelObject.asyncVoidApi { resolvedValue.counter.increment(amount) } // RTPO17d -> RTLC12 } override fun decrement(): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) + channelObject.throwIfInvalidWriteApiConfiguration() // RTPO18b / RTO26 + val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) // RTPO18c / RTPO3c2 if (resolvedValue !is ResolvedValue.CounterRef) { - throw typeMismatchError("Cannot decrement a non-LiveCounter object at path: \"$path\"") + throw typeMismatchError("Cannot decrement a non-LiveCounter object at path: \"$path\"") // RTPO18e } - // TODO - delegate the COUNTER_INC (negated amount 1) to the resolved LiveCounter - TODO("Not yet implemented") + return channelObject.asyncVoidApi { resolvedValue.counter.decrement(1) } // RTPO18a1 default, RTPO18d -> RTLC13 } override fun decrement(amount: Number): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) + channelObject.throwIfInvalidWriteApiConfiguration() // RTPO18b / RTO26 + val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) // RTPO18c / RTPO3c2 if (resolvedValue !is ResolvedValue.CounterRef) { - throw typeMismatchError("Cannot decrement a non-LiveCounter object at path: \"$path\"") + throw typeMismatchError("Cannot decrement a non-LiveCounter object at path: \"$path\"") // RTPO18e } - // TODO - delegate the COUNTER_INC (negated amount) to the resolved LiveCounter - TODO("Not yet implemented") + return channelObject.asyncVoidApi { resolvedValue.counter.decrement(amount) } // RTPO18d -> RTLC13 } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultLiveMapPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultLiveMapPathObject.kt index 8c6a561d5..a4809e67e 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultLiveMapPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultLiveMapPathObject.kt @@ -3,15 +3,17 @@ package io.ably.lib.liveobjects.path.types import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.path.DefaultPathObject import io.ably.lib.liveobjects.path.PathObject +import io.ably.lib.liveobjects.path.PathSegments import io.ably.lib.liveobjects.pathNotResolvedError import io.ably.lib.liveobjects.typeMismatchError import io.ably.lib.liveobjects.value.LiveMapValue import io.ably.lib.liveobjects.value.ResolvedValue +import java.util.AbstractMap import java.util.concurrent.CompletableFuture /** * Default implementation of [LiveMapPathObject], adding map navigation and read/write - * operations on top of [DefaultPathObject]; all left unimplemented for now. + * operations on top of [DefaultPathObject]. * * Spec: RTTS6a */ @@ -20,55 +22,60 @@ internal class DefaultLiveMapPathObject( path: String, ) : DefaultPathObject(channelObject, path), LiveMapPathObject { - override fun get(key: String): PathObject = TODO("Not yet implemented") + // RTPO5c, RTPO5d - purely navigational, no resolution; returns the untyped base node (RTTS3h) + override fun get(key: String): PathObject = + DefaultPathObject(channelObject, PathSegments.appendKey(path, key)) - override fun at(path: String): PathObject = TODO("Not yet implemented") + // RTPO6b, RTPO6c, RTPO6d - purely navigational, dot-delimited with backslash-escaped dots + override fun at(path: String): PathObject = + DefaultPathObject(channelObject, PathSegments.appendPath(this.path, path)) - override fun entries(): Iterable> { - channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path) !is ResolvedValue.MapRef) return emptyList() // not a LiveMap (or unresolved) -> empty - // TODO - iterate the resolved map's entries, yielding (key, child PathObject) - TODO("Not yet implemented") + override fun entries(): Iterable> { // RTPO9 + channelObject.throwIfInvalidAccessApiConfiguration() // RTPO9a + val map = (resolveValueAtPath(path) as? ResolvedValue.MapRef)?.map + ?: return emptyList() // RTPO9d - not a LiveMap (or unresolved) -> empty + // RTPO9c - derive from the map's keys at call time; child paths as if by get() + return map.keys().map { key -> + AbstractMap.SimpleImmutableEntry(key, get(key)) + } } - override fun keys(): Iterable { - channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path) !is ResolvedValue.MapRef) return emptyList() // not a LiveMap (or unresolved) -> empty - // TODO - return the resolved map's keys - TODO("Not yet implemented") + override fun keys(): Iterable { // RTPO10 + channelObject.throwIfInvalidAccessApiConfiguration() // RTPO10a + val map = (resolveValueAtPath(path) as? ResolvedValue.MapRef)?.map + ?: return emptyList() // RTPO10d - not a LiveMap (or unresolved) -> empty + return map.keys().toList() // RTPO10c - via RTLM12 } - override fun values(): Iterable { - channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path) !is ResolvedValue.MapRef) return emptyList() // not a LiveMap (or unresolved) -> empty - // TODO - return a child PathObject for each entry of the resolved map - TODO("Not yet implemented") + override fun values(): Iterable { // RTPO11 + channelObject.throwIfInvalidAccessApiConfiguration() // RTPO11a + val map = (resolveValueAtPath(path) as? ResolvedValue.MapRef)?.map + ?: return emptyList() // RTPO11d - not a LiveMap (or unresolved) -> empty + return map.keys().map { key -> get(key) } // RTPO11c - child paths as if by get() } - override fun size(): Long? { - channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path) !is ResolvedValue.MapRef) return null // not a LiveMap (or unresolved) -> null - // TODO - return the resolved map's size - TODO("Not yet implemented") + override fun size(): Long? { // RTPO12 + channelObject.throwIfInvalidAccessApiConfiguration() // RTPO12a + val map = (resolveValueAtPath(path) as? ResolvedValue.MapRef)?.map + ?: return null // RTPO12d - not a LiveMap (or unresolved) -> null + return map.size() // RTPO12c - via RTLM10d } override fun set(key: String, value: LiveMapValue): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) + channelObject.throwIfInvalidWriteApiConfiguration() // RTPO15b / RTO26 + val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) // RTPO15c / RTPO3c2 if (resolvedValue !is ResolvedValue.MapRef) { - throw typeMismatchError("Cannot set a key on a non-LiveMap object at path: \"$path\"") + throw typeMismatchError("Cannot set a key on a non-LiveMap object at path: \"$path\"") // RTPO15e } - // TODO - delegate the MAP_SET to the resolved LiveMap - TODO("Not yet implemented") + return channelObject.asyncVoidApi { resolvedValue.map.set(key, value) } // RTPO15d -> RTLM20 } override fun remove(key: String): CompletableFuture { - channelObject.throwIfInvalidWriteApiConfiguration() - val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) + channelObject.throwIfInvalidWriteApiConfiguration() // RTPO16b / RTO26 + val resolvedValue = resolveValueAtPath(path) ?: throw pathNotResolvedError(path) // RTPO16c / RTPO3c2 if (resolvedValue !is ResolvedValue.MapRef) { - throw typeMismatchError("Cannot remove a key from a non-LiveMap object at path: \"$path\"") + throw typeMismatchError("Cannot remove a key from a non-LiveMap object at path: \"$path\"") // RTPO16e } - // TODO - delegate the MAP_REMOVE to the resolved LiveMap - TODO("Not yet implemented") + return channelObject.asyncVoidApi { resolvedValue.map.remove(key) } // RTPO16d -> RTLM21 } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultNumberPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultNumberPathObject.kt index bc64dd28c..f4ad5b9f8 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultNumberPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultNumberPathObject.kt @@ -3,11 +3,12 @@ package io.ably.lib.liveobjects.path.types import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.path.DefaultPathObject +import io.ably.lib.liveobjects.value.ResolvedValue import io.ably.lib.liveobjects.value.valueType /** * Default implementation of [NumberPathObject], a terminal primitive view that only adds a - * type-narrowed [value]; left unimplemented for now. + * type-narrowed [value]. * * Spec: RTTS6c */ @@ -18,8 +19,8 @@ internal class DefaultNumberPathObject( override fun value(): Number? { channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path)?.valueType() != ValueType.NUMBER) return null // not a Number at this path -> no value - // TODO - extract the primitive value from the resolved leaf, narrowed to Number - TODO("Not yet implemented") + val resolved = resolveValueAtPath(path) ?: return null + if (resolved.valueType() != ValueType.NUMBER) return null // RTTS6c - exact type only + return (resolved as ResolvedValue.Leaf).data.number } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultStringPathObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultStringPathObject.kt index 4275c84c4..9988af033 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultStringPathObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/path/types/DefaultStringPathObject.kt @@ -3,11 +3,12 @@ package io.ably.lib.liveobjects.path.types import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.path.DefaultPathObject +import io.ably.lib.liveobjects.value.ResolvedValue import io.ably.lib.liveobjects.value.valueType /** * Default implementation of [StringPathObject], a terminal primitive view that only adds a - * type-narrowed [value]; left unimplemented for now. + * type-narrowed [value]. * * Spec: RTTS6c */ @@ -18,8 +19,8 @@ internal class DefaultStringPathObject( override fun value(): String? { channelObject.throwIfInvalidAccessApiConfiguration() - if (resolveValueAtPath(path)?.valueType() != ValueType.STRING) return null // not a String at this path -> no value - // TODO - extract the primitive value from the resolved leaf, narrowed to String - TODO("Not yet implemented") + val resolved = resolveValueAtPath(path) ?: return null + if (resolved.valueType() != ValueType.STRING) return null // RTTS6c - exact type only + return (resolved as ResolvedValue.Leaf).data.string } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt index bb1847308..8751e1311 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/BaseRealtimeLiveObject.kt @@ -1,26 +1,58 @@ package io.ably.lib.liveobjects.value +import io.ably.lib.liveobjects.DefaultRealtimeObject import io.ably.lib.liveobjects.ObjectsOperationSource +import io.ably.lib.liveobjects.ObjectsPool +import io.ably.lib.liveobjects.ROOT_OBJECT_ID +import io.ably.lib.liveobjects.message.ObjectMessage import io.ably.lib.liveobjects.message.WireObjectMessage import io.ably.lib.liveobjects.message.WireObjectOperation import io.ably.lib.liveobjects.message.WireObjectState +import io.ably.lib.liveobjects.message.toPublicMessage import io.ably.lib.liveobjects.objectError -import io.ably.lib.liveobjects.value.livecounter.noOpCounterUpdate -import io.ably.lib.liveobjects.value.livemap.noOpMapUpdate +import io.ably.lib.liveobjects.value.livemap.InternalLiveMap import io.ably.lib.util.Clock import io.ably.lib.util.Log -import io.ably.lib.util.SystemClock internal enum class ObjectType(val value: String) { Map("map"), Counter("counter") } -// Spec: RTLO4b4b -// TODO - Check what to do about `ObjectUpdate` field, whether we really need to keep it or not -internal data class ObjectUpdate(val data: Any?) +/** Change type for a LiveMap key diff. Spec: RTLM18b */ +internal enum class MapChange { Updated, Removed } -internal val ObjectUpdate.noOp get() = this.data == null +/** + * An update emitted for a LiveObject data change. Spec: RTLO4b4 (LiveObjectUpdate) + * - [objectMessage] - the source wire message that caused the update, if any (RTLO4b4d) + * - [tombstone] - true when the update results from tombstoning this object (RTLO4b4e) + */ +internal sealed class ObjectUpdate { + abstract val objectMessage: WireObjectMessage? + abstract val tombstone: Boolean + + /** RTLO4b4b - no-op update; nothing changed, nothing is emitted. */ + object NoOp : ObjectUpdate() { + override val objectMessage: WireObjectMessage? get() = null + override val tombstone: Boolean get() = false + } + + /** RTLM18 - LiveMapUpdate: per-key diff. Spec: RTLM18a, RTLM18b */ + internal data class MapUpdate( + val update: kotlin.collections.Map, // RTLM18b + override val objectMessage: WireObjectMessage? = null, + override val tombstone: Boolean = false, + ) : ObjectUpdate() + + /** RTLC11 - LiveCounterUpdate: amount delta. Spec: RTLC11a, RTLC11b */ + internal data class CounterUpdate( + val amount: Double, // RTLC11b1 + override val objectMessage: WireObjectMessage? = null, + override val tombstone: Boolean = false, + ) : ObjectUpdate() +} + +internal val ObjectUpdate.noOp get() = this is ObjectUpdate.NoOp /** * Provides common functionality and base implementation for LiveMap and LiveCounter. @@ -30,13 +62,17 @@ internal val ObjectUpdate.noOp get() = this.data == null * This should also be included in logging */ internal abstract class BaseRealtimeObject( - internal val objectId: String, // // RTLO3a + internal val objectId: String, // RTLO3a internal val objectType: ObjectType, - internal val clock: Clock = SystemClock.INSTANCE, + internal val realtimeObject: DefaultRealtimeObject, ) { protected open val tag = "BaseRealtimeObject" + internal val clock: Clock get() = realtimeObject.clock + + private val objectsPool: ObjectsPool get() = realtimeObject.objectsPool + internal val siteTimeserials = mutableMapOf() // RTLO3b internal var createOperationIsMerged = false // RTLO3c @@ -46,6 +82,61 @@ internal abstract class BaseRealtimeObject( private var tombstonedAt: Long? = null + /** + * Reverse references: parent InternalLiveMap objectId -> set of keys at which that map + * references this object. Keyed by objectId per RTLO3f (RTLO3f1 permits direct references; + * ids avoid map-to-map reference cycles and survive pool replacement). Only mutated and + * traversed on the sequential scope, so a plain mutable map is safe. + * Spec: RTLO3f, RTLO3f2 + */ + internal val parentReferences = mutableMapOf>() + + /** Records that [parent] references this object at [key]. Spec: RTLO4g */ + internal fun addParentReference(parent: InternalLiveMap, key: String) { + parentReferences.getOrPut(parent.objectId) { mutableSetOf() }.add(key) // RTLO4g1, RTLO4g2 + } + + /** Removes the recorded reference from [parent] at [key]. Spec: RTLO4h */ + internal fun removeParentReference(parent: InternalLiveMap, key: String) { + val keys = parentReferences[parent.objectId] ?: return // RTLO4h1 + keys.remove(key) // RTLO4h2 + if (keys.isEmpty()) { + parentReferences.remove(parent.objectId) // RTLO4h3 + } + } + + /** Spec: RTO5c10a */ + internal fun clearParentReferences() = parentReferences.clear() + + /** + * All key-paths from the root InternalLiveMap to this object: one per simple path in the + * parent-reference graph, cycle-safe, order unspecified. Iterative DFS walking upward via + * [parentReferences], resolving parent ids through the pool (stale ids are skipped). + * Spec: RTLO4f (RTLO4f1..f4) + */ + internal fun getFullPaths(): List> { + val paths = mutableListOf>() + // (object, path-so-far, visited objectIds on this branch) + val stack = ArrayDeque, Set>>() + stack.addLast(Triple(this, emptyList(), emptySet())) + while (stack.isNotEmpty()) { + val (obj, currentPath, visited) = stack.removeLast() + if (obj.objectId in visited) continue // RTLO4f2 - simple paths only, skip cycles + val newVisited = visited + obj.objectId + if (obj.objectId == ROOT_OBJECT_ID) { + paths.add(currentPath) // RTLO4f2 - the empty path when this object is root itself + continue + } + for ((parentId, keys) in obj.parentReferences) { + val parent = objectsPool.get(parentId) ?: continue // stale reference - parent left the pool + for (key in keys) { + stack.addLast(Triple(parent, listOf(key) + currentPath, newVisited)) + } + } + } + return paths // RTLO4f3 - each simple path exactly once + } + /** * This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object_sync` * @return an update describing the changes @@ -62,10 +153,7 @@ internal abstract class BaseRealtimeObject( if (isTombstoned) { // this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing - if (objectType == ObjectType.Map) { - return noOpMapUpdate - } - return noOpCounterUpdate + return ObjectUpdate.NoOp // RTLM6e1, RTLC6e1 } return applyObjectState(wireObjectState, wireObjectMessage) // RTLM6, RTLC6 } @@ -127,17 +215,27 @@ internal abstract class BaseRealtimeObject( } /** - * Marks the object as tombstoned. + * Marks the object as tombstoned. The returned update carries `tombstone = true` and the + * source message (RTLO4e5..e8); the caller emits it via notifyUpdated. */ - internal fun tombstone(serialTimestamp: Long?): ObjectUpdate { + internal fun tombstone(serialTimestamp: Long?, message: WireObjectMessage?): ObjectUpdate { if (serialTimestamp == null) { - Log.w(tag, "Tombstoning object $objectId without serial timestamp, using local timestamp instead") + Log.w(tag, "Tombstoning object $objectId without serial timestamp, using local timestamp instead") // RTLO6b1 } - isTombstoned = true - tombstonedAt = serialTimestamp?: clock.currentTimeMillis() - val update = clearData() - // TODO - Emit object lifecycle event for deletion - return update + isTombstoned = true // RTLO4e2 + tombstonedAt = serialTimestamp ?: clock.currentTimeMillis() // RTLO4e3, RTLO6a, RTLO6b + // RTLO4e5..e7 - stamp tombstone + source message on the diff update. Tombstoning an + // already-empty object yields an empty diff, but the update must still be emitted (the + // tombstone flag drives listener teardown per RTLO4b4c3c; ably-js diffs are never noop), + // so synthesize an empty typed update in that case. + return when (val update = clearData()) { // RTLO4e4 + is ObjectUpdate.MapUpdate -> update.copy(tombstone = true, objectMessage = message) + is ObjectUpdate.CounterUpdate -> update.copy(tombstone = true, objectMessage = message) + ObjectUpdate.NoOp -> when (objectType) { + ObjectType.Map -> ObjectUpdate.MapUpdate(emptyMap(), message, tombstone = true) + ObjectType.Counter -> ObjectUpdate.CounterUpdate(0.0, message, tombstone = true) + } + } // RTLO4e8 } /** @@ -203,11 +301,54 @@ internal abstract class BaseRealtimeObject( abstract fun clearData(): ObjectUpdate /** - * Notifies subscribers about changes made to this object. Propagates updates through the - * appropriate manager after converting the generic update map to type-specific update objects. + * Notifies subscribers about a change to this object: instance listeners first + * (RTLO4b4c3a), then path-based subscriptions via the bubbling dispatch (RTLO4b4c3b -> + * RTO24b). Tombstone updates additionally deregister all instance listeners afterwards + * (RTLO4b4c3c); path subscriptions are unaffected (RTLO4b4c3c1). + * * Spec: RTLO4b4c */ - abstract fun notifyUpdated(update: ObjectUpdate) + internal fun notifyUpdated(update: ObjectUpdate) { + if (update.noOp) { + return // RTLO4b4c1 + } + Log.v(tag, "Object $objectId updated: $update") + val publicMessage = update.objectMessage + ?.takeIf { it.operation != null } // sync messages never surface publicly (RTPO19e2/RTINS16e2) + ?.toPublicMessage(realtimeObject.channelName) // PAOM3 + notifyInstanceSubscriptions(update, publicMessage) // RTLO4b4c3a + notifyPathSubscriptions(update, publicMessage) // RTLO4b4c3b + if (update.tombstone) { + deregisterInstanceListeners() // RTLO4b4c3c + } + } + + /** Emits the update to this object's instance listeners as an InstanceSubscriptionEvent. */ + protected abstract fun notifyInstanceSubscriptions(update: ObjectUpdate, message: ObjectMessage?) + + /** Deregisters all instance listeners - tombstone teardown (RTLO4b4c3c). */ + protected abstract fun deregisterInstanceListeners() + + /** + * Path-based subscription dispatch: one notifyPathEvent per path-to-this-object, with the + * object's own path as the most-preferred candidate and, for map updates, one deeper + * candidate per changed key. + * + * Spec: RTO24b (RTO24b1, RTO24b2, RTO24b2a1, RTO24b2a2) + */ + private fun notifyPathSubscriptions(update: ObjectUpdate, message: ObjectMessage?) { + val pathsToThis = getFullPaths() // RTO24b1 + if (pathsToThis.isEmpty()) { + return // orphaned object (not reachable from root) - no path events + } + for (pathToThis in pathsToThis) { // RTO24b2 + val candidates = mutableListOf(pathToThis) // RTO24b2a1 - most preferred first + if (update is ObjectUpdate.MapUpdate) { + update.update.keys.forEach { candidates.add(pathToThis + it) } // RTO24b2a2 + } + realtimeObject.pathObjectSubscriptionRegister.notifyPathEvent(candidates, message) + } + } /** * Called during garbage collection intervals to clean up expired entries. diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/ResolvedValue.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/ResolvedValue.kt index 34002837d..a7c930fce 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/ResolvedValue.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/ResolvedValue.kt @@ -2,16 +2,18 @@ package io.ably.lib.liveobjects.value import io.ably.lib.liveobjects.ValueType import io.ably.lib.liveobjects.message.WireObjectData +import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter +import io.ably.lib.liveobjects.value.livemap.InternalLiveMap /** * The result of resolving a path segment / map entry against the objects - * graph: either a node view of a live object, or a primitive leaf carried as - * wire ObjectData. + * graph: either a reference to an internal live object from the pool, or a + * primitive leaf carried as wire ObjectData. */ internal sealed interface ResolvedValue { - data class MapRef(val map: LiveMap) : ResolvedValue // TODO: LiveMap will be replaced by InternalLiveMap - data class CounterRef(val counter: LiveCounter) : ResolvedValue // TODO: LiveCounter will be replaced by InternalLiveCounter - data class Leaf(val data: WireObjectData) : ResolvedValue + data class MapRef(val map: InternalLiveMap) : ResolvedValue // RTLM5d2f2 + data class CounterRef(val counter: InternalLiveCounter) : ResolvedValue // RTLM5d2f2 + data class Leaf(val data: WireObjectData) : ResolvedValue // RTLM5d2b..e } /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/DefaultLiveCounter.kt index 4ec0d3edf..398df76cc 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/DefaultLiveCounter.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/DefaultLiveCounter.kt @@ -1,6 +1,15 @@ package io.ably.lib.liveobjects.value.livecounter +import io.ably.lib.liveobjects.DefaultRealtimeObject +import io.ably.lib.liveobjects.invalidInputError +import io.ably.lib.liveobjects.message.WireCounterCreate +import io.ably.lib.liveobjects.message.WireCounterCreateWithObjectId +import io.ably.lib.liveobjects.message.WireObjectMessage +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectOperationAction +import io.ably.lib.liveobjects.serialization.gson import io.ably.lib.liveobjects.value.LiveCounter +import io.ably.lib.liveobjects.value.ObjectType /** * Default implementation of the [LiveCounter] value type - an immutable holder for @@ -11,14 +20,40 @@ import io.ably.lib.liveobjects.value.LiveCounter * takes the initial count; the count is retained internally with no public accessor * (Spec: RTLCV3d). * - * This is currently a skeleton: it only retains the initial value. Producing the - * `COUNTER_CREATE` operation/message from this count is not yet implemented. - * * Spec: RTLCV1, RTLCV2, RTLCV3 */ internal class DefaultLiveCounter( internal val initialCount: Number, ) : LiveCounter() { - // TODO - build the COUNTER_CREATE ObjectMessage from `initialCount`, mirroring - // ably-js LiveCounterValueType.createCounterCreateMessage. Spec: RTLCV4 + + /** + * Evaluates this value type into a COUNTER_CREATE ObjectMessage. Mirrors ably-js + * `LiveCounterValueType.createCounterCreateMessage`. The caller publishes the message + * (evaluation itself has no side effects on the channel). + * + * Spec: RTLCV4 + */ + internal suspend fun createCounterCreateMessage(realtimeObject: DefaultRealtimeObject): WireObjectMessage { + // RTLCV4a - validation deferred from create() to evaluation time + if (initialCount.toDouble().isNaN() || initialCount.toDouble().isInfinite()) { + throw invalidInputError("Counter value should be a valid number") + } + + val counterCreate = WireCounterCreate(count = initialCount.toDouble()) // RTLCV4b, RTLCV4b1 + val initialValueJSONString = gson.toJson(counterCreate) // RTLCV4c + // RTLCV4d..f - nonce, server time and RTO14 objectId derivation + val (objectId, nonce) = realtimeObject.getObjectIdStringWithNonce(ObjectType.Counter, initialValueJSONString) + + return WireObjectMessage( + operation = WireObjectOperation( + action = WireObjectOperationAction.CounterCreate, // RTLCV4g1 + objectId = objectId, // RTLCV4g2 + counterCreateWithObjectId = WireCounterCreateWithObjectId( + nonce = nonce, // RTLCV4g3 + initialValue = initialValueJSONString, // RTLCV4g4 + derivedFrom = counterCreate, // RTLCV4g5 - local use only (@Transient on the wire type) + ), + ) + ) // RTLCV4h + } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt index 65de1eef8..ca43021d7 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/InternalLiveCounter.kt @@ -2,28 +2,23 @@ package io.ably.lib.liveobjects.value.livecounter import io.ably.lib.liveobjects.* import io.ably.lib.liveobjects.DefaultRealtimeObject -import io.ably.lib.liveobjects.adapter.AblyClientAdapter +import io.ably.lib.liveobjects.instance.DefaultInstanceSubscriptionEvent +import io.ably.lib.liveobjects.instance.InstanceListener +import io.ably.lib.liveobjects.instance.types.DefaultLiveCounterInstance import io.ably.lib.liveobjects.invalidInputError import io.ably.lib.liveobjects.message.* -import io.ably.lib.liveobjects.message.WireCounterInc -import io.ably.lib.liveobjects.message.WireObjectMessage -import io.ably.lib.liveobjects.message.WireObjectOperation -import io.ably.lib.liveobjects.message.WireObjectOperationAction -import io.ably.lib.liveobjects.message.WireObjectState import io.ably.lib.liveobjects.value.BaseRealtimeObject import io.ably.lib.liveobjects.value.ObjectType import io.ably.lib.liveobjects.value.ObjectUpdate -import io.ably.lib.liveobjects.value.noOp import java.util.concurrent.atomic.AtomicReference -import io.ably.lib.util.Log /** * @spec RTLC1/RTLC2 - LiveCounter implementation extends BaseRealtimeObject */ internal class InternalLiveCounter private constructor( objectId: String, - private val realtimeObject: DefaultRealtimeObject, -) : BaseRealtimeObject(objectId, ObjectType.Counter, realtimeObject.clock) { + realtimeObject: DefaultRealtimeObject, +) : BaseRealtimeObject(objectId, ObjectType.Counter, realtimeObject) { override val tag = "LiveCounter" @@ -38,18 +33,17 @@ internal class InternalLiveCounter private constructor( */ private val liveCounterManager = LiveCounterManager(this) - private val channelName = realtimeObject.channelName - private val adapter: AblyClientAdapter get() = realtimeObject.adapter - internal suspend fun increment(amount: Number) = incrementAsync(amount.toDouble()) + // RTLC13c - negating the amount lets increment's validation and publish path cover decrement internal suspend fun decrement(amount: Number) = incrementAsync(-amount.toDouble()) internal fun value(): Double { return data.get() } - internal fun subscribe(listener: LiveCounterChangeListener): Subscription { + /** Identity-based subscription to this counter's updates. Spec: RTINS16d, RTLO4b */ + internal fun subscribe(listener: InstanceListener): Subscription { return liveCounterManager.subscribe(listener) } @@ -75,29 +69,27 @@ internal class InternalLiveCounter private constructor( } override fun applyObjectState(wireObjectState: WireObjectState, message: WireObjectMessage): ObjectUpdate { - return liveCounterManager.applyState(wireObjectState, message.serialTimestamp) + return liveCounterManager.applyState(wireObjectState, message) } override fun applyObjectOperation(operation: WireObjectOperation, message: WireObjectMessage): Boolean { - return liveCounterManager.applyOperation(operation, message.serialTimestamp) + return liveCounterManager.applyOperation(operation, message) } override fun clearData(): ObjectUpdate { return liveCounterManager.calculateUpdateFromDataDiff(data.get(), 0.0).apply { this@InternalLiveCounter.data.set(0.0) } } - override fun notifyUpdated(update: ObjectUpdate) { - if (update.noOp) { - return - } - Log.v(tag, "Object $objectId updated: $update") - - // TODO - Emit a proper LiveCounterChangeEvent once the Instance/ObjectMessage subscription - // pipeline is wired up. ObjectUpdate is not a LiveCounterChangeEvent, so casting it (as was - // done previously) always throws ClassCastException; emission is deferred until then. - liveCounterManager.notify(update as LiveCounterChangeEvent) + override fun notifyInstanceSubscriptions(update: ObjectUpdate, message: ObjectMessage?) { + // RTINS16e1, RTINS16e2 - the event wraps a fresh instance bound to this counter (the spec + // requires "an Instance wrapping the underlying LiveObject", not a specific wrapper identity) + liveCounterManager.notify( + DefaultInstanceSubscriptionEvent(DefaultLiveCounterInstance(realtimeObject, this), message) + ) } + override fun deregisterInstanceListeners() = liveCounterManager.offAll() // RTLO4b4c3c + override fun onGCInterval(gcGracePeriod: Long) { // Nothing to GC for a counter object return @@ -111,13 +103,5 @@ internal class InternalLiveCounter private constructor( internal fun zeroValue(objectId: String, realtimeObjects: DefaultRealtimeObject): InternalLiveCounter { return InternalLiveCounter(objectId, realtimeObjects) } - - /** - * Creates initial value payload for counter creation. - * Spec: RTLCV4b - */ - internal fun initialValue(count: Number): WireCounterCreate { - return WireCounterCreate(count = count.toDouble()) - } } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterChangeCoordinator.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterChangeCoordinator.kt index e148bb55b..50795d299 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterChangeCoordinator.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterChangeCoordinator.kt @@ -8,7 +8,7 @@ import io.ably.lib.liveobjects.value.ObjectUpdate import io.ably.lib.util.EventEmitter import io.ably.lib.util.Log -internal val noOpCounterUpdate = ObjectUpdate(null) +internal val noOpCounterUpdate: ObjectUpdate = ObjectUpdate.NoOp /** * Interface for handling live counter changes by notifying subscribers of updates. @@ -22,27 +22,28 @@ internal interface HandlesLiveCounterChange { fun notify(update: LiveCounterChangeEvent) } -internal interface LiveCounterChangeListener : InstanceListener - internal interface LiveCounterChangeEvent : InstanceSubscriptionEvent internal abstract class LiveCounterChangeCoordinator: HandlesLiveCounterChange { private val counterChangeEmitter = LiveCounterChangeEmitter() - fun subscribe(listener: LiveCounterChangeListener): Subscription { + fun subscribe(listener: InstanceListener): Subscription { counterChangeEmitter.on(listener) return onceSubscription { counterChangeEmitter.off(listener) } } + /** Deregisters all instance listeners - tombstone teardown. Spec: RTLO4b4c3c */ + internal fun offAll() = counterChangeEmitter.off() + override fun notify(update: LiveCounterChangeEvent) = counterChangeEmitter.emit(update) } -private class LiveCounterChangeEmitter : EventEmitter() { +private class LiveCounterChangeEmitter : EventEmitter() { private val tag = "LiveCounterChangeEmitter" - override fun apply(listener: LiveCounterChangeListener?, event: LiveCounterChangeEvent, vararg args: Any?) { + override fun apply(listener: InstanceListener?, event: LiveCounterChangeEvent?, vararg args: Any?) { try { event?.let { listener?.onUpdated(it) } ?: Log.w(tag, "Null event passed to LiveCounterChange listener callback") diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt index a08bb5844..b3f7eab59 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livecounter/LiveCounterManager.kt @@ -1,6 +1,7 @@ package io.ably.lib.liveobjects.value.livecounter import io.ably.lib.liveobjects.message.WireCounterInc +import io.ably.lib.liveobjects.message.WireObjectMessage import io.ably.lib.liveobjects.message.WireObjectOperation import io.ably.lib.liveobjects.message.WireObjectOperationAction import io.ably.lib.liveobjects.message.WireObjectState @@ -17,38 +18,44 @@ internal class LiveCounterManager(private val liveCounter: InternalLiveCounter): /** * @spec RTLC6 - Overrides counter data with state from sync */ - internal fun applyState(wireObjectState: WireObjectState, serialTimestamp: Long?): ObjectUpdate { - val previousData = liveCounter.data.get() - + internal fun applyState(wireObjectState: WireObjectState, message: WireObjectMessage): ObjectUpdate { if (wireObjectState.tombstone) { - liveCounter.tombstone(serialTimestamp) - } else { - // override data for this object with data from the object state - liveCounter.createOperationIsMerged = false // RTLC6b - liveCounter.data.set(wireObjectState.counter?.count ?: 0.0) // RTLC6c - - // RTLC6d - wireObjectState.createOp?.let { createOp -> - mergeInitialDataFromCreateOperation(createOp) - } + // RTLC6f, RTLC6f2 - tombstone update returned as-is (carries tombstone flag + message) + return liveCounter.tombstone(message.serialTimestamp, message) } - return calculateUpdateFromDataDiff(previousData, liveCounter.data.get()) + val previousData = liveCounter.data.get() // RTLC6g - only the override branch needs it + + // override data for this object with data from the object state + liveCounter.createOperationIsMerged = false // RTLC6b + liveCounter.data.set(wireObjectState.counter?.count ?: 0.0) // RTLC6c + + // RTLC6d - merge result is discarded; only the outer diff is returned + wireObjectState.createOp?.let { createOp -> + mergeInitialDataFromCreateOperation(createOp, message) + } + + // RTLC6h - diff between previous and new data, stamped with the source message + return when (val diff = calculateUpdateFromDataDiff(previousData, liveCounter.data.get())) { + is ObjectUpdate.CounterUpdate -> diff.copy(objectMessage = message) + else -> diff // NoOp stays NoOp + } } /** * @spec RTLC7 - Applies operations to LiveCounter + * @spec RTLC7f1 - [message] is the source ObjectMessage that contains the operation */ - internal fun applyOperation(operation: WireObjectOperation, serialTimestamp: Long?): Boolean { + internal fun applyOperation(operation: WireObjectOperation, message: WireObjectMessage): Boolean { return when (operation.action) { WireObjectOperationAction.CounterCreate -> { - val update = applyCounterCreate(operation) // RTLC7d1 + val update = applyCounterCreate(operation, message) // RTLC7d1 liveCounter.notifyUpdated(update) // RTLC7d1a true // RTLC7d1b } WireObjectOperationAction.CounterInc -> { if (operation.counterInc != null) { - val update = applyCounterInc(operation.counterInc) // RTLC7d5 + val update = applyCounterInc(operation.counterInc, message) // RTLC7d5 liveCounter.notifyUpdated(update) // RTLC7d5a true // RTLC7d5b } else { @@ -56,8 +63,8 @@ internal class LiveCounterManager(private val liveCounter: InternalLiveCounter): } } WireObjectOperationAction.ObjectDelete -> { - val update = liveCounter.tombstone(serialTimestamp) - liveCounter.notifyUpdated(update) + val update = liveCounter.tombstone(message.serialTimestamp, message) // RTLC7d4 + liveCounter.notifyUpdated(update) // RTLC7d4c true // RTLC7d4b } else -> { @@ -69,8 +76,9 @@ internal class LiveCounterManager(private val liveCounter: InternalLiveCounter): /** * @spec RTLC8 - Applies counter create operation + * @spec RTLC8a2 - [message] is the source ObjectMessage that contains the operation */ - private fun applyCounterCreate(operation: WireObjectOperation): ObjectUpdate { + private fun applyCounterCreate(operation: WireObjectOperation, message: WireObjectMessage): ObjectUpdate { if (liveCounter.createOperationIsMerged) { // RTLC8b // There can't be two different create operation for the same object id, because the object id @@ -80,32 +88,33 @@ internal class LiveCounterManager(private val liveCounter: InternalLiveCounter): tag, "Skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=$objectId" ) - return noOpCounterUpdate // RTLC8c + return noOpCounterUpdate // RTLC8b } - return mergeInitialDataFromCreateOperation(operation) // RTLC8c + return mergeInitialDataFromCreateOperation(operation, message) // RTLC8c, RTLC8e } /** * @spec RTLC9 - Applies counter increment operation + * @spec RTLC9a3 - [message] is the source ObjectMessage that contains the operation */ - private fun applyCounterInc(wireCounterInc: WireCounterInc): ObjectUpdate { + private fun applyCounterInc(wireCounterInc: WireCounterInc, message: WireObjectMessage): ObjectUpdate { val amount = wireCounterInc.number val previousValue = liveCounter.data.get() liveCounter.data.set(previousValue + amount) // RTLC9f - return ObjectUpdate(amount) + return ObjectUpdate.CounterUpdate(amount, message) // RTLC9g } internal fun calculateUpdateFromDataDiff(prevData: Double, newData: Double): ObjectUpdate { // A zero delta means the value did not change (e.g. clearing an already-zero counter). - // Return the no-op update so notifyUpdated() short-circuits and no event is emitted. - return if (newData == prevData) noOpCounterUpdate else ObjectUpdate(newData - prevData) + // Return the no-op update so notifyUpdated() short-circuits and no event is emitted. Spec: RTLC14b + return if (newData == prevData) noOpCounterUpdate else ObjectUpdate.CounterUpdate(newData - prevData) } /** * @spec RTLC16 - Merges initial data from create operation */ - private fun mergeInitialDataFromCreateOperation(operation: WireObjectOperation): ObjectUpdate { + private fun mergeInitialDataFromCreateOperation(operation: WireObjectOperation, message: WireObjectMessage): ObjectUpdate { // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. // note that it is intentional to SUM the incoming count from the create op. // if we got here, it means that current counter instance is missing the initial value in its data reference, @@ -114,9 +123,9 @@ internal class LiveCounterManager(private val liveCounter: InternalLiveCounter): ?: operation.counterCreate?.count ?: 0.0 val previousValue = liveCounter.data.get() - liveCounter.data.set(previousValue + count) // RTLC16 - liveCounter.createOperationIsMerged = true // RTLC16 - return ObjectUpdate(count) + liveCounter.data.set(previousValue + count) // RTLC16a + liveCounter.createOperationIsMerged = true // RTLC16b + return ObjectUpdate.CounterUpdate(count, message) // RTLC16c } internal fun validate(state: WireObjectState) { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/DefaultLiveMap.kt index dd90a6f7b..e155e0140 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/DefaultLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/DefaultLiveMap.kt @@ -1,7 +1,21 @@ package io.ably.lib.liveobjects.value.livemap +import io.ably.lib.liveobjects.DefaultRealtimeObject +import io.ably.lib.liveobjects.invalidInputError +import io.ably.lib.liveobjects.message.WireMapCreate +import io.ably.lib.liveobjects.message.WireMapCreateWithObjectId +import io.ably.lib.liveobjects.message.WireObjectData +import io.ably.lib.liveobjects.message.WireObjectMessage +import io.ably.lib.liveobjects.message.WireObjectOperation +import io.ably.lib.liveobjects.message.WireObjectOperationAction +import io.ably.lib.liveobjects.message.WireObjectsMapEntry +import io.ably.lib.liveobjects.message.WireObjectsMapSemantics +import io.ably.lib.liveobjects.serialization.gson import io.ably.lib.liveobjects.value.LiveMap import io.ably.lib.liveobjects.value.LiveMapValue +import io.ably.lib.liveobjects.value.ObjectType +import io.ably.lib.liveobjects.value.livecounter.DefaultLiveCounter +import java.util.Base64 /** * Default implementation of the [LiveMap] value type - an immutable holder for the @@ -12,16 +26,82 @@ import io.ably.lib.liveobjects.value.LiveMapValue * the initial entries map; the entries are retained internally with no public * accessor (Spec: RTLMV3d). * - * This is currently a skeleton: it only retains the initial value. Producing the - * `MAP_CREATE` operation/message from these entries (including nested object create - * messages for nested [LiveMap]/[LiveCounter] value types) is not yet implemented. - * * Spec: RTLMV1, RTLMV2, RTLMV3 */ internal class DefaultLiveMap( internal val entries: Map, ) : LiveMap() { - // TODO - build the MAP_CREATE ObjectMessage (plus nested object create messages) - // from `entries`, mirroring ably-js LiveMapValueType.createMapCreateMessage. - // Spec: RTLMV4 + + /** + * Evaluates this value type into `[nested creates..., MAP_CREATE]` - depth-first, with this + * map's own MAP_CREATE last (RTLMV4k). Mirrors ably-js + * `LiveMapValueType.createMapCreateMessage` / `_getMapCreate`. The caller publishes the + * messages (evaluation itself has no side effects on the channel). + * + * RTLMV4a (entries must be a Dict) and the RTLMV4c value-type check are compile-time + * impossible with `Map` - recorded as typed-SDK deviations. + * + * Spec: RTLMV4 + */ + internal suspend fun createMapCreateMessages(realtimeObject: DefaultRealtimeObject): List { + if (entries.keys.any { it.isEmpty() }) { // RTLMV4b - String typing is compile-time; emptiness is ours + throw invalidInputError("Map keys should not be empty") + } + + val nestedCreateMessages = mutableListOf() + val mapEntries = mutableMapOf() + for ((key, value) in entries) { // RTLMV4d + val objectData: WireObjectData = when { + value.isLiveCounter -> { // RTLMV4d1 + val msg = (value.asLiveCounter as DefaultLiveCounter).createCounterCreateMessage(realtimeObject) + nestedCreateMessages.add(msg) + WireObjectData(objectId = msg.operation!!.objectId) + } + value.isLiveMap -> { // RTLMV4d2 - recursive, depth-first + val msgs = (value.asLiveMap as DefaultLiveMap).createMapCreateMessages(realtimeObject) + nestedCreateMessages.addAll(msgs) + // last message is the nested map's own MAP_CREATE (RTLMV4k) + WireObjectData(objectId = msgs.last().operation!!.objectId) + } + else -> fromPrimitiveLiveMapValue(value) // RTLMV4d3..d7 + } + mapEntries[key] = WireObjectsMapEntry(tombstone = false, data = objectData) + } + + val mapCreate = WireMapCreate(semantics = WireObjectsMapSemantics.LWW, entries = mapEntries) // RTLMV4e + // RTLMV4f - the @JsonAdapter WireObjectDataJsonSerializer performs the OD4 wire encoding + val initialValueJSONString = gson.toJson(mapCreate) + // RTLMV4g..i - nonce, server time and RTO14 objectId derivation + val (objectId, nonce) = realtimeObject.getObjectIdStringWithNonce(ObjectType.Map, initialValueJSONString) + + val mapCreateMsg = WireObjectMessage( + operation = WireObjectOperation( + action = WireObjectOperationAction.MapCreate, // RTLMV4j1 + objectId = objectId, // RTLMV4j2 + mapCreateWithObjectId = WireMapCreateWithObjectId( + nonce = nonce, // RTLMV4j3 + initialValue = initialValueJSONString, // RTLMV4j4 + derivedFrom = mapCreate, // RTLMV4j5 - local use only (@Transient on the wire type) + ), + ) + ) + return nestedCreateMessages + mapCreateMsg // RTLMV4k + } +} + +/** + * Converts a primitive [LiveMapValue] to its wire ObjectData form. Shared by the MAP_SET path + * (RTLM20e7b..f) and value-type evaluation (RTLMV4d3..d7). LiveMap/LiveCounter value types are + * handled by the callers before reaching here. + */ +internal fun fromPrimitiveLiveMapValue(value: LiveMapValue): WireObjectData { + return when { + value.isBoolean -> WireObjectData(boolean = value.asBoolean) // RTLM20e7e + value.isBinary -> WireObjectData(bytes = Base64.getEncoder().encodeToString(value.asBinary)) // RTLM20e7f + value.isNumber -> WireObjectData(number = value.asNumber.toDouble()) // RTLM20e7d + value.isString -> WireObjectData(string = value.asString) // RTLM20e7c + value.isJsonObject -> WireObjectData(json = value.asJsonObject) // RTLM20e7b + value.isJsonArray -> WireObjectData(json = value.asJsonArray) // RTLM20e7b + else -> throw IllegalArgumentException("Unsupported value type") // unreachable - LiveMapValue union is closed + } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt index 0cf81043f..c61e00eca 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/InternalLiveMap.kt @@ -2,20 +2,19 @@ package io.ably.lib.liveobjects.value.livemap import io.ably.lib.liveobjects.* import io.ably.lib.liveobjects.ObjectsPool -import io.ably.lib.liveobjects.adapter.AblyClientAdapter +import io.ably.lib.liveobjects.instance.DefaultInstanceSubscriptionEvent +import io.ably.lib.liveobjects.instance.InstanceListener +import io.ably.lib.liveobjects.instance.types.DefaultLiveMapInstance import io.ably.lib.liveobjects.message.* -import io.ably.lib.liveobjects.message.WireObjectMessage -import io.ably.lib.liveobjects.message.WireObjectOperation -import io.ably.lib.liveobjects.message.WireObjectOperationAction -import io.ably.lib.liveobjects.message.WireObjectState -import io.ably.lib.liveobjects.message.WireObjectsMapSemantics import io.ably.lib.liveobjects.value.* import io.ably.lib.liveobjects.value.BaseRealtimeObject import io.ably.lib.liveobjects.value.ObjectType import io.ably.lib.liveobjects.value.ObjectUpdate -import io.ably.lib.liveobjects.value.noOp -import io.ably.lib.util.Log -import java.util.Base64 +import io.ably.lib.liveobjects.value.livecounter.DefaultLiveCounter +import com.google.gson.JsonElement +import com.google.gson.JsonNull +import com.google.gson.JsonObject +import com.google.gson.JsonPrimitive import java.util.concurrent.ConcurrentHashMap import java.util.AbstractMap @@ -24,9 +23,9 @@ import java.util.AbstractMap */ internal class InternalLiveMap private constructor( objectId: String, - private val realtimeObject: DefaultRealtimeObject, + realtimeObject: DefaultRealtimeObject, internal val semantics: WireObjectsMapSemantics = WireObjectsMapSemantics.LWW -) : BaseRealtimeObject(objectId, ObjectType.Map, realtimeObject.clock) { +) : BaseRealtimeObject(objectId, ObjectType.Map, realtimeObject) { override val tag = "LiveMap" @@ -43,27 +42,25 @@ internal class InternalLiveMap private constructor( */ private val liveMapManager = LiveMapManager(this) - private val channelName = realtimeObject.channelName - private val adapter: AblyClientAdapter get() = realtimeObject.adapter internal val objectsPool: ObjectsPool get() = realtimeObject.objectsPool - internal fun get(keyName: String): LiveMapValue? { + /** Spec: RTLM5 */ + internal fun get(keyName: String): ResolvedValue? { if (isTombstoned) { - return null + return null // RTLM5e } - data[keyName]?.let { liveMapEntry -> - return liveMapEntry.getResolvedValue(objectsPool) - } - return null // RTLM5d1 + return data[keyName]?.getResolvedValue(objectsPool) // RTLM5d1, RTLM5d2 } - internal fun entries(): Iterable> { - return sequence> { + /** + * Non-tombstoned entries (RTLM11d1). A non-tombstoned entry whose objectId reference does not + * resolve is still yielded with a null value (RTLM11d3a). + */ + internal fun entries(): Iterable> { + return sequence> { for ((key, entry) in data.entries) { - val value = entry.getResolvedValue(objectsPool) // RTLM11d, RTLM11d2 - value?.let { - yield(AbstractMap.SimpleImmutableEntry(key, it)) - } + if (entry.isEntryOrRefTombstoned(objectsPool)) continue // RTLM11d1 + yield(AbstractMap.SimpleImmutableEntry(key, entry.getResolvedValue(objectsPool))) // RTLM11d3, RTLM11d3a } }.asIterable() } @@ -77,7 +74,7 @@ internal class InternalLiveMap private constructor( }.asIterable() } - internal fun values(): Iterable { + internal fun values(): Iterable { val iterableEntries = entries() return sequence { for (entry in iterableEntries) { @@ -90,36 +87,89 @@ internal class InternalLiveMap private constructor( return data.values.count { !it.isEntryOrRefTombstoned(objectsPool) }.toLong() // RTLM10d } + /** + * Recursively compacted JSON snapshot. Cycles are emitted as {"objectId": } markers + * (RTPO14b2); binary leaves stay base64 strings - already the wire form (RTPO14b1). + * Visited ids are added before iterating and never removed (ably-js parity: a map referenced + * twice on sibling branches yields the marker for the second sibling too). + * + * Spec: RTPO13c (structure), RTPO14b (JSON differences) + */ + internal fun compactJson(visited: MutableSet = mutableSetOf()): JsonObject { + val result = JsonObject() + visited.add(objectId) + for ((key, resolved) in entries()) { // RTPO13c1 - tombstoned entries excluded via entries() + when (resolved) { + is ResolvedValue.MapRef -> + if (resolved.map.objectId in visited) { + result.add(key, JsonObject().apply { addProperty("objectId", resolved.map.objectId) }) // RTPO14b2 + } else { + result.add(key, resolved.map.compactJson(visited)) // RTPO13c2 + } + is ResolvedValue.CounterRef -> result.addProperty(key, resolved.counter.value()) // RTPO13c3 + is ResolvedValue.Leaf -> result.add(key, resolved.data.toCompactJsonElement()) // RTPO13c4, RTPO14b1 + null -> { + // dangling reference (RTLM11d3a) - omitted from the snapshot, matching the ably-js + // observable result (undefined-valued keys are dropped by JSON serialisation) + } + } + } + return result + } + internal suspend fun set(keyName: String, value: LiveMapValue) = setAsync(keyName, value) internal suspend fun remove(keyName: String) = removeAsync(keyName) override fun validate(state: WireObjectState) = liveMapManager.validate(state) - internal fun subscribe(listener: LiveMapChangeListener): Subscription { + /** Identity-based subscription to this map's updates. Spec: RTINS16d, RTLO4b */ + internal fun subscribe(listener: InstanceListener): Subscription { return liveMapManager.subscribe(listener) } private suspend fun setAsync(keyName: String, value: LiveMapValue) { - // Validate input parameters + // RTLM20e1 / RTLMV4b - validate input parameters if (keyName.isEmpty()) { throw invalidInputError("Map key should not be empty") } + // RTLM20e7 - the wire value; LiveMap/LiveCounter value types are evaluated into their + // *_CREATE messages first (RTLM20e7g1) and referenced by objectId (RTLM20e7g2) + val createMessages: List + val objectData: WireObjectData + when { + value.isLiveCounter -> { // RTLM20e7g + val msg = (value.asLiveCounter as DefaultLiveCounter).createCounterCreateMessage(realtimeObject) // RTLCV4 + createMessages = listOf(msg) + objectData = WireObjectData(objectId = msg.operation!!.objectId) // RTLM20e7g2 + } + value.isLiveMap -> { // RTLM20e7g + val msgs = (value.asLiveMap as DefaultLiveMap).createMapCreateMessages(realtimeObject) // RTLMV4 + createMessages = msgs + objectData = WireObjectData(objectId = msgs.last().operation!!.objectId) // RTLM20e7g2 + } + else -> { + createMessages = emptyList() + objectData = fromPrimitiveLiveMapValue(value) // RTLM20e7b..f + } + } + // RTLM20e - Create ObjectMessage with the MAP_SET operation - val msg = WireObjectMessage( + val mapSetMsg = WireObjectMessage( operation = WireObjectOperation( - action = WireObjectOperationAction.MapSet, - objectId = objectId, + action = WireObjectOperationAction.MapSet, // RTLM20e2 + objectId = objectId, // RTLM20e3 mapSet = WireMapSet( - key = keyName, - value = fromLiveMapValue(value) + key = keyName, // RTLM20e6 + value = objectData // RTLM20e7 ) ) ) - // RTLM20h - publish and apply locally on ACK - realtimeObject.publishAndApply(arrayOf(msg)) + // RTLM20h - publish (nested creates first, RTLM20h1; single message otherwise, RTLM20h2) + // and apply locally on ACK + realtimeObject.publishAndApply((createMessages + mapSetMsg).toTypedArray()) } private suspend fun removeAsync(keyName: String) { @@ -142,31 +192,36 @@ internal class InternalLiveMap private constructor( } override fun applyObjectState(wireObjectState: WireObjectState, message: WireObjectMessage): ObjectUpdate { - return liveMapManager.applyState(wireObjectState, message.serialTimestamp) + return liveMapManager.applyState(wireObjectState, message) } override fun applyObjectOperation(operation: WireObjectOperation, message: WireObjectMessage): Boolean { - return liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp) + return liveMapManager.applyOperation(operation, message) } override fun clearData(): ObjectUpdate { - clearTimeserial = null // RTLM4 + // RTLO4e9 - before the data is reset, drop the parent references this map holds on objects + // referenced by its entries. Covers RTO4b resets, detached/failed clears and object + // tombstoning in one place (ably-js does the same in LiveMap.clearData). + for ((key, entry) in data) { + val refId = entry.data?.objectId ?: continue + objectsPool.get(refId)?.removeParentReference(this, key) // RTLO4e9a, RTLO4e9b + } + clearTimeserial = null // RTLM4d return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap()) .apply { this@InternalLiveMap.data.clear() } } - override fun notifyUpdated(update: ObjectUpdate) { - if (update.noOp) { - return - } - Log.v(tag, "Object $objectId updated: $update") - - // TODO - Emit a proper LiveMapChangeEvent once the Instance/ObjectMessage subscription - // pipeline is wired up. ObjectUpdate is not a LiveMapChangeEvent, so casting it (as was - // done previously) always throws ClassCastException; emission is deferred until then. - liveMapManager.notify(update as LiveMapChangeEvent) + override fun notifyInstanceSubscriptions(update: ObjectUpdate, message: ObjectMessage?) { + // RTINS16e1, RTINS16e2 - the event wraps a fresh instance bound to this map (the spec + // requires "an Instance wrapping the underlying LiveObject", not a specific wrapper identity) + liveMapManager.notify( + DefaultInstanceSubscriptionEvent(DefaultLiveMapInstance(realtimeObject, this), message) + ) } + override fun deregisterInstanceListeners() = liveMapManager.offAll() // RTLO4b4c3c + override fun onGCInterval(gcGracePeriod: Long) { data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod, clock) } } @@ -179,45 +234,18 @@ internal class InternalLiveMap private constructor( internal fun zeroValue(objectId: String, objects: DefaultRealtimeObject): InternalLiveMap { return InternalLiveMap(objectId, objects) } - - /** - * Creates a MapCreate payload from map entries. - * Spec: RTLMV4e - */ - internal fun initialValue(entries: MutableMap): WireMapCreate { - return WireMapCreate( - semantics = WireObjectsMapSemantics.LWW, - entries = entries.mapValues { (_, value) -> - WireObjectsMapEntry( - tombstone = false, - data = fromLiveMapValue(value) - ) - } - ) - } - - /** - * Spec: RTLM20e7 - */ - private fun fromLiveMapValue(value: LiveMapValue): WireObjectData { - return when { - value.isLiveMap || value.isLiveCounter -> - WireObjectData(objectId = (value.value as BaseRealtimeObject).objectId) - value.isBoolean -> - WireObjectData(boolean = value.asBoolean) - value.isBinary -> - WireObjectData(bytes = Base64.getEncoder().encodeToString(value.asBinary)) - value.isNumber -> - WireObjectData(number = value.asNumber.toDouble()) - value.isString -> - WireObjectData(string = value.asString) - value.isJsonObject -> - WireObjectData(json = value.asJsonObject) - value.isJsonArray -> - WireObjectData(json = value.asJsonArray) - else -> - throw IllegalArgumentException("Unsupported value type") - } - } } } + +/** + * Compact-JSON form of a primitive wire leaf: primitives become JsonPrimitives, binary + * stays base64-encoded (RTPO14b1), embedded JSON passes through. + */ +internal fun WireObjectData.toCompactJsonElement(): JsonElement = when { + string != null -> JsonPrimitive(string) + number != null -> JsonPrimitive(number) + boolean != null -> JsonPrimitive(boolean) + bytes != null -> JsonPrimitive(bytes) // base64 kept encoded - RTPO14b1 + json != null -> json!! + else -> JsonNull.INSTANCE // unreachable for a valid Leaf (isInvalid-filtered) +} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapChangeCoordinator.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapChangeCoordinator.kt index 4848f37b8..a98b1da6d 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapChangeCoordinator.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapChangeCoordinator.kt @@ -8,7 +8,7 @@ import io.ably.lib.liveobjects.value.ObjectUpdate import io.ably.lib.util.EventEmitter import io.ably.lib.util.Log -internal val noOpMapUpdate = ObjectUpdate(null) +internal val noOpMapUpdate: ObjectUpdate = ObjectUpdate.NoOp /** * Interface for handling live map changes by notifying subscribers of updates. @@ -22,27 +22,28 @@ internal interface HandlesLiveMapChange { fun notify(update: LiveMapChangeEvent) } -internal interface LiveMapChangeListener : InstanceListener - internal interface LiveMapChangeEvent : InstanceSubscriptionEvent internal abstract class LiveMapChangeCoordinator: HandlesLiveMapChange { private val mapChangeEmitter = LiveMapChangeEmitter() - fun subscribe(listener: LiveMapChangeListener): Subscription { + fun subscribe(listener: InstanceListener): Subscription { mapChangeEmitter.on(listener) return onceSubscription { mapChangeEmitter.off(listener) } } + /** Deregisters all instance listeners - tombstone teardown. Spec: RTLO4b4c3c */ + internal fun offAll() = mapChangeEmitter.off() + override fun notify(update: LiveMapChangeEvent) = mapChangeEmitter.emit(update) } -private class LiveMapChangeEmitter : EventEmitter() { +private class LiveMapChangeEmitter : EventEmitter() { private val tag = "LiveMapChangeEmitter" - override fun apply(listener: LiveMapChangeListener?, event: LiveMapChangeEvent?, vararg args: Any?) { + override fun apply(listener: InstanceListener?, event: LiveMapChangeEvent?, vararg args: Any?) { try { event?.let { listener?.onUpdated(it) } ?: Log.w(tag, "Null event passed to LiveMapChange listener callback") diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapEntry.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapEntry.kt index 15f388a12..a8cf44ce7 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapEntry.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapEntry.kt @@ -2,11 +2,11 @@ package io.ably.lib.liveobjects.value.livemap import io.ably.lib.liveobjects.ObjectsPool import io.ably.lib.liveobjects.message.WireObjectData -import io.ably.lib.liveobjects.value.* -import io.ably.lib.liveobjects.value.BaseRealtimeObject +import io.ably.lib.liveobjects.message.isInvalid import io.ably.lib.liveobjects.value.ObjectType +import io.ably.lib.liveobjects.value.ResolvedValue +import io.ably.lib.liveobjects.value.livecounter.InternalLiveCounter import io.ably.lib.util.Clock -import java.util.Base64 /** * @spec RTLM3 - Map data structure storing entries @@ -37,34 +37,28 @@ internal fun LiveMapEntry.isEntryOrRefTombstoned(objectsPool: ObjectsPool): Bool } /** - * Returns value as is if object data stores a primitive type or - * a reference to another RealtimeObject from the pool if it stores an objectId. + * Resolves this entry to the internal graph view: a primitive leaf as wire ObjectData, + * or a reference to another internal object from the pool. + * Spec: RTLM5d2 */ -internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): LiveMapValue? { +internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): ResolvedValue? { if (isTombstoned) { return null } // RTLM5d2h - - data?.let { d -> // RTLM5d2b, RTLM5d2c, RTLM5d2d, RTLM5d2e - d.string?.let { return LiveMapValue.of(it) } - d.number?.let { return LiveMapValue.of(it) } - d.boolean?.let { return LiveMapValue.of(it) } - d.bytes?.let { return LiveMapValue.of(Base64.getDecoder().decode(it)) } - d.json?.let { parsed -> - return when { - parsed.isJsonObject -> LiveMapValue.of(parsed.asJsonObject) - parsed.isJsonArray -> LiveMapValue.of(parsed.asJsonArray) - else -> null - } + val d = data ?: return null // RTLM5d2g + d.objectId?.let { refId -> // RTLM5d2f - has an objectId reference + val refObject = objectsPool.get(refId) ?: return null // RTLM5d2f1 + if (refObject.isTombstoned) { + return null // tombstoned objects must not be surfaced to the end users (RTLM14c behaviour) } - d.objectId?.let { refId -> // RTLM5d2f - has an objectId reference - objectsPool.get(refId)?.let { refObject -> - if (refObject.isTombstoned) { - return null // tombstoned objects must not be surfaced to the end users - } - return fromRealtimeObject(refObject) // RTLM5d2f2 - } + // RTLM5d2f2 - safe casts by construction: the pool only ever contains these two subclasses + return when (refObject.objectType) { + ObjectType.Map -> ResolvedValue.MapRef(refObject as InternalLiveMap) + ObjectType.Counter -> ResolvedValue.CounterRef(refObject as InternalLiveCounter) } } - return null // RTLM5d2g, RTLM5d2f1 + // RTLM5d2b..e - primitive leaf; keep the wire form, typed narrowing (incl. base64 decode + // for bytes) happens at the PathObject/Instance layer + if (d.isInvalid()) return null // RTLM5d2g + return ResolvedValue.Leaf(d) } /** @@ -74,10 +68,3 @@ internal fun LiveMapEntry.isEligibleForGc(gcGracePeriod: Long, clock: Clock): Bo val currentTime = clock.currentTimeMillis() return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } - -private fun fromRealtimeObject(realtimeObject: BaseRealtimeObject): LiveMapValue { - return when (realtimeObject.objectType) { - ObjectType.Map -> LiveMapValue.of(realtimeObject as LiveMap) - ObjectType.Counter -> LiveMapValue.of(realtimeObject as LiveCounter) - } -} diff --git a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt index d35e535a4..ea915fb92 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/liveobjects/value/livemap/LiveMapManager.kt @@ -1,15 +1,10 @@ package io.ably.lib.liveobjects.value.livemap import io.ably.lib.liveobjects.message.* -import io.ably.lib.liveobjects.message.WireMapSet -import io.ably.lib.liveobjects.message.WireObjectOperation -import io.ably.lib.liveobjects.message.WireObjectOperationAction -import io.ably.lib.liveobjects.message.WireObjectState import io.ably.lib.liveobjects.objectError +import io.ably.lib.liveobjects.value.MapChange import io.ably.lib.liveobjects.value.ObjectUpdate -import io.ably.lib.liveobjects.value.noOp import io.ably.lib.util.Log -import kotlin.collections.iterator internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChangeCoordinator() { @@ -20,50 +15,58 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan /** * @spec RTLM6 - Overrides object data with state from sync */ - internal fun applyState(wireObjectState: WireObjectState, serialTimestamp: Long?): ObjectUpdate { - val previousData = liveMap.data.toMap() - + internal fun applyState(wireObjectState: WireObjectState, message: WireObjectMessage): ObjectUpdate { if (wireObjectState.tombstone) { - liveMap.tombstone(serialTimestamp) - } else { - // override data for this object with data from the object state - liveMap.createOperationIsMerged = false // RTLM6b - liveMap.data.clear() - - liveMap.clearTimeserial = wireObjectState.map?.clearTimeserial // RTLM6i - - wireObjectState.map?.entries?.forEach { (key, entry) -> - liveMap.data[key] = LiveMapEntry( - isTombstoned = entry.tombstone ?: false, - tombstonedAt = if (entry.tombstone == true) entry.serialTimestamp - ?: liveMap.clock.currentTimeMillis() else null, - timeserial = entry.timeserial, - data = entry.data - ) - } // RTLM6c - - // RTLM6d - wireObjectState.createOp?.let { createOp -> - mergeInitialDataFromCreateOperation(createOp) - } + // RTLM6f, RTLM6f2 - tombstone update returned as-is (carries tombstone flag + message) + return liveMap.tombstone(message.serialTimestamp, message) } - return calculateUpdateFromDataDiff(previousData, liveMap.data.toMap()) + val previousData = liveMap.data.toMap() // RTLM6g - only the override branch needs it + + // override data for this object with data from the object state. + // Parent references are deliberately not maintained per-entry here: applyState only runs + // during sync, and ObjectsManager rebuilds all parent references once the sync sequence + // ends (RTO5c10). Any future non-sync caller of applyObjectSync must rebuild them too. + liveMap.createOperationIsMerged = false // RTLM6b + liveMap.data.clear() + + liveMap.clearTimeserial = wireObjectState.map?.clearTimeserial // RTLM6i + + wireObjectState.map?.entries?.forEach { (key, entry) -> + liveMap.data[key] = LiveMapEntry( + isTombstoned = entry.tombstone ?: false, + tombstonedAt = if (entry.tombstone == true) entry.serialTimestamp + ?: liveMap.clock.currentTimeMillis() else null, // RTLM6c1 + timeserial = entry.timeserial, + data = entry.data + ) + } // RTLM6c + + // RTLM6d - merge result is discarded; only the outer diff is returned + wireObjectState.createOp?.let { createOp -> + mergeInitialDataFromCreateOperation(createOp, message) + } + + // RTLM6h - diff between previous and new data, stamped with the source message + return when (val diff = calculateUpdateFromDataDiff(previousData, liveMap.data.toMap())) { + is ObjectUpdate.MapUpdate -> diff.copy(objectMessage = message) + else -> diff // NoOp stays NoOp + } } /** * @spec RTLM15 - Applies operations to LiveMap */ - internal fun applyOperation(operation: WireObjectOperation, serial: String?, serialTimestamp: Long?): Boolean { + internal fun applyOperation(operation: WireObjectOperation, message: WireObjectMessage): Boolean { return when (operation.action) { WireObjectOperationAction.MapCreate -> { - val update = applyMapCreate(operation) // RTLM15d1 + val update = applyMapCreate(operation, message) // RTLM15d1 liveMap.notifyUpdated(update) // RTLM15d1a true // RTLM15d1b } WireObjectOperationAction.MapSet -> { if (operation.mapSet != null) { - val update = applyMapSet(operation.mapSet, serial) // RTLM15d6 + val update = applyMapSet(operation.mapSet, message.serial, message) // RTLM15d6 liveMap.notifyUpdated(update) // RTLM15d6a true // RTLM15d6b } else { @@ -72,7 +75,7 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan } WireObjectOperationAction.MapRemove -> { if (operation.mapRemove != null) { - val update = applyMapRemove(operation.mapRemove, serial, serialTimestamp) // RTLM15d7 + val update = applyMapRemove(operation.mapRemove, message.serial, message.serialTimestamp, message) // RTLM15d7 liveMap.notifyUpdated(update) // RTLM15d7a true // RTLM15d7b } else { @@ -80,12 +83,12 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan } } WireObjectOperationAction.ObjectDelete -> { - val update = liveMap.tombstone(serialTimestamp) - liveMap.notifyUpdated(update) + val update = liveMap.tombstone(message.serialTimestamp, message) // RTLM15d5 + liveMap.notifyUpdated(update) // RTLM15d5c true // RTLM15d5b } WireObjectOperationAction.MapClear -> { - val update = applyMapClear(serial) // RTLM15d8 + val update = applyMapClear(message) // RTLM15d8 liveMap.notifyUpdated(update) // RTLM15d8a true // RTLM15d8b } @@ -98,8 +101,9 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan /** * @spec RTLM16 - Applies map create operation + * @spec RTLM16a2 - [message] is the source ObjectMessage that contains the operation */ - private fun applyMapCreate(operation: WireObjectOperation): ObjectUpdate { + private fun applyMapCreate(operation: WireObjectOperation, message: WireObjectMessage): ObjectUpdate { if (liveMap.createOperationIsMerged) { // RTLM16b // There can't be two different create operation for the same object id, because the object id @@ -114,7 +118,7 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan validateMapSemantics(getEffectiveMapCreate(operation)?.semantics) // RTLM16c - return mergeInitialDataFromCreateOperation(operation) // RTLM16d + return mergeInitialDataFromCreateOperation(operation, message) // RTLM16d } /** @@ -123,6 +127,7 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan private fun applyMapSet( wireMapSet: WireMapSet, // RTLM7d3 timeSerial: String?, // RTLM7d2 + message: WireObjectMessage, // RTLM7d4 ): ObjectUpdate { // RTLM7h - skip if operation is older than the last MAP_CLEAR val clearSerial = liveMap.clearTimeserial @@ -149,7 +154,7 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan } // RTLM7g - wireMapSet.value.objectId?.let { + val newRefObject = wireMapSet.value.objectId?.let { // this MAP_SET op is setting a key to point to another object via its object id, // but it is possible that we don't have the corresponding object in the pool yet (for example, we haven't seen the *_CREATE op for it). // we don't want to return undefined from this map's .get() method even if we don't have the object, @@ -158,6 +163,10 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan } if (existingEntry != null) { + // RTLM7a3 - drop the parent reference held via the entry being overwritten + existingEntry.data?.objectId?.let { oldRefId -> + liveMap.objectsPool.get(oldRefId)?.removeParentReference(liveMap, wireMapSet.key) // RTLM7a3a, RTLM7a3b + } // RTLM7a2 - Replace existing entry with new one instead of mutating liveMap.data[wireMapSet.key] = LiveMapEntry( isTombstoned = false, // RTLM7a2c @@ -173,7 +182,10 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan ) } - return ObjectUpdate(mapOf(wireMapSet.key to "updated")) + // RTLM7g2 - record the reverse reference for the newly referenced object + newRefObject?.addParentReference(liveMap, wireMapSet.key) + + return ObjectUpdate.MapUpdate(mapOf(wireMapSet.key to MapChange.Updated), message) // RTLM7f } /** @@ -183,6 +195,7 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan wireMapRemove: WireMapRemove, // RTLM8c4 timeSerial: String?, // RTLM8c2 timeStamp: Long?, // RTLM8c3 + message: WireObjectMessage, // RTLM8c5 ): ObjectUpdate { // RTLM8g - skip if operation is older than the last MAP_CLEAR val clearSerial = liveMap.clearTimeserial @@ -215,6 +228,10 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan } if (existingEntry != null) { + // RTLM8a3 - drop the parent reference held via the entry being removed + existingEntry.data?.objectId?.let { oldRefId -> + liveMap.objectsPool.get(oldRefId)?.removeParentReference(liveMap, wireMapRemove.key) // RTLM8a3a, RTLM8a3b + } // RTLM8a2 - Replace existing entry with new one instead of mutating liveMap.data[wireMapRemove.key] = LiveMapEntry( isTombstoned = true, // RTLM8a2c @@ -231,13 +248,15 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan ) } - return ObjectUpdate(mapOf(wireMapRemove.key to "removed")) + return ObjectUpdate.MapUpdate(mapOf(wireMapRemove.key to MapChange.Removed), message) // RTLM8e } /** * @spec RTLM24 - Applies MAP_CLEAR operation to LiveMap + * @spec RTLM24a2 - [message] is the source ObjectMessage that contains the operation */ - private fun applyMapClear(timeSerial: String?): ObjectUpdate { + private fun applyMapClear(message: WireObjectMessage): ObjectUpdate { + val timeSerial = message.serial // RTLM24a1 val clearSerial = liveMap.clearTimeserial // RTLM24c - skip if existing clear serial is strictly newer than incoming op serial @@ -251,21 +270,25 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan "Updating clearTimeserial; previous=$clearSerial, new=$timeSerial; objectId=$objectId") liveMap.clearTimeserial = timeSerial // RTLM24d - val update = mutableMapOf() + val update = mutableMapOf() // RTLM24e - remove all entries whose serial is older than (or equal to missing) the clear serial liveMap.data.entries.removeIf { val (key, entry) = it val entrySerial = entry.timeserial - if (entrySerial == null || (timeSerial != null && timeSerial > entrySerial)) { - update[key] = "removed" - true + if (entrySerial == null || (timeSerial != null && timeSerial > entrySerial)) { // RTLM24e1 + // RTLM24e1c - drop the parent reference held via the cleared entry + entry.data?.objectId?.let { refId -> + liveMap.objectsPool.get(refId)?.removeParentReference(liveMap, key) // RTLM24e1c1, RTLM24e1c2 + } + update[key] = MapChange.Removed // RTLM24e1b + true // RTLM24e1a } else { false } } - return ObjectUpdate(update) + return ObjectUpdate.MapUpdate(update, message) // RTLM24f } /** @@ -292,56 +315,52 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan private fun getEffectiveMapCreate(operation: WireObjectOperation): WireMapCreate? = operation.mapCreateWithObjectId?.derivedFrom ?: operation.mapCreate - private fun mergeInitialDataFromCreateOperation(operation: WireObjectOperation): ObjectUpdate { + private fun mergeInitialDataFromCreateOperation(operation: WireObjectOperation, message: WireObjectMessage): ObjectUpdate { val effectiveMapCreate = getEffectiveMapCreate(operation) - if (effectiveMapCreate?.entries.isNullOrEmpty()) { // no map entries in MAP_CREATE op - return noOpMapUpdate + if (effectiveMapCreate == null) { + // no MAP_CREATE payload - the initial value is implicitly an empty map, nothing to merge. + // createOperationIsMerged is intentionally NOT set (ably-js parity), so a later + // payload-bearing create op for this object could still merge. + return ObjectUpdate.MapUpdate(emptyMap(), message) } - val aggregatedUpdate = mutableListOf() + val aggregated = mutableMapOf() // RTLM23a // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. // we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations. - effectiveMapCreate?.entries?.forEach { (key, entry) -> + effectiveMapCreate.entries.forEach { (key, entry) -> // for a MAP_CREATE operation we must use the serial value available on an entry, instead of a serial on a message val opTimeserial = entry.timeserial val update = if (entry.tombstone == true) { // RTLM23a2 - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op - applyMapRemove(WireMapRemove(key), opTimeserial, entry.serialTimestamp) + applyMapRemove(WireMapRemove(key), opTimeserial, entry.serialTimestamp, message) } else { // RTLM23a1 - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op - applyMapSet(WireMapSet(key, entry.data ?: throw objectError("MAP_SET operation without data")), opTimeserial) + applyMapSet(WireMapSet(key, entry.data ?: throw objectError("MAP_SET operation without data")), opTimeserial, message) } - // skip noop updates - if (update.noOp) { - return@forEach + // RTLM23c - merge per-key results, skipping noop updates + if (update is ObjectUpdate.MapUpdate) { + aggregated.putAll(update.update) } - - aggregatedUpdate.add(update) } liveMap.createOperationIsMerged = true // RTLM23b - // TODO - This will need some rework as per new spec, we have commented out old code. - // Maybe we no more calculate the difference, so we might get rid of the code. - return ObjectUpdate(aggregatedUpdate) -// return ObjectUpdate( -// aggregatedUpdate.map { it.update }.fold(emptyMap()) { acc, map -> acc + map } -// ) + return ObjectUpdate.MapUpdate(aggregated, objectMessage = message) // RTLM23c } internal fun calculateUpdateFromDataDiff( prevData: Map, newData: Map ): ObjectUpdate { - val update = mutableMapOf() + val update = mutableMapOf() // Check for removed entries for ((key, prevEntry) in prevData) { if (!prevEntry.isTombstoned && !newData.containsKey(key)) { - update[key] = "removed" + update[key] = MapChange.Removed // RTLM22b1 } } @@ -350,7 +369,7 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan if (!prevData.containsKey(key)) { // if property does not exist in current map, but new data has it as non-tombstoned property - got updated if (!newEntry.isTombstoned) { - update[key] = "updated" + update[key] = MapChange.Updated // RTLM22b2 } // otherwise, if new data has this prop tombstoned - do nothing, as property didn't exist anyway continue @@ -362,12 +381,12 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan // compare tombstones first if (prevEntry.isTombstoned && !newEntry.isTombstoned) { // prev prop is tombstoned, but new is not. it means prop was updated to a meaningful value - update[key] = "updated" + update[key] = MapChange.Updated continue } if (!prevEntry.isTombstoned && newEntry.isTombstoned) { // prev prop is not tombstoned, but new is. it means prop was removed - update[key] = "removed" + update[key] = MapChange.Removed continue } if (prevEntry.isTombstoned && newEntry.isTombstoned) { @@ -378,15 +397,15 @@ internal class LiveMapManager(private val liveMap: InternalLiveMap): LiveMapChan // both props exist and are not tombstoned, need to compare values to see if it was changed val valueChanged = prevEntry.data != newEntry.data if (valueChanged) { - update[key] = "updated" + update[key] = MapChange.Updated // RTLM22b3 continue } } // An empty diff means nothing actually changed (e.g. clearing an already-empty root // map on a channel with no objects). Return the no-op update so notifyUpdated() - // short-circuits and no change event is emitted. Spec: RTLM/RTO4b. - return if (update.isEmpty()) noOpMapUpdate else ObjectUpdate(update) + // short-circuits and no change event is emitted. Spec: RTLM22b/RTO4b. + return if (update.isEmpty()) noOpMapUpdate else ObjectUpdate.MapUpdate(update) } internal fun validate(state: WireObjectState) { diff --git a/liveobjects/src/test/kotlin/io/ably/lib/liveobjects/unit/PathSegmentsTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/liveobjects/unit/PathSegmentsTest.kt new file mode 100644 index 000000000..31bd0f4bd --- /dev/null +++ b/liveobjects/src/test/kotlin/io/ably/lib/liveobjects/unit/PathSegmentsTest.kt @@ -0,0 +1,96 @@ +package io.ably.lib.liveobjects.unit + +import io.ably.lib.liveobjects.path.PathSegments +import org.junit.Assert.assertEquals +import org.junit.Test + +/** + * Encodes the stored-vs-supplied path invariants that the root convention relies on + * (empty stored path = root = zero segments, while a user-supplied "" is one empty segment, + * matching ably-js `at("")`). Spec: RTPO4a, RTPO4b, RTPO4c, RTPO5c, RTPO6b, RTPO6c + */ +class PathSegmentsTest { + + @Test + fun testAppendKeyToRootYieldsBareKey() { + // RTPO5c - appending to the root's empty stored path must not produce a leading dot + assertEquals("a", PathSegments.appendKey("", "a")) + } + + @Test + fun testAppendKeyEscapesDotsInKey() { + // RTPO4b - a raw key containing dots is a single segment; its dots get escaped + assertEquals("a.b\\.c", PathSegments.appendKey("a", "b.c")) + } + + @Test + fun testParseHonoursEscapedDots() { + // RTPO6b - `\.` is a literal dot inside a segment, not a separator + assertEquals(listOf("a.b", "c"), PathSegments.parse("a\\.b.c")) + } + + @Test + fun testParseStoredEmptyIsRoot() { + // RTPO4c - the root PathObject stores "" which represents ZERO segments + assertEquals(emptyList(), PathSegments.parseStored("")) + } + + @Test + fun testParseEmptyIsOneEmptySegment() { + // user-supplied "" means one empty segment, matching ably-js `at("")` + assertEquals(listOf(""), PathSegments.parse("")) + } + + @Test + fun testJoinEmptyListIsRootPath() { + // RTPO4c - inverse of parseStored("") + assertEquals("", PathSegments.join(emptyList())) + } + + @Test + fun testAppendPathSplitsOnUnescapedDots() { + // RTPO6c - a supplied sub-path is dot-delimited; escaped dots stay within their segment + assertEquals("a.b.c", PathSegments.appendPath("a", "b.c")) + assertEquals("a.b\\.c", PathSegments.appendPath("a", "b\\.c")) + assertEquals("b.c", PathSegments.appendPath("", "b.c")) + } + + @Test + fun testNonDotEscapeKeepsBackslash() { + // ably-js parity: only escaped dots are unescaped; `\x` keeps its backslash + assertEquals(listOf("a\\b", "c"), PathSegments.parse("a\\b.c")) + // a trailing lone backslash is kept + assertEquals(listOf("a\\"), PathSegments.parse("a\\")) + } + + @Test + fun testBackslashKeysRoundTrip() { + // join escapes backslashes (unlike ably-js display-only _escapePath) because the joined + // string IS the storage here and gets re-parsed. A key ending in `\` must not collide + // with the escaped-dot separator. + val stored = PathSegments.appendKey(PathSegments.appendKey("", "a\\"), "b") + assertEquals("a\\\\.b", stored) + assertEquals(listOf("a\\", "b"), PathSegments.parseStored(stored)) + + // a single key containing `\.` (chars a,\,.,b) stays one segment through the round-trip + val storedSingle = PathSegments.appendKey("", "a\\.b") + assertEquals("a\\\\\\.b", storedSingle) + assertEquals(listOf("a\\.b"), PathSegments.parseStored(storedSingle)) + } + + @Test + fun testUserParseKeepsDoubleBackslash() { + // stored-path strict unescaping must not leak into user-supplied sub-path parsing: + // ably-js `at("a\\b")` keeps both backslashes, and so does parse() + assertEquals(listOf("a\\\\b"), PathSegments.parse("a\\\\b")) + } + + @Test + fun testStoredPathRoundTrip() { + // join(parseStored(p)) == p for stored paths produced by appendKey/appendPath + val stored = PathSegments.appendKey(PathSegments.appendKey("", "a.b"), "c") + assertEquals("a\\.b.c", stored) + assertEquals(listOf("a.b", "c"), PathSegments.parseStored(stored)) + assertEquals(stored, PathSegments.join(PathSegments.parseStored(stored))) + } +} diff --git a/uts/build.gradle.kts b/uts/build.gradle.kts index 9fbc28e44..4eca7f9c4 100644 --- a/uts/build.gradle.kts +++ b/uts/build.gradle.kts @@ -44,21 +44,11 @@ tasks.withType().configureEach { tasks.register("runUtsUnitTests") { filter { includeTestsMatching("io.ably.lib.uts.unit.*") - // liveobjects has no SDK implementation yet, so these translate-only tests fail at - // runtime with "LiveObjects plugin hasn't been installed". Exclude them from the run - // (they still compile via compileTestKotlin). - // TODO: This should be removed once liveobjects implementation is in place. - excludeTestsMatching("io.ably.lib.uts.unit.liveobjects.*") } } tasks.register("runUtsIntegrationTests") { filter { includeTestsMatching("io.ably.lib.uts.integration.*") - // liveobjects has no SDK implementation yet — exclude the translate-only liveobjects - // tests (standard + proxy) from the run; they still compile via compileTestKotlin. - // TODO: This should be removed once liveobjects implementation is in place. - excludeTestsMatching("io.ably.lib.uts.integration.standard.liveobjects.*") - excludeTestsMatching("io.ably.lib.uts.integration.proxy.liveobjects.*") } }