transaction improvements

This commit is contained in:
Ladd Hoffman 2024-12-30 01:23:11 -06:00
parent 066c03f690
commit 198c996231
11 changed files with 322 additions and 222 deletions

View File

@ -1,7 +1,7 @@
import Debug from 'debug'; import Debug from 'debug';
import {RhizomeNode} from "../src/node";
import {Entity} from "../src/entity";
import {Collection} from "../src/collection"; import {Collection} from "../src/collection";
import {Entity} from "../src/entity";
import {RhizomeNode} from "../src/node";
const debug = Debug('example-app'); const debug = Debug('example-app');
// As an app we want to be able to write and read data. // As an app we want to be able to write and read data.
@ -36,11 +36,17 @@ type User = {
await rhizomeNode.start(); await rhizomeNode.start();
// Let's use the rhizomic database for some more things. // TODO: Use the rhizomic database for some more things.
// Like what? // Like what?
// - Logging // - Logging
// - Chat // - Chat
//
// TODO: Allow configuration regarding read/write concern i.e.
// if we perform a read immediately do we see the value we wrote?
// Intuition says yes, we want that-- but how do we expose the propagation status?
// Insert a "user" record
const taliesinData: User = { const taliesinData: User = {
id: 'taliesin-1', id: 'taliesin-1',
name: 'Taliesin', name: 'Taliesin',
@ -48,10 +54,13 @@ type User = {
age: Math.floor(Math.random() * 1000) age: Math.floor(Math.random() * 1000)
}; };
const taliesinPutResult = await users.put(undefined, taliesinData);
{ {
const result = JSON.stringify(taliesinPutResult); const taliesinPutResult = await users.put(undefined, taliesinData);
const resolvedUser = {
id: taliesinPutResult.id,
...taliesinPutResult.properties
} as User;
const result = JSON.stringify(resolvedUser);
const expected = JSON.stringify(taliesinData); const expected = JSON.stringify(taliesinData);
if (result === expected) { if (result === expected) {
@ -63,32 +72,17 @@ type User = {
} }
} }
// TODO: Allow configuration regarding read/write concern i.e. // Read back what we wrote
// if we perform a read immediately do we see the value we wrote?
// Intuition says yes, we want that-- but how do we expose the propagation status?
{
const resolved = users.resolve('taliesin-1'); const resolved = users.resolve('taliesin-1');
if (!resolved) throw new Error('unable to resolve entity we just created'); if (!resolved) throw new Error('unable to resolve entity we just created');
debug('resolved', resolved);
const resolvedUser = { const resolvedUser = {
id: resolved.id, id: resolved.id,
...resolved.properties ...resolved.properties
} as User; } as User;
/*
function sortKeys (o: {[key: string]: unknown}): {[key: string]: unknown} {
const r: {[key: string]: unknown} = {};
r.id = o.id;
Object.keys(o).sort().forEach((key) => {
if (key === "id") return;
r[key] = o[key];
})
return r;
}
*/
const result = JSON.stringify(resolvedUser); const result = JSON.stringify(resolvedUser);
const expected = JSON.stringify(taliesinData); const expected = JSON.stringify(taliesinData);
@ -99,6 +93,7 @@ type User = {
`\n\nExpected \n${expected}` + `\n\nExpected \n${expected}` +
`\nReceived\n${result}`); `\nReceived\n${result}`);
} }
}
})(); })();

View File

@ -6,7 +6,7 @@
import Debug from 'debug'; import Debug from 'debug';
import {randomUUID} from "node:crypto"; import {randomUUID} from "node:crypto";
import EventEmitter from "node:events"; import EventEmitter from "node:events";
import {Delta, DeltaID} from "./delta"; import {Delta, DeltaFilter} from "./delta";
import {Entity, EntityProperties} from "./entity"; import {Entity, EntityProperties} from "./entity";
import {Lossy, ResolvedViewOne, Resolver} from "./lossy"; import {Lossy, ResolvedViewOne, Resolver} from "./lossy";
import {RhizomeNode} from "./node"; import {RhizomeNode} from "./node";
@ -17,6 +17,7 @@ export class Collection {
rhizomeNode?: RhizomeNode; rhizomeNode?: RhizomeNode;
name: string; name: string;
eventStream = new EventEmitter(); eventStream = new EventEmitter();
lossy?: Lossy;
constructor(name: string) { constructor(name: string) {
this.name = name; this.name = name;
@ -31,10 +32,15 @@ export class Collection {
rhizomeConnect(rhizomeNode: RhizomeNode) { rhizomeConnect(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode; this.rhizomeNode = rhizomeNode;
rhizomeNode.deltaStream.subscribeDeltas((delta: Delta) => { this.lossy = new Lossy(this.rhizomeNode.lossless);
// TODO: Make sure this is the kind of delta we're looking for
debug(`collection ${this.name} received delta ${JSON.stringify(delta)}`); // Listen for completed transactions, and emit updates to event stream
this.ingestDelta(delta); this.rhizomeNode.lossless.eventStream.on("updated", (id) => {
// TODO: Filter so we only get members of our collection
// TODO: Reslover / Delta Filter?
const res = this.resolve(id);
this.eventStream.emit("update", res);
}); });
rhizomeNode.httpServer.httpApi.serveCollection(this); rhizomeNode.httpServer.httpApi.serveCollection(this);
@ -42,15 +48,6 @@ export class Collection {
debug(`connected ${this.name} to rhizome`); debug(`connected ${this.name} to rhizome`);
} }
ingestDelta(delta: Delta) {
if (!this.rhizomeNode) return;
const updated = this.rhizomeNode.lossless.ingestDelta(delta);
this.eventStream.emit('ingested', delta);
this.eventStream.emit('updated', updated);
}
// Applies the javascript rules for updating object values, // Applies the javascript rules for updating object values,
// e.g. set to `undefined` to delete a property. // e.g. set to `undefined` to delete a property.
// This function is here instead of Entity so that it can: // This function is here instead of Entity so that it can:
@ -62,7 +59,10 @@ export class Collection {
creator: string, creator: string,
host: string, host: string,
resolver?: Resolver resolver?: Resolver
): Delta[] { ): {
transactionDelta: Delta | undefined,
deltas: Delta[]
} {
const deltas: Delta[] = []; const deltas: Delta[] = [];
let oldProperties: EntityProperties = {}; let oldProperties: EntityProperties = {};
@ -86,10 +86,6 @@ export class Collection {
creator, creator,
host, host,
pointers: [{ pointers: [{
localContext: "_transaction",
target: transactionId,
targetContext: "deltas"
}, {
localContext: this.name, localContext: this.name,
target: entityId, target: entityId,
targetContext: key targetContext: key
@ -101,8 +97,11 @@ export class Collection {
} }
}); });
let transactionDelta: Delta | undefined;
if (deltas.length > 1) {
// We can generate a separate delta describing this transaction // We can generate a separate delta describing this transaction
const transactionDelta = new Delta({ transactionDelta = new Delta({
creator, creator,
host, host,
pointers: [{ pointers: [{
@ -115,7 +114,17 @@ export class Collection {
}] }]
}); });
return [transactionDelta, ...deltas]; // Also need to annotate the deltas with the transactionId
for (const delta of deltas) {
delta.pointers.unshift({
localContext: "_transaction",
target: transactionId,
targetContext: "deltas"
});
}
}
return {transactionDelta, deltas};
} }
onCreate(cb: (entity: Entity) => void) { onCreate(cb: (entity: Entity) => void) {
@ -159,7 +168,7 @@ export class Collection {
entityId = randomUUID(); entityId = randomUUID();
} }
const deltas = this.generateDeltas( const {transactionDelta, deltas} = this.generateDeltas(
entityId, entityId,
properties, properties,
this.rhizomeNode?.config.creator, this.rhizomeNode?.config.creator,
@ -167,67 +176,47 @@ export class Collection {
resolver, resolver,
); );
debug(`put ${entityId} generated deltas:`, JSON.stringify(deltas)); const ingested = new Promise<boolean>((resolve) => {
this.rhizomeNode!.lossless.eventStream.on("updated", (id: DomainEntityID) => {
// Here we set up a listener so we can wait for all our deltas to be if (id === entityId) resolve(true);
// ingested into our lossless view before proceeding.
// TODO: Hoist this into a more generic transaction mechanism.
//
const allIngested = new Promise<boolean>((resolve) => {
const ingestedIds = new Set<DeltaID>();
this.eventStream.on('ingested', (delta: Delta) => {
// TODO: timeout
if (deltas.map(({id}) => id).includes(delta.id)) {
ingestedIds.add(delta.id);
if (ingestedIds.size === deltas.length) {
resolve(true);
}
}
}) })
}); });
deltas.forEach(async (delta: Delta) => { if (transactionDelta) {
deltas.unshift(transactionDelta);
}
deltas.forEach(async (delta: Delta) => {
// record this delta just as if we had received it from a peer // record this delta just as if we had received it from a peer
delta.receivedFrom = this.rhizomeNode!.myRequestAddr; delta.receivedFrom = this.rhizomeNode!.myRequestAddr;
this.rhizomeNode!.deltaStream.deltasAccepted.push(delta); this.rhizomeNode!.deltaStream.deltasAccepted.push(delta);
// publish the delta to our subscribed peers // publish the delta to our subscribed peers
await this.rhizomeNode!.deltaStream.publishDelta(delta); await this.rhizomeNode!.deltaStream.publishDelta(delta);
debug(`published delta ${JSON.stringify(delta)}`);
// ingest the delta as though we had received it from a peer // ingest the delta as though we had received it from a peer
this.ingestDelta(delta); this.rhizomeNode!.lossless.ingestDelta(delta);
}); });
// Return updated view of this entity // Return updated view of this entity
// Let's wait for an event notifying us that the entity has been updated. // Let's wait for an event notifying us that the entity has been updated.
// This means all of our deltas have been applied. // This means all of our deltas have been applied.
await allIngested; await ingested;
const res = this.resolve(entityId, resolver); const res = this.resolve(entityId, resolver);
if (!res) throw new Error("could not get what we just put!"); if (!res) throw new Error("could not get what we just put!");
this.eventStream.emit("update", res);
return res; return res;
} }
resolve<T = ResolvedViewOne>(id: string, resolver?: Resolver): T | undefined { resolve<T = ResolvedViewOne>(
// Now with lossy view approach, instead of just returning what we id: string,
// already have, let's compute our view now. resolver?: Resolver,
// return this.entities.resolve(id); deltaFilter?: DeltaFilter
// TODO: Caching ): T | undefined {
if (!this.rhizomeNode) return undefined; if (!this.rhizomeNode) return undefined;
const lossy = new Lossy(this.rhizomeNode.lossless); const res = this.lossy?.resolve(resolver, [id], deltaFilter) || {};
// TODO: deltaFilter
const res = lossy.resolve(resolver, [id]);
debug('lossy view', res);
return res[id] as T; return res[id] as T;
} }

View File

@ -1,6 +1,6 @@
import microtime from 'microtime';
import {randomUUID} from "crypto"; import {randomUUID} from "crypto";
import {PeerAddress, Timestamp} from "./types"; import microtime from 'microtime';
import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types";
export type DeltaID = string; export type DeltaID = string;
@ -12,22 +12,39 @@ export type Pointer = {
targetContext?: string; targetContext?: string;
}; };
export class Delta { export class DeltaNetworkImage {
id: DeltaID; id: DeltaID;
timeCreated: Timestamp;
host: HostID;
creator: CreatorID;
pointers: Pointer[];
constructor({id, timeCreated, host, creator, pointers}: DeltaNetworkImage) {
this.id = id;
this.host = host;
this.creator = creator;
this.timeCreated = timeCreated;
this.pointers = pointers;
}
};
export class Delta extends DeltaNetworkImage {
receivedFrom?: PeerAddress; receivedFrom?: PeerAddress;
timeReceived: Timestamp; timeReceived: Timestamp;
timeCreated: Timestamp; transactionId?: TransactionID;
creator: string;
host: string; // TODO: Verify the following assumption:
pointers: Pointer[] = []; // We're assuming that you only call this constructor when
constructor(delta: Omit<Delta, "id" | "timeReceived" | "timeCreated">) { // actually creating a new delta.
this.id = randomUUID(); // When receiving one from the network, you can
this.timeCreated = microtime.now(); constructor({host, creator, pointers}: Partial<DeltaNetworkImage>) {
// TODO: Verify that when receiving a delta from the network we can
// retain the delta's id.
const id = randomUUID();
const timeCreated = microtime.now();
if (!host || !creator || !pointers) throw new Error('uninitializied values');
super({id, timeCreated, host, creator, pointers});
this.timeCreated = timeCreated;
this.timeReceived = this.timeCreated; this.timeReceived = this.timeCreated;
this.creator = delta.creator;
this.host = delta.host;
this.receivedFrom = delta.receivedFrom;
this.pointers = delta.pointers;
} }
} }

View File

@ -1,7 +1,7 @@
import Debug from 'debug'; import Debug from 'debug';
import EventEmitter from 'node:events'; import EventEmitter from 'node:events';
import objectHash from 'object-hash'; import objectHash from 'object-hash';
import {Delta} from './delta'; import {Delta, DeltaNetworkImage} from './delta';
import {RhizomeNode} from './node'; import {RhizomeNode} from './node';
const debug = Debug('deltas'); const debug = Debug('deltas');
@ -91,7 +91,8 @@ export class DeltaStream {
} }
serializeDelta(delta: Delta): string { serializeDelta(delta: Delta): string {
return JSON.stringify(delta); const deltaNetworkImage = new DeltaNetworkImage(delta);
return JSON.stringify(deltaNetworkImage);
} }
deserializeDelta(input: string): Delta { deserializeDelta(input: string): Delta {

View File

@ -2,13 +2,15 @@
// We can maintain a record of all the targeted entities, and the deltas that targeted them // We can maintain a record of all the targeted entities, and the deltas that targeted them
import Debug from 'debug'; import Debug from 'debug';
import {Delta, DeltaFilter, DeltaID} from './delta'; import EventEmitter from 'events';
import {Delta, DeltaFilter, DeltaNetworkImage} from './delta';
import {Transactions} from './transactions';
import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types"; import {DomainEntityID, PropertyID, PropertyTypes, TransactionID, ViewMany} from "./types";
const debug = Debug('lossless'); const debug = Debug('lossless');
export type CollapsedPointer = {[key: PropertyID]: PropertyTypes}; export type CollapsedPointer = {[key: PropertyID]: PropertyTypes};
export type CollapsedDelta = Omit<Delta, 'pointers'> & { export type CollapsedDelta = Omit<DeltaNetworkImage, 'pointers'> & {
pointers: CollapsedPointer[]; pointers: CollapsedPointer[];
}; };
@ -21,9 +23,9 @@ export type LosslessViewOne = {
export type LosslessViewMany = ViewMany<LosslessViewOne>; export type LosslessViewMany = ViewMany<LosslessViewOne>;
class DomainEntityMap extends Map<DomainEntityID, DomainEntity> {}; class LosslessEntityMap extends Map<DomainEntityID, LosslessEntity> {};
class DomainEntity { class LosslessEntity {
id: DomainEntityID; id: DomainEntityID;
properties = new Map<PropertyID, Set<Delta>>(); properties = new Map<PropertyID, Set<Delta>>();
@ -44,7 +46,6 @@ class DomainEntity {
this.properties.set(targetContext, propertyDeltas); this.properties.set(targetContext, propertyDeltas);
} }
debug(`adding delta for entity ${this.id}`);
propertyDeltas.add(delta); propertyDeltas.add(delta);
} }
} }
@ -61,50 +62,24 @@ class DomainEntity {
} }
} }
class Transaction {
size?: number;
receivedDeltaIds = new Set<DeltaID>();
}
class Transactions {
transactions = new Map<TransactionID, Transaction>();
getOrInit(id: TransactionID): Transaction {
let t = this.transactions.get(id);
if (!t) {
t = new Transaction();
this.transactions.set(id, t);
}
return t;
}
receivedDelta(id: TransactionID, deltaId: DeltaID) {
const t = this.getOrInit(id);
t.receivedDeltaIds.add(deltaId);
}
isComplete(id: TransactionID) {
const t = this.getOrInit(id);
return t.size !== undefined && t.receivedDeltaIds.size === t.size;
}
setSize(id: TransactionID, size: number) {
const t = this.getOrInit(id);
t.size = size;
}
get ids() {
return Array.from(this.transactions.keys());
}
}
export class Lossless { export class Lossless {
domainEntities = new DomainEntityMap(); domainEntities = new LosslessEntityMap();
transactions = new Transactions(); transactions = new Transactions();
referencedAs = new Map<string, Set<DomainEntityID>>(); referencedAs = new Map<string, Set<DomainEntityID>>();
// referencingAs = new Map<string, Set<DomainEntityID>>(); eventStream = new EventEmitter();
ingestDelta(delta: Delta) { constructor() {
this.transactions.eventStream.on("completed", (transactionId) => {
debug(`completed transaction ${transactionId}`);
const transaction = this.transactions.get(transactionId);
if (!transaction) return;
for (const id of transaction.entityIds) {
this.eventStream.emit("updated", id);
}
});
}
ingestDelta(delta: Delta): TransactionID | undefined {
const targets = delta.pointers const targets = delta.pointers
.filter(({targetContext}) => !!targetContext) .filter(({targetContext}) => !!targetContext)
.map(({target}) => target) .map(({target}) => target)
@ -114,7 +89,7 @@ export class Lossless {
let ent = this.domainEntities.get(target); let ent = this.domainEntities.get(target);
if (!ent) { if (!ent) {
ent = new DomainEntity(target); ent = new LosslessEntity(target);
this.domainEntities.set(target, ent); this.domainEntities.set(target, ent);
} }
@ -134,43 +109,15 @@ export class Lossless {
} }
} }
const {target: transactionId} = delta.pointers.find(({ const transactionId = this.transactions.ingestDelta(delta, targets);
localContext,
target,
targetContext
}) =>
localContext === "_transaction" &&
typeof target === "string" &&
targetContext === "deltas"
) || {};
if (transactionId) { if (!transactionId) {
// This delta is part of a transaction // No transaction -- we can issue an update event immediately
this.transactions.receivedDelta(transactionId as string, delta.id); for (const id of targets) {
} else { this.eventStream.emit("updated", id);
const {target: transactionId} = delta.pointers.find(({
localContext,
target,
targetContext
}) =>
localContext === "_transaction" &&
typeof target === "string" &&
targetContext === "size"
) || {};
if (transactionId) {
// This delta describes a transaction
const {target: size} = delta.pointers.find(({
localContext,
target
}) =>
localContext === "size" &&
typeof target === "number"
) || {};
this.transactions.setSize(transactionId as string, size as number);
} }
} }
return transactionId;
} }
view(entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): LosslessViewMany { view(entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): LosslessViewMany {
@ -180,8 +127,6 @@ export class Lossless {
const ent = this.domainEntities.get(id); const ent = this.domainEntities.get(id);
if (!ent) continue; if (!ent) continue;
debug(`domain entity ${id}`, JSON.stringify(ent));
const referencedAs = new Set<string>(); const referencedAs = new Set<string>();
const properties: { const properties: {
[key: PropertyID]: CollapsedDelta[] [key: PropertyID]: CollapsedDelta[]
@ -191,6 +136,14 @@ export class Lossless {
properties[key] = properties[key] || []; properties[key] = properties[key] || [];
for (const delta of deltas) { for (const delta of deltas) {
// If this delta is part of a transaction,
// we need to be able to wait for the whole transaction.
if (delta.transactionId) {
if (!this.transactions.isComplete(delta.transactionId)) {
// TODO: Test this condition
continue;
}
}
if (deltaFilter) { if (deltaFilter) {
const include = deltaFilter(delta); const include = deltaFilter(delta);

View File

@ -60,7 +60,6 @@ export function lastValueFromLosslessViewOne(
value?: string | number, value?: string | number,
timeUpdated?: number timeUpdated?: number
} = {}; } = {};
debug(`trying to get last value for ${key} from ${JSON.stringify(ent.properties[key])}`);
res.timeUpdated = 0; res.timeUpdated = 0;
for (const delta of ent.properties[key] || []) { for (const delta of ent.properties[key] || []) {
@ -92,7 +91,9 @@ function defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany {
return resolved; return resolved;
}; };
// TODO: Incremental updates of lossy models. For example, with last-write-wins,
// we keep the timeUpdated for each field. A second stage resolver can rearrange
// the data structure to a preferred shape and may discard the timeUpdated info.
export class Lossy { export class Lossy {
lossless: Lossless; lossless: Lossless;
@ -104,6 +105,7 @@ export class Lossy {
// apply a filter to the deltas composing that lossless view, // apply a filter to the deltas composing that lossless view,
// and then apply a supplied resolver function which receives // and then apply a supplied resolver function which receives
// the filtered lossless view as input. // the filtered lossless view as input.
// TODO: Cache things!
resolve<T = ResolvedViewOne>(fn?: Resolver<T> | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T { resolve<T = ResolvedViewOne>(fn?: Resolver<T> | Resolver, entityIds?: DomainEntityID[], deltaFilter?: DeltaFilter): T {
if (!fn) { if (!fn) {
fn = defaultResolver; fn = defaultResolver;

View File

@ -31,7 +31,7 @@ export class RhizomeNode {
requestReply: RequestReply; requestReply: RequestReply;
httpServer: HttpServer; httpServer: HttpServer;
deltaStream: DeltaStream; deltaStream: DeltaStream;
lossless = new Lossless(); lossless: Lossless;
peers: Peers; peers: Peers;
myRequestAddr: PeerAddress; myRequestAddr: PeerAddress;
myPublishAddr: PeerAddress; myPublishAddr: PeerAddress;
@ -66,9 +66,13 @@ export class RhizomeNode {
this.httpServer = new HttpServer(this); this.httpServer = new HttpServer(this);
this.deltaStream = new DeltaStream(this); this.deltaStream = new DeltaStream(this);
this.peers = new Peers(this); this.peers = new Peers(this);
this.lossless = new Lossless();
} }
async start() { async start() {
// Connect our lossless view to the delta stream
this.deltaStream.subscribeDeltas((delta) => this.lossless.ingestDelta(delta));
// Start ZeroMQ publish and reply sockets // Start ZeroMQ publish and reply sockets
this.pubSub.start(); this.pubSub.start();
this.requestReply.start(); this.requestReply.start();

View File

@ -1,11 +1,11 @@
import Debug from 'debug'; import Debug from 'debug';
import {Message} from 'zeromq'; import {Message} from 'zeromq';
import {SEED_PEERS} from "./config"; import {SEED_PEERS} from "./config";
import {Delta} from "./delta";
import {RhizomeNode} from "./node"; import {RhizomeNode} from "./node";
import {Subscription} from './pub-sub'; import {Subscription} from './pub-sub';
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply"; import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply";
import {PeerAddress} from "./types"; import {PeerAddress} from "./types";
import {Delta} from "./delta";
const debug = Debug('peers'); const debug = Debug('peers');
export enum RequestMethods { export enum RequestMethods {
@ -92,7 +92,9 @@ export class Peers {
debug('it\'s a request for deltas'); debug('it\'s a request for deltas');
// TODO: stream these rather than // TODO: stream these rather than
// trying to write them all in one message // trying to write them all in one message
await res.send(JSON.stringify(this.rhizomeNode.deltaStream.deltasAccepted)); const deltas = this.rhizomeNode.deltaStream.deltasAccepted;
debug(`sending ${deltas.length} deltas`);
await res.send(JSON.stringify(deltas));
break; break;
} }
} }

View File

@ -45,7 +45,6 @@ export class ResponseSocket {
if (typeof msg === 'object') { if (typeof msg === 'object') {
msg = JSON.stringify(msg); msg = JSON.stringify(msg);
} }
debug('sending reply', {msg});
await this.sock.send(msg); await this.sock.send(msg);
} }
} }

136
src/transactions.ts Normal file
View File

@ -0,0 +1,136 @@
import Debug from "debug";
import EventEmitter from "events";
import {Delta, DeltaID} from "./delta";
import {DomainEntityID, TransactionID} from "./types";
const debug = Debug("transactions");
function getDeltaTransactionId(delta: Delta): TransactionID | undefined {
const {target: transactionId} = delta.pointers.find(({
localContext,
target,
targetContext
}) =>
localContext === "_transaction" &&
typeof target === "string" &&
targetContext === "deltas"
) || {};
if (transactionId && typeof transactionId === "string") {
return transactionId;
}
}
function getTransactionSize(delta: Delta): {
transactionId: TransactionID,
size: number
} | undefined {
const {target: transactionId} = delta.pointers.find(({
localContext,
target,
targetContext
}) =>
localContext === "_transaction" &&
typeof target === "string" &&
targetContext === "size"
) || {};
if (transactionId && typeof transactionId === "string") {
// This delta describes a transaction
const {target: size} = delta.pointers.find(({
localContext,
target
}) =>
localContext === "size" &&
typeof target === "number"
) || {};
return {transactionId, size: size as number};
}
}
export class Transaction {
size?: number;
receivedDeltaIds = new Set<DeltaID>();
entityIds = new Set<DomainEntityID>();
}
export class Transactions {
transactions = new Map<TransactionID, Transaction>();
eventStream = new EventEmitter();
get(id: TransactionID): Transaction | undefined {
return this.transactions.get(id);
}
getOrInit(id: TransactionID): Transaction {
let t = this.transactions.get(id);
if (!t) {
t = new Transaction();
this.transactions.set(id, t);
}
return t;
}
ingestDelta(delta: Delta, targets: DomainEntityID[]): TransactionID | undefined {
{
const transactionId = getDeltaTransactionId(delta);
if (transactionId) {
const t = this.getOrInit(transactionId);
for (const id of targets) {
t.entityIds.add(id);
}
// This delta is part of a transaction
// Add this to the delta's data structure for quick reference
delta.transactionId = transactionId;
// Update our transaction tracking
this.receivedDelta(transactionId, delta.id);
// Notify that the transaction is complete
if (this.isComplete(transactionId)) {
this.eventStream.emit("completed", transactionId);
}
return transactionId;
}
}
{
const {transactionId, size} = getTransactionSize(delta) || {};
if (transactionId && size) {
// This delta describes a transaction
debug(`transaction ${transactionId} has size ${size}`);
this.setSize(transactionId, size as number);
// Check if the transaction is complete
if (this.isComplete(transactionId)) {
this.eventStream.emit("completed", transactionId);
}
return transactionId;
}
}
}
receivedDelta(id: TransactionID, deltaId: DeltaID) {
const t = this.getOrInit(id);
t.receivedDeltaIds.add(deltaId);
}
isComplete(id: TransactionID) {
const t = this.getOrInit(id);
return t.size !== undefined && t.receivedDeltaIds.size === t.size;
}
setSize(id: TransactionID, size: number) {
const t = this.getOrInit(id);
t.size = size;
}
get ids() {
return Array.from(this.transactions.keys());
}
}

View File

@ -9,6 +9,8 @@ export type PropertyTypes = string | number | undefined;
export type DomainEntityID = string; export type DomainEntityID = string;
export type PropertyID = string; export type PropertyID = string;
export type TransactionID = string; export type TransactionID = string;
export type HostID = string;
export type CreatorID = string;
export type Timestamp = number; export type Timestamp = number;