refactored globals into classes

This commit is contained in:
Ladd Hoffman 2024-12-25 16:13:48 -06:00
parent 8f97517075
commit a5fb49475b
16 changed files with 649 additions and 403 deletions

View File

@ -35,7 +35,8 @@ Then if their clocks drift relative to ours, we can seek consensus among a broad
But at that point just run ntpd. Can still do consensus to verify
but probably no need to implement custom time synchronization protocol.
Wait NTP is centralized isn't it, not peer to peer...
Apparently PTP, Precision Time Protocol, is a thing.
PTP affords for a layer of user defined priority for best clock selection.
## Peering
@ -118,7 +119,7 @@ To demonstrate the example application, you can open multiple terminals, and in
export DEBUG="*,-express"
export RHIZOME_REQUEST_BIND_PORT=4000
export RHIZOME_PUBLISH_BIND_PORT=4001
export RHIZOME_SEED_PEERS='127.0.0.1:4002, 127.0.0.1:4004'
export RHIZOME_SEED_PEERS='localhost:4002, localhost:4004'
export RHIZOME_HTTP_API_PORT=3000
export RHIZOME_PEER_ID=peer1
npm run example-app
@ -128,7 +129,7 @@ npm run example-app
export DEBUG="*,-express"
export RHIZOME_REQUEST_BIND_PORT=4002
export RHIZOME_PUBLISH_BIND_PORT=4003
export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4004'
export RHIZOME_SEED_PEERS='localhost:4000, localhost:4004'
export RHIZOME_HTTP_API_PORT=3001
export RHIZOME_PEER_ID=peer2
npm run example-app
@ -138,7 +139,7 @@ npm run example-app
export DEBUG="*,-express"
export RHIZOME_REQUEST_BIND_PORT=4004
export RHIZOME_PUBLISH_BIND_PORT=4005
export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4002'
export RHIZOME_SEED_PEERS='localhost:4000, localhost:4002'
export RHIZOME_HTTP_API_PORT=3002
export RHIZOME_PEER_ID=peer3
npm run example-app

60
__tests__/run.ts Normal file
View File

@ -0,0 +1,60 @@
import Debug from 'debug';
import {RhizomeNode, RhizomeNodeConfig} from "../src/node";
import {TypedCollection} from '../src/typed-collection';
const debug = Debug('test:run');
type User = {
id?: string;
name: string;
nameLong?: string;
email?: string;
age: number;
};
class App extends RhizomeNode {
constructor(config?: Partial<RhizomeNodeConfig>) {
super(config);
const users = new TypedCollection<User>("users");
users.rhizomeConnect(this);
}
}
describe('Run', () => {
let app: App;
beforeAll(async () => {
app = new App({
// TODO expose more conveniently as test config options
httpPort: 5000,
httpEnable: true,
requestBindPort: 5001,
publishBindPort: 5002,
});
await app.start();
});
afterAll(async () => {
debug('attempting to stop app');
await app.stop();
});
it('can put a new user', async () => {
const res = await fetch('http://localhost:5000/users', {
method: 'PUT',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
name: "Peon",
id: "peon-1",
age: 263
})
});
const data = await res.json();
expect(data).toMatchObject({
properties: {
name: "Peon",
id: "peon-1",
age: 263
}
});
});
});

View File

@ -3,12 +3,11 @@
// It should enable operations like removing a property removes the value from the entities in the collection
// It could then be further extended with e.g. table semantics like filter, sort, join
import {randomUUID} from "node:crypto";
import EventEmitter from "node:events";
import { deltasAccepted, publishDelta, subscribeDeltas } from "./deltas";
import {RhizomeNode} from "./node";
import {Entity, EntityProperties, EntityPropertiesDeltaBuilder} from "./object-layer";
import {Delta} from "./types";
import { randomUUID } from "node:crypto";
import {myRequestAddr} from "./peers";
// type Property = {
// name: string,
@ -24,13 +23,24 @@ import {myRequestAddr} from "./peers";
// }
export class Collection {
rhizomeNode?: RhizomeNode;
name: string;
entities = new Map<string, Entity>();
eventStream = new EventEmitter();
constructor() {
subscribeDeltas((delta: Delta) => {
constructor(name: string) {
this.name = name;
}
rhizomeConnect(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
rhizomeNode.deltaStream.subscribeDeltas((delta: Delta) => {
// TODO: Make sure this is the kind of delta we're looking for
this.applyDelta(delta);
});
rhizomeNode.httpApi.serveCollection(this);
}
// Applies the javascript rules for updating object values,
@ -45,7 +55,7 @@ export class Collection {
entity.id = entityId;
eventType = 'create';
}
const deltaBulider = new EntityPropertiesDeltaBuilder(entityId);
const deltaBulider = new EntityPropertiesDeltaBuilder(this.rhizomeNode!, entityId);
if (!properties) {
// Let's interpret this as entity deletion
@ -135,9 +145,9 @@ export class Collection {
const deltas: Delta[] = [];
const entity = this.updateEntity(entityId, properties, true, deltas);
deltas.forEach(async (delta: Delta) => {
delta.receivedFrom = myRequestAddr;
deltasAccepted.push(delta);
await publishDelta(delta);
delta.receivedFrom = this.rhizomeNode!.myRequestAddr;
this.rhizomeNode!.deltaStream.deltasAccepted.push(delta);
await this.rhizomeNode!.deltaStream.publishDelta(delta);
});
return entity;
}
@ -146,8 +156,8 @@ export class Collection {
const deltas: Delta[] = [];
this.updateEntity(entityId, undefined, true, deltas);
deltas.forEach(async (delta: Delta) => {
deltasAccepted.push(delta);
await publishDelta(delta);
this.rhizomeNode!.deltaStream.deltasAccepted.push(delta);
await this.rhizomeNode!.deltaStream.publishDelta(delta);
});
}

View File

@ -6,7 +6,7 @@ import {PeerAddress} from "./types";
export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data';
export const CREATOR = process.env.USER!;
export const HOST_ID = process.env.RHIZOME_PEER_ID || randomUUID();
export const PEER_ID = process.env.RHIZOME_PEER_ID || randomUUID();
export const ADDRESS = process.env.RHIZOME_ADDRESS ?? 'localhost';
export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS;
export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000');
@ -14,7 +14,7 @@ export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUES
export const PUBLISH_BIND_ADDR = process.env.RHIZOME_PUBLISH_BIND_ADDR || ADDRESS;
export const PUBLISH_BIND_PORT = parseInt(process.env.RHIZOME_PUBLISH_BIND_PORT || '4001');
export const PUBLISH_BIND_HOST = process.env.RHIZOME_PUBLISH_BIND_HOST || PUBLISH_BIND_ADDR;
export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || '127.0.0.1';
export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || 'localhost';
export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000');
export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',')

0
src/context.ts Normal file
View File

View File

@ -1,103 +1,94 @@
import Debug from 'debug';
import EventEmitter from 'node:events';
import objectHash from 'object-hash';
import {myRequestAddr} from './peers';
import {publishSock, subscribeSock} from './pub-sub';
import {Decision, Delta, PeerAddress} from './types';
import Debug from 'debug';
import {RhizomeNode} from './node';
import {Decision, Delta} from './types';
const debug = Debug('deltas');
export const deltaStream = new EventEmitter();
export class DeltaStream {
rhizomeNode: RhizomeNode;
deltaStream = new EventEmitter();
deltasProposed: Delta[] = [];
deltasAccepted: Delta[] = [];
deltasRejected: Delta[] = [];
deltasDeferred: Delta[] = [];
hashesReceived = new Set<string>();
export const deltasProposed: Delta[] = [];
export const deltasAccepted: Delta[] = [];
export const deltasRejected: Delta[] = [];
export const deltasDeferred: Delta[] = [];
constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
}
export const hashesReceived = new Set<string>();
export function applyPolicy(delta: Delta): Decision {
applyPolicy(delta: Delta): Decision {
return !!delta && Decision.Accept;
}
export function receiveDelta(delta: Delta) {
receiveDelta(delta: Delta) {
// Deduplication: if we already received this delta, disregard it
const hash = objectHash(delta);
if (!hashesReceived.has(hash)) {
hashesReceived.add(hash);
deltasProposed.push(delta);
if (!this.hashesReceived.has(hash)) {
this.hashesReceived.add(hash);
this.deltasProposed.push(delta);
}
}
export function ingestDelta(delta: Delta) {
const decision = applyPolicy(delta);
ingestDelta(delta: Delta) {
const decision = this.applyPolicy(delta);
switch (decision) {
case Decision.Accept:
deltasAccepted.push(delta);
deltaStream.emit('delta', {delta});
this.deltasAccepted.push(delta);
this.deltaStream.emit('delta', {delta});
break;
case Decision.Reject:
deltasRejected.push(delta);
this.deltasRejected.push(delta);
break;
case Decision.Defer:
deltasDeferred.push(delta);
this.deltasDeferred.push(delta);
break;
}
}
export function ingestNext(): boolean {
const delta = deltasProposed.shift();
ingestNext(): boolean {
const delta = this.deltasProposed.shift();
if (!delta) {
return false;
}
ingestDelta(delta);
this.ingestDelta(delta);
return true;
}
export function ingestAll() {
while (ingestNext());
ingestAll() {
while (this.ingestNext());
}
export function ingestNextDeferred(): boolean {
const delta = deltasDeferred.shift();
ingestNextDeferred(): boolean {
const delta = this.deltasDeferred.shift();
if (!delta) {
return false;
}
ingestDelta(delta);
this.ingestDelta(delta);
return true;
}
export function ingestAllDeferred() {
while (ingestNextDeferred());
ingestAllDeferred() {
while (this.ingestNextDeferred());
}
export function subscribeDeltas(fn: (delta: Delta) => void) {
deltaStream.on('delta', ({delta}) => {
subscribeDeltas(fn: (delta: Delta) => void) {
this.deltaStream.on('delta', ({delta}) => {
fn(delta);
});
}
export async function publishDelta(delta: Delta) {
async publishDelta(delta: Delta) {
debug(`Publishing delta: ${JSON.stringify(delta)}`);
await publishSock.send(["deltas", myRequestAddr.toAddrString(), serializeDelta(delta)]);
await this.rhizomeNode.pubSub.publish("deltas", this.serializeDelta(delta));
}
function serializeDelta(delta: Delta) {
serializeDelta(delta: Delta) {
return JSON.stringify(delta);
}
function deserializeDelta(input: string) {
deserializeDelta(input: string) {
return JSON.parse(input);
}
export async function runDeltas() {
for await (const [topic, sender, msg] of subscribeSock) {
if (topic.toString() !== "deltas") {
continue;
}
const delta = deserializeDelta(msg.toString());
delta.receivedFrom = PeerAddress.fromString(sender.toString());
debug(`Received delta: ${JSON.stringify(delta)}`);
ingestDelta(delta);
}
}

View File

@ -1,12 +1,7 @@
import {HTTP_API_ENABLE} from "./config";
import {runDeltas} from "./deltas";
import {runHttpApi} from "./http-api";
import {Entity} from "./object-layer";
import {askAllPeersForDeltas, subscribeToSeeds} from "./peers";
import {bindPublish, } from "./pub-sub";
import {bindReply, runRequestHandlers} from "./request-reply";
import {TypedCollection} from "./typed-collection";
import Debug from 'debug';
import {RhizomeNode} from "./node";
import {Entity} from "./object-layer";
import {TypedCollection} from "./typed-collection";
const debug = Debug('example-app');
// As an app we want to be able to write and read data.
@ -23,21 +18,9 @@ type User = {
};
(async () => {
const users = new TypedCollection<User>();
await bindPublish();
await bindReply();
if (HTTP_API_ENABLE) {
runHttpApi({users});
}
runDeltas();
runRequestHandlers();
await new Promise((resolve) => setTimeout(resolve, 500));
subscribeToSeeds();
await new Promise((resolve) => setTimeout(resolve, 500));
askAllPeersForDeltas();
await new Promise((resolve) => setTimeout(resolve, 1000));
const rhizomeNode = new RhizomeNode();
const users = new TypedCollection<User>("users");
users.rhizomeConnect(rhizomeNode);
users.onUpdate((u: Entity) => {
debug('User updated:', u);
@ -47,6 +30,9 @@ type User = {
debug('New user!:', u);
});
await rhizomeNode.start()
const taliesin = users.put(undefined, {
// id: 'taliesin-1',
name: 'Taliesin',

View File

@ -1,25 +1,22 @@
import Debug from "debug";
import express from "express";
import {FSWatcher} from "fs";
import {readdirSync, readFileSync, watch} from "fs";
import {Server} from "http";
import path, {join} from "path";
import {Converter} from "showdown";
import {Collection} from "./collection";
import {HTTP_API_ADDR, HTTP_API_PORT} from "./config";
import {deltasAccepted} from "./deltas";
import {peers} from "./peers";
import {RhizomeNode} from "./node";
import {Delta} from "./types";
const debug = Debug('http-api');
type CollectionsToServe = {
[key: string]: Collection;
};
const docConverter = new Converter({
completeHTMLDocument: true,
// simpleLineBreaks: true,
tables: true,
tasklists: true
});
const htmlDocFromMarkdown = (md: string): string => docConverter.makeHtml(md);
type mdFileInfo = {
@ -31,6 +28,8 @@ type mdFileInfo = {
class MDFiles {
files = new Map<string, mdFileInfo>();
readme?: mdFileInfo;
dirWatcher?: FSWatcher;
readmeWatcher?: FSWatcher;
readFile(name: string) {
const md = readFileSync(join('./markdown', `${name}.md`)).toString();
@ -66,7 +65,7 @@ class MDFiles {
}
watchDir() {
watch('./markdown', null, (eventType, filename) => {
this.dirWatcher = watch('./markdown', null, (eventType, filename) => {
if (!filename) return;
if (!filename.endsWith(".md")) return;
@ -91,7 +90,7 @@ class MDFiles {
}
watchReadme() {
watch('./README.md', null, (eventType, filename) => {
this.readmeWatcher = watch('./README.md', null, (eventType, filename) => {
if (!filename) return;
switch (eventType) {
@ -104,28 +103,40 @@ class MDFiles {
}
});
}
close() {
this.dirWatcher?.close();
this.readmeWatcher?.close();
}
}
export function runHttpApi(collections?: CollectionsToServe) {
const app = express();
app.use(express.json());
export class HttpApi {
rhizomeNode: RhizomeNode;
app = express();
mdFiles = new MDFiles();
server?: Server;
// Get list of markdown files
const mdFiles = new MDFiles();
mdFiles.readDir();
mdFiles.readReadme();
mdFiles.watchDir();
mdFiles.watchReadme();
constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
this.app.use(express.json());
}
start() {
// Scan and watch for markdown files
this.mdFiles.readDir();
this.mdFiles.readReadme();
this.mdFiles.watchDir();
this.mdFiles.watchReadme();
// Serve README
app.get('/html/README', (_req: express.Request, res: express.Response) => {
const html = mdFiles.getReadmeHTML();
this.app.get('/html/README', (_req: express.Request, res: express.Response) => {
const html = this.mdFiles.getReadmeHTML();
res.setHeader('content-type', 'text/html').send(html);
});
// Serve markdown files as html
app.get('/html/:name', (req: express.Request, res: express.Response) => {
let html = mdFiles.getHtml(req.params.name);
this.app.get('/html/:name', (req: express.Request, res: express.Response) => {
let html = this.mdFiles.getHtml(req.params.name);
if (!html) {
res.status(404);
html = htmlDocFromMarkdown('# 404\n\n## [Index](/html)');
@ -138,69 +149,31 @@ export function runHttpApi(collections?: CollectionsToServe) {
{
let md = `# Files\n\n`;
md += `[README](/html/README)\n\n`;
for (const name of mdFiles.list()) {
for (const name of this.mdFiles.list()) {
md += `- [${name}](./${name})\n`;
}
const html = htmlDocFromMarkdown(md);
app.get('/html', (_req: express.Request, res: express.Response) => {
this.app.get('/html', (_req: express.Request, res: express.Response) => {
res.setHeader('content-type', 'text/html').send(html);
});
}
// Set up API routes
if (collections) {
for (const [name, collection] of Object.entries(collections)) {
debug(`collection: ${name}`);
// Get the ID of all domain entities
app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => {
res.json({ids: collection.getIds()});
});
// Get a single domain entity by ID
app.get(`/${name}/:id`, (req: express.Request, res: express.Response) => {
const {params: {id}} = req;
const ent = collection.get(id);
res.json(ent);
});
// Add a new domain entity
// TODO: schema validation
app.put(`/${name}`, (req: express.Request, res: express.Response) => {
const {body: properties} = req;
const ent = collection.put(properties.id, properties);
res.json(ent);
});
// Update a domain entity
app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => {
const {body: properties, params: {id}} = req;
if (properties.id && properties.id !== id) {
res.status(400).json({error: "ID Mismatch", param: id, property: properties.id});
return;
}
const ent = collection.put(id, properties);
res.json(ent);
});
}
}
app.get("/deltas", (_req: express.Request, res: express.Response) => {
// TODO: streaming
res.json(deltasAccepted);
// Serve list of all deltas accepted
// TODO: This won't scale well
this.app.get("/deltas", (_req: express.Request, res: express.Response) => {
res.json(this.rhizomeNode.deltaStream.deltasAccepted);
});
// Get the number of deltas ingested by this node
app.get("/deltas/count", (_req: express.Request, res: express.Response) => {
res.json(deltasAccepted.length);
this.app.get("/deltas/count", (_req: express.Request, res: express.Response) => {
res.json(this.rhizomeNode.deltaStream.deltasAccepted.length);
});
// Get the list of peers seen by this node (including itself)
app.get("/peers", (_req: express.Request, res: express.Response) => {
res.json(peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
const deltasAcceptedCount = deltasAccepted
this.app.get("/peers", (_req: express.Request, res: express.Response) => {
res.json(this.rhizomeNode.peers.peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
const deltasAcceptedCount = this.rhizomeNode.deltaStream.deltasAccepted
.filter((delta: Delta) => {
return delta.receivedFrom?.addr == reqAddr.addr &&
delta.receivedFrom?.port == reqAddr.port;
@ -220,11 +193,53 @@ export function runHttpApi(collections?: CollectionsToServe) {
});
// Get the number of peers seen by this node (including itself)
app.get("/peers/count", (_req: express.Request, res: express.Response) => {
res.json(peers.length);
this.app.get("/peers/count", (_req: express.Request, res: express.Response) => {
res.json(this.rhizomeNode.peers.peers.length);
});
app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
debug(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
const {httpAddr, httpPort} = this.rhizomeNode.config;
this.server = this.app.listen(httpPort, httpAddr, () => {
debug(`HTTP API bound to ${httpAddr}:${httpPort}`);
});
}
serveCollection(collection: Collection) {
const {name} = collection;
// Get the ID of all domain entities
this.app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => {
res.json({ids: collection.getIds()});
});
// Get a single domain entity by ID
this.app.get(`/${name}/:id`, (req: express.Request, res: express.Response) => {
const {params: {id}} = req;
const ent = collection.get(id);
res.json(ent);
});
// Add a new domain entity
// TODO: schema validation
this.app.put(`/${name}`, (req: express.Request, res: express.Response) => {
const {body: properties} = req;
const ent = collection.put(properties.id, properties);
res.json(ent);
});
// Update a domain entity
this.app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => {
const {body: properties, params: {id}} = req;
if (properties.id && properties.id !== id) {
res.status(400).json({error: "ID Mismatch", param: id, property: properties.id});
return;
}
const ent = collection.put(id, properties);
res.json(ent);
});
}
async stop() {
this.server?.close();
this.mdFiles.close();
}
}

82
src/node.ts Normal file
View File

@ -0,0 +1,82 @@
import Debug from 'debug';
import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUBLISH_BIND_ADDR, PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_ADDR, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from './config';
import {DeltaStream} from './deltas';
import {HttpApi} from './http-api';
import {Peers} from './peers';
import {PubSub} from './pub-sub';
import {RequestReply} from './request-reply';
import {PeerAddress} from './types';
import {Collection} from './collection';
const debug = Debug('rhizome-node');
export type RhizomeNodeConfig = {
requestBindAddr: string;
requestBindHost: string;
requestBindPort: number;
publishBindAddr: string;
publishBindHost: string;
publishBindPort: number;
httpAddr: string;
httpPort: number;
httpEnable: boolean;
seedPeers: PeerAddress[];
peerId: string;
creator: string; // TODO each host should be able to support multiple users
};
// So that we can run more than one instance in the same process (for testing)
export class RhizomeNode {
config: RhizomeNodeConfig;
pubSub: PubSub;
requestReply: RequestReply;
httpApi: HttpApi;
deltaStream: DeltaStream;
peers: Peers;
myRequestAddr: PeerAddress;
myPublishAddr: PeerAddress;
constructor(config?: Partial<RhizomeNodeConfig>) {
this.config = {
requestBindAddr: REQUEST_BIND_ADDR,
requestBindHost: REQUEST_BIND_HOST,
requestBindPort: REQUEST_BIND_PORT,
publishBindAddr: PUBLISH_BIND_ADDR,
publishBindHost: PUBLISH_BIND_HOST,
publishBindPort: PUBLISH_BIND_PORT,
httpAddr: HTTP_API_ADDR,
httpPort: HTTP_API_PORT,
httpEnable: HTTP_API_ENABLE,
seedPeers: SEED_PEERS,
peerId: PEER_ID,
creator: CREATOR,
...config
};
debug('config', this.config);
this.myRequestAddr = new PeerAddress(this.config.requestBindHost, this.config.requestBindPort);
this.myPublishAddr = new PeerAddress(this.config.publishBindHost, this.config.publishBindPort);
this.pubSub = new PubSub(this);
this.requestReply = new RequestReply(this);
this.httpApi = new HttpApi(this);
this.deltaStream = new DeltaStream(this);
this.peers = new Peers(this);
}
async start() {
this.pubSub.start();
this.requestReply.start();
if (this.config.httpEnable) {
this.httpApi.start();
}
await new Promise((resolve) => setTimeout(resolve, 500));
this.peers.subscribeToSeeds();
await new Promise((resolve) => setTimeout(resolve, 500));
this.peers.askAllPeersForDeltas();
await new Promise((resolve) => setTimeout(resolve, 1000));
}
async stop() {
await this.pubSub.stop();
await this.requestReply.stop();
await this.httpApi.stop();
}
}

View File

@ -7,7 +7,7 @@
// - As typescript interfaces?
// - As typescript classes?
import { CREATOR, HOST_ID } from "./config";
import {RhizomeNode} from "./node";
import {Delta, PropertyTypes} from "./types";
export type EntityProperties = {
@ -29,10 +29,10 @@ export class Entity {
export class EntityPropertiesDeltaBuilder {
delta: Delta;
constructor(entityId: string) {
constructor(rhizomeNode: RhizomeNode, entityId: string) {
this.delta = {
creator: CREATOR,
host: HOST_ID,
creator: rhizomeNode.config.creator,
host: rhizomeNode.config.peerId,
pointers: [{
localContext: 'id',
target: entityId,

View File

@ -1,9 +1,10 @@
import {PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from "./config";
import {deltasAccepted, ingestAll, receiveDelta} from "./deltas";
import {connectSubscribe} from "./pub-sub";
import {PeerRequest, registerRequestHandler, RequestSocket, ResponseSocket} from "./request-reply";
import {Delta, PeerAddress} from "./types";
import Debug from 'debug';
import {Message} from 'zeromq';
import {SEED_PEERS} from "./config";
import {RhizomeNode} from "./node";
import {Subscription} from './pub-sub';
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply";
import {Delta, PeerAddress} from "./types";
const debug = Debug('peers');
export enum PeerMethods {
@ -11,48 +12,50 @@ export enum PeerMethods {
AskForDeltas
}
export const myRequestAddr = new PeerAddress(REQUEST_BIND_HOST, REQUEST_BIND_PORT);
export const myPublishAddr = new PeerAddress(PUBLISH_BIND_HOST, PUBLISH_BIND_PORT);
registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
debug('inspecting peer request');
switch (req.method) {
case PeerMethods.GetPublishAddress: {
debug('it\'s a request for our publish address');
await res.send(myPublishAddr.toAddrString());
break;
}
case PeerMethods.AskForDeltas: {
debug('it\'s a request for deltas');
// TODO: stream these rather than
// trying to write them all in one message
await res.send(JSON.stringify(deltasAccepted));
break;
}
}
});
class Peer {
rhizomeNode: RhizomeNode;
reqAddr: PeerAddress;
reqSock: RequestSocket;
reqSock?: RequestSocket;
publishAddr: PeerAddress | undefined;
isSelf: boolean;
isSeedPeer: boolean;
constructor(addr: string, port: number) {
this.reqAddr = new PeerAddress(addr, port);
this.reqSock = new RequestSocket(addr, port);
this.isSelf = addr === myRequestAddr.addr && port === myRequestAddr.port;
this.isSeedPeer = !!SEED_PEERS.find((seedPeer) =>
addr === seedPeer.addr && port === seedPeer.port);
subscription?: Subscription;
constructor(rhizomeNode: RhizomeNode, reqAddr: PeerAddress) {
this.rhizomeNode = rhizomeNode;
this.reqAddr = reqAddr;
this.isSelf = reqAddr.isEqual(this.rhizomeNode.myRequestAddr);
this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => reqAddr.isEqual(seedPeer));
}
async subscribe() {
async request(method: PeerMethods): Promise<Message> {
if (!this.reqSock) {
this.reqSock = new RequestSocket(this.reqAddr);
}
return this.reqSock.request(method);
}
async subscribeDeltas() {
if (!this.publishAddr) {
const res = await this.reqSock.request(PeerMethods.GetPublishAddress);
// TODO: input validation
debug(`requesting publish addr from peer ${this.reqAddr.toAddrString()}`);
const res = await this.request(PeerMethods.GetPublishAddress);
this.publishAddr = PeerAddress.fromString(res.toString());
connectSubscribe(this.publishAddr!);
debug(`received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
}
this.subscription = this.rhizomeNode.pubSub.subscribe(
this.publishAddr,
"deltas",
(sender, msg) => {
const delta = this.rhizomeNode.deltaStream.deserializeDelta(msg.toString());
delta.receivedFrom = sender;
debug(`Received delta: ${JSON.stringify(delta)}`);
this.rhizomeNode.deltaStream.ingestDelta(delta);
});
this.subscription.start();
}
async askForDeltas(): Promise<Delta[]> {
// TODO as a first approximation we are trying to cram the entire history
// of accepted deltas, into one (potentially gargantuan) json message.
@ -60,42 +63,69 @@ class Peer {
// Third pass should find a way to reduce the number of deltas transmitted.
// TODO: requestTimeout
const res = await this.reqSock.request(PeerMethods.AskForDeltas);
const res = await this.request(PeerMethods.AskForDeltas);
const deltas = JSON.parse(res.toString());
return deltas;
}
}
export const peers: Peer[] = [];
export class Peers {
rhizomeNode: RhizomeNode;
peers: Peer[] = [];
peers.push(new Peer(myRequestAddr.addr, myRequestAddr.port));
constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
function newPeer(addr: string, port: number) {
const peer = new Peer(addr, port);
peers.push(peer);
// Add self to the list of peers, but don't connect
this.addPeer(this.rhizomeNode.myRequestAddr);
this.rhizomeNode.requestReply.registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
debug('inspecting peer request');
switch (req.method) {
case PeerMethods.GetPublishAddress: {
debug('it\'s a request for our publish address');
await res.send(this.rhizomeNode.myPublishAddr.toAddrString());
break;
}
case PeerMethods.AskForDeltas: {
debug('it\'s a request for deltas');
// TODO: stream these rather than
// trying to write them all in one message
await res.send(JSON.stringify(this.rhizomeNode.deltaStream.deltasAccepted));
break;
}
}
});
}
addPeer(addr: PeerAddress): Peer {
const peer = new Peer(this.rhizomeNode, addr);
this.peers.push(peer);
debug('added peer', addr);
return peer;
}
export async function subscribeToSeeds() {
SEED_PEERS.forEach(async ({addr, port}, idx) => {
debug(`SEED PEERS[${idx}]=${addr}:${port}`);
const peer = newPeer(addr, port);
await peer.subscribe();
async subscribeToSeeds() {
SEED_PEERS.forEach(async (addr, idx) => {
debug(`SEED PEERS[${idx}]=${addr.toAddrString()}`);
const peer = this.addPeer(addr);
await peer.subscribeDeltas();
});
}
//! TODO Expect abysmal scaling properties with this function
export async function askAllPeersForDeltas() {
peers
.filter(({isSelf}) => !isSelf)
async askAllPeersForDeltas() {
this.peers.filter(({isSelf}) => !isSelf)
.forEach(async (peer, idx) => {
debug(`Asking peer ${idx} for deltas`);
const deltas = await peer.askForDeltas();
debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
for (const delta of deltas) {
delta.receivedFrom = peer.reqAddr;
receiveDelta(delta);
this.rhizomeNode.deltaStream.receiveDelta(delta);
}
ingestAll();
this.rhizomeNode.deltaStream.ingestAll();
});
}
}

View File

@ -1,23 +1,79 @@
import {Publisher, Subscriber} from 'zeromq';
import {PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT} from './config';
import {PeerAddress} from './types';
import Debug from 'debug';
import {Message, Publisher, Subscriber} from 'zeromq';
import {RhizomeNode} from './node';
import {PeerAddress} from './types';
const debug = Debug('pub-sub');
export const publishSock = new Publisher();
export const subscribeSock = new Subscriber();
export type SubscribedMessageHandler = (sender: PeerAddress, msg: Message) => void;
export async function bindPublish() {
const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`;
await publishSock.bind(addrStr);
debug(`Publishing socket bound to ${addrStr}`);
// TODO: Allow subscribing to multiple topics on one socket
export class Subscription {
sock = new Subscriber();
topic: string;
publishAddr: PeerAddress;
publishAddrStr: string;
cb: SubscribedMessageHandler;
constructor(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler) {
this.cb = cb;
this.topic = topic;
this.publishAddr = publishAddr;
this.publishAddrStr = `tcp://${this.publishAddr.toAddrString()}`;
}
export function connectSubscribe(publishAddr: PeerAddress) {
// TODO: peer discovery
const addrStr = `tcp://${publishAddr.toAddrString()}`;
debug('connectSubscribe', {addrStr});
subscribeSock.connect(addrStr);
subscribeSock.subscribe("deltas");
debug(`Subscribing to ${addrStr}`);
async start() {
this.sock.connect(this.publishAddrStr);
this.sock.subscribe(this.topic);
debug(`Subscribing to ${this.topic} topic on ${this.publishAddrStr}`);
for await (const [, sender, msg] of this.sock) {
const senderAddr = PeerAddress.fromString(sender.toString());
this.cb(senderAddr, msg);
}
}
}
export class PubSub {
rhizomeNode: RhizomeNode;
publishSock: Publisher;
publishAddrStr: string;
subscriptions: Subscription[] = [];
constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
this.publishSock = new Publisher();
const {publishBindAddr, publishBindPort} = this.rhizomeNode.config;
this.publishAddrStr = `tcp://${publishBindAddr}:${publishBindPort}`;
}
async start() {
await this.publishSock.bind(this.publishAddrStr);
debug(`Publishing socket bound to ${this.publishAddrStr}`);
}
async publish(topic: string, msg: string) {
await this.publishSock.send([
topic,
this.rhizomeNode.myRequestAddr.toAddrString(),
msg
]);
}
subscribe(publishAddr: PeerAddress, topic: string, cb: SubscribedMessageHandler): Subscription {
const subscription = new Subscription(publishAddr, topic, cb);
this.subscriptions.push(subscription);
return subscription;
}
async stop() {
await this.publishSock.unbind(this.publishAddrStr);
this.publishSock.close();
this.publishSock = new Publisher();
for (const subscription of this.subscriptions) {
subscription.sock.close();
debug('subscription socket is closed?', subscription.sock.closed);
}
}
}

View File

@ -1,15 +0,0 @@
import { Query, QueryResult, } from './types';
import { deltasAccepted } from './deltas';
import { applyFilter } from './filter';
// export const queryResultMemo = new Map<Query, QueryResult>();
export function issueQuery(query: Query): QueryResult {
const deltas = applyFilter(deltasAccepted, query.filterExpr);
return {
deltas
// TODO: Materialized view / state collapse snapshot
};
}

View File

@ -1,8 +1,9 @@
import {Request, Reply, Message} from 'zeromq';
import { REQUEST_BIND_PORT, REQUEST_BIND_ADDR} from './config';
import {EventEmitter} from 'node:events';
import {PeerMethods} from './peers';
import Debug from 'debug';
import {RhizomeNode} from './node';
import {PeerAddress} from './types';
const debug = Debug('request-reply');
export type PeerRequest = {
@ -11,34 +12,30 @@ export type PeerRequest = {
export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void;
export const replySock = new Reply();
const requestStream = new EventEmitter();
// TODO: Retain handle to request socket for each peer, so we only need to open once
export class RequestSocket {
sock = new Request();
export async function bindReply() {
const addrStr = `tcp://${REQUEST_BIND_ADDR}:${REQUEST_BIND_PORT}`;
await replySock.bind(addrStr);
debug(`Reply socket bound to ${addrStr}`);
constructor(addr: PeerAddress) {
const addrStr = `tcp://${addr.addr}:${addr.port}`;
this.sock.connect(addrStr);
debug(`Request socket connecting to ${addrStr}`);
}
export async function runRequestHandlers() {
for await (const [msg] of replySock) {
debug(`Received message`, {msg: msg.toString()});
const req = peerRequestFromMsg(msg);
requestStream.emit('request', req);
async request(method: PeerMethods): Promise<Message> {
const req: PeerRequest = {
method
};
await this.sock.send(JSON.stringify(req));
// Wait for a response.
// TODO: Timeout
// TODO: Retry
// this.sock.receiveTimeout = ...
const [res] = await this.sock.receive();
return res;
}
}
function peerRequestFromMsg(msg: Message): PeerRequest | null {
let req: PeerRequest | null = null;
try {
const obj = JSON.parse(msg.toString());
req = {...obj};
} catch(e) {
debug('error receiving command', e);
}
return req;
}
export class ResponseSocket {
sock: Reply;
constructor(sock: Reply) {
@ -53,30 +50,54 @@ export class ResponseSocket {
}
}
export function registerRequestHandler(handler: RequestHandler) {
requestStream.on('request', (req) => {
const res = new ResponseSocket(replySock);
function peerRequestFromMsg(msg: Message): PeerRequest | null {
let req: PeerRequest | null = null;
try {
const obj = JSON.parse(msg.toString());
req = {...obj};
} catch (e) {
debug('error receiving command', e);
}
return req;
}
export class RequestReply {
rhizomeNode: RhizomeNode;
replySock = new Reply();
requestStream = new EventEmitter();
requestBindAddrStr: string;
constructor(rhizomeNode: RhizomeNode) {
this.rhizomeNode = rhizomeNode;
const {requestBindAddr, requestBindPort} = this.rhizomeNode.config;
this.requestBindAddrStr = `tcp://${requestBindAddr}:${requestBindPort}`;
}
// Listen for incoming requests
async start() {
await this.replySock.bind(this.requestBindAddrStr);
debug(`Reply socket bound to ${this.requestBindAddrStr}`);
for await (const [msg] of this.replySock) {
debug(`Received message`, {msg: msg.toString()});
const req = peerRequestFromMsg(msg);
this.requestStream.emit('request', req);
}
}
// Add a top level handler for incoming requests.
// Each handler will get a copy of every message.
registerRequestHandler(handler: RequestHandler) {
this.requestStream.on('request', (req) => {
const res = new ResponseSocket(this.replySock);
handler(req, res);
});
}
export class RequestSocket {
sock = new Request();
constructor(host: string, port: number) {
const addrStr = `tcp://${host}:${port}`;
this.sock.connect(addrStr);
debug(`Request socket connecting to ${addrStr}`);
}
async request(method: PeerMethods): Promise<Message> {
const req: PeerRequest = {
method
};
await this.sock.send(JSON.stringify(req));
// Wait for a response.
// TODO: Timeout
// TODO: Retry
// this.sock.receiveTimeout = ...
const [res] = await this.sock.receive();
return res;
async stop() {
await this.replySock.unbind(this.requestBindAddrStr);
this.replySock.close();
this.replySock = new Reply();
}
}

View File

@ -5,6 +5,7 @@ export class TypedCollection<T extends EntityProperties> extends Collection {
put(id: string | undefined, properties: T): Entity {
return super.put(id, properties);
}
get(id: string): Entity | undefined {
return super.get(id);
}

View File

@ -46,19 +46,27 @@ export type Properties = {[key: string]: PropertyTypes};
export class PeerAddress {
addr: string;
port: number;
constructor(addr: string, port: number) {
this.addr = addr;
this.port = port;
}
static fromString(addrString: string): PeerAddress {
const [addr, port] = addrString.trim().split(':');
return new PeerAddress(addr, parseInt(port));
}
toAddrString() {
return `${this.addr}:${this.port}`;
}
toJSON() {
return this.toAddrString();
}
isEqual(other: PeerAddress) {
return this.addr === other.addr && this.port === other.port;
}
};