added last-write-wins resolver

This commit is contained in:
Ladd Hoffman 2024-12-29 14:35:30 -06:00
parent 870c1a62b6
commit 2e0672e04c
16 changed files with 298 additions and 135 deletions

4
__tests__/query.ts Normal file
View File

@ -0,0 +1,4 @@
describe.skip('Query', () => {
it('can use a json logic expression to filter the queries', () => {});
it('can use a json logic expression to implement a lossy resolver', () => {});
});

View File

@ -1,7 +1,7 @@
import Debug from 'debug'; import Debug from 'debug';
import {RhizomeNode} from "../src/node"; import {RhizomeNode} from "../src/node";
import {Entity} from "../src/entity"; import {Entity} from "../src/entity";
import {TypedCollection} from "../src/typed-collection"; import {Collection} from "../src/collection";
const debug = Debug('example-app'); const debug = Debug('example-app');
// As an app we want to be able to write and read data. // As an app we want to be able to write and read data.
@ -19,7 +19,7 @@ type User = {
(async () => { (async () => {
const rhizomeNode = new RhizomeNode(); const rhizomeNode = new RhizomeNode();
const users = new TypedCollection<User>("user"); const users = new Collection("user");
users.rhizomeConnect(rhizomeNode); users.rhizomeConnect(rhizomeNode);
users.onUpdate((u: Entity) => { users.onUpdate((u: Entity) => {
@ -37,27 +37,63 @@ type User = {
// - Logging // - Logging
// - Chat // - Chat
// //
const taliesinData: User = {
const taliesin = await users.put(undefined, {
id: 'taliesin-1', id: 'taliesin-1',
name: 'Taliesin', name: 'Taliesin',
nameLong: 'Taliesin (Ladd)', nameLong: 'Taliesin (Ladd)',
age: Math.floor(Math.random() * 1000) age: Math.floor(Math.random() * 1000)
}); };
const taliesinPutResult = await users.put(undefined, taliesinData);
{
const result = JSON.stringify(taliesinPutResult);
const expected = JSON.stringify(taliesinData);
if (result === expected) {
debug('Put result matches expected: ' + expected);
} else {
debug(`Put result does not match expected.` +
`\n\nExpected \n${expected}` +
`\nReceived\n${result}`);
}
}
// TODO: Allow configuration regarding read/write concern i.e. // TODO: Allow configuration regarding read/write concern i.e.
// if we perform a read immediately do we see the value we wrote? // if we perform a read immediately do we see the value we wrote?
// Intuition says yes, we want that-- but how do we expose the propagation status? // Intuition says yes, we want that-- but how do we expose the propagation status?
const result = users.get(taliesin.id); const resolved = users.resolve('taliesin-1');
const matches: boolean = JSON.stringify(result) === JSON.stringify(taliesin); if (!resolved) throw new Error('unable to resolve entity we just created');
if (matches) {
debug('Result matches expected: ' + JSON.stringify(taliesin)); const resolvedUser = {
} else { id: resolved.id,
debug(`Result does not match expected.` + ...resolved.properties
`\n\nExpected \n${JSON.stringify(taliesin)}` + } as User;
`\nReceived\n${JSON.stringify(result)}`);
/*
function sortKeys (o: {[key: string]: unknown}): {[key: string]: unknown} {
const r: {[key: string]: unknown} = {};
r.id = o.id;
Object.keys(o).sort().forEach((key) => {
if (key === "id") return;
r[key] = o[key];
})
return r;
} }
*/
const result = JSON.stringify(resolvedUser);
const expected = JSON.stringify(taliesinData);
if (result === expected) {
debug('Get result matches expected: ' + expected);
} else {
debug(`Get result does not match expected.` +
`\n\nExpected \n${expected}` +
`\nReceived\n${result}`);
}
})(); })();

54
package-lock.json generated
View File

@ -1,31 +1,34 @@
{ {
"name": "rhizome-node", "name": "rhizome-node",
"version": "1.0.0", "version": "0.1.0",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "rhizome-node", "name": "rhizome-node",
"version": "1.0.0", "version": "0.1.0",
"license": "Unlicense", "license": "Unlicense",
"dependencies": { "dependencies": {
"@types/bluebird": "^3.5.42",
"@types/debug": "^4.1.12",
"@types/json-logic-js": "^2.0.8",
"@types/object-hash": "^3.0.6",
"debug": "^4.4.0", "debug": "^4.4.0",
"express": "^4.21.2", "express": "^4.21.2",
"json-logic-js": "^2.0.5", "json-logic-js": "^2.0.5",
"level": "^9.0.0", "level": "^9.0.0",
"microtime": "^3.1.1",
"object-hash": "^3.0.0", "object-hash": "^3.0.0",
"showdown": "^2.1.0", "showdown": "^2.1.0",
"util": "./util/",
"zeromq": "^6.1.2" "zeromq": "^6.1.2"
}, },
"devDependencies": { "devDependencies": {
"@eslint/js": "^9.17.0", "@eslint/js": "^9.17.0",
"@types/bluebird": "^3.5.42",
"@types/debug": "^4.1.12",
"@types/express": "^5.0.0", "@types/express": "^5.0.0",
"@types/jest": "^29.5.14", "@types/jest": "^29.5.14",
"@types/json-logic-js": "^2.0.8",
"@types/microtime": "^2.1.2",
"@types/node": "^22.10.2", "@types/node": "^22.10.2",
"@types/object-hash": "^3.0.6",
"@types/showdown": "^2.0.6", "@types/showdown": "^2.0.6",
"eslint": "^9.17.0", "eslint": "^9.17.0",
"eslint-config-airbnb-base-typescript": "^1.1.0", "eslint-config-airbnb-base-typescript": "^1.1.0",
@ -1438,6 +1441,7 @@
"version": "3.5.42", "version": "3.5.42",
"resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.42.tgz", "resolved": "https://registry.npmjs.org/@types/bluebird/-/bluebird-3.5.42.tgz",
"integrity": "sha512-Jhy+MWRlro6UjVi578V/4ZGNfeCOcNCp0YaFNIUGFKlImowqwb1O/22wDVk3FDGMLqxdpOV3qQHD5fPEH4hK6A==", "integrity": "sha512-Jhy+MWRlro6UjVi578V/4ZGNfeCOcNCp0YaFNIUGFKlImowqwb1O/22wDVk3FDGMLqxdpOV3qQHD5fPEH4hK6A==",
"dev": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/@types/body-parser": { "node_modules/@types/body-parser": {
@ -1465,6 +1469,7 @@
"version": "4.1.12", "version": "4.1.12",
"resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz",
"integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==",
"dev": true,
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@types/ms": "*" "@types/ms": "*"
@ -1562,6 +1567,7 @@
"version": "2.0.8", "version": "2.0.8",
"resolved": "https://registry.npmjs.org/@types/json-logic-js/-/json-logic-js-2.0.8.tgz", "resolved": "https://registry.npmjs.org/@types/json-logic-js/-/json-logic-js-2.0.8.tgz",
"integrity": "sha512-WgNsDPuTPKYXl0Jh0IfoCoJoAGGYZt5qzpmjuLSEg7r0cKp/kWtWp0HAsVepyPSPyXiHo6uXp/B/kW/2J1fa2Q==", "integrity": "sha512-WgNsDPuTPKYXl0Jh0IfoCoJoAGGYZt5qzpmjuLSEg7r0cKp/kWtWp0HAsVepyPSPyXiHo6uXp/B/kW/2J1fa2Q==",
"dev": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/@types/json-schema": { "node_modules/@types/json-schema": {
@ -1579,6 +1585,13 @@
"license": "MIT", "license": "MIT",
"peer": true "peer": true
}, },
"node_modules/@types/microtime": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/@types/microtime/-/microtime-2.1.2.tgz",
"integrity": "sha512-d5odaV/0jPwfehN1t+y7+TcbGxECQLtl7mVETpMaYA0SnlhyKQKgWPCRetbSJVP7i2Kzx8CuTDgDs2kjS1MCOw==",
"dev": true,
"license": "MIT"
},
"node_modules/@types/mime": { "node_modules/@types/mime": {
"version": "1.3.5", "version": "1.3.5",
"resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.5.tgz", "resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.5.tgz",
@ -1590,6 +1603,7 @@
"version": "0.7.34", "version": "0.7.34",
"resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz",
"integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==",
"dev": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/@types/node": { "node_modules/@types/node": {
@ -1605,6 +1619,7 @@
"version": "3.0.6", "version": "3.0.6",
"resolved": "https://registry.npmjs.org/@types/object-hash/-/object-hash-3.0.6.tgz", "resolved": "https://registry.npmjs.org/@types/object-hash/-/object-hash-3.0.6.tgz",
"integrity": "sha512-fOBV8C1FIu2ELinoILQ+ApxcUKz4ngq+IWUYrxSGjXzzjUALijilampwkMgEtJ+h2njAW3pi853QpzNVCHB73w==", "integrity": "sha512-fOBV8C1FIu2ELinoILQ+ApxcUKz4ngq+IWUYrxSGjXzzjUALijilampwkMgEtJ+h2njAW3pi853QpzNVCHB73w==",
"dev": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/@types/qs": { "node_modules/@types/qs": {
@ -6907,6 +6922,26 @@
"node": ">=8.6" "node": ">=8.6"
} }
}, },
"node_modules/microtime": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/microtime/-/microtime-3.1.1.tgz",
"integrity": "sha512-to1r7o24cDsud9IhN6/8wGmMx5R2kT0w2Xwm5okbYI3d1dk6Xv0m+Z+jg2vS9pt+ocgQHTCtgs/YuyJhySzxNg==",
"hasInstallScript": true,
"license": "MIT",
"dependencies": {
"node-addon-api": "^5.0.0",
"node-gyp-build": "^4.4.0"
},
"engines": {
"node": ">= 14.13.0"
}
},
"node_modules/microtime/node_modules/node-addon-api": {
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-5.1.0.tgz",
"integrity": "sha512-eh0GgfEkpnoWDq+VY8OyvYhFEzBk6jIYbRKdIlyTiAXIVJ8PyBaKb0rp7oDtoddbdoHWhq8wwr+XZ81F1rpNdA==",
"license": "MIT"
},
"node_modules/mime": { "node_modules/mime": {
"version": "1.6.0", "version": "1.6.0",
"resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz", "resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz",
@ -8974,6 +9009,10 @@
"integrity": "sha512-jk1+QP6ZJqyOiuEI9AEWQfju/nB2Pw466kbA0LEZljHwKeMgd9WrAEgEGxjPDD2+TNbbb37rTyhEfrCXfuKXnA==", "integrity": "sha512-jk1+QP6ZJqyOiuEI9AEWQfju/nB2Pw466kbA0LEZljHwKeMgd9WrAEgEGxjPDD2+TNbbb37rTyhEfrCXfuKXnA==",
"license": "MIT" "license": "MIT"
}, },
"node_modules/util": {
"resolved": "util",
"link": true
},
"node_modules/util-deprecate": { "node_modules/util-deprecate": {
"version": "1.0.2", "version": "1.0.2",
"resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz",
@ -9280,6 +9319,7 @@
"node": ">= 10", "node": ">= 10",
"pnpm": ">= 9" "pnpm": ">= 9"
} }
} },
"util": {}
} }
} }

View File

@ -13,25 +13,32 @@
"jest": { "jest": {
"testEnvironment": "node", "testEnvironment": "node",
"preset": "ts-jest", "preset": "ts-jest",
"roots": ["__tests__/"] "roots": [
"./__tests__/"
],
"testMatch": [
"**/__tests__/**/*"
]
}, },
"author": "Taliesin (Ladd) <ladd@dgov.io>", "author": "Taliesin (Ladd) <ladd@dgov.io>",
"license": "Unlicense", "license": "Unlicense",
"dependencies": { "dependencies": {
"@types/bluebird": "^3.5.42",
"@types/debug": "^4.1.12",
"@types/json-logic-js": "^2.0.8",
"@types/object-hash": "^3.0.6",
"debug": "^4.4.0", "debug": "^4.4.0",
"express": "^4.21.2", "express": "^4.21.2",
"json-logic-js": "^2.0.5", "json-logic-js": "^2.0.5",
"level": "^9.0.0", "level": "^9.0.0",
"microtime": "^3.1.1",
"object-hash": "^3.0.0", "object-hash": "^3.0.0",
"showdown": "^2.1.0", "showdown": "^2.1.0",
"zeromq": "^6.1.2", "util": "./util/",
"util": "./util/" "zeromq": "^6.1.2"
}, },
"devDependencies": { "devDependencies": {
"@types/bluebird": "^3.5.42",
"@types/debug": "^4.1.12",
"@types/json-logic-js": "^2.0.8",
"@types/microtime": "^2.1.2",
"@types/object-hash": "^3.0.6",
"@eslint/js": "^9.17.0", "@eslint/js": "^9.17.0",
"@types/express": "^5.0.0", "@types/express": "^5.0.0",
"@types/jest": "^29.5.14", "@types/jest": "^29.5.14",

12
scratch/jsonlogic.ts Normal file
View File

@ -0,0 +1,12 @@
import { apply } from 'json-logic-js';
console.log(apply({"map":[
{"var":"integers"},
{"*":[{"var":""},2]}
]}, {"integers":[1,2,3,4,5]}));
console.log(apply({"reduce":[
{"var":"integers"},
{"+":[{"var":"current"}, {"var":"accumulator"}]},
0
]}, {"integers":[1,2,3,4,5]}));

View File

@ -9,7 +9,7 @@ import EventEmitter from "node:events";
import {Delta, DeltaID} from "./delta"; import {Delta, DeltaID} from "./delta";
import {Entity, EntityProperties} from "./entity"; import {Entity, EntityProperties} from "./entity";
import {LosslessViewMany} from "./lossless"; import {LosslessViewMany} from "./lossless";
import {firstValueFromLosslessViewOne, Lossy, LossyViewMany, LossyViewOne} from "./lossy"; import {lastValueFromLosslessViewOne, Lossy, ResolvedViewMany, ResolvedViewOne, Resolver} from "./lossy";
import {RhizomeNode} from "./node"; import {RhizomeNode} from "./node";
import {DomainEntityID} from "./types"; import {DomainEntityID} from "./types";
const debug = Debug('collection'); const debug = Debug('collection');
@ -17,22 +17,12 @@ const debug = Debug('collection');
export class Collection { export class Collection {
rhizomeNode?: RhizomeNode; rhizomeNode?: RhizomeNode;
name: string; name: string;
entities = new Map<string, Entity>();
eventStream = new EventEmitter(); eventStream = new EventEmitter();
constructor(name: string) { constructor(name: string) {
this.name = name; this.name = name;
} }
ingestDelta(delta: Delta) {
if (!this.rhizomeNode) return;
const updated = this.rhizomeNode.lossless.ingestDelta(delta);
this.eventStream.emit('ingested', delta);
this.eventStream.emit('updated', updated);
}
// Instead of trying to update our final view of the entity with every incoming delta, // Instead of trying to update our final view of the entity with every incoming delta,
// let's try this: // let's try this:
// - keep a lossless view (of everything) // - keep a lossless view (of everything)
@ -53,39 +43,24 @@ export class Collection {
debug(`connected ${this.name} to rhizome`); debug(`connected ${this.name} to rhizome`);
} }
onCreate(cb: (entity: Entity) => void) { ingestDelta(delta: Delta) {
// TODO: Trigger for changes received from peers if (!this.rhizomeNode) return;
this.eventStream.on('create', (entity: Entity) => {
cb(entity);
});
}
onUpdate(cb: (entity: Entity) => void) { const updated = this.rhizomeNode.lossless.ingestDelta(delta);
// TODO: Trigger for changes received from peers
this.eventStream.on('update', (entity: Entity) => {
cb(entity);
});
}
defaultResolver(losslessView: LosslessViewMany): LossyViewMany { this.eventStream.emit('ingested', delta);
const resolved: LossyViewMany = {}; this.eventStream.emit('updated', updated);
debug('default resolver, lossless view', JSON.stringify(losslessView));
for (const [id, ent] of Object.entries(losslessView)) {
resolved[id] = {id, properties: {}};
for (const key of Object.keys(ent.properties)) {
const {value} = firstValueFromLosslessViewOne(ent, key) || {};
debug(`[ ${key} ] = ${value}`);
resolved[id].properties[key] = value;
}
}
return resolved;
} }
// Applies the javascript rules for updating object values, // Applies the javascript rules for updating object values,
// e.g. set to `undefined` to delete a property // e.g. set to `undefined` to delete a property.
// This function is here instead of Entity so that it can:
// - read the current state in order to build its delta
// - include the collection name in the delta it produces
generateDeltas( generateDeltas(
entityId: DomainEntityID, entityId: DomainEntityID,
newProperties: EntityProperties, newProperties: EntityProperties,
resolver?: Resolver,
creator?: string, creator?: string,
host?: string host?: string
): Delta[] { ): Delta[] {
@ -93,7 +68,7 @@ export class Collection {
let oldProperties: EntityProperties = {}; let oldProperties: EntityProperties = {};
if (entityId) { if (entityId) {
const entity = this.get(entityId); const entity = this.resolve(entityId, resolver);
if (entity) { if (entity) {
oldProperties = entity.properties; oldProperties = entity.properties;
} }
@ -123,17 +98,39 @@ export class Collection {
return deltas; return deltas;
} }
onCreate(cb: (entity: Entity) => void) {
// TODO: Trigger for changes received from peers
this.eventStream.on('create', (entity: Entity) => {
cb(entity);
});
}
onUpdate(cb: (entity: Entity) => void) {
// TODO: Trigger for changes received from peers
this.eventStream.on('update', (entity: Entity) => {
cb(entity);
});
}
getIds(): string[] {
if (!this.rhizomeNode) return [];
return Array.from(this.rhizomeNode.lossless.domainEntities.keys());
}
// THIS PUT SHOULD CORRESOND TO A PARTICULAR MATERIALIZED VIEW...
// How can we encode that?
// Well, we have a way to do that, we just need the same particular inputs.
// We take a resolver as an optional argument.
async put( async put(
entityId: DomainEntityID | undefined, entityId: DomainEntityID | undefined,
properties: EntityProperties properties: EntityProperties,
): Promise<LossyViewOne> { resolver?: Resolver
// const deltas: Delta[] = []; ): Promise<ResolvedViewOne> {
// const entity = this.updateEntity(entityId, properties, true, deltas); // For convenience, we allow setting id via properties.id
if (!entityId && !!properties.id && typeof properties.id === 'string') {
// THIS PUT SHOULD CORRESOND TO A PARTICULAR MATERIALIZED VIEW... entityId = properties.id;
// How can we encode that? }
// Well, we have a way to do that, we just need the same particular inputs // Generate an ID if none is provided
if (!entityId) { if (!entityId) {
entityId = randomUUID(); entityId = randomUUID();
} }
@ -141,12 +138,17 @@ export class Collection {
const deltas = this.generateDeltas( const deltas = this.generateDeltas(
entityId, entityId,
properties, properties,
resolver,
this.rhizomeNode?.config.creator, this.rhizomeNode?.config.creator,
this.rhizomeNode?.config.peerId, this.rhizomeNode?.config.peerId,
); );
debug(`put ${entityId} generated deltas:`, JSON.stringify(deltas)); debug(`put ${entityId} generated deltas:`, JSON.stringify(deltas));
// Here we set up a listener so we can wait for all our deltas to be
// ingested into our lossless view before proceeding.
// TODO: Hoist this into a more generic transaction mechanism.
const allIngested = new Promise<boolean>((resolve) => { const allIngested = new Promise<boolean>((resolve) => {
const ingestedIds = new Set<DeltaID>(); const ingestedIds = new Set<DeltaID>();
this.eventStream.on('ingested', (delta: Delta) => { this.eventStream.on('ingested', (delta: Delta) => {
@ -160,7 +162,6 @@ export class Collection {
}) })
}); });
// updateEntity may have generated some deltas for us to store and publish
deltas.forEach(async (delta: Delta) => { deltas.forEach(async (delta: Delta) => {
// record this delta just as if we had received it from a peer // record this delta just as if we had received it from a peer
@ -181,7 +182,7 @@ export class Collection {
await allIngested; await allIngested;
const res = this.get(entityId); const res = this.resolve(entityId, resolver);
if (!res) throw new Error("could not get what we just put!"); if (!res) throw new Error("could not get what we just put!");
this.eventStream.emit("update", res); this.eventStream.emit("update", res);
@ -189,18 +190,41 @@ export class Collection {
return res; return res;
} }
get(id: string): LossyViewOne | undefined { // TODO: default should probably be last write wins
// Now with lossy view approach, instead of just returning what we already have, defaultResolver(losslessView: LosslessViewMany): ResolvedViewMany {
// let's compute our view now. const resolved: ResolvedViewMany = {};
// return this.entities.get(id);
if (!this.rhizomeNode) return undefined; // debug('default resolver, lossless view', JSON.stringify(losslessView));
const lossy = new Lossy(this.rhizomeNode.lossless); for (const [id, ent] of Object.entries(losslessView)) {
const res = lossy.resolve((view) => this.defaultResolver(view), [id]); resolved[id] = {id, properties: {}};
return res[id];
for (const key of Object.keys(ent.properties)) {
const {value} = lastValueFromLosslessViewOne(ent, key) || {};
// debug(`[ ${key} ] = ${value}`);
resolved[id].properties[key] = value;
}
}
return resolved;
} }
getIds(): string[] { resolve(id: string, resolver?: Resolver): ResolvedViewOne | undefined {
if (!this.rhizomeNode) return []; // Now with lossy view approach, instead of just returning what we
return Array.from(this.rhizomeNode.lossless.domainEntities.keys()); // already have, let's compute our view now.
// return this.entities.resolve(id);
// TODO: Caching
if (!this.rhizomeNode) return undefined;
if (!resolver) {
debug('using default resolver');
resolver = (view) => this.defaultResolver(view);
}
const lossy = new Lossy(this.rhizomeNode.lossless);
const res = lossy.resolve(resolver, [id]);
debug('lossy view', res);
return res[id];
} }
} }

View File

@ -1,5 +1,6 @@
import microtime from 'microtime';
import {randomUUID} from "crypto"; import {randomUUID} from "crypto";
import {PeerAddress} from "./types"; import {PeerAddress, Timestamp} from "./types";
export type DeltaID = string; export type DeltaID = string;
@ -14,11 +15,15 @@ export type Pointer = {
export class Delta { export class Delta {
id: DeltaID; id: DeltaID;
receivedFrom?: PeerAddress; receivedFrom?: PeerAddress;
timeReceived: Timestamp;
timeCreated: Timestamp;
creator: string; creator: string;
host: string; host: string;
pointers: Pointer[] = []; pointers: Pointer[] = [];
constructor(delta: Omit<Delta, "id">) { constructor(delta: Omit<Delta, "id" | "timeReceived" | "timeCreated">) {
this.id = randomUUID(); this.id = randomUUID();
this.timeCreated = microtime.now();
this.timeReceived = this.timeCreated;
this.creator = delta.creator; this.creator = delta.creator;
this.host = delta.host; this.host = delta.host;
this.receivedFrom = delta.receivedFrom; this.receivedFrom = delta.receivedFrom;

View File

@ -42,7 +42,7 @@ export class DeltaStream {
switch (decision) { switch (decision) {
case Decision.Accept: case Decision.Accept:
this.deltasAccepted.push(delta); this.deltasAccepted.push(delta);
this.deltaStream.emit('delta', {delta}); this.deltaStream.emit('delta', delta);
break; break;
case Decision.Reject: case Decision.Reject:
this.deltasRejected.push(delta); this.deltasRejected.push(delta);
@ -80,7 +80,7 @@ export class DeltaStream {
} }
subscribeDeltas(fn: (delta: Delta) => void) { subscribeDeltas(fn: (delta: Delta) => void) {
this.deltaStream.on('delta', ({delta}) => { this.deltaStream.on('delta', (delta) => {
fn(delta); fn(delta);
}); });
} }
@ -90,11 +90,12 @@ export class DeltaStream {
await this.rhizomeNode.pubSub.publish("deltas", this.serializeDelta(delta)); await this.rhizomeNode.pubSub.publish("deltas", this.serializeDelta(delta));
} }
serializeDelta(delta: Delta) { serializeDelta(delta: Delta): string {
return JSON.stringify(delta); return JSON.stringify(delta);
} }
deserializeDelta(input: string) { deserializeDelta(input: string): Delta {
// TODO: Input validation
return JSON.parse(input); return JSON.parse(input);
} }
} }

View File

@ -7,6 +7,7 @@
// - As typescript interfaces? // - As typescript interfaces?
// - As typescript classes? // - As typescript classes?
import {Collection} from "./collection";
import {PropertyTypes} from "./types"; import {PropertyTypes} from "./types";
export type EntityProperties = { export type EntityProperties = {
@ -19,6 +20,11 @@ export class Entity {
constructor( constructor(
readonly id: string, readonly id: string,
readonly collection?: Collection
) {} ) {}
}
async save() {
if (!this.collection) throw new Error('to save this entity you must specify the collection');
return this.collection.put(this.id, this.properties);
}
}

View File

@ -61,7 +61,7 @@ export class HttpApi {
// Get a single domain entity by ID // Get a single domain entity by ID
this.router.get(`/${name}/:id`, (req: express.Request, res: express.Response) => { this.router.get(`/${name}/:id`, (req: express.Request, res: express.Response) => {
const {params: {id}} = req; const {params: {id}} = req;
const ent = collection.get(id); const ent = collection.resolve(id);
if (!ent) { if (!ent) {
res.status(404).send({error: "Not Found"}); res.status(404).send({error: "Not Found"});
return; return;

View File

@ -3,7 +3,7 @@
import Debug from 'debug'; import Debug from 'debug';
import {Delta, DeltaFilter} from './delta'; import {Delta, DeltaFilter} from './delta';
import {DomainEntityID, PropertyID, PropertyTypes} from "./types"; import {DomainEntityID, PropertyID, PropertyTypes, ViewMany} from "./types";
const debug = Debug('lossless'); const debug = Debug('lossless');
export type CollapsedPointer = {[key: string]: PropertyTypes}; export type CollapsedPointer = {[key: string]: PropertyTypes};
@ -19,9 +19,7 @@ export type LosslessViewOne = {
} }
}; };
export type LosslessViewMany = { export type LosslessViewMany = ViewMany<LosslessViewOne>;
[key: DomainEntityID]: LosslessViewOne;
};
class DomainEntityMap extends Map<DomainEntityID, DomainEntity> {}; class DomainEntityMap extends Map<DomainEntityID, DomainEntity> {};
@ -66,7 +64,7 @@ class DomainEntity {
export class Lossless { export class Lossless {
domainEntities = new DomainEntityMap(); domainEntities = new DomainEntityMap();
ingestDelta(delta: Delta): LosslessViewMany { ingestDelta(delta: Delta) {
const targets = delta.pointers const targets = delta.pointers
.filter(({targetContext}) => !!targetContext) .filter(({targetContext}) => !!targetContext)
.map(({target}) => target) .map(({target}) => target)
@ -86,8 +84,6 @@ export class Lossless {
debug('after add, domain entity:', JSON.stringify(ent)); debug('after add, domain entity:', JSON.stringify(ent));
} }
return this.view(targets);
} }
//TODO: json logic -- view(deltaFilter?: FilterExpr) { //TODO: json logic -- view(deltaFilter?: FilterExpr) {

View File

@ -6,21 +6,30 @@
// Fields in the output can be described as transformations // Fields in the output can be described as transformations
import Debug from 'debug'; import Debug from 'debug';
import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless";
import {DomainEntityID, Properties} from "./types";
import {DeltaFilter} from "./delta"; import {DeltaFilter} from "./delta";
import {CollapsedDelta, Lossless, LosslessViewMany, LosslessViewOne} from "./lossless";
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types";
const debug = Debug('lossy'); const debug = Debug('lossy');
export type LossyViewOne<T = Properties> = { type TimestampedProperty = {
value: PropertyTypes,
timeUpdated: Timestamp
};
export type LossyViewOne<T = TimestampedProperty> = {
id: DomainEntityID; id: DomainEntityID;
properties: T; properties: {
[key: PropertyID]: T
};
}; };
export type LossyViewMany = { export type LossyViewMany<T> = ViewMany<LossyViewOne<T>>;
[key: DomainEntityID]: LossyViewOne;
};
type Resolver<T = LosslessViewMany> = (losslessView: LosslessViewMany) => T; export type ResolvedViewOne = LossyViewOne<PropertyTypes>;
export type ResolvedViewMany = ViewMany<ResolvedViewOne>;
export type Resolver<T = ResolvedViewMany> =
(losslessView: LosslessViewMany) => T;
// Extract a particular value from a delta's pointers // Extract a particular value from a delta's pointers
export function valueFromCollapsedDelta( export function valueFromCollapsedDelta(
@ -44,13 +53,42 @@ export function firstValueFromLosslessViewOne(
delta: CollapsedDelta, delta: CollapsedDelta,
value: string | number value: string | number
} | undefined { } | undefined {
debug(`trying to get value for ${key} from ${JSON.stringify(ent.properties[key])}`); debug(`trying to get first value for ${key} from ${JSON.stringify(ent.properties[key])}`);
for (const delta of ent.properties[key] || []) { for (const delta of ent.properties[key] || []) {
const value = valueFromCollapsedDelta(delta, key); const value = valueFromCollapsedDelta(delta, key);
if (value) return {delta, value}; if (value) return {delta, value};
} }
} }
// Function for resolving a value for an entity by last write wins
export function lastValueFromLosslessViewOne(
ent: LosslessViewOne,
key: string
): {
delta?: CollapsedDelta,
value?: string | number,
timeUpdated?: number
} | undefined {
const res: {
delta?: CollapsedDelta,
value?: string | number,
timeUpdated?: number
} = {};
debug(`trying to get last value for ${key} from ${JSON.stringify(ent.properties[key])}`);
res.timeUpdated = 0;
for (const delta of ent.properties[key] || []) {
const value = valueFromCollapsedDelta(delta, key);
if (value === undefined) continue;
if (delta.timeCreated < res.timeUpdated) continue;
res.delta = delta;
res.value = value;
res.timeUpdated = delta.timeCreated;
}
return res;
}
export class Lossy { export class Lossy {
lossless: Lossless; lossless: Lossless;

View File

@ -1,10 +0,0 @@
import {Collection} from './collection';
import {EntityProperties} from './entity';
import {LossyViewOne} from './lossy';
import {DomainEntityID} from './types';
export class TypedCollection<T extends EntityProperties> extends Collection {
async put(id: DomainEntityID | undefined, properties: T): Promise<LossyViewOne> {
return super.put(id, properties);
}
}

View File

@ -9,8 +9,13 @@ export type PropertyTypes = string | number | undefined;
export type DomainEntityID = string; export type DomainEntityID = string;
export type PropertyID = string; export type PropertyID = string;
export type Properties = {[key: PropertyID]: PropertyTypes}; export type Timestamp = number;
export type ViewMany<T> = {
[key: DomainEntityID]: T;
};
// TODO: Move to ./peers.ts
export class PeerAddress { export class PeerAddress {
addr: string; addr: string;
port: number; port: number;

View File

@ -4,15 +4,22 @@
"module": "CommonJS", "module": "CommonJS",
"esModuleInterop": true, "esModuleInterop": true,
"moduleResolution": "Node", "moduleResolution": "Node",
"sourceMap": true, "sourceMap": false,
"baseUrl": ".", "baseUrl": ".",
"outDir": "dist", "outDir": "dist",
"importsNotUsedAsValues": "remove", "importsNotUsedAsValues": "remove",
"strict": true, "strict": true,
"esModuleInterop": true,
"skipLibCheck": true, "skipLibCheck": true,
"forceConsistentCasingInFileNames": true "forceConsistentCasingInFileNames": true
}, },
"include": ["src/**/*", "examples/**/*", "__tests__/**/*"], "include": [
"exclude": ["node_modules"] "src/**/*",
"util/**/*",
"examples/**/*",
"scratch/**/*",
"__tests__/**/*"
],
"exclude": [
"node_modules"
]
} }

View File

@ -1,13 +1,5 @@
import {RhizomeNode, RhizomeNodeConfig} from "../src/node"; import {RhizomeNode, RhizomeNodeConfig} from "../src/node";
import {TypedCollection} from "../src/typed-collection"; import {Collection} from "../src/collection";
type User = {
id?: string;
name: string;
nameLong?: string;
email?: string;
age: number;
};
const start = 5000; const start = 5000;
const range = 5000; const range = 5000;
@ -25,7 +17,7 @@ export class App extends RhizomeNode {
...config, ...config,
}); });
const users = new TypedCollection<User>("user"); const users = new Collection("user");
users.rhizomeConnect(this); users.rhizomeConnect(this);
const {httpAddr, httpPort} = this.config; const {httpAddr, httpPort} = this.config;