refactored lossy for clarity, added separate class for last write wins

This commit is contained in:
Ladd Hoffman 2025-01-01 22:21:16 -06:00
parent 5161ab0a85
commit 4eec3b294c
8 changed files with 351 additions and 172 deletions

View File

@ -0,0 +1,57 @@
import Debug from "debug";
import {Delta} from "../src/delta";
import {LastWriteWins} from "../src/last-write-wins";
import {Lossless} from "../src/lossless";
import {RhizomeNode} from "../src/node";
const debug = Debug('test:last-write-wins');
describe('Last write wins', () => {
describe('given that two separate writes occur', () => {
const node = new RhizomeNode();
const lossless = new Lossless(node);
const lossy = new LastWriteWins(lossless);
beforeAll(() => {
lossless.ingestDelta(new Delta({
creator: 'a',
host: 'h',
pointers: [{
localContext: "vegetable",
target: "broccoli",
targetContext: "want"
}, {
localContext: "desire",
target: 95,
}]
}));
lossless.ingestDelta(new Delta({
creator: 'a',
host: 'h',
pointers: [{
localContext: "vegetable",
target: "broccoli",
targetContext: "want"
}, {
localContext: "want",
target: 90,
}]
}));
});
it('our resolver should return the most recently written value', () => {
const result = lossy.resolve(["broccoli"]);
debug('result', result);
expect(result).toMatchObject({
broccoli: {
id: "broccoli",
properties: {
want: 90
}
}
});
});
});
});

View File

