rhizome/src/collection.ts

235 lines
7.0 KiB
TypeScript
Raw Normal View History

2024-12-22 14:17:44 -06:00
// A basic collection of entities
// This may be extended to house a collection of objects that all follow a common schema.
2024-12-21 21:16:18 -06:00
// It should enable operations like removing a property removes the value from the entities in the collection
// It could then be further extended with e.g. table semantics like filter, sort, join
import Debug from 'debug';
2024-12-25 16:13:48 -06:00
import {randomUUID} from "node:crypto";
2024-12-22 09:13:44 -06:00
import EventEmitter from "node:events";
import {Delta, DeltaID} from "./delta";
import {Entity, EntityProperties} from "./entity";
2024-12-29 17:50:20 -06:00
import {Lossy, ResolvedViewOne, Resolver} from "./lossy";
2024-12-25 16:13:48 -06:00
import {RhizomeNode} from "./node";
import {DomainEntityID} from "./types";
const debug = Debug('collection');
2024-12-22 09:13:44 -06:00
export class Collection {
2024-12-25 16:13:48 -06:00
rhizomeNode?: RhizomeNode;
name: string;
2024-12-22 09:13:44 -06:00
eventStream = new EventEmitter();
2024-12-25 16:13:48 -06:00
constructor(name: string) {
this.name = name;
}
// Instead of trying to update our final view of the entity with every incoming delta,
// let's try this:
// - keep a lossless view (of everything)
// - build a lossy view when needed
// This approach is simplistic, but can then be optimized and enhanced.
2024-12-25 16:13:48 -06:00
rhizomeConnect(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
rhizomeNode.deltaStream.subscribeDeltas((delta: Delta) => {
2024-12-22 09:13:44 -06:00
// TODO: Make sure this is the kind of delta we're looking for
debug(`collection ${this.name} received delta ${JSON.stringify(delta)}`);
this.ingestDelta(delta);
2024-12-22 09:13:44 -06:00
});
2024-12-25 16:13:48 -06:00
rhizomeNode.httpServer.httpApi.serveCollection(this);
debug(`connected ${this.name} to rhizome`);
2024-12-22 09:13:44 -06:00
}
2024-12-29 14:35:30 -06:00
ingestDelta(delta: Delta) {
if (!this.rhizomeNode) return;
2024-12-22 14:17:44 -06:00
2024-12-29 14:35:30 -06:00
const updated = this.rhizomeNode.lossless.ingestDelta(delta);
2024-12-22 14:17:44 -06:00
2024-12-29 14:35:30 -06:00
this.eventStream.emit('ingested', delta);
this.eventStream.emit('updated', updated);
}
// Applies the javascript rules for updating object values,
2024-12-29 14:35:30 -06:00
// e.g. set to `undefined` to delete a property.
// This function is here instead of Entity so that it can:
// - read the current state in order to build its delta
// - include the collection name in the delta it produces
generateDeltas(
entityId: DomainEntityID,
newProperties: EntityProperties,
2024-12-29 17:50:20 -06:00
creator: string,
host: string,
resolver?: Resolver
): Delta[] {
2024-12-22 09:13:44 -06:00
const deltas: Delta[] = [];
let oldProperties: EntityProperties = {};
if (entityId) {
2024-12-29 14:35:30 -06:00
const entity = this.resolve(entityId, resolver);
if (entity) {
oldProperties = entity.properties;
}
}
2024-12-29 17:50:20 -06:00
// Generate a transaction ID
const transactionId = `transaction-${randomUUID()}`;
// Generate a delta for each changed property
Object.entries(newProperties).forEach(([key, value]) => {
2024-12-29 17:50:20 -06:00
// Disallow property named "id"
if (key === 'id') return;
if (oldProperties[key] !== value && host && creator) {
deltas.push(new Delta({
creator,
host,
pointers: [{
2024-12-29 17:50:20 -06:00
localContext: "_transaction",
target: transactionId,
targetContext: "deltas"
}, {
localContext: this.name,
target: entityId,
targetContext: key
}, {
localContext: key,
target: value
}]
}));
}
});
2024-12-29 17:50:20 -06:00
// We can generate a separate delta describing this transaction
const transactionDelta = new Delta({
creator,
host,
pointers: [{
localContext: "_transaction",
target: transactionId,
targetContext: "size"
}, {
localContext: "size",
target: deltas.length
}]
});
return [transactionDelta, ...deltas];
}
2024-12-29 14:35:30 -06:00
onCreate(cb: (entity: Entity) => void) {
// TODO: Trigger for changes received from peers
this.eventStream.on('create', (entity: Entity) => {
cb(entity);
});
}
onUpdate(cb: (entity: Entity) => void) {
// TODO: Trigger for changes received from peers
this.eventStream.on('update', (entity: Entity) => {
cb(entity);
});
}
2024-12-29 14:35:30 -06:00
getIds(): string[] {
if (!this.rhizomeNode) return [];
2024-12-29 17:50:20 -06:00
const set = this.rhizomeNode.lossless.referencedAs.get(this.name);
if (!set) return [];
return Array.from(set.values());
2024-12-29 14:35:30 -06:00
}
2024-12-29 14:35:30 -06:00
// THIS PUT SHOULD CORRESOND TO A PARTICULAR MATERIALIZED VIEW...
// How can we encode that?
// Well, we have a way to do that, we just need the same particular inputs.
// We take a resolver as an optional argument.
async put(
entityId: DomainEntityID | undefined,
properties: EntityProperties,
resolver?: Resolver
): Promise<ResolvedViewOne> {
2024-12-29 17:50:20 -06:00
if (!this.rhizomeNode) throw new Error('collection not connecte to rhizome');
2024-12-29 14:35:30 -06:00
// For convenience, we allow setting id via properties.id
if (!entityId && !!properties.id && typeof properties.id === 'string') {
entityId = properties.id;
}
// Generate an ID if none is provided
if (!entityId) {
entityId = randomUUID();
}
const deltas = this.generateDeltas(
entityId,
properties,
this.rhizomeNode?.config.creator,
this.rhizomeNode?.config.peerId,
2024-12-29 17:50:20 -06:00
resolver,
);
debug(`put ${entityId} generated deltas:`, JSON.stringify(deltas));
2024-12-29 14:35:30 -06:00
// Here we set up a listener so we can wait for all our deltas to be
// ingested into our lossless view before proceeding.
// TODO: Hoist this into a more generic transaction mechanism.
2024-12-29 17:50:20 -06:00
//
const allIngested = new Promise<boolean>((resolve) => {
const ingestedIds = new Set<DeltaID>();
this.eventStream.on('ingested', (delta: Delta) => {
// TODO: timeout
if (deltas.map(({id}) => id).includes(delta.id)) {
ingestedIds.add(delta.id);
if (ingestedIds.size === deltas.length) {
resolve(true);
}
}
})
});
2024-12-22 09:13:44 -06:00
deltas.forEach(async (delta: Delta) => {
// record this delta just as if we had received it from a peer
2024-12-25 16:13:48 -06:00
delta.receivedFrom = this.rhizomeNode!.myRequestAddr;
this.rhizomeNode!.deltaStream.deltasAccepted.push(delta);
2024-12-22 14:17:44 -06:00
// publish the delta to our subscribed peers
2024-12-25 16:13:48 -06:00
await this.rhizomeNode!.deltaStream.publishDelta(delta);
debug(`published delta ${JSON.stringify(delta)}`);
// ingest the delta as though we had received it from a peer
this.ingestDelta(delta);
2024-12-22 09:13:44 -06:00
});
// Return updated view of this entity
// Let's wait for an event notifying us that the entity has been updated.
// This means all of our deltas have been applied.
await allIngested;
2024-12-29 14:35:30 -06:00
const res = this.resolve(entityId, resolver);
if (!res) throw new Error("could not get what we just put!");
this.eventStream.emit("update", res);
return res;
2024-12-22 09:13:44 -06:00
}
2024-12-22 14:17:44 -06:00
2024-12-29 17:50:20 -06:00
resolve<T = ResolvedViewOne>(id: string, resolver?: Resolver): T | undefined {
2024-12-29 14:35:30 -06:00
// Now with lossy view approach, instead of just returning what we
// already have, let's compute our view now.
// return this.entities.resolve(id);
// TODO: Caching
if (!this.rhizomeNode) return undefined;
2024-12-29 14:35:30 -06:00
const lossy = new Lossy(this.rhizomeNode.lossless);
2024-12-29 17:50:20 -06:00
// TODO: deltaFilter
2024-12-29 14:35:30 -06:00
const res = lossy.resolve(resolver, [id]);
debug('lossy view', res);
2024-12-22 14:17:44 -06:00
2024-12-29 17:50:20 -06:00
return res[id] as T;
2024-12-22 09:13:44 -06:00
}
2024-12-21 21:16:18 -06:00
}