pretty neat
This commit is contained in:
parent
1c98efa181
commit
cfe410484e
|
@ -0,0 +1,2 @@
|
||||||
|
dist/
|
||||||
|
node_modules/
|
|
@ -0,0 +1,7 @@
|
||||||
|
import eslint from '@eslint/js';
|
||||||
|
import tseslint from 'typescript-eslint';
|
||||||
|
|
||||||
|
export default tseslint.config(
|
||||||
|
eslint.configs.recommended,
|
||||||
|
tseslint.configs.recommended,
|
||||||
|
);
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,29 @@
|
||||||
|
{
|
||||||
|
"name": "rhizome-node",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"description": "Rhizomatic database engine node",
|
||||||
|
"scripts": {
|
||||||
|
"start": "node --experimental-strip-types --experimental-transform-types src/main.ts",
|
||||||
|
"lint": "eslint",
|
||||||
|
"test": "echo \"Error: no test specified\" && exit 1"
|
||||||
|
},
|
||||||
|
"author": "",
|
||||||
|
"license": "Unlicense",
|
||||||
|
"dependencies": {
|
||||||
|
"@types/bluebird": "^3.5.42",
|
||||||
|
"@types/json-logic-js": "^2.0.8",
|
||||||
|
"express": "^4.21.2",
|
||||||
|
"json-logic-js": "^2.0.5",
|
||||||
|
"level": "^9.0.0",
|
||||||
|
"zeromq": "^6.1.2"
|
||||||
|
},
|
||||||
|
"devDependencies": {
|
||||||
|
"@eslint/js": "^9.17.0",
|
||||||
|
"@types/express": "^5.0.0",
|
||||||
|
"@types/node": "^22.10.2",
|
||||||
|
"eslint": "^9.17.0",
|
||||||
|
"eslint-config-airbnb-base-typescript": "^1.1.0",
|
||||||
|
"typescript": "^5.7.2",
|
||||||
|
"typescript-eslint": "^8.18.0"
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,24 @@
|
||||||
|
To install
|
||||||
|
|
||||||
|
npm install
|
||||||
|
|
||||||
|
To build
|
||||||
|
|
||||||
|
npx tsc
|
||||||
|
|
||||||
|
To demonstrate the example application, you can open multiple terminals. In each terminal execute something like the following.
|
||||||
|
|
||||||
|
export REQUEST_BIND_PORT=4000
|
||||||
|
export PUBLISH_BIND_PORT=4001
|
||||||
|
export SEED_PEERS='127.0.0.1:4002, 127.0.0.1:4004'
|
||||||
|
node dist/example-app.js
|
||||||
|
|
||||||
|
export REQUEST_BIND_PORT=4002
|
||||||
|
export PUBLISH_BIND_PORT=4003
|
||||||
|
export SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4004'
|
||||||
|
node dist/example-app.js
|
||||||
|
|
||||||
|
export REQUEST_BIND_PORT=4004
|
||||||
|
export PUBLISH_BIND_PORT=4005
|
||||||
|
export SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4002'
|
||||||
|
node dist/example-app.js
|
|
@ -0,0 +1,34 @@
|
||||||
|
// The goal here is to house a collection of objects that all follow a common schema.
|
||||||
|
// It should enable operations like removing a property removes the value from the entities in the collection
|
||||||
|
// It could then be further extended with e.g. table semantics like filter, sort, join
|
||||||
|
|
||||||
|
type Property = {
|
||||||
|
name: string,
|
||||||
|
type: number | string;
|
||||||
|
}
|
||||||
|
|
||||||
|
class EntityType {
|
||||||
|
name: string;
|
||||||
|
properties?: Property[];
|
||||||
|
constructor(name: string) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Entity {
|
||||||
|
type: EntityType;
|
||||||
|
properties?: object;
|
||||||
|
constructor(type: EntityType) {
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Collection {
|
||||||
|
entities = new Map<string, Entity>();
|
||||||
|
// update(entityId, properties)
|
||||||
|
// ...
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Collections {
|
||||||
|
collections = new Map<string, Collection>();
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data';
|
||||||
|
export const CREATOR = process.env.USER!;
|
||||||
|
export const HOST = process.env.HOST!;
|
||||||
|
export const ADDRESS = process.env.ADDRESS ?? '127.0.0.1';
|
||||||
|
export const REQUEST_BIND_PORT = parseInt(process.env.REQUEST_BIND_PORT || '4000');
|
||||||
|
export const PUBLISH_BIND_PORT = parseInt(process.env.PUBLISH_BIND_PORT || '4001');
|
||||||
|
export const REQUEST_BIND_ADDR = process.env.ADDRESS || '127.0.0.1';
|
||||||
|
export const PUBLISH_BIND_ADDR = process.env.ADDRESS || '127.0.0.1';
|
||||||
|
export const HTTP_API_PORT = parseInt(process.env.HTTP_API_PORT || '3000');
|
||||||
|
export const HTTP_API_ADDR = process.env.ADDRESS || '127.0.0.1';
|
||||||
|
export const ENABLE_HTTP_API = process.env.ENABLE_HTTP_API === 'true';
|
||||||
|
export const SEED_PEERS = (process.env.SEED_PEERS || '').split(',')
|
||||||
|
.filter(x => !!x)
|
||||||
|
.map((peer: string) => {
|
||||||
|
const [addr, port] = peer.trim().split(':');
|
||||||
|
return {addr, port: parseInt(port)};
|
||||||
|
});
|
|
@ -0,0 +1,91 @@
|
||||||
|
import EventEmitter from 'node:events';
|
||||||
|
import { Delta, Decision } from './types';
|
||||||
|
import { publishSock, subscribeSock } from './pub-sub';
|
||||||
|
|
||||||
|
export const deltaStream = new EventEmitter();
|
||||||
|
|
||||||
|
export const deltasProposed: Delta[] = [];
|
||||||
|
export const deltasAccepted: Delta[] = [];
|
||||||
|
export const deltasRejected: Delta[] = [];
|
||||||
|
export const deltasDeferred: Delta[] = [];
|
||||||
|
|
||||||
|
export function applyPolicy(delta: Delta): Decision {
|
||||||
|
return !!delta && Decision.Accept;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function receiveDelta(delta: Delta) {
|
||||||
|
deltasProposed.push(delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function ingestDelta(delta: Delta) {
|
||||||
|
const decision = applyPolicy(delta);
|
||||||
|
switch (decision) {
|
||||||
|
case Decision.Accept:
|
||||||
|
deltasAccepted.push(delta);
|
||||||
|
deltaStream.emit('delta', { delta });
|
||||||
|
break;
|
||||||
|
case Decision.Reject:
|
||||||
|
deltasRejected.push(delta);
|
||||||
|
break;
|
||||||
|
case Decision.Defer:
|
||||||
|
deltasDeferred.push(delta);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function ingestNext(): boolean {
|
||||||
|
const delta = deltasProposed.shift();
|
||||||
|
if (!delta) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ingestDelta(delta);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function ingestAll() {
|
||||||
|
while (ingestNext());
|
||||||
|
}
|
||||||
|
|
||||||
|
export function ingestNextDeferred(): boolean {
|
||||||
|
const delta = deltasDeferred.shift();
|
||||||
|
if (!delta) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
ingestDelta(delta);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function ingestAllDeferred() {
|
||||||
|
while (ingestNextDeferred());
|
||||||
|
}
|
||||||
|
|
||||||
|
export function subscribeDeltas(fn: (delta: Delta) => void) {
|
||||||
|
deltaStream.on('delta', ({delta}) => {
|
||||||
|
fn(delta);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function publishDelta(delta: Delta) {
|
||||||
|
console.log(`Publishing delta: ${JSON.stringify(delta)}`);
|
||||||
|
await publishSock.send(["deltas", serializeDelta(delta)])
|
||||||
|
}
|
||||||
|
|
||||||
|
function serializeDelta(delta: Delta) {
|
||||||
|
return JSON.stringify(delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
function deserializeDelta(input: string) {
|
||||||
|
return JSON.parse(input);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function runDeltas() {
|
||||||
|
for await (const [topic, msg] of subscribeSock) {
|
||||||
|
if (topic.toString() !== "deltas") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const delta = deserializeDelta(msg.toString());
|
||||||
|
console.log(`Received delta: ${JSON.stringify(delta)}`);
|
||||||
|
ingestDelta(delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
// We can start to use deltas to express relational data in a given context
|
||||||
|
|
||||||
|
import express from "express";
|
||||||
|
import { bindPublish, } from "./pub-sub";
|
||||||
|
import { runDeltas } from "./deltas";
|
||||||
|
import { Entities, Entity } from "./object-layer";
|
||||||
|
import { bindReply, runRequestHandlers } from "./request-reply";
|
||||||
|
import { subscribeToSeeds } from "./peers";
|
||||||
|
import { ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT } from "./config";
|
||||||
|
|
||||||
|
|
||||||
|
// As an app we want to be able to write and read data.
|
||||||
|
// The data is whatever shape we define it to be in a given context.
|
||||||
|
// So we want access to an API that is integrated with our declarations of
|
||||||
|
// e.g. entities and their properties.
|
||||||
|
|
||||||
|
// This implies at least one layer on top of the underlying primitive deltas.
|
||||||
|
|
||||||
|
type UserProperties = {
|
||||||
|
id?: string;
|
||||||
|
name: string;
|
||||||
|
nameLong?: string;
|
||||||
|
email?: string;
|
||||||
|
age: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Users {
|
||||||
|
db = new Entities();
|
||||||
|
create(properties: UserProperties): Entity {
|
||||||
|
// We provide undefined for the id, to let the database generate it
|
||||||
|
// This call returns the id
|
||||||
|
const user = this.db.put(undefined, properties);
|
||||||
|
console.log(`Users.create(${user.id}, ${JSON.stringify(properties)}`);
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
upsert(properties: UserProperties): Entity {
|
||||||
|
const user = this.db.put(properties.id, properties);
|
||||||
|
console.log(`Users.upsert(${user.id}, ${JSON.stringify(properties)}`);
|
||||||
|
return user;
|
||||||
|
}
|
||||||
|
getOne(id: string): Entity | undefined {
|
||||||
|
return this.db.get(id);
|
||||||
|
}
|
||||||
|
getIds(): string[] {
|
||||||
|
return this.db.getIds();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
const app = express()
|
||||||
|
|
||||||
|
app.get("/ids", (req: express.Request, res: express.Response) => {
|
||||||
|
res.json({ ids: users.getIds()});
|
||||||
|
});
|
||||||
|
|
||||||
|
if (ENABLE_HTTP_API) {
|
||||||
|
app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
|
||||||
|
console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
await bindPublish();
|
||||||
|
await bindReply();
|
||||||
|
runDeltas();
|
||||||
|
runRequestHandlers();
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||||
|
subscribeToSeeds();
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||||
|
|
||||||
|
const users = new Users();
|
||||||
|
|
||||||
|
const taliesin = users.upsert({
|
||||||
|
id: 'taliesin-1',
|
||||||
|
name: 'Taliesin',
|
||||||
|
nameLong: 'Taliesin (Ladd)',
|
||||||
|
age: Math.floor(Math.random() * 1000)
|
||||||
|
});
|
||||||
|
|
||||||
|
taliesin.onUpdate((u: Entity) => {
|
||||||
|
console.log('User updated', u);
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: Allow configuration regarding read/write concern i.e.
|
||||||
|
// if we perform a read immediately do we see the value we wrote?
|
||||||
|
// Intuition says yes, we want that-- but how do we expose the propagation status?
|
||||||
|
|
||||||
|
const result = users.getOne(taliesin.id);
|
||||||
|
const matches: boolean = JSON.stringify(result) === JSON.stringify(taliesin);
|
||||||
|
console.log(`Result ${matches ? 'matches' : 'does not match'} expected.` +
|
||||||
|
`\n\nExpected \n${JSON.stringify(taliesin)}` +
|
||||||
|
`\nReceived\n${JSON.stringify(result)}`);
|
||||||
|
|
||||||
|
})();
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
import { add_operation, apply } from 'json-logic-js';
|
||||||
|
import { Delta, DeltaContext } from '../types';
|
||||||
|
|
||||||
|
add_operation('in', (needle, haystack) => {
|
||||||
|
return [...haystack].includes(needle);
|
||||||
|
});
|
||||||
|
|
||||||
|
export function applyFilter(deltas: Delta[], filterExpr: JSON): Delta[] {
|
||||||
|
return deltas.filter(delta => {
|
||||||
|
const context: DeltaContext = {
|
||||||
|
...delta,
|
||||||
|
creatorAddress: [delta.creator, delta.host].join('@'),
|
||||||
|
};
|
||||||
|
return apply(filterExpr, context);
|
||||||
|
});
|
||||||
|
}
|
|
@ -0,0 +1,33 @@
|
||||||
|
import { FilterExpr } from "../types";
|
||||||
|
// import { map } from 'radash';
|
||||||
|
|
||||||
|
// A creator as seen by a host
|
||||||
|
type OriginPoint = {
|
||||||
|
creator: string;
|
||||||
|
host: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
class Party {
|
||||||
|
originPoints: OriginPoint[];
|
||||||
|
constructor(og: OriginPoint) {
|
||||||
|
this.originPoints = [og];
|
||||||
|
}
|
||||||
|
getAddress() {
|
||||||
|
const { creator, host } = this.originPoints[0];
|
||||||
|
return `${creator}@${host}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const knownParties = new Set<Party>();
|
||||||
|
export const countKnownParties = () => knownParties.size;
|
||||||
|
|
||||||
|
export function generateFilter(): FilterExpr {
|
||||||
|
// map(knownParties, (p: Party) => p.address]
|
||||||
|
//
|
||||||
|
|
||||||
|
const addresses = [...knownParties.values()].map(p => p.getAddress());
|
||||||
|
|
||||||
|
return {
|
||||||
|
'in': ['$creatorAddress', addresses]
|
||||||
|
};
|
||||||
|
};
|
|
@ -0,0 +1,45 @@
|
||||||
|
import express from "express";
|
||||||
|
import { runDeltas } from "./deltas";
|
||||||
|
import {ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT} from "./config";
|
||||||
|
|
||||||
|
const app = express()
|
||||||
|
|
||||||
|
app.get("/", (req: express.Request, res: express.Response) => {
|
||||||
|
res.json({ message: "Welcome to the Express + TypeScript Server!" });
|
||||||
|
});
|
||||||
|
|
||||||
|
if (ENABLE_HTTP_API) {
|
||||||
|
app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
|
||||||
|
console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Endpoint: Receive a delta
|
||||||
|
//
|
||||||
|
// TODO: Websockets
|
||||||
|
// TODO: UDP
|
||||||
|
// TODO: ZeroMQ
|
||||||
|
//
|
||||||
|
// TODO: Endpoint: Query (materialized view)
|
||||||
|
// TODO: Endpoint: Info about peers
|
||||||
|
// TODO: Propagate information about peers (~gossip / or maybe just same as other kinds of deltas)
|
||||||
|
// So we dogfood the delta data structure and the distributed architecture
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// TODO: Collections of functions
|
||||||
|
// How are we defining functions?
|
||||||
|
// Transformations?
|
||||||
|
// Inputs, calculations, outputs;
|
||||||
|
// Tx/Rx/Store/Retrieve/Compute;
|
||||||
|
// Schedule?
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// What assumptions, if any, can we or do we want to make about our operating envoronment/situation?
|
||||||
|
// How much continuity dare we hope for?
|
||||||
|
// It's going to depend on the use case
|
||||||
|
|
||||||
|
// You simply want a formula for expressing your confidence in things
|
||||||
|
|
||||||
|
// That can be encoded as deltas
|
||||||
|
|
||||||
|
runDeltas();
|
|
@ -0,0 +1,180 @@
|
||||||
|
// The goal here is to provide a translation for
|
||||||
|
// entities and their properties
|
||||||
|
// to and from (sequences of) deltas.
|
||||||
|
|
||||||
|
// How can our caller define the entities and their properties?
|
||||||
|
// - As typescript types?
|
||||||
|
// - As typescript interfaces?
|
||||||
|
// - As typescript classes?
|
||||||
|
|
||||||
|
import EventEmitter from "node:events";
|
||||||
|
import { CREATOR, HOST } from "./config";
|
||||||
|
import { publishDelta, subscribeDeltas } from "./deltas";
|
||||||
|
import { Delta, PropertyTypes } from "./types";
|
||||||
|
import { randomUUID } from "node:crypto";
|
||||||
|
|
||||||
|
const entityEventStream = new EventEmitter();
|
||||||
|
|
||||||
|
type EntityProperties = {
|
||||||
|
[key: string]: PropertyTypes
|
||||||
|
};
|
||||||
|
|
||||||
|
export class Entity {
|
||||||
|
id: string;
|
||||||
|
properties: EntityProperties;
|
||||||
|
ahead = 0;
|
||||||
|
constructor(id: string) {
|
||||||
|
this.id = id;
|
||||||
|
this.properties = {};
|
||||||
|
}
|
||||||
|
onUpdate(cb: (entity: Entity) => void) {
|
||||||
|
// TODO: This doesn't seem like it will scale well.
|
||||||
|
entityEventStream.on('update', (entity: Entity) => {
|
||||||
|
if (entity.id === this.id) {
|
||||||
|
cb(entity);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const entities = new Map<string, Entity>();
|
||||||
|
// TODO: Use leveldb for storing view snapshots
|
||||||
|
|
||||||
|
class EntityPropertiesDeltaBuilder {
|
||||||
|
delta: Delta;
|
||||||
|
constructor(entityId: string) {
|
||||||
|
this.delta = {
|
||||||
|
creator: CREATOR,
|
||||||
|
host: HOST,
|
||||||
|
pointers: [{
|
||||||
|
localContext: 'id',
|
||||||
|
target: entityId,
|
||||||
|
targetContext: 'properties'
|
||||||
|
}]
|
||||||
|
};
|
||||||
|
}
|
||||||
|
add(localContext: string, target: PropertyTypes) {
|
||||||
|
this.delta.pointers.push({localContext, target});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Applies the javascript rules for updating object values,
|
||||||
|
// e.g. set to `undefined` to delete a property
|
||||||
|
function updateEntity(entityId?: string, properties?: object, local = false, deltas?: Delta[]): Entity {
|
||||||
|
let entity: Entity | undefined;
|
||||||
|
let eventType: 'create' | 'update' | 'delete' | undefined;
|
||||||
|
entityId = entityId ?? randomUUID();
|
||||||
|
entity = entities.get(entityId);
|
||||||
|
if (!entity) {
|
||||||
|
entity = new Entity(entityId);
|
||||||
|
entity.id = entityId;
|
||||||
|
eventType = 'create';
|
||||||
|
}
|
||||||
|
const deltaBulider = new EntityPropertiesDeltaBuilder(entityId);
|
||||||
|
|
||||||
|
if (!properties) {
|
||||||
|
// Let's interpret this as entity deletion
|
||||||
|
entities.delete(entityId);
|
||||||
|
// TODO: prepare and publish a delta
|
||||||
|
// TODO: execute hooks
|
||||||
|
eventType = 'delete';
|
||||||
|
} else {
|
||||||
|
let anyChanged = false;
|
||||||
|
Object.entries(properties).forEach(([key, value]) => {
|
||||||
|
let changed = false;
|
||||||
|
if (entity.properties && entity.properties[key] !== value) {
|
||||||
|
entity.properties[key] = value;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
if (local && changed) {
|
||||||
|
// If this is a change, let's generate a delta
|
||||||
|
deltaBulider.add(key, value);
|
||||||
|
// We append to the array the caller may provide
|
||||||
|
// We can update this count as we receive network confirmation for deltas
|
||||||
|
entity.ahead += 1;
|
||||||
|
}
|
||||||
|
anyChanged = anyChanged || changed;
|
||||||
|
});
|
||||||
|
// We've noted that we may be ahead of the server, let's update our
|
||||||
|
// local image of this entity.
|
||||||
|
//* In principle, this system can recreate past or alternative states.
|
||||||
|
//* At worst, by replaying all the deltas up to a particular point.
|
||||||
|
//* Some sort of checkpointing strategy would probably be helpful.
|
||||||
|
//* Furthermore, if we can implement reversible transformations,
|
||||||
|
//* it would then be efficient to calculate the state of the system with
|
||||||
|
//* specific deltas removed. We could use it to extract a measurement
|
||||||
|
//* of the effects of some deltas' inclusion or exclusion, the
|
||||||
|
//* evaluation of which may lend evidence to some possible arguments.
|
||||||
|
|
||||||
|
entities.set(entityId, entity);
|
||||||
|
if (anyChanged) {
|
||||||
|
deltas?.push(deltaBulider.delta);
|
||||||
|
eventType = eventType || 'update';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (eventType) {
|
||||||
|
entityEventStream.emit(eventType, entity);
|
||||||
|
}
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can update our local image of the entity, but we should annotate it
|
||||||
|
// to indicate that we have not yet received any confirmation of this delta
|
||||||
|
// having been propagated.
|
||||||
|
// Later when we receive deltas regarding this entity we can detect when
|
||||||
|
// we have received back an image that matches our target.
|
||||||
|
|
||||||
|
// So we need a function to generate one or more deltas for each call to put/
|
||||||
|
// maybe we stage them and wait for a call to commit() that initiates the
|
||||||
|
// assembly and transmission of one or more deltas
|
||||||
|
|
||||||
|
function applyDelta(delta: Delta) {
|
||||||
|
// TODO: handle delta representing entity deletion
|
||||||
|
const idPtr = delta.pointers.find(({localContext}) => localContext === 'id');
|
||||||
|
if (!idPtr) {
|
||||||
|
console.error('encountered delta with no entity id', delta);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const properties: EntityProperties = {};
|
||||||
|
delta.pointers.filter(({localContext}) => localContext !== 'id')
|
||||||
|
.forEach(({localContext: key, target: value}) => {
|
||||||
|
properties[key] = value;
|
||||||
|
}, {});
|
||||||
|
const entityId = idPtr.target as string;
|
||||||
|
// TODO: Handle the scenario where this update has been superceded by a newer one locally
|
||||||
|
updateEntity(entityId, properties);
|
||||||
|
}
|
||||||
|
|
||||||
|
subscribeDeltas((delta: Delta) => {
|
||||||
|
// TODO: Make sure this is the kind of delta we're looking for
|
||||||
|
applyDelta(delta);
|
||||||
|
});
|
||||||
|
|
||||||
|
export class Entities {
|
||||||
|
constructor() {
|
||||||
|
entityEventStream.on('create', (entity: Entity) => {
|
||||||
|
console.log(`new entity!`, entity);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
put(entityId: string | undefined, properties: object): Entity {
|
||||||
|
const deltas: Delta[] = [];
|
||||||
|
const entity = updateEntity(entityId, properties, true, deltas);
|
||||||
|
deltas.forEach(async (delta: Delta) => {
|
||||||
|
await publishDelta(delta);
|
||||||
|
});
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
del(entityId: string) {
|
||||||
|
const deltas: Delta[] = [];
|
||||||
|
updateEntity(entityId, undefined, true, deltas);
|
||||||
|
deltas.forEach(async (delta: Delta) => {
|
||||||
|
await publishDelta(delta);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
get(id: string): Entity | undefined {
|
||||||
|
return entities.get(id);
|
||||||
|
}
|
||||||
|
getIds(): string[] {
|
||||||
|
return Array.from(entities.keys());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
import { PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT } from "./config";
|
||||||
|
import { registerRequestHandler, PeerRequest, ResponseSocket } from "./request-reply";
|
||||||
|
import { RequestSocket, } from "./request-reply";
|
||||||
|
import { SEED_PEERS } from "./config";
|
||||||
|
import {connectSubscribe} from "./pub-sub";
|
||||||
|
|
||||||
|
export enum PeerMethods {
|
||||||
|
GetPublishAddress,
|
||||||
|
}
|
||||||
|
|
||||||
|
export type PeerAddress = {
|
||||||
|
addr: string,
|
||||||
|
port: number
|
||||||
|
};
|
||||||
|
|
||||||
|
const publishAddr: PeerAddress = {
|
||||||
|
addr: PUBLISH_BIND_ADDR,
|
||||||
|
port: PUBLISH_BIND_PORT
|
||||||
|
};
|
||||||
|
|
||||||
|
class Peer {
|
||||||
|
reqSock: RequestSocket;
|
||||||
|
publishAddr: PeerAddress | undefined;
|
||||||
|
constructor(addr: string, port: number) {
|
||||||
|
this.reqSock = new RequestSocket(addr, port);
|
||||||
|
}
|
||||||
|
async subscribe() {
|
||||||
|
if (!this.publishAddr) {
|
||||||
|
const res = await this.reqSock.request(PeerMethods.GetPublishAddress);
|
||||||
|
// TODO: input validation
|
||||||
|
const {addr, port} = JSON.parse(res.toString());
|
||||||
|
this.publishAddr = {addr, port};
|
||||||
|
connectSubscribe(addr, port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function subscribeToSeeds() {
|
||||||
|
SEED_PEERS.forEach(async ({addr, port}, idx) => {
|
||||||
|
console.log(`SEED PEERS[${idx}]=${addr}:${port}`);
|
||||||
|
const peer = new Peer(addr, port);
|
||||||
|
await peer.subscribe();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
|
||||||
|
console.log('inspecting peer request');
|
||||||
|
switch (req.method) {
|
||||||
|
case PeerMethods.GetPublishAddress:
|
||||||
|
console.log('it\'s a request for our publish address');
|
||||||
|
await res.send(publishAddr);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
import { Publisher, Subscriber } from 'zeromq';
|
||||||
|
import { PUBLISH_BIND_PORT, PUBLISH_BIND_ADDR} from './config';
|
||||||
|
|
||||||
|
export const publishSock = new Publisher();
|
||||||
|
export const subscribeSock = new Subscriber();
|
||||||
|
|
||||||
|
export async function bindPublish() {
|
||||||
|
const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`;
|
||||||
|
await publishSock.bind(addrStr);
|
||||||
|
console.log(`Publishing socket bound to ${addrStr}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function connectSubscribe(host: string, port: number) {
|
||||||
|
// TODO: peer discovery
|
||||||
|
const addrStr = `tcp://${host}:${port}`;
|
||||||
|
subscribeSock.connect(addrStr);
|
||||||
|
subscribeSock.subscribe("deltas");
|
||||||
|
console.log(`Subscribing to ${addrStr}`);
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
import { Query, QueryResult, } from './types';
|
||||||
|
import { deltasAccepted } from './deltas';
|
||||||
|
import { applyFilter } from './filter';
|
||||||
|
|
||||||
|
// export const queryResultMemo = new Map<Query, QueryResult>();
|
||||||
|
|
||||||
|
export function issueQuery(query: Query): QueryResult {
|
||||||
|
const deltas = applyFilter(deltasAccepted, query.filterExpr);
|
||||||
|
return {
|
||||||
|
deltas
|
||||||
|
// TODO: Materialized view / state collapse snapshot
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
import { Request, Reply, Message } from 'zeromq';
|
||||||
|
import { REQUEST_BIND_PORT, REQUEST_BIND_ADDR} from './config';
|
||||||
|
import { EventEmitter } from 'node:events';
|
||||||
|
import { PeerMethods } from './peers';
|
||||||
|
|
||||||
|
export type PeerRequest = {
|
||||||
|
method: PeerMethods;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type RequestHandler = (req: PeerRequest, res: ResponseSocket) => void;
|
||||||
|
|
||||||
|
export const replySock = new Reply();
|
||||||
|
const requestStream = new EventEmitter();
|
||||||
|
|
||||||
|
export async function bindReply() {
|
||||||
|
const addrStr = `tcp://${REQUEST_BIND_ADDR}:${REQUEST_BIND_PORT}`;
|
||||||
|
await replySock.bind(addrStr);
|
||||||
|
console.log(`Reply socket bound to ${addrStr}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function runRequestHandlers() {
|
||||||
|
for await (const [msg] of replySock) {
|
||||||
|
console.log(`Received message`, {msg: msg.toString()});
|
||||||
|
const req = peerRequestFromMsg(msg);
|
||||||
|
requestStream.emit('request', req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function peerRequestFromMsg(msg: Message): PeerRequest | null {
|
||||||
|
let req: PeerRequest | null = null;
|
||||||
|
try {
|
||||||
|
const obj = JSON.parse(msg.toString());
|
||||||
|
req = {...obj};
|
||||||
|
} catch(e) {
|
||||||
|
console.log('error receiving command', e);
|
||||||
|
}
|
||||||
|
return req;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ResponseSocket {
|
||||||
|
sock: Reply;
|
||||||
|
constructor(sock: Reply) {
|
||||||
|
this.sock = sock;
|
||||||
|
}
|
||||||
|
async send(msg: object | string) {
|
||||||
|
if (typeof msg === 'object') {
|
||||||
|
msg = JSON.stringify(msg);
|
||||||
|
}
|
||||||
|
console.log('sending reply', {msg});
|
||||||
|
await this.sock.send(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function registerRequestHandler(handler: RequestHandler) {
|
||||||
|
requestStream.on('request', (req) => {
|
||||||
|
const res = new ResponseSocket(replySock);
|
||||||
|
handler(req, res);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export class RequestSocket {
|
||||||
|
sock = new Request();
|
||||||
|
constructor(host: string, port: number) {
|
||||||
|
const addrStr = `tcp://${host}:${port}`;
|
||||||
|
this.sock.connect(addrStr);
|
||||||
|
console.log(`Request socket connecting to ${addrStr}`);
|
||||||
|
}
|
||||||
|
async request(method: PeerMethods): Promise<Message> {
|
||||||
|
const req: PeerRequest = {
|
||||||
|
method
|
||||||
|
};
|
||||||
|
await this.sock.send(JSON.stringify(req));
|
||||||
|
// Wait for a response.
|
||||||
|
// TODO: Timeout
|
||||||
|
// TODO: Retry
|
||||||
|
// this.sock.receiveTimeout = ...
|
||||||
|
const [res] = await this.sock.receive();
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,11 @@
|
||||||
|
import { Level } from 'level';
|
||||||
|
import { LEVEL_DB_DIR } from './config';
|
||||||
|
import path from 'path';
|
||||||
|
|
||||||
|
function newStore(name: string): Level {
|
||||||
|
return new Level<string, string>(path.join(LEVEL_DB_DIR, name));
|
||||||
|
}
|
||||||
|
|
||||||
|
export const queryResultStore = newStore('query-results');
|
||||||
|
|
||||||
|
export const deltasAcceptedStore = newStore('deltas-accepted');
|
|
@ -0,0 +1,39 @@
|
||||||
|
export type Pointer = {
|
||||||
|
localContext: string,
|
||||||
|
target: string | number | undefined,
|
||||||
|
targetContext?: string
|
||||||
|
};
|
||||||
|
|
||||||
|
export type Delta = {
|
||||||
|
creator: string,
|
||||||
|
host: string,
|
||||||
|
pointers: Pointer[],
|
||||||
|
}
|
||||||
|
|
||||||
|
export type DeltaContext = Delta & {
|
||||||
|
creatorAddress: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type Query = {
|
||||||
|
filterExpr: JSON
|
||||||
|
};
|
||||||
|
|
||||||
|
export type QueryResult = {
|
||||||
|
deltas: Delta[]
|
||||||
|
};
|
||||||
|
|
||||||
|
export enum Decision {
|
||||||
|
Accept,
|
||||||
|
Reject,
|
||||||
|
Defer
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
export type JSONLogic = object;
|
||||||
|
|
||||||
|
export type FilterExpr = JSONLogic;
|
||||||
|
|
||||||
|
export type FilterGenerator = () => FilterExpr;
|
||||||
|
|
||||||
|
export type PropertyTypes = string | number | undefined;
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
{
|
||||||
|
"compilerOptions": {
|
||||||
|
"target": "ES6",
|
||||||
|
"module": "CommonJS",
|
||||||
|
"esModuleInterop": true,
|
||||||
|
"moduleResolution": "Node",
|
||||||
|
"sourceMap": true,
|
||||||
|
"baseUrl": ".",
|
||||||
|
"paths": {
|
||||||
|
"@/*": ["src/*"]
|
||||||
|
},
|
||||||
|
"outDir": "dist",
|
||||||
|
"importsNotUsedAsValues": "remove",
|
||||||
|
"strict": true,
|
||||||
|
"esModuleInterop": true,
|
||||||
|
"skipLibCheck": true,
|
||||||
|
"forceConsistentCasingInFileNames": true
|
||||||
|
},
|
||||||
|
"include": ["src/**/*"],
|
||||||
|
"exclude": ["node_modules"]
|
||||||
|
}
|
Loading…
Reference in New Issue