@ -37,7 +37,7 @@ describe('Lossless', () => {
expect(lossless.view()).toMatchObject({ expect(lossless.view()).toMatchObject({
keanu: { keanu: {
referencedAs: ["actor"], referencedAs: ["actor"],
properties: { propertyDeltas: {
roles: [{ roles: [{
creator: "a", creator: "a",
host: "h", host: "h",
@ -53,7 +53,7 @@ describe('Lossless', () => {
}, },
neo: { neo: {
referencedAs: ["role"], referencedAs: ["role"],
properties: { propertyDeltas: {
actor: [{ actor: [{
creator: "a", creator: "a",
host: "h", host: "h",
@ -69,7 +69,7 @@ describe('Lossless', () => {
}, },
the_matrix: { the_matrix: {
referencedAs: ["film"], referencedAs: ["film"],
properties: { propertyDeltas: {
cast: [{ cast: [{
creator: "a", creator: "a",
host: "h", host: "h",
@ -114,7 +114,7 @@ describe('Lossless', () => {
expect(lossless.view()).toMatchObject({ expect(lossless.view()).toMatchObject({
ace: { ace: {
referencedAs: ["1", "14"], referencedAs: ["1", "14"],
properties: { propertyDeltas: {
value: [{ value: [{
creator: 'A', creator: 'A',
host: 'H', host: 'H',
@ -141,7 +141,7 @@ describe('Lossless', () => {
expect(lossless.view(undefined, filter)).toMatchObject({ expect(lossless.view(undefined, filter)).toMatchObject({
ace: { ace: {
referencedAs: ["1"], referencedAs: ["1"],
properties: { propertyDeltas: {
value: [{ value: [{
creator: 'A', creator: 'A',
host: 'H', host: 'H',
@ -156,7 +156,7 @@ describe('Lossless', () => {
expect(lossless.view(["ace"], filter)).toMatchObject({ expect(lossless.view(["ace"], filter)).toMatchObject({
ace: { ace: {
referencedAs: ["1"], referencedAs: ["1"],
properties: { propertyDeltas: {
value: [{ value: [{
creator: 'A', creator: 'A',
host: 'H', host: 'H',
@ -168,5 +168,7 @@ describe('Lossless', () => {
} }
}); });
}); });
// TODO: Test with transactions, say A1 -- B -- A2
}); });
}); });

View File

@ -1,13 +1,60 @@
import {RhizomeNode} from "../src/node.js"; import Debug from 'debug';
import {Delta, PointerTarget} from "../src/delta.js"; import {Delta, PointerTarget} from "../src/delta.js";
import {Lossless, LosslessViewMany} from "../src/lossless.js"; import {lastValueFromDeltas} from "../src/last-write-wins.js";
import {Lossy, lastValueFromLosslessViewOne, valueFromCollapsedDelta } from "../src/lossy.js"; import {Lossless, LosslessViewOne} from "../src/lossless.js";
import {Lossy, valueFromCollapsedDelta} from "../src/lossy.js";
import {RhizomeNode} from "../src/node.js";
const debug = Debug('test:lossy');
type Role = {
actor: PointerTarget,
film: PointerTarget,
role: PointerTarget
};
type Summary = {
roles: Role[];
};
function initializer(): Summary {
return {
roles: []
};
}
// TODO: Add more rigor to this example approach to generating a summary.
// it's really not CRDT, it likely depends on the order of the pointers.
// TODO: Prove with failing test
const reducer = (acc: Summary, cur: LosslessViewOne): Summary => {
if (cur.referencedAs.includes("role")) {
const {delta, value: actor} = lastValueFromDeltas("actor", cur.propertyDeltas["actor"]) ?? {};
if (!delta) throw new Error('expected to find delta');
if (!actor) throw new Error('expected to find actor');
const film = valueFromCollapsedDelta("film", delta);
if (!film) throw new Error('expected to find film');
acc.roles.push({
role: cur.id,
actor,
film
});
}
return acc;
}
const resolver = (acc: Summary): Summary => {
return acc;
}
describe('Lossy', () => { describe('Lossy', () => {
describe('se a provided function to resolve entity views', () => { describe('use a provided initializer, reducer, and resolver to resolve entity views', () => {
const node = new RhizomeNode(); const node = new RhizomeNode();
const lossless = new Lossless(node); const lossless = new Lossless(node);
const lossy = new Lossy(lossless);
const lossy = new Lossy(lossless, initializer, reducer, resolver);
beforeAll(() => { beforeAll(() => {
lossless.ingestDelta(new Delta({ lossless.ingestDelta(new Delta({
@ -36,36 +83,8 @@ describe('Lossy', () => {
}); });
it('example summary', () => { it('example summary', () => {
type Role = { const result = lossy.resolve();
actor: PointerTarget, debug('result', result);
film: PointerTarget,
role: PointerTarget
};
type Summary = {
roles: Role[];
};
const resolver = (losslessView: LosslessViewMany): Summary => {
const roles: Role[] = [];
for (const [id, ent] of Object.entries(losslessView)) {
if (ent.referencedAs.includes("role")) {
const {delta, value: actor} = lastValueFromLosslessViewOne(ent, "actor") ?? {};
if (!delta) continue; // TODO: panic
if (!actor) continue; // TODO: panic
const film = valueFromCollapsedDelta(delta, "film");
if (!film) continue; // TODO: panic
roles.push({
role: id,
actor,
film
});
}
}
return {roles};
}
const result = lossy.resolve<Summary>(resolver);
expect(result).toEqual({ expect(result).toEqual({
roles: [{ roles: [{
film: "the_matrix", film: "the_matrix",

View File

@ -6,9 +6,9 @@
import Debug from 'debug'; import Debug from 'debug';
import {randomUUID} from "node:crypto"; import {randomUUID} from "node:crypto";
import EventEmitter from "node:events"; import EventEmitter from "node:events";
import {Delta, DeltaFilter} from "./delta.js"; import {Delta} from "./delta.js";
import {Entity, EntityProperties} from "./entity.js"; import {Entity, EntityProperties} from "./entity.js";
import {Lossy, ResolvedViewOne, Resolver} from "./lossy.js"; import {LastWriteWins, ResolvedViewOne} from './last-write-wins.js';
import {RhizomeNode} from "./node.js"; import {RhizomeNode} from "./node.js";
import {DomainEntityID} from "./types.js"; import {DomainEntityID} from "./types.js";
const debug = Debug('rz:collection'); const debug = Debug('rz:collection');
@ -17,7 +17,7 @@ export class Collection {
rhizomeNode?: RhizomeNode; rhizomeNode?: RhizomeNode;
name: string; name: string;
eventStream = new EventEmitter(); eventStream = new EventEmitter();
lossy?: Lossy; lossy?: LastWriteWins;
constructor(name: string) { constructor(name: string) {
this.name = name; this.name = name;
@ -32,7 +32,7 @@ export class Collection {
rhizomeConnect(rhizomeNode: RhizomeNode) { rhizomeConnect(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode; this.rhizomeNode = rhizomeNode;
this.lossy = new Lossy(this.rhizomeNode.lossless); this.lossy = new LastWriteWins(this.rhizomeNode.lossless);
// Listen for completed transactions, and emit updates to event stream // Listen for completed transactions, and emit updates to event stream
this.rhizomeNode.lossless.eventStream.on("updated", (id) => { this.rhizomeNode.lossless.eventStream.on("updated", (id) => {
@ -58,7 +58,6 @@ export class Collection {
newProperties: EntityProperties, newProperties: EntityProperties,
creator: string, creator: string,
host: string, host: string,
resolver?: Resolver
): { ): {
transactionDelta: Delta | undefined, transactionDelta: Delta | undefined,
deltas: Delta[] deltas: Delta[]
@ -67,7 +66,7 @@ export class Collection {
let oldProperties: EntityProperties = {}; let oldProperties: EntityProperties = {};
if (entityId) { if (entityId) {
const entity = this.resolve(entityId, resolver); const entity = this.resolve(entityId);
if (entity) { if (entity) {
oldProperties = entity.properties; oldProperties = entity.properties;
} }
@ -155,7 +154,6 @@ export class Collection {
async put( async put(
entityId: DomainEntityID | undefined, entityId: DomainEntityID | undefined,
properties: EntityProperties, properties: EntityProperties,
resolver?: Resolver
): Promise<ResolvedViewOne> { ): Promise<ResolvedViewOne> {
if (!this.rhizomeNode) throw new Error('collection not connecte to rhizome'); if (!this.rhizomeNode) throw new Error('collection not connecte to rhizome');
@ -173,7 +171,6 @@ export class Collection {
properties, properties,
this.rhizomeNode?.config.creator, this.rhizomeNode?.config.creator,
this.rhizomeNode?.config.peerId, this.rhizomeNode?.config.peerId,
resolver,
); );
const ingested = new Promise<boolean>((resolve) => { const ingested = new Promise<boolean>((resolve) => {
@ -204,21 +201,19 @@ export class Collection {
await ingested; await ingested;
const res = this.resolve(entityId, resolver); const res = this.resolve(entityId);
if (!res) throw new Error("could not get what we just put!"); if (!res) throw new Error("could not get what we just put!");
return res; return res;
} }
resolve<T = ResolvedViewOne>( resolve(
id: string, id: string
resolver?: Resolver, ): ResolvedViewOne | undefined {
deltaFilter?: DeltaFilter
): T | undefined {
if (!this.rhizomeNode) throw new Error('collection not connected to rhizome'); if (!this.rhizomeNode) throw new Error('collection not connected to rhizome');
if (!this.lossy) throw new Error('lossy view not initialized'); if (!this.lossy) throw new Error('lossy view not initialized');
const res = this.lossy.resolve(resolver, [id], deltaFilter) || {}; const res = this.lossy.resolve([id]) || {};
return res[id] as T; return res[id];
} }
} }

104
src/last-write-wins.ts Normal file
View File

@ -0,0 +1,104 @@
import Debug from 'debug';
import {Lossy, valueFromCollapsedDelta} from './lossy.js';
import {EntityProperties} from "./entity.js";
import {CollapsedDelta, Lossless, LosslessViewOne} from "./lossless.js";
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js";
const debug = Debug('rz:lossy:last-write-wins');
type TimestampedProperty = {
value: PropertyTypes,
timeUpdated: Timestamp
};
type TimestampedProperties = {
[key: PropertyID]: TimestampedProperty
};
export type LossyViewOne<T = TimestampedProperties> = {
id: DomainEntityID;
properties: T;
};
export type LossyViewMany<T = TimestampedProperties> = ViewMany<LossyViewOne<T>>;
export type ResolvedViewOne = LossyViewOne<EntityProperties>;
export type ResolvedViewMany = ViewMany<ResolvedViewOne>;
type Accumulator = LossyViewMany<TimestampedProperties>;
type Result = LossyViewMany<EntityProperties>;
// Function for resolving a value for an entity by last write wins
export function lastValueFromDeltas(
key: string,
deltas?: CollapsedDelta[]
): {
delta?: CollapsedDelta,
value?: string | number,
timeUpdated?: number
} | undefined {
const res: {
delta?: CollapsedDelta,
value?: string | number,
timeUpdated?: number
} = {};
res.timeUpdated = 0;
for (const delta of deltas || []) {
const value = valueFromCollapsedDelta(key, delta);
if (value === undefined) continue;
if (res.timeUpdated && delta.timeCreated < res.timeUpdated) continue;
res.delta = delta;
res.value = value;
res.timeUpdated = delta.timeCreated;
}
return res;
}
function initializer(): Accumulator {
return {};
};
function reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator {
if (!acc[cur.id]) {
acc[cur.id] = {id: cur.id, properties: {}};
}
for (const [key, deltas] of Object.entries(cur.propertyDeltas)) {
debug(`reducer: looking for value for key ${key}`);
const {value, timeUpdated} = lastValueFromDeltas(key, deltas) || {};
debug(`reducer: key ${key} value ${value} timeUpdated ${timeUpdated}`);
if (!value || !timeUpdated) continue;
if (timeUpdated > (acc[cur.id].properties[key]?.timeUpdated || 0)) {
acc[cur.id].properties[key] = {
value,
timeUpdated
};
}
}
return acc;
};
function resolver(cur: Accumulator): Result {
const res: Result = {};
for (const [id, ent] of Object.entries(cur)) {
res[id] = {id, properties: {}};
for (const [key, {value}] of Object.entries(ent.properties)) {
res[id].properties[key] = value;
}
}
return res;
};
export class LastWriteWins extends Lossy<Accumulator, Result> {
constructor(
readonly lossless: Lossless,
) {
super(lossless, initializer, reducer, resolver);
}
}

View File

@ -3,10 +3,10 @@
import Debug from 'debug'; import Debug from 'debug';
import EventEmitter from 'events'; import EventEmitter from 'events';
import {Delta, DeltaFilter, DeltaNetworkImage} from './delta.js'; import {Delta, DeltaFilter, DeltaID, DeltaNetworkImage} from './delta.js';
import {RhizomeNode} from './node.js';
import {Transactions} from './transactions.js'; import {Transactions} from './transactions.js';
import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types.js"; import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types.js";
import {RhizomeNode} from './node.js';
const debug = Debug('rz:lossless'); const debug = Debug('rz:lossless');
export type CollapsedPointer = {[key: PropertyID]: PropertyTypes}; export type CollapsedPointer = {[key: PropertyID]: PropertyTypes};
@ -16,8 +16,9 @@ export type CollapsedDelta = Omit<DeltaNetworkImage, 'pointers'> & {
}; };
export type LosslessViewOne = { export type LosslessViewOne = {
id: DomainEntityID,
referencedAs: string[]; referencedAs: string[];
properties: { propertyDeltas: {
[key: PropertyID]: CollapsedDelta[] [key: PropertyID]: CollapsedDelta[]
} }
}; };
@ -27,12 +28,9 @@ export type LosslessViewMany = ViewMany<LosslessViewOne>;
class LosslessEntityMap extends Map<DomainEntityID, LosslessEntity> {}; class LosslessEntityMap extends Map<DomainEntityID, LosslessEntity> {};
class LosslessEntity { class LosslessEntity {
id: DomainEntityID;
properties = new Map<PropertyID, Set<Delta>>(); properties = new Map<PropertyID, Set<Delta>>();
constructor(id: DomainEntityID) { constructor(readonly lossless: Lossless, readonly id: DomainEntityID) {}
this.id = id;
}
addDelta(delta: Delta) { addDelta(delta: Delta) {
const targetContexts = delta.pointers const targetContexts = delta.pointers
@ -48,6 +46,7 @@ class LosslessEntity {
} }
propertyDeltas.add(delta); propertyDeltas.add(delta);
debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `entity ${this.id} added delta:`, JSON.stringify(delta));
} }
} }
@ -71,12 +70,12 @@ export class Lossless {
constructor(readonly rhizomeNode: RhizomeNode) { constructor(readonly rhizomeNode: RhizomeNode) {
this.transactions = new Transactions(this); this.transactions = new Transactions(this);
this.transactions.eventStream.on("completed", (transactionId) => { this.transactions.eventStream.on("completed", (transactionId, deltaIds) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `Completed transaction ${transactionId}`); debug(`[${this.rhizomeNode.config.peerId}]`, `Completed transaction ${transactionId}`);
const transaction = this.transactions.get(transactionId); const transaction = this.transactions.get(transactionId);
if (!transaction) return; if (!transaction) return;
for (const id of transaction.entityIds) { for (const id of transaction.entityIds) {
this.eventStream.emit("updated", id); this.eventStream.emit("updated", id, deltaIds);
} }
}); });
} }
@ -91,7 +90,7 @@ export class Lossless {
let ent = this.domainEntities.get(target); let ent = this.domainEntities.get(target);
if (!ent) { if (!ent) {
ent = new LosslessEntity(target); ent = new LosslessEntity(this, target);
this.domainEntities.set(target, ent); this.domainEntities.set(target, ent);
} }
@ -116,28 +115,49 @@ export class Lossless {
if (!transactionId) { if (!transactionId) {
// No transaction -- we can issue an update event immediately // No transaction -- we can issue an update event immediately
for (const id of targets) { for (const id of targets) {
this.eventStream.emit("updated", id); this.eventStream.emit("updated", id, [delta.id]);
} }
} }
return transactionId; return transactionId;
} }
viewSpecific(entityId: DomainEntityID, deltaIds: DeltaID[], deltaFilter?: DeltaFilter): LosslessViewOne | undefined {
debug(`[${this.rhizomeNode.config.peerId}]`, `viewSpecific, deltaIds:`, JSON.stringify(deltaIds));
const combinedFilter = (delta: Delta) => {
debug(`[${this.rhizomeNode.config.peerId}]`, `combinedFilter, deltaIds:`, JSON.stringify(deltaIds));
if (!deltaIds.includes(delta.id)) {
debug(`[${this.rhizomeNode.config.peerId}]`, `Excluding delta ${delta.id} because it's not in the requested list of deltas`);
return false;
}
if (!deltaFilter) return true;
return deltaFilter(delta);
};
const res = this.view([entityId], (delta) => combinedFilter(delta));
return res[entityId];
}
view(entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): LosslessViewMany { view(entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): LosslessViewMany {
const view: LosslessViewMany = {}; const view: LosslessViewMany = {};
entityIds = entityIds ?? Array.from(this.domainEntities.keys()); entityIds = entityIds ?? Array.from(this.domainEntities.keys());
for (const id of entityIds) { for (const id of entityIds) {
const ent = this.domainEntities.get(id); const ent = this.domainEntities.get(id);
if (!ent) continue; if (!ent) continue;
const referencedAs = new Set<string>(); const referencedAs = new Set<string>();
const properties: { const propertyDeltas: {
[key: PropertyID]: CollapsedDelta[] [key: PropertyID]: CollapsedDelta[]
} = {}; } = {};
for (const [key, deltas] of ent.properties.entries()) { for (const [key, deltas] of ent.properties.entries()) {
properties[key] = properties[key] || []; propertyDeltas[key] = propertyDeltas[key] || [];
for (const delta of deltas) { for (const delta of deltas) {
if (deltaFilter && !deltaFilter(delta)) {
continue;
}
// If this delta is part of a transaction, // If this delta is part of a transaction,
// we need to be able to wait for the whole transaction. // we need to be able to wait for the whole transaction.
if (delta.transactionId) { if (delta.transactionId) {
@ -148,11 +168,6 @@ export class Lossless {
} }
} }
if (deltaFilter) {
const include = deltaFilter(delta);
if (!include) continue;
}
const pointers: CollapsedPointer[] = []; const pointers: CollapsedPointer[] = [];
for (const {localContext, target} of delta.pointers) { for (const {localContext, target} of delta.pointers) {
@ -162,20 +177,21 @@ export class Lossless {
} }
} }
const collapsedDelta: CollapsedDelta = { propertyDeltas[key].push({
...delta, ...delta,
pointers pointers
}; });
properties[key].push(collapsedDelta);
} }
} }
view[ent.id] = { view[ent.id] = {
id: ent.id,
referencedAs: Array.from(referencedAs.values()), referencedAs: Array.from(referencedAs.values()),
properties propertyDeltas
}; };
} }
debug(`[${this.rhizomeNode.config.peerId}]`, `Returning view:`, JSON.stringify(view, null, 2));
return view; return view;
} }

View File

@ -5,36 +5,20 @@
// We can achieve this via functional expression, encoded as JSON-Logic. // We can achieve this via functional expression, encoded as JSON-Logic.
// Fields in the output can be described as transformations // Fields in the output can be described as transformations
// import Debug from 'debug'; import Debug from 'debug';
import {DeltaFilter} from "./delta.js"; import {DeltaFilter, DeltaID} from "./delta.js";
import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless.js"; import {CollapsedDelta, Lossless, LosslessViewOne} from "./lossless.js";
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types.js"; import {DomainEntityID} from "./types.js";
// const debug = Debug('rz:lossy'); const debug = Debug('rz:lossy');
type TimestampedProperty = { export type Initializer<Accumulator> = (v: LosslessViewOne) => Accumulator;
value: PropertyTypes, export type Reducer<Accumulator> = (acc: Accumulator, cur: LosslessViewOne) => Accumulator;
timeUpdated: Timestamp export type Resolver<Accumulator, Result> = (cur: Accumulator) => Result;
};
export type LossyViewOne<T = TimestampedProperty> = {
id: DomainEntityID;
properties: {
[key: PropertyID]: T
};
};
export type LossyViewMany<T> = ViewMany<LossyViewOne<T>>;
export type ResolvedViewOne = LossyViewOne<PropertyTypes>;
export type ResolvedViewMany = ViewMany<ResolvedViewOne>;
export type Resolver<T = ResolvedViewMany> =
(losslessView: LosslessViewMany) => T;
// Extract a particular value from a delta's pointers // Extract a particular value from a delta's pointers
export function valueFromCollapsedDelta( export function valueFromCollapsedDelta(
delta: CollapsedDelta, key: string,
key: string delta: CollapsedDelta
): string | number | undefined { ): string | number | undefined {
for (const pointer of delta.pointers) { for (const pointer of delta.pointers) {
for (const [k, value] of Object.entries(pointer)) { for (const [k, value] of Object.entries(pointer)) {
@ -45,76 +29,57 @@ export function valueFromCollapsedDelta(
} }
} }
// Function for resolving a value for an entity by last write wins
export function lastValueFromLosslessViewOne(
ent: LosslessViewOne,
key: string
): {
delta?: CollapsedDelta,
value?: string | number,
timeUpdated?: number
} | undefined {
const res: {
delta?: CollapsedDelta,
value?: string | number,
timeUpdated?: number
} = {};
res.timeUpdated = 0;
for (const delta of ent.properties[key] || []) {
const value = valueFromCollapsedDelta(delta, key);
if (value === undefined) continue;
if (delta.timeCreated < res.timeUpdated) continue;
res.delta = delta;
res.value = value;
res.timeUpdated = delta.timeCreated;
}
return res;
}
// TODO: Incremental updates of lossy models. For example, with last-write-wins, // TODO: Incremental updates of lossy models. For example, with last-write-wins,
// we keep the timeUpdated for each field. A second stage resolver can rearrange // we keep the timeUpdated for each field. A second stage resolver can rearrange
// the data structure to a preferred shape and may discard the timeUpdated info. // the data structure to a preferred shape and may discard the timeUpdated info.
export class Lossy { export class Lossy<Accumulator, Result> {
lossless: Lossless; deltaFilter?: DeltaFilter;
accumulator?: Accumulator;
constructor(lossless: Lossless) { constructor(
this.lossless = lossless; readonly lossless: Lossless,
readonly initializer: Initializer<Accumulator>,
readonly reducer: Reducer<Accumulator>,
readonly resolver: Resolver<Accumulator, Result>,
) {
this.lossless.eventStream.on("updated", (id, deltaIds) => {
debug(`[${this.lossless.rhizomeNode.config.peerId}] entity ${id} updated, deltaIds:`,
JSON.stringify(deltaIds));
this.ingestUpdate(id, deltaIds);
});
}
ingestUpdate(id: DomainEntityID, deltaIds: DeltaID[]) {
debug(`[${this.lossless.rhizomeNode.config.peerId}] prior to ingesting update, deltaIds:`, deltaIds);
const losslessPartial = this.lossless.viewSpecific(id, deltaIds, this.deltaFilter);
debug(`[${this.lossless.rhizomeNode.config.peerId}] prior to ingesting update, lossless partial:`,
JSON.stringify(losslessPartial, null, 2));
if (!losslessPartial) return;
const latest = this.accumulator || this.initializer(losslessPartial);
this.accumulator = this.reducer(latest, losslessPartial);
debug(`[${this.lossless.rhizomeNode.config.peerId}] after ingesting update, entity ${id} accumulator:`,
JSON.stringify(this.accumulator, null, 2));
} }
// Using the lossless view of some given domain entities, // Using the lossless view of some given domain entities,
// apply a filter to the deltas composing that lossless view, // apply a filter to the deltas composing that lossless view,
// and then apply a supplied resolver function which receives // and then apply a supplied resolver function which receives
// the filtered lossless view as input. // the filtered lossless view as input.
// TODO: Cache things! // resolve<T = ResolvedViewOne>(fn?: Resolver<T> | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T {
resolve<T = ResolvedViewOne>(fn?: Resolver<T> | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T { resolve(entityIds?: DomainEntityID[]): Result | undefined {
if (!fn) { if (!entityIds) {
fn = (v) => this.defaultResolver(v); entityIds = Array.from(this.lossless.domainEntities.keys());
}
const losslessView = this.lossless.view(entityIds, deltaFilter);
return fn(losslessView) as T;
} }
defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany { if (!this.accumulator) return undefined;
const resolved: ResolvedViewMany = {};
// debug(`[${this.lossless.rhizomeNode.config.peerId}]`, 'Default resolver, lossless view', JSON.stringify(losslessView)); return this.resolver(this.accumulator);
for (const [id, ent] of Object.entries(losslessView)) {
resolved[id] = {id, properties: {}};
for (const key of Object.keys(ent.properties)) {
const {value} = lastValueFromLosslessViewOne(ent, key) || {};
// debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `[ ${key} ] = ${value}`);
resolved[id].properties[key] = value;
} }
} }
return resolved;
};
}
// Generate a rule // Generate a rule
// Apply the rule -- When? // Apply the rule -- When?

View File

@ -1,8 +1,8 @@
import Debug from "debug"; import Debug from "debug";
import EventEmitter from "events"; import EventEmitter from "events";
import {Delta, DeltaID} from "./delta.js"; import {Delta, DeltaID} from "./delta.js";
import {DomainEntityID, TransactionID} from "./types.js";
import {Lossless} from "./lossless.js"; import {Lossless} from "./lossless.js";
import {DomainEntityID, TransactionID} from "./types.js";
const debug = Debug('rz:transactions'); const debug = Debug('rz:transactions');
function getDeltaTransactionId(delta: Delta): TransactionID | undefined { function getDeltaTransactionId(delta: Delta): TransactionID | undefined {
@ -53,6 +53,19 @@ export class Transaction {
size?: number; size?: number;
receivedDeltaIds = new Set<DeltaID>(); receivedDeltaIds = new Set<DeltaID>();
entityIds = new Set<DomainEntityID>(); entityIds = new Set<DomainEntityID>();
resolved: Promise<boolean>;
constructor(readonly transactions: Transactions, readonly id: TransactionID) {
this.resolved = new Promise((resolve) => {
this.transactions.eventStream.on("completed", (transactionId) => {
if (transactionId === this.id) resolve(true);
});
});
}
getReceivedDeltaIds() {
return Array.from(this.receivedDeltaIds.values());
}
} }
export class Transactions { export class Transactions {
@ -68,13 +81,14 @@ export class Transactions {
getOrInit(id: TransactionID): Transaction { getOrInit(id: TransactionID): Transaction {
let t = this.transactions.get(id); let t = this.transactions.get(id);
if (!t) { if (!t) {
t = new Transaction(); t = new Transaction(this, id);
this.transactions.set(id, t); this.transactions.set(id, t);
} }
return t; return t;
} }
ingestDelta(delta: Delta, targets: DomainEntityID[]): TransactionID | undefined { ingestDelta(delta: Delta, targets: DomainEntityID[]): TransactionID | undefined {
// This delta may be part of a transaction
{ {
const transactionId = getDeltaTransactionId(delta); const transactionId = getDeltaTransactionId(delta);
if (transactionId) { if (transactionId) {
@ -83,7 +97,6 @@ export class Transactions {
t.entityIds.add(id); t.entityIds.add(id);
} }
// This delta is part of a transaction
// Add this to the delta's data structure for quick reference // Add this to the delta's data structure for quick reference
delta.transactionId = transactionId; delta.transactionId = transactionId;
@ -92,25 +105,25 @@ export class Transactions {
// Notify that the transaction is complete // Notify that the transaction is complete
if (this.isComplete(transactionId)) { if (this.isComplete(transactionId)) {
this.eventStream.emit("completed", transactionId); this.eventStream.emit("completed", t.id, t.getReceivedDeltaIds());
} }
return transactionId; return transactionId;
} }
} }
// This delta may describe a transaction
{ {
const {transactionId, size} = getTransactionSize(delta) || {}; const {transactionId, size} = getTransactionSize(delta) || {};
if (transactionId && size) { if (transactionId && size) {
// This delta describes a transaction
debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `Transaction ${transactionId} has size ${size}`); debug(`[${this.lossless.rhizomeNode.config.peerId}]`, `Transaction ${transactionId} has size ${size}`);
this.setSize(transactionId, size as number); this.setSize(transactionId, size as number);
// Check if the transaction is complete // Check if the transaction is complete
if (this.isComplete(transactionId)) { if (this.isComplete(transactionId)) {
this.eventStream.emit("completed", transactionId); const t = this.getOrInit(transactionId);
this.eventStream.emit("completed", t.id, t.getReceivedDeltaIds());
} }
return transactionId; return transactionId;
@ -124,8 +137,16 @@ export class Transactions {
} }
isComplete(id: TransactionID) { isComplete(id: TransactionID) {
const t = this.getOrInit(id); const t = this.get(id);
return t.size !== undefined && t.receivedDeltaIds.size === t.size; if (!t) return false;
if (t.size === undefined) return false;
return t.receivedDeltaIds.size === t.size;
}
async waitFor(id: TransactionID) {
const t = this.get(id);
if (!t) return;
await t.resolved;
} }
setSize(id: TransactionID, size: number) { setSize(id: TransactionID, size: number) {