From 4eec3b294cf72c8ca11a502bfdd17f789591fac1 Mon Sep 17 00:00:00 2001 From: Ladd Date: Wed, 1 Jan 2025 22:21:16 -0600 Subject: [PATCH] refactored lossy for clarity, added separate class for last write wins --- __tests__/last-write-wins.ts | 57 +++++++++++++++ __tests__/lossless.ts | 14 ++-- __tests__/lossy.ts | 89 ++++++++++++++---------- src/collection.ts | 27 +++----- src/last-write-wins.ts | 104 +++++++++++++++++++++++++++ src/lossless.ts | 62 +++++++++++------ src/lossy.ts | 131 +++++++++++++---------------------- src/transactions.ts | 39 ++++++++--- 8 files changed, 351 insertions(+), 172 deletions(-) create mode 100644 __tests__/last-write-wins.ts create mode 100644 src/last-write-wins.ts diff --git a/__tests__/last-write-wins.ts b/__tests__/last-write-wins.ts new file mode 100644 index 0000000..37ccaa8 --- /dev/null +++ b/__tests__/last-write-wins.ts @@ -0,0 +1,57 @@ +import Debug from "debug"; +import {Delta} from "../src/delta"; +import {LastWriteWins} from "../src/last-write-wins"; +import {Lossless} from "../src/lossless"; +import {RhizomeNode} from "../src/node"; +const debug = Debug('test:last-write-wins'); + +describe('Last write wins', () => { + + describe('given that two separate writes occur', () => { + const node = new RhizomeNode(); + const lossless = new Lossless(node); + + const lossy = new LastWriteWins(lossless); + + beforeAll(() => { + lossless.ingestDelta(new Delta({ + creator: 'a', + host: 'h', + pointers: [{ + localContext: "vegetable", + target: "broccoli", + targetContext: "want" + }, { + localContext: "desire", + target: 95, + }] + })); + + lossless.ingestDelta(new Delta({ + creator: 'a', + host: 'h', + pointers: [{ + localContext: "vegetable", + target: "broccoli", + targetContext: "want" + }, { + localContext: "want", + target: 90, + }] + })); + }); + + it('our resolver should return the most recently written value', () => { + const result = lossy.resolve(["broccoli"]); + debug('result', result); + expect(result).toMatchObject({ + broccoli: { + id: "broccoli", + properties: { + want: 90 + } + } + }); + }); + }); +}); diff --git a/__tests__/lossless.ts b/__tests__/lossless.ts index 25a5b20..d365389 100644 --- a/__tests__/lossless.ts +++ b/__tests__/lossless.ts @@ -37,7 +37,7 @@ describe('Lossless', () => { expect(lossless.view()).toMatchObject({ keanu: { referencedAs: ["actor"], - properties: { + propertyDeltas: { roles: [{ creator: "a", host: "h", @@ -53,7 +53,7 @@ describe('Lossless', () => { }, neo: { referencedAs: ["role"], - properties: { + propertyDeltas: { actor: [{ creator: "a", host: "h", @@ -69,7 +69,7 @@ describe('Lossless', () => { }, the_matrix: { referencedAs: ["film"], - properties: { + propertyDeltas: { cast: [{ creator: "a", host: "h", @@ -114,7 +114,7 @@ describe('Lossless', () => { expect(lossless.view()).toMatchObject({ ace: { referencedAs: ["1", "14"], - properties: { + propertyDeltas: { value: [{ creator: 'A', host: 'H', @@ -141,7 +141,7 @@ describe('Lossless', () => { expect(lossless.view(undefined, filter)).toMatchObject({ ace: { referencedAs: ["1"], - properties: { + propertyDeltas: { value: [{ creator: 'A', host: 'H', @@ -156,7 +156,7 @@ describe('Lossless', () => { expect(lossless.view(["ace"], filter)).toMatchObject({ ace: { referencedAs: ["1"], - properties: { + propertyDeltas: { value: [{ creator: 'A', host: 'H', @@ -168,5 +168,7 @@ describe('Lossless', () => { } }); }); + + // TODO: Test with transactions, say A1 -- B -- A2 }); }); diff --git a/__tests__/lossy.ts b/__tests__/lossy.ts index 1e29c14..605a103 100644 --- a/__tests__/lossy.ts +++ b/__tests__/lossy.ts @@ -1,13 +1,60 @@ -import {RhizomeNode} from "../src/node.js"; +import Debug from 'debug'; import {Delta, PointerTarget} from "../src/delta.js"; -import {Lossless, LosslessViewMany} from "../src/lossless.js"; -import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta } from "../src/lossy.js"; +import {lastValueFromDeltas} from "../src/last-write-wins.js"; +import {Lossless, LosslessViewOne} from "../src/lossless.js"; +import {Lossy, valueFromCollapsedDelta} from "../src/lossy.js"; +import {RhizomeNode} from "../src/node.js"; +const debug = Debug('test:lossy'); + +type Role = { + actor: PointerTarget, + film: PointerTarget, + role: PointerTarget +}; + +type Summary = { + roles: Role[]; +}; + + +function initializer(): Summary { + return { + roles: [] + }; +} + +// TODO: Add more rigor to this example approach to generating a summary. +// it's really not CRDT, it likely depends on the order of the pointers. +// TODO: Prove with failing test + +const reducer = (acc: Summary, cur: LosslessViewOne): Summary => { + if (cur.referencedAs.includes("role")) { + const {delta, value: actor} = lastValueFromDeltas("actor", cur.propertyDeltas["actor"]) ?? {}; + if (!delta) throw new Error('expected to find delta'); + if (!actor) throw new Error('expected to find actor'); + const film = valueFromCollapsedDelta("film", delta); + if (!film) throw new Error('expected to find film'); + acc.roles.push({ + role: cur.id, + actor, + film + }); + } + + return acc; +} + +const resolver = (acc: Summary): Summary => { + return acc; +} + describe('Lossy', () => { - describe('se a provided function to resolve entity views', () => { + describe('use a provided initializer, reducer, and resolver to resolve entity views', () => { const node = new RhizomeNode(); const lossless = new Lossless(node); - const lossy = new Lossy(lossless); + + const lossy = new Lossy(lossless, initializer, reducer, resolver); beforeAll(() => { lossless.ingestDelta(new Delta({ @@ -36,36 +83,8 @@ describe('Lossy', () => { }); it('example summary', () => { - type Role = { - actor: PointerTarget, - film: PointerTarget, - role: PointerTarget - }; - - type Summary = { - roles: Role[]; - }; - - const resolver = (losslessView: LosslessViewMany): Summary => { - const roles: Role[] = []; - for (const [id, ent] of Object.entries(losslessView)) { - if (ent.referencedAs.includes("role")) { - const {delta, value: actor} = lastValueFromLosslessViewOne(ent, "actor") ?? {}; - if (!delta) continue; // TODO: panic - if (!actor) continue; // TODO: panic - const film = valueFromCollapsedDelta(delta, "film"); - if (!film) continue; // TODO: panic - roles.push({ - role: id, - actor, - film - }); - } - } - return {roles}; - } - - const result = lossy.resolve(resolver); + const result = lossy.resolve(); + debug('result', result); expect(result).toEqual({ roles: [{ film: "the_matrix", diff --git a/src/collection.ts b/src/collection.ts index 6ed7d7b..b03b8e6 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -6,9 +6,9 @@ import Debug from 'debug'; import {randomUUID} from "node:crypto"; import EventEmitter from "node:events"; -import {Delta, DeltaFilter} from "./delta.js"; +import {Delta} from "./delta.js"; import {Entity, EntityProperties} from "./entity.js"; -import {Lossy, ResolvedViewOne, Resolver} from "./lossy.js"; +import {LastWriteWins, ResolvedViewOne} from './last-write-wins.js'; import {RhizomeNode} from "./node.js"; import {DomainEntityID} from "./types.js"; const debug = Debug('rz:collection'); @@ -17,7 +17,7 @@ export class Collection { rhizomeNode?: RhizomeNode; name: string; eventStream = new EventEmitter(); - lossy?: Lossy; + lossy?: LastWriteWins; constructor(name: string) { this.name = name; @@ -32,7 +32,7 @@ export class Collection { rhizomeConnect(rhizomeNode: RhizomeNode) { this.rhizomeNode = rhizomeNode; - this.lossy = new Lossy(this.rhizomeNode.lossless); + this.lossy = new LastWriteWins(this.rhizomeNode.lossless); // Listen for completed transactions, and emit updates to event stream this.rhizomeNode.lossless.eventStream.on("updated", (id) => { @@ -58,7 +58,6 @@ export class Collection { newProperties: EntityProperties, creator: string, host: string, - resolver?: Resolver ): { transactionDelta: Delta | undefined, deltas: Delta[] @@ -67,7 +66,7 @@ export class Collection { let oldProperties: EntityProperties = {}; if (entityId) { - const entity = this.resolve(entityId, resolver); + const entity = this.resolve(entityId); if (entity) { oldProperties = entity.properties; } @@ -155,7 +154,6 @@ export class Collection { async put( entityId: DomainEntityID | undefined, properties: EntityProperties, - resolver?: Resolver ): Promise { if (!this.rhizomeNode) throw new Error('collection not connecte to rhizome'); @@ -173,7 +171,6 @@ export class Collection { properties, this.rhizomeNode?.config.creator, this.rhizomeNode?.config.peerId, - resolver, ); const ingested = new Promise((resolve) => { @@ -204,21 +201,19 @@ export class Collection { await ingested; - const res = this.resolve(entityId, resolver); + const res = this.resolve(entityId); if (!res) throw new Error("could not get what we just put!"); return res; } - resolve( - id: string, - resolver?: Resolver, - deltaFilter?: DeltaFilter - ): T | undefined { + resolve( + id: string + ): ResolvedViewOne | undefined { if (!this.rhizomeNode) throw new Error('collection not connected to rhizome'); if (!this.lossy) throw new Error('lossy view not initialized'); - const res = this.lossy.resolve(resolver, [id], deltaFilter) || {}; + const res = this.lossy.resolve([id]) || {}; - return res[id] as T; + return res[id]; } } diff --git a/src/last-write-wins.ts b/src/last-write-wins.ts new file mode 100644 index 0000000..7a4d70b --- /dev/null +++ b/src/last-write-wins.ts @@ -0,0 +1,104 @@ +import Debug from 'debug'; +import {Lossy, valueFromCollapsedDelta} from './lossy.js'; + +import {EntityProperties} from "./entity.js"; +import {CollapsedDelta, Lossless, LosslessViewOne} from "./lossless.js"; +import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js"; + +const debug = Debug('rz:lossy:last-write-wins'); + +type TimestampedProperty = { + value: PropertyTypes, + timeUpdated: Timestamp +}; + +type TimestampedProperties = { + [key: PropertyID]: TimestampedProperty +}; + +export type LossyViewOne = { + id: DomainEntityID; + properties: T; +}; + +export type LossyViewMany = ViewMany>; + +export type ResolvedViewOne = LossyViewOne; +export type ResolvedViewMany = ViewMany; + +type Accumulator = LossyViewMany; +type Result = LossyViewMany; + +// Function for resolving a value for an entity by last write wins +export function lastValueFromDeltas( + key: string, + deltas?: CollapsedDelta[] +): { + delta?: CollapsedDelta, + value?: string | number, + timeUpdated?: number +} | undefined { + const res: { + delta?: CollapsedDelta, + value?: string | number, + timeUpdated?: number + } = {}; + res.timeUpdated = 0; + + for (const delta of deltas || []) { + const value = valueFromCollapsedDelta(key, delta); + if (value === undefined) continue; + if (res.timeUpdated && delta.timeCreated < res.timeUpdated) continue; + res.delta = delta; + res.value = value; + res.timeUpdated = delta.timeCreated; + } + + return res; +} + +function initializer(): Accumulator { + return {}; +}; + +function reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator { + if (!acc[cur.id]) { + acc[cur.id] = {id: cur.id, properties: {}}; + } + + for (const [key, deltas] of Object.entries(cur.propertyDeltas)) { + debug(`reducer: looking for value for key ${key}`); + const {value, timeUpdated} = lastValueFromDeltas(key, deltas) || {}; + debug(`reducer: key ${key} value ${value} timeUpdated ${timeUpdated}`); + if (!value || !timeUpdated) continue; + + if (timeUpdated > (acc[cur.id].properties[key]?.timeUpdated || 0)) { + acc[cur.id].properties[key] = { + value, + timeUpdated + }; + } + } + return acc; +}; + +function resolver(cur: Accumulator): Result { + const res: Result = {}; + + for (const [id, ent] of Object.entries(cur)) { + res[id] = {id, properties: {}}; + for (const [key, {value}] of Object.entries(ent.properties)) { + res[id].properties[key] = value; + } + } + + return res; +}; + +export class LastWriteWins extends Lossy { + constructor( + readonly lossless: Lossless, + ) { + super(lossless, initializer, reducer, resolver); + } +} diff --git a/src/lossless.ts b/src/lossless.ts index 86aeb4f..54fae05 100644 --- a/src/lossless.ts +++ b/src/lossless.ts @@ -3,10 +3,10 @@ import Debug from 'debug'; import EventEmitter from 'events'; -import {Delta, DeltaFilter, DeltaNetworkImage} from './delta.js'; +import {Delta, DeltaFilter, DeltaID, DeltaNetworkImage} from './delta.js'; +import {RhizomeNode} from './node.js'; import {Transactions} from './transactions.js'; import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types.js"; -import {RhizomeNode} from './node.js'; const debug = Debug('rz:lossless'); export type CollapsedPointer = {[key: PropertyID]: PropertyTypes}; @@ -16,8 +16,9 @@ export type CollapsedDelta = Omit & { }; export type LosslessViewOne = { + id: DomainEntityID, referencedAs: string[]; - properties: { + propertyDeltas: { [key: PropertyID]: CollapsedDelta[] } }; @@ -27,12 +28,9 @@ export type LosslessViewMany = ViewMany; class LosslessEntityMap extends Map {}; class LosslessEntity { - id: DomainEntityID; properties = new Map>(); - constructor(id: DomainEntityID) { - this.id = id; - } + constructor(readonly lossless: Lossless, readonly id: DomainEntityID) {} addDelta(delta: Delta) { const targetContexts = delta.pointers @@ -48,6 +46,7 @@ class LosslessEntity { } propertyDeltas.add(delta); + debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `entity ${this.id} added delta:`, JSON.stringify(delta)); } } @@ -71,12 +70,12 @@ export class Lossless { constructor(readonly rhizomeNode: RhizomeNode) { this.transactions = new Transactions(this); - this.transactions.eventStream.on("completed", (transactionId) => { + this.transactions.eventStream.on("completed", (transactionId, deltaIds) => { debug(`[${this.rhizomeNode.config.peerId}]`, `Completed transaction ${transactionId}`); const transaction = this.transactions.get(transactionId); if (!transaction) return; for (const id of transaction.entityIds) { - this.eventStream.emit("updated", id); + this.eventStream.emit("updated", id, deltaIds); } }); } @@ -91,7 +90,7 @@ export class Lossless { let ent = this.domainEntities.get(target); if (!ent) { - ent = new LosslessEntity(target); + ent = new LosslessEntity(this, target); this.domainEntities.set(target, ent); } @@ -116,28 +115,49 @@ export class Lossless { if (!transactionId) { // No transaction -- we can issue an update event immediately for (const id of targets) { - this.eventStream.emit("updated", id); + this.eventStream.emit("updated", id, [delta.id]); } } return transactionId; } + viewSpecific(entityId: DomainEntityID, deltaIds: DeltaID[], deltaFilter?: DeltaFilter): LosslessViewOne | undefined { + debug(`[${this.rhizomeNode.config.peerId}]`, `viewSpecific, deltaIds:`, JSON.stringify(deltaIds)); + const combinedFilter = (delta: Delta) => { + debug(`[${this.rhizomeNode.config.peerId}]`, `combinedFilter, deltaIds:`, JSON.stringify(deltaIds)); + if (!deltaIds.includes(delta.id)) { + debug(`[${this.rhizomeNode.config.peerId}]`, `Excluding delta ${delta.id} because it's not in the requested list of deltas`); + return false; + } + if (!deltaFilter) return true; + return deltaFilter(delta); + }; + const res = this.view([entityId], (delta) => combinedFilter(delta)); + return res[entityId]; + } + view(entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): LosslessViewMany { const view: LosslessViewMany = {}; entityIds = entityIds ?? Array.from(this.domainEntities.keys()); + for (const id of entityIds) { const ent = this.domainEntities.get(id); if (!ent) continue; + const referencedAs = new Set(); - const properties: { + const propertyDeltas: { [key: PropertyID]: CollapsedDelta[] } = {}; for (const [key, deltas] of ent.properties.entries()) { - properties[key] = properties[key] || []; + propertyDeltas[key] = propertyDeltas[key] || []; for (const delta of deltas) { + if (deltaFilter && !deltaFilter(delta)) { + continue; + } + // If this delta is part of a transaction, // we need to be able to wait for the whole transaction. if (delta.transactionId) { @@ -148,11 +168,6 @@ export class Lossless { } } - if (deltaFilter) { - const include = deltaFilter(delta); - if (!include) continue; - } - const pointers: CollapsedPointer[] = []; for (const {localContext, target} of delta.pointers) { @@ -162,20 +177,21 @@ export class Lossless { } } - const collapsedDelta: CollapsedDelta = { + propertyDeltas[key].push({ ...delta, pointers - }; - - properties[key].push(collapsedDelta); + }); } } view[ent.id] = { + id: ent.id, referencedAs: Array.from(referencedAs.values()), - properties + propertyDeltas }; } + + debug(`[${this.rhizomeNode.config.peerId}]`, `Returning view:`, JSON.stringify(view, null, 2)); return view; } diff --git a/src/lossy.ts b/src/lossy.ts index cf212ce..760c1a7 100644 --- a/src/lossy.ts +++ b/src/lossy.ts @@ -5,36 +5,20 @@ // We can achieve this via functional expression, encoded as JSON-Logic. // Fields in the output can be described as transformations -// import Debug from 'debug'; -import {DeltaFilter} from "./delta.js"; -import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless.js"; -import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js"; -// const debug = Debug('rz:lossy'); +import Debug from 'debug'; +import {DeltaFilter, DeltaID} from "./delta.js"; +import {CollapsedDelta, Lossless, LosslessViewOne} from "./lossless.js"; +import {DomainEntityID} from "./types.js"; +const debug = Debug('rz:lossy'); -type TimestampedProperty = { - value: PropertyTypes, - timeUpdated: Timestamp -}; - -export type LossyViewOne = { - id: DomainEntityID; - properties: { - [key: PropertyID]: T - }; -}; - -export type LossyViewMany = ViewMany>; - -export type ResolvedViewOne = LossyViewOne; -export type ResolvedViewMany = ViewMany; - -export type Resolver = - (losslessView: LosslessViewMany) => T; +export type Initializer = (v: LosslessViewOne) => Accumulator; +export type Reducer = (acc: Accumulator, cur: LosslessViewOne) => Accumulator; +export type Resolver = (cur: Accumulator) => Result; // Extract a particular value from a delta's pointers export function valueFromCollapsedDelta( - delta: CollapsedDelta, - key: string + key: string, + delta: CollapsedDelta ): string | number | undefined { for (const pointer of delta.pointers) { for (const [k, value] of Object.entries(pointer)) { @@ -45,75 +29,56 @@ export function valueFromCollapsedDelta( } } - -// Function for resolving a value for an entity by last write wins -export function lastValueFromLosslessViewOne( - ent: LosslessViewOne, - key: string -): { - delta?: CollapsedDelta, - value?: string | number, - timeUpdated?: number -} | undefined { - const res: { - delta?: CollapsedDelta, - value?: string | number, - timeUpdated?: number - } = {}; - res.timeUpdated = 0; - - for (const delta of ent.properties[key] || []) { - const value = valueFromCollapsedDelta(delta, key); - if (value === undefined) continue; - if (delta.timeCreated < res.timeUpdated) continue; - res.delta = delta; - res.value = value; - res.timeUpdated = delta.timeCreated; - } - - return res; -} - // TODO: Incremental updates of lossy models. For example, with last-write-wins, // we keep the timeUpdated for each field. A second stage resolver can rearrange // the data structure to a preferred shape and may discard the timeUpdated info. -export class Lossy { - lossless: Lossless; +export class Lossy { + deltaFilter?: DeltaFilter; + accumulator?: Accumulator; - constructor(lossless: Lossless) { - this.lossless = lossless; + constructor( + readonly lossless: Lossless, + readonly initializer: Initializer, + readonly reducer: Reducer, + readonly resolver: Resolver, + ) { + this.lossless.eventStream.on("updated", (id, deltaIds) => { + debug(`[${this.lossless.rhizomeNode.config.peerId}] entity ${id} updated, deltaIds:`, + JSON.stringify(deltaIds)); + this.ingestUpdate(id, deltaIds); + }); + } + + ingestUpdate(id: DomainEntityID, deltaIds: DeltaID[]) { + debug(`[${this.lossless.rhizomeNode.config.peerId}] prior to ingesting update, deltaIds:`, deltaIds); + const losslessPartial = this.lossless.viewSpecific(id, deltaIds, this.deltaFilter); + + debug(`[${this.lossless.rhizomeNode.config.peerId}] prior to ingesting update, lossless partial:`, + JSON.stringify(losslessPartial, null, 2)); + + if (!losslessPartial) return; + + const latest = this.accumulator || this.initializer(losslessPartial); + this.accumulator = this.reducer(latest, losslessPartial); + + debug(`[${this.lossless.rhizomeNode.config.peerId}] after ingesting update, entity ${id} accumulator:`, + JSON.stringify(this.accumulator, null, 2)); } // Using the lossless view of some given domain entities, // apply a filter to the deltas composing that lossless view, // and then apply a supplied resolver function which receives // the filtered lossless view as input. - // TODO: Cache things! - resolve(fn?: Resolver | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T { - if (!fn) { - fn = (v) => this.defaultResolver(v); + // resolve(fn?: Resolver | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T { + resolve(entityIds?: DomainEntityID[]): Result | undefined { + if (!entityIds) { + entityIds = Array.from(this.lossless.domainEntities.keys()); } - const losslessView = this.lossless.view(entityIds, deltaFilter); - return fn(losslessView) as T; + + if (!this.accumulator) return undefined; + + return this.resolver(this.accumulator); } - - defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany { - const resolved: ResolvedViewMany = {}; - - // debug(`[${this.lossless.rhizomeNode.config.peerId}]`, 'Default resolver, lossless view', JSON.stringify(losslessView)); - for (const [id, ent] of Object.entries(losslessView)) { - resolved[id] = {id, properties: {}}; - - for (const key of Object.keys(ent.properties)) { - const {value} = lastValueFromLosslessViewOne(ent, key) || {}; - - // debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `[ ${key} ] = ${value}`); - resolved[id].properties[key] = value; - } - } - return resolved; - }; - } // Generate a rule diff --git a/src/transactions.ts b/src/transactions.ts index d2ca167..e6db31a 100644 --- a/src/transactions.ts +++ b/src/transactions.ts @@ -1,8 +1,8 @@ import Debug from "debug"; import EventEmitter from "events"; import {Delta, DeltaID} from "./delta.js"; -import {DomainEntityID, TransactionID} from "./types.js"; import {Lossless} from "./lossless.js"; +import {DomainEntityID, TransactionID} from "./types.js"; const debug = Debug('rz:transactions'); function getDeltaTransactionId(delta: Delta): TransactionID | undefined { @@ -53,6 +53,19 @@ export class Transaction { size?: number; receivedDeltaIds = new Set(); entityIds = new Set(); + resolved: Promise; + + constructor(readonly transactions: Transactions, readonly id: TransactionID) { + this.resolved = new Promise((resolve) => { + this.transactions.eventStream.on("completed", (transactionId) => { + if (transactionId === this.id) resolve(true); + }); + }); + } + + getReceivedDeltaIds() { + return Array.from(this.receivedDeltaIds.values()); + } } export class Transactions { @@ -68,13 +81,14 @@ export class Transactions { getOrInit(id: TransactionID): Transaction { let t = this.transactions.get(id); if (!t) { - t = new Transaction(); + t = new Transaction(this, id); this.transactions.set(id, t); } return t; } ingestDelta(delta: Delta, targets: DomainEntityID[]): TransactionID | undefined { + // This delta may be part of a transaction { const transactionId = getDeltaTransactionId(delta); if (transactionId) { @@ -83,7 +97,6 @@ export class Transactions { t.entityIds.add(id); } - // This delta is part of a transaction // Add this to the delta's data structure for quick reference delta.transactionId = transactionId; @@ -92,25 +105,25 @@ export class Transactions { // Notify that the transaction is complete if (this.isComplete(transactionId)) { - this.eventStream.emit("completed", transactionId); + this.eventStream.emit("completed", t.id, t.getReceivedDeltaIds()); } return transactionId; } } + // This delta may describe a transaction { const {transactionId, size} = getTransactionSize(delta) || {}; if (transactionId && size) { - // This delta describes a transaction - debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `Transaction ${transactionId} has size ${size}`); this.setSize(transactionId, size as number); // Check if the transaction is complete if (this.isComplete(transactionId)) { - this.eventStream.emit("completed", transactionId); + const t = this.getOrInit(transactionId); + this.eventStream.emit("completed", t.id, t.getReceivedDeltaIds()); } return transactionId; @@ -124,8 +137,16 @@ export class Transactions { } isComplete(id: TransactionID) { - const t = this.getOrInit(id); - return t.size !== undefined && t.receivedDeltaIds.size === t.size; + const t = this.get(id); + if (!t) return false; + if (t.size === undefined) return false; + return t.receivedDeltaIds.size === t.size; + } + + async waitFor(id: TransactionID) { + const t = this.get(id); + if (!t) return; + await t.resolved; } setSize(id: TransactionID, size: number) {