peers are able to sync

This commit is contained in:
Ladd Hoffman 2024-12-22 09:13:44 -06:00
parent cfe410484e
commit 518bc4eb44
9 changed files with 271 additions and 195 deletions

2
.gitignore vendored
View File

@ -1,2 +1,4 @@
dist/ dist/
node_modules/ node_modules/
*.swp
*.swo

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -2,33 +2,176 @@
// It should enable operations like removing a property removes the value from the entities in the collection // It should enable operations like removing a property removes the value from the entities in the collection
// It could then be further extended with e.g. table semantics like filter, sort, join // It could then be further extended with e.g. table semantics like filter, sort, join
type Property = {
name: string,
type: number | string;
}
class EntityType { import EventEmitter from "node:events";
name: string; import { publishDelta, subscribeDeltas } from "./deltas";
properties?: Property[]; import { Entity, EntityProperties, EntityPropertiesDeltaBuilder } from "./object-layer";
constructor(name: string) { import { Delta } from "./types";
this.name = name; import { randomUUID } from "node:crypto";
}
}
class Entity { // type Property = {
type: EntityType; // name: string,
properties?: object; // type: number | string;
constructor(type: EntityType) { // }
this.type = type;
}
}
class Collection { // class EntityType {
entities = new Map<string, Entity>(); // name: string;
// properties?: Property[];
// constructor(name: string) {
// this.name = name;
// }
// }
// class Entity {
// type: EntityType;
// properties?: object;
// constructor(type: EntityType) {
// this.type = type;
// }
// }
// class Collection {
// update(entityId, properties) // update(entityId, properties)
// ... // ...
} // }
export class Collections { // export class Collections {
collections = new Map<string, Collection>(); // collections = new Map<string, Collection>();
// }
export class Collection {
entities = new Map<string, Entity>();
eventStream = new EventEmitter();
constructor() {
console.log('COLLECTION SUBSCRIBING TO DELTA STREAM');
subscribeDeltas((delta: Delta) => {
// TODO: Make sure this is the kind of delta we're looking for
console.log('COLLECTION RECEIVED DELTA');
this.applyDelta(delta);
});
this.eventStream.on('create', (entity: Entity) => {
console.log(`new entity!`, entity);
});
}
// Applies the javascript rules for updating object values,
// e.g. set to `undefined` to delete a property
updateEntity(entityId?: string, properties?: object, local = false, deltas?: Delta[]): Entity {
let entity: Entity | undefined;
let eventType: 'create' | 'update' | 'delete' | undefined;
entityId = entityId ?? randomUUID();
entity = this.entities.get(entityId);
if (!entity) {
entity = new Entity(entityId);
entity.id = entityId;
eventType = 'create';
}
const deltaBulider = new EntityPropertiesDeltaBuilder(entityId);
if (!properties) {
// Let's interpret this as entity deletion
this.entities.delete(entityId);
// TODO: prepare and publish a delta
// TODO: execute hooks
eventType = 'delete';
} else {
let anyChanged = false;
Object.entries(properties).forEach(([key, value]) => {
let changed = false;
if (entity.properties && entity.properties[key] !== value) {
entity.properties[key] = value;
changed = true;
}
if (local && changed) {
// If this is a change, let's generate a delta
deltaBulider.add(key, value);
// We append to the array the caller may provide
// We can update this count as we receive network confirmation for deltas
entity.ahead += 1;
}
anyChanged = anyChanged || changed;
});
// We've noted that we may be ahead of the server, let's update our
// local image of this entity.
//* In principle, this system can recreate past or alternative states.
//* At worst, by replaying all the deltas up to a particular point.
//* Some sort of checkpointing strategy would probably be helpful.
//* Furthermore, if we can implement reversible transformations,
//* it would then be efficient to calculate the state of the system with
//* specific deltas removed. We could use it to extract a measurement
//* of the effects of some deltas' inclusion or exclusion, the
//* evaluation of which may lend evidence to some possible arguments.
this.entities.set(entityId, entity);
if (anyChanged) {
deltas?.push(deltaBulider.delta);
eventType = eventType || 'update';
}
}
if (eventType) {
this.eventStream.emit(eventType, entity);
}
return entity;
}
// We can update our local image of the entity, but we should annotate it
// to indicate that we have not yet received any confirmation of this delta
// having been propagated.
// Later when we receive deltas regarding this entity we can detect when
// we have received back an image that matches our target.
// So we need a function to generate one or more deltas for each call to put/
// maybe we stage them and wait for a call to commit() that initiates the
// assembly and transmission of one or more deltas
applyDelta(delta: Delta) {
// TODO: handle delta representing entity deletion
console.log('applying delta:', delta);
const idPtr = delta.pointers.find(({localContext}) => localContext === 'id');
if (!idPtr) {
console.error('encountered delta with no entity id', delta);
return;
}
const properties: EntityProperties = {};
delta.pointers.filter(({localContext}) => localContext !== 'id')
.forEach(({localContext: key, target: value}) => {
properties[key] = value;
}, {});
const entityId = idPtr.target as string;
// TODO: Handle the scenario where this update has been superceded by a newer one locally
this.updateEntity(entityId, properties);
}
onCreate(cb: (entity: Entity) => void) {
this.eventStream.on('create', (entity: Entity) => {
cb(entity);
});
}
onUpdate(cb: (entity: Entity) => void) {
this.eventStream.on('update', (entity: Entity) => {
cb(entity);
});
}
put(entityId: string | undefined, properties: object): Entity {
const deltas: Delta[] = [];
const entity = this.updateEntity(entityId, properties, true, deltas);
deltas.forEach(async (delta: Delta) => {
await publishDelta(delta);
});
return entity;
}
del(entityId: string) {
const deltas: Delta[] = [];
this.updateEntity(entityId, undefined, true, deltas);
deltas.forEach(async (delta: Delta) => {
await publishDelta(delta);
});
}
get(id: string): Entity | undefined {
return this.entities.get(id);
}
getIds(): string[] {
return Array.from(this.entities.keys());
}
} }

View File

@ -10,7 +10,7 @@ export const deltasRejected: Delta[] = [];
export const deltasDeferred: Delta[] = []; export const deltasDeferred: Delta[] = [];
export function applyPolicy(delta: Delta): Decision { export function applyPolicy(delta: Delta): Decision {
return !!delta && Decision.Accept; return !!delta && Decision.Accept;
} }
export function receiveDelta(delta: Delta) { export function receiveDelta(delta: Delta) {
@ -18,19 +18,19 @@ export function receiveDelta(delta: Delta) {
} }
export function ingestDelta(delta: Delta) { export function ingestDelta(delta: Delta) {
const decision = applyPolicy(delta); const decision = applyPolicy(delta);
switch (decision) { switch (decision) {
case Decision.Accept: case Decision.Accept:
deltasAccepted.push(delta); deltasAccepted.push(delta);
deltaStream.emit('delta', { delta }); deltaStream.emit('delta', { delta });
break; break;
case Decision.Reject: case Decision.Reject:
deltasRejected.push(delta); deltasRejected.push(delta);
break; break;
case Decision.Defer: case Decision.Defer:
deltasDeferred.push(delta); deltasDeferred.push(delta);
break; break;
} }
} }
export function ingestNext(): boolean { export function ingestNext(): boolean {

View File

@ -2,10 +2,11 @@
import express from "express"; import express from "express";
import { bindPublish, } from "./pub-sub"; import { bindPublish, } from "./pub-sub";
import { runDeltas } from "./deltas"; import { deltasAccepted, deltasProposed, runDeltas } from "./deltas";
import { Entities, Entity } from "./object-layer"; import { Entity } from "./object-layer";
import { Collection } from "./collection-layer";
import { bindReply, runRequestHandlers } from "./request-reply"; import { bindReply, runRequestHandlers } from "./request-reply";
import { subscribeToSeeds } from "./peers"; import { askAllPeersForDeltas, subscribeToSeeds } from "./peers";
import { ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT } from "./config"; import { ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT } from "./config";
@ -25,7 +26,7 @@ type UserProperties = {
}; };
class Users { class Users {
db = new Entities(); db = new Collection();
create(properties: UserProperties): Entity { create(properties: UserProperties): Entity {
// We provide undefined for the id, to let the database generate it // We provide undefined for the id, to let the database generate it
// This call returns the id // This call returns the id
@ -47,12 +48,23 @@ class Users {
} }
(async () => { (async () => {
const app = express() const users = new Users();
const app = express()
app.get("/ids", (req: express.Request, res: express.Response) => { app.get("/ids", (req: express.Request, res: express.Response) => {
res.json({ ids: users.getIds()}); res.json({ ids: users.getIds()});
}); });
app.get("/deltas", (req: express.Request, res: express.Response) => {
// TODO: streaming
res.json(deltasAccepted);
});
app.get("/deltas/count", (req: express.Request, res: express.Response) => {
// TODO: streaming
res.json(deltasAccepted.length);
});
if (ENABLE_HTTP_API) { if (ENABLE_HTTP_API) {
app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
@ -63,21 +75,30 @@ class Users {
await bindReply(); await bindReply();
runDeltas(); runDeltas();
runRequestHandlers(); runRequestHandlers();
await new Promise((resolve) => setTimeout(resolve, 200)); await new Promise((resolve) => setTimeout(resolve, 500));
subscribeToSeeds(); subscribeToSeeds();
await new Promise((resolve) => setTimeout(resolve, 200)); await new Promise((resolve) => setTimeout(resolve, 500));
askAllPeersForDeltas();
await new Promise((resolve) => setTimeout(resolve, 1000));
const users = new Users(); setInterval(() => {
console.log('deltasProposed count', deltasProposed.length,
'deltasAccepted count', deltasAccepted.length);
}, 5000)
const taliesin = users.upsert({ const taliesin = users.upsert({
id: 'taliesin-1', // id: 'taliesin-1',
name: 'Taliesin', name: 'Taliesin',
nameLong: 'Taliesin (Ladd)', nameLong: 'Taliesin (Ladd)',
age: Math.floor(Math.random() * 1000) age: Math.floor(Math.random() * 1000)
}); });
taliesin.onUpdate((u: Entity) => { users.db.onUpdate((u: Entity) => {
console.log('User updated', u); console.log('User updated:', u);
});
users.db.onCreate((u: Entity) => {
console.log('New user!:', u);
}); });
// TODO: Allow configuration regarding read/write concern i.e. // TODO: Allow configuration regarding read/write concern i.e.

View File

@ -7,15 +7,10 @@
// - As typescript interfaces? // - As typescript interfaces?
// - As typescript classes? // - As typescript classes?
import EventEmitter from "node:events";
import { CREATOR, HOST } from "./config"; import { CREATOR, HOST } from "./config";
import { publishDelta, subscribeDeltas } from "./deltas";
import { Delta, PropertyTypes } from "./types"; import { Delta, PropertyTypes } from "./types";
import { randomUUID } from "node:crypto";
const entityEventStream = new EventEmitter(); export type EntityProperties = {
type EntityProperties = {
[key: string]: PropertyTypes [key: string]: PropertyTypes
}; };
@ -27,20 +22,11 @@ export class Entity {
this.id = id; this.id = id;
this.properties = {}; this.properties = {};
} }
onUpdate(cb: (entity: Entity) => void) {
// TODO: This doesn't seem like it will scale well.
entityEventStream.on('update', (entity: Entity) => {
if (entity.id === this.id) {
cb(entity);
}
});
}
} }
const entities = new Map<string, Entity>();
// TODO: Use leveldb for storing view snapshots // TODO: Use leveldb for storing view snapshots
class EntityPropertiesDeltaBuilder { export class EntityPropertiesDeltaBuilder {
delta: Delta; delta: Delta;
constructor(entityId: string) { constructor(entityId: string) {
this.delta = { this.delta = {
@ -58,123 +44,3 @@ class EntityPropertiesDeltaBuilder {
} }
} }
// Applies the javascript rules for updating object values,
// e.g. set to `undefined` to delete a property
function updateEntity(entityId?: string, properties?: object, local = false, deltas?: Delta[]): Entity {
let entity: Entity | undefined;
let eventType: 'create' | 'update' | 'delete' | undefined;
entityId = entityId ?? randomUUID();
entity = entities.get(entityId);
if (!entity) {
entity = new Entity(entityId);
entity.id = entityId;
eventType = 'create';
}
const deltaBulider = new EntityPropertiesDeltaBuilder(entityId);
if (!properties) {
// Let's interpret this as entity deletion
entities.delete(entityId);
// TODO: prepare and publish a delta
// TODO: execute hooks
eventType = 'delete';
} else {
let anyChanged = false;
Object.entries(properties).forEach(([key, value]) => {
let changed = false;
if (entity.properties && entity.properties[key] !== value) {
entity.properties[key] = value;
changed = true;
}
if (local && changed) {
// If this is a change, let's generate a delta
deltaBulider.add(key, value);
// We append to the array the caller may provide
// We can update this count as we receive network confirmation for deltas
entity.ahead += 1;
}
anyChanged = anyChanged || changed;
});
// We've noted that we may be ahead of the server, let's update our
// local image of this entity.
//* In principle, this system can recreate past or alternative states.
//* At worst, by replaying all the deltas up to a particular point.
//* Some sort of checkpointing strategy would probably be helpful.
//* Furthermore, if we can implement reversible transformations,
//* it would then be efficient to calculate the state of the system with
//* specific deltas removed. We could use it to extract a measurement
//* of the effects of some deltas' inclusion or exclusion, the
//* evaluation of which may lend evidence to some possible arguments.
entities.set(entityId, entity);
if (anyChanged) {
deltas?.push(deltaBulider.delta);
eventType = eventType || 'update';
}
}
if (eventType) {
entityEventStream.emit(eventType, entity);
}
return entity;
}
// We can update our local image of the entity, but we should annotate it
// to indicate that we have not yet received any confirmation of this delta
// having been propagated.
// Later when we receive deltas regarding this entity we can detect when
// we have received back an image that matches our target.
// So we need a function to generate one or more deltas for each call to put/
// maybe we stage them and wait for a call to commit() that initiates the
// assembly and transmission of one or more deltas
function applyDelta(delta: Delta) {
// TODO: handle delta representing entity deletion
const idPtr = delta.pointers.find(({localContext}) => localContext === 'id');
if (!idPtr) {
console.error('encountered delta with no entity id', delta);
return;
}
const properties: EntityProperties = {};
delta.pointers.filter(({localContext}) => localContext !== 'id')
.forEach(({localContext: key, target: value}) => {
properties[key] = value;
}, {});
const entityId = idPtr.target as string;
// TODO: Handle the scenario where this update has been superceded by a newer one locally
updateEntity(entityId, properties);
}
subscribeDeltas((delta: Delta) => {
// TODO: Make sure this is the kind of delta we're looking for
applyDelta(delta);
});
export class Entities {
constructor() {
entityEventStream.on('create', (entity: Entity) => {
console.log(`new entity!`, entity);
});
}
put(entityId: string | undefined, properties: object): Entity {
const deltas: Delta[] = [];
const entity = updateEntity(entityId, properties, true, deltas);
deltas.forEach(async (delta: Delta) => {
await publishDelta(delta);
});
return entity;
}
del(entityId: string) {
const deltas: Delta[] = [];
updateEntity(entityId, undefined, true, deltas);
deltas.forEach(async (delta: Delta) => {
await publishDelta(delta);
});
}
get(id: string): Entity | undefined {
return entities.get(id);
}
getIds(): string[] {
return Array.from(entities.keys());
}
}

View File

@ -3,11 +3,32 @@ import { registerRequestHandler, PeerRequest, ResponseSocket } from "./request-r
import { RequestSocket, } from "./request-reply"; import { RequestSocket, } from "./request-reply";
import { SEED_PEERS } from "./config"; import { SEED_PEERS } from "./config";
import {connectSubscribe} from "./pub-sub"; import {connectSubscribe} from "./pub-sub";
import {deltasAccepted, deltasProposed, ingestAll, receiveDelta} from "./deltas";
import {Delta} from "./types";
export enum PeerMethods { export enum PeerMethods {
GetPublishAddress, GetPublishAddress,
AskForDeltas
} }
registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
console.log('inspecting peer request');
switch (req.method) {
case PeerMethods.GetPublishAddress: {
console.log('it\'s a request for our publish address');
await res.send(publishAddr);
break;
}
case PeerMethods.AskForDeltas: {
console.log('it\'s a request for deltas');
// TODO: stream these rather than
// trying to write them all in one message
await res.send(JSON.stringify(deltasAccepted));
break;
}
}
});
export type PeerAddress = { export type PeerAddress = {
addr: string, addr: string,
port: number port: number
@ -33,23 +54,46 @@ class Peer {
connectSubscribe(addr, port); connectSubscribe(addr, port);
} }
} }
async askForDeltas(): Promise<Delta[]> {
// TODO as a first approximation we are trying to cram the entire history
// of accepted deltas, into one (potentially gargantuan) json message.
// A second approximation would be to stream the deltas.
// Third pass should find a way to reduce the number of deltas transmitted.
// TODO: requestTimeout
const res = await this.reqSock.request(PeerMethods.AskForDeltas);
const deltas = JSON.parse(res.toString());
return deltas;
}
}
const peers: Peer[] = [];
function newPeer(addr: string, port: number) {
const peer = new Peer(addr, port);
peers.push(peer);
return peer;
} }
export async function subscribeToSeeds() { export async function subscribeToSeeds() {
SEED_PEERS.forEach(async ({addr, port}, idx) => { SEED_PEERS.forEach(async ({addr, port}, idx) => {
console.log(`SEED PEERS[${idx}]=${addr}:${port}`); console.log(`SEED PEERS[${idx}]=${addr}:${port}`);
const peer = new Peer(addr, port); const peer = newPeer(addr, port);
await peer.subscribe(); await peer.subscribe();
}); });
} }
registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { //! TODO Expect abysmal scaling properties with this function
console.log('inspecting peer request'); export async function askAllPeersForDeltas() {
switch (req.method) { peers.forEach(async (peer, idx) => {
case PeerMethods.GetPublishAddress: console.log(`Asking peer ${idx} for deltas`);
console.log('it\'s a request for our publish address'); const deltas = await peer.askForDeltas();
await res.send(publishAddr); console.log('received deltas:', deltas);
break; for (const delta of deltas) {
} receiveDelta(delta);
}); }
console.log('deltasProposed count', deltasProposed.length);
console.log('deltasAccepted count', deltasAccepted.length);
ingestAll();
});
}