From b7921a6edfe440144ee5c94c3e775625a6c536d4 Mon Sep 17 00:00:00 2001 From: Ladd Date: Tue, 24 Dec 2024 13:41:31 -0600 Subject: [PATCH] converted logging to use debug library. added markdown watch and reread --- README.md | 52 +++++++-- markdown/006-lossless-representation.md | 12 -- markdown/006-lossless.md | 1 + package.json | 4 +- src/deltas.ts | 8 +- src/example-app.ts | 11 +- src/http-api.ts | 145 ++++++++++++++++++++---- src/main.ts | 45 -------- src/peers.ts | 18 +-- src/pub-sub.ts | 8 +- src/request-reply.ts | 12 +- 11 files changed, 204 insertions(+), 112 deletions(-) delete mode 100644 markdown/006-lossless-representation.md delete mode 100644 src/main.ts diff --git a/README.md b/README.md index 8680956..d792796 100644 --- a/README.md +++ b/README.md @@ -1,61 +1,93 @@ ## Setup -- Install nodejs -- Install [nvm](https://nvm.sh) +Install [`nvm`](https://nvm.sh) -## Install +Clone repo +```bash +git clone https://gitea.dgov.io/ladd/rhizome +``` +Use `nvm` to install and activate the target nodejs version ```bash nvm install +``` + +Install nodejs packages +```bash npm install ``` ## Build +Compile Typescript ```bash -npx tsc -# npm run build # also works +npm run build +``` -# npx tsc --watch # is useful during development +During development, it's useful to run the compiler in watch mode: +```bash +npm run build:watch ``` ## Run -To demonstrate the example application, you can open multiple terminals. In each terminal execute something like the following. +To demonstrate the example application, you can open multiple terminals, and in each terminal execute something like the following: ```bash +export DEBUG="*,-express" export RHIZOME_REQUEST_BIND_PORT=4000 export RHIZOME_PUBLISH_BIND_PORT=4001 export RHIZOME_SEED_PEERS='127.0.0.1:4002, 127.0.0.1:4004' export RHIZOME_HTTP_API_PORT=3000 export RHIZOME_PEER_ID=peer1 -node dist/example-app.js +npm run example-app ``` ```bash +export DEBUG="*,-express" export RHIZOME_REQUEST_BIND_PORT=4002 export RHIZOME_PUBLISH_BIND_PORT=4003 export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4004' +export RHIZOME_HTTP_API_PORT=3001 export RHIZOME_PEER_ID=peer2 -node dist/example-app.js +npm run example-app ``` ```bash +export DEBUG="*,-express" export RHIZOME_REQUEST_BIND_PORT=4004 export RHIZOME_PUBLISH_BIND_PORT=4005 export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4002' +export RHIZOME_HTTP_API_PORT=3002 export RHIZOME_PEER_ID=peer3 -node dist/example-app.js +npm run example-app ``` In a separate terminal, you can use `curl` to interact with an instance. `jq` is helpful for formatting the json responses. +Query the number of peers seen by a given node (including itself) ```bash curl -s http://localhost:3000/peers/count | jq +``` + +Query the list of peers seen by a given node (including itself) +```bash curl -s http://localhost:3000/peers | jq +``` + +Query the number of deltas ingested by this node +```bash curl -s http://localhost:3000/deltas/count | jq +``` + +Query the list of deltas ingested by this node +```bash curl -s http://localhost:3000/deltas | jq ``` +# Project Management + +- [] Item 1 +- [] Item 2 diff --git a/markdown/006-lossless-representation.md b/markdown/006-lossless-representation.md deleted file mode 100644 index fff973e..0000000 --- a/markdown/006-lossless-representation.md +++ /dev/null @@ -1,12 +0,0 @@ - - -> myk: -> I think so far this seems mostly on point, but I'd focus on building the bridge between Domain Entity (lossy representation) <-> Lossless Representation <-> Delta[] I think -> the tricky stuff comes in with, like, how do you take an undifferentiated stream of deltas, a query and a schema -> and filter / merge those deltas into the lossless tree structure you need in order to then reduce into a lossy domain node -> if that part of the flow works then the rest becomes fairly self-evident -> a "lossless representation" is basically a DAG/Tree that starts with a root node whose properties each contain the deltas that assign values to them, where the delta may have a pointer up to "this" and then down to some related domain node, which gets interpolated into the tree instead of just referenced, and it has its properties contain the deltas that target it, etc -> so you need both the ID of the root node (the thing being targeted by one or more deltas) as well as the scehma to apply to determine which contexts on that target to include (target_context effectively becomes a property on the domain entity, right?), as well as which schema to apply to included referenced entities, etc. -> so it's what keeps you from returning the whole stream of deltas, while still allowing you to follow arbitrary edges - - diff --git a/markdown/006-lossless.md b/markdown/006-lossless.md index 9807436..ec20b55 100644 --- a/markdown/006-lossless.md +++ b/markdown/006-lossless.md @@ -88,3 +88,4 @@ Lossy transformation: actors: [{role: neo, base_salary: 1000000, salary_currency: "usd"}], }, } + diff --git a/package.json b/package.json index 11353cf..a5b6d8e 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,9 @@ "version": "1.0.0", "description": "Rhizomatic database engine node", "scripts": { - "start": "node --experimental-strip-types --experimental-transform-types src/main.ts", + "example-app": "node dist/example-app.js", + "build": "tsc", + "build:watch": "tsc --watch", "lint": "eslint", "test": "jest" }, diff --git a/src/deltas.ts b/src/deltas.ts index 53c28c1..021ac62 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -2,7 +2,9 @@ import EventEmitter from 'node:events'; import objectHash from 'object-hash'; import {myRequestAddr} from './peers'; import {publishSock, subscribeSock} from './pub-sub'; -import {Decision, Delta, PeerAddress, Properties} from './types'; +import {Decision, Delta, PeerAddress} from './types'; +import Debug from 'debug'; +const debug = Debug('deltas'); export const deltaStream = new EventEmitter(); @@ -75,7 +77,7 @@ export function subscribeDeltas(fn: (delta: Delta) => void) { } export async function publishDelta(delta: Delta) { - console.log(`Publishing delta: ${JSON.stringify(delta)}`); + debug(`Publishing delta: ${JSON.stringify(delta)}`); await publishSock.send(["deltas", myRequestAddr.toAddrString(), serializeDelta(delta)]); } @@ -94,7 +96,7 @@ export async function runDeltas() { } const delta = deserializeDelta(msg.toString()); delta.receivedFrom = PeerAddress.fromString(sender.toString()); - console.log(`Received delta: ${JSON.stringify(delta)}`); + debug(`Received delta: ${JSON.stringify(delta)}`); ingestDelta(delta); } } diff --git a/src/example-app.ts b/src/example-app.ts index 35b2bf8..d767986 100644 --- a/src/example-app.ts +++ b/src/example-app.ts @@ -1,4 +1,3 @@ -import {Collection} from "./collection"; import {HTTP_API_ENABLE} from "./config"; import {runDeltas} from "./deltas"; import {runHttpApi} from "./http-api"; @@ -7,6 +6,8 @@ import {askAllPeersForDeltas, subscribeToSeeds} from "./peers"; import {bindPublish, } from "./pub-sub"; import {bindReply, runRequestHandlers} from "./request-reply"; import {TypedCollection} from "./typed-collection"; +import Debug from 'debug'; +const debug = Debug('example-app'); // As an app we want to be able to write and read data. // The data is whatever shape we define it to be in a given context. @@ -39,11 +40,11 @@ type User = { await new Promise((resolve) => setTimeout(resolve, 1000)); users.onUpdate((u: Entity) => { - console.log('User updated:', u); + debug('User updated:', u); }); users.onCreate((u: Entity) => { - console.log('New user!:', u); + debug('New user!:', u); }); const taliesin = users.put(undefined, { @@ -60,9 +61,9 @@ type User = { const result = users.get(taliesin.id); const matches: boolean = JSON.stringify(result) === JSON.stringify(taliesin); if (matches) { - console.log('Result matches expected: ' + JSON.stringify(taliesin)); + debug('Result matches expected: ' + JSON.stringify(taliesin)); } else { - console.log(`Result does not match expected.` + + debug(`Result does not match expected.` + `\n\nExpected \n${JSON.stringify(taliesin)}` + `\nReceived\n${JSON.stringify(result)}`); } diff --git a/src/http-api.ts b/src/http-api.ts index f84ee9b..15ee4bd 100644 --- a/src/http-api.ts +++ b/src/http-api.ts @@ -1,13 +1,13 @@ +import Debug from "debug"; import express from "express"; +import {readdirSync, readFileSync, watch} from "fs"; +import path, {join} from "path"; +import {Converter} from "showdown"; +import {Collection} from "./collection"; import {HTTP_API_ADDR, HTTP_API_PORT} from "./config"; import {deltasAccepted} from "./deltas"; import {peers} from "./peers"; import {Delta} from "./types"; -import {readdirSync, readFileSync} from "fs"; -import Debug from "debug"; -import {Collection} from "./collection"; -import {Converter} from "showdown"; -import path from "path"; const debug = Debug('http-api'); type CollectionsToServe = { @@ -20,30 +20,128 @@ const docConverter = new Converter({ }); const htmlDocFromMarkdown = (md: string): string => docConverter.makeHtml(md); +type mdFileInfo = { + name: string, + md: string, + html: string +}; + +class MDFiles { + files = new Map(); + readme?: mdFileInfo; + + readFile(name: string) { + const md = readFileSync(join('./markdown', `${name}.md`)).toString(); + const html = htmlDocFromMarkdown(md); + this.files.set(name, {name, md, html}); + } + + readReadme() { + const md = readFileSync('./README.md').toString(); + const html = htmlDocFromMarkdown(md); + this.readme = {name: 'README', md, html}; + } + + getReadmeHTML() { + return this.readme?.html; + } + + getHtml(name: string): string | undefined { + return this.files.get(name)?.html; + } + + list(): string[] { + return Array.from(this.files.keys()); + } + + readDir() { + // Read list of markdown files from directory and + // render each markdown file as html + readdirSync('./markdown/') + .filter((f) => f.endsWith('.md')) + .map((name) => path.parse(name).name) + .forEach((name) => this.readFile(name)); + } + + watchDir() { + watch('./markdown', null, (eventType, filename) => { + if (!filename) return; + if (!filename.endsWith(".md")) return; + + const name = path.parse(filename).name; + + switch (eventType) { + case 'rename': { + debug(`file ${name} renamed`); + // Remove it from memory and re-scan everything + this.files.delete(name); + this.readDir(); + break; + } + case 'change': { + debug(`file ${name} changed`); + // Re-read this file + this.readFile(name) + break; + } + } + }); + } + + watchReadme() { + watch('./README.md', null, (eventType, filename) => { + if (!filename) return; + + switch (eventType) { + case 'change': { + debug(`README file changed`); + // Re-read this file + this.readReadme() + break; + } + } + }); + } +} + export function runHttpApi(collections?: CollectionsToServe) { const app = express(); app.use(express.json()); - // Convert markdown to HTML and serve it - const mdFiles = readdirSync('./markdown/') - .filter((f) => f.endsWith('.md')) - .map((name) => path.parse(name).name); + // Get list of markdown files + const mdFiles = new MDFiles(); + mdFiles.readDir(); + mdFiles.readReadme(); + mdFiles.watchDir(); + mdFiles.watchReadme(); - debug('mdFiles:', mdFiles); - - app.get('/html', (_req: express.Request, res: express.Response) => { - let md = `# Files\n\n`; - for (const name of mdFiles) { - md += `- [${name}](./${name})\n`; - } - const html = htmlDocFromMarkdown(md); + // Serve README + app.get('/html/README', (_req: express.Request, res: express.Response) => { + const html = mdFiles.getReadmeHTML(); res.setHeader('content-type', 'text/html').send(html); }); - for (const name of mdFiles) { - const md = readFileSync(`./markdown/${name}.md`).toString(); + // Serve markdown files as html + app.get('/html/:name', (req: express.Request, res: express.Response) => { + let html = mdFiles.getHtml(req.params.name); + if (!html) { + res.status(404); + html = htmlDocFromMarkdown('# 404\n\n## [Index](/html)'); + } + res.setHeader('content-type', 'text/html'); + res.send(html); + }); + + // Serve index + { + let md = `# Files\n\n`; + md += `[README](/html/README)\n\n`; + for (const name of mdFiles.list()) { + md += `- [${name}](./${name})\n`; + } const html = htmlDocFromMarkdown(md); - app.get(`/html/${name}`, (_req: express.Request, res: express.Response) => { + + app.get('/html', (_req: express.Request, res: express.Response) => { res.setHeader('content-type', 'text/html').send(html); }); } @@ -54,16 +152,20 @@ export function runHttpApi(collections?: CollectionsToServe) { for (const [name, collection] of Object.entries(collections)) { debug(`collection: ${name}`); + // Get the ID of all domain entities app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => { res.json({ids: collection.getIds()}); }); + // Add a new domain entity + // TODO: schema validation app.put(`/${name}`, (req: express.Request, res: express.Response) => { const {body: properties} = req; const ent = collection.put(undefined, properties); res.json(ent); }); + // Update a domain entity app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => { const {body: properties, params: {id}} = req; if (properties.id && properties.id !== id) { @@ -81,10 +183,12 @@ export function runHttpApi(collections?: CollectionsToServe) { res.json(deltasAccepted); }); + // Get the number of deltas ingested by this node app.get("/deltas/count", (_req: express.Request, res: express.Response) => { res.json(deltasAccepted.length); }); + // Get the list of peers seen by this node (including itself) app.get("/peers", (_req: express.Request, res: express.Response) => { res.json(peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => { const deltasAcceptedCount = deltasAccepted @@ -106,6 +210,7 @@ export function runHttpApi(collections?: CollectionsToServe) { })); }); + // Get the number of peers seen by this node (including itself) app.get("/peers/count", (_req: express.Request, res: express.Response) => { res.json(peers.length); }); diff --git a/src/main.ts b/src/main.ts deleted file mode 100644 index b5905d0..0000000 --- a/src/main.ts +++ /dev/null @@ -1,45 +0,0 @@ -import express from "express"; -import { runDeltas } from "./deltas"; -import {HTTP_API_ENABLE, HTTP_API_ADDR, HTTP_API_PORT} from "./config"; - -const app = express() - -app.get("/", (req: express.Request, res: express.Response) => { - res.json({ message: "Welcome to the Express + TypeScript Server!" }); -}); - -if (HTTP_API_ENABLE) { - app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { - console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); - }); -} - -// TODO: Endpoint: Receive a delta -// -// TODO: Websockets -// TODO: UDP -// TODO: ZeroMQ -// -// TODO: Endpoint: Query (materialized view) -// TODO: Endpoint: Info about peers -// TODO: Propagate information about peers (~gossip / or maybe just same as other kinds of deltas) -// So we dogfood the delta data structure and the distributed architecture -// -// -// TODO: Collections of functions -// How are we defining functions? -// Transformations? -// Inputs, calculations, outputs; -// Tx/Rx/Store/Retrieve/Compute; -// Schedule? -// -// -// What assumptions, if any, can we or do we want to make about our operating envoronment/situation? -// How much continuity dare we hope for? -// It's going to depend on the use case - -// You simply want a formula for expressing your confidence in things - -// That can be encoded as deltas - -runDeltas(); diff --git a/src/peers.ts b/src/peers.ts index 3986667..3dfc0a3 100644 --- a/src/peers.ts +++ b/src/peers.ts @@ -1,8 +1,10 @@ import {PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from "./config"; -import {deltasAccepted, deltasProposed, ingestAll, receiveDelta} from "./deltas"; +import {deltasAccepted, ingestAll, receiveDelta} from "./deltas"; import {connectSubscribe} from "./pub-sub"; import {PeerRequest, registerRequestHandler, RequestSocket, ResponseSocket} from "./request-reply"; import {Delta, PeerAddress} from "./types"; +import Debug from 'debug'; +const debug = Debug('peers'); export enum PeerMethods { GetPublishAddress, @@ -13,15 +15,15 @@ export const myRequestAddr = new PeerAddress(REQUEST_BIND_HOST, REQUEST_BIND_POR export const myPublishAddr = new PeerAddress(PUBLISH_BIND_HOST, PUBLISH_BIND_PORT); registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { - console.log('inspecting peer request'); + debug('inspecting peer request'); switch (req.method) { case PeerMethods.GetPublishAddress: { - console.log('it\'s a request for our publish address'); + debug('it\'s a request for our publish address'); await res.send(myPublishAddr.toAddrString()); break; } case PeerMethods.AskForDeltas: { - console.log('it\'s a request for deltas'); + debug('it\'s a request for deltas'); // TODO: stream these rather than // trying to write them all in one message await res.send(JSON.stringify(deltasAccepted)); @@ -41,7 +43,7 @@ class Peer { this.reqSock = new RequestSocket(addr, port); this.isSelf = addr === myRequestAddr.addr && port === myRequestAddr.port; this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => - addr === seedPeer.addr && port === seedPeer.port); + addr === seedPeer.addr && port === seedPeer.port); } async subscribe() { if (!this.publishAddr) { @@ -76,7 +78,7 @@ function newPeer(addr: string, port: number) { export async function subscribeToSeeds() { SEED_PEERS.forEach(async ({addr, port}, idx) => { - console.log(`SEED PEERS[${idx}]=${addr}:${port}`); + debug(`SEED PEERS[${idx}]=${addr}:${port}`); const peer = newPeer(addr, port); await peer.subscribe(); }); @@ -87,9 +89,9 @@ export async function askAllPeersForDeltas() { peers .filter(({isSelf}) => !isSelf) .forEach(async (peer, idx) => { - console.log(`Asking peer ${idx} for deltas`); + debug(`Asking peer ${idx} for deltas`); const deltas = await peer.askForDeltas(); - console.log(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); + debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); for (const delta of deltas) { delta.receivedFrom = peer.reqAddr; receiveDelta(delta); diff --git a/src/pub-sub.ts b/src/pub-sub.ts index 7fa0d6d..4356241 100644 --- a/src/pub-sub.ts +++ b/src/pub-sub.ts @@ -1,6 +1,8 @@ import {Publisher, Subscriber} from 'zeromq'; import {PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT} from './config'; import {PeerAddress} from './types'; +import Debug from 'debug'; +const debug = Debug('pub-sub'); export const publishSock = new Publisher(); export const subscribeSock = new Subscriber(); @@ -8,14 +10,14 @@ export const subscribeSock = new Subscriber(); export async function bindPublish() { const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`; await publishSock.bind(addrStr); - console.log(`Publishing socket bound to ${addrStr}`); + debug(`Publishing socket bound to ${addrStr}`); } export function connectSubscribe(publishAddr: PeerAddress) { // TODO: peer discovery const addrStr = `tcp://${publishAddr.toAddrString()}`; - console.log('connectSubscribe', {addrStr}); + debug('connectSubscribe', {addrStr}); subscribeSock.connect(addrStr); subscribeSock.subscribe("deltas"); - console.log(`Subscribing to ${addrStr}`); + debug(`Subscribing to ${addrStr}`); } diff --git a/src/request-reply.ts b/src/request-reply.ts index 02bea79..6a5bb33 100644 --- a/src/request-reply.ts +++ b/src/request-reply.ts @@ -2,6 +2,8 @@ import { Request, Reply, Message } from 'zeromq'; import { REQUEST_BIND_PORT, REQUEST_BIND_ADDR} from './config'; import { EventEmitter } from 'node:events'; import { PeerMethods } from './peers'; +import Debug from 'debug'; +const debug = Debug('request-reply'); export type PeerRequest = { method: PeerMethods; @@ -15,12 +17,12 @@ const requestStream = new EventEmitter(); export async function bindReply() { const addrStr = `tcp://${REQUEST_BIND_ADDR}:${REQUEST_BIND_PORT}`; await replySock.bind(addrStr); - console.log(`Reply socket bound to ${addrStr}`); + debug(`Reply socket bound to ${addrStr}`); } export async function runRequestHandlers() { for await (const [msg] of replySock) { - console.log(`Received message`, {msg: msg.toString()}); + debug(`Received message`, {msg: msg.toString()}); const req = peerRequestFromMsg(msg); requestStream.emit('request', req); } @@ -32,7 +34,7 @@ function peerRequestFromMsg(msg: Message): PeerRequest | null { const obj = JSON.parse(msg.toString()); req = {...obj}; } catch(e) { - console.log('error receiving command', e); + debug('error receiving command', e); } return req; } @@ -46,7 +48,7 @@ export class ResponseSocket { if (typeof msg === 'object') { msg = JSON.stringify(msg); } - console.log('sending reply', {msg}); + debug('sending reply', {msg}); await this.sock.send(msg); } } @@ -63,7 +65,7 @@ export class RequestSocket { constructor(host: string, port: number) { const addrStr = `tcp://${host}:${port}`; this.sock.connect(addrStr); - console.log(`Request socket connecting to ${addrStr}`); + debug(`Request socket connecting to ${addrStr}`); } async request(method: PeerMethods): Promise { const req: PeerRequest = {