converted logging to use debug library. added markdown watch and reread

This commit is contained in:
Ladd Hoffman 2024-12-24 13:41:31 -06:00
parent 165732d7cd
commit b7921a6edf
11 changed files with 204 additions and 112 deletions

View File

@ -1,61 +1,93 @@
## Setup ## Setup
- Install nodejs Install [`nvm`](https://nvm.sh)
- Install [nvm](https://nvm.sh)
## Install Clone repo
```bash
git clone https://gitea.dgov.io/ladd/rhizome
```
Use `nvm` to install and activate the target nodejs version
```bash ```bash
nvm install nvm install
```
Install nodejs packages
```bash
npm install npm install
``` ```
## Build ## Build
Compile Typescript
```bash ```bash
npx tsc npm run build
# npm run build # also works ```
# npx tsc --watch # is useful during development During development, it's useful to run the compiler in watch mode:
```bash
npm run build:watch
``` ```
## Run ## Run
To demonstrate the example application, you can open multiple terminals. In each terminal execute something like the following. To demonstrate the example application, you can open multiple terminals, and in each terminal execute something like the following:
```bash ```bash
export DEBUG="*,-express"
export RHIZOME_REQUEST_BIND_PORT=4000 export RHIZOME_REQUEST_BIND_PORT=4000
export RHIZOME_PUBLISH_BIND_PORT=4001 export RHIZOME_PUBLISH_BIND_PORT=4001
export RHIZOME_SEED_PEERS='127.0.0.1:4002, 127.0.0.1:4004' export RHIZOME_SEED_PEERS='127.0.0.1:4002, 127.0.0.1:4004'
export RHIZOME_HTTP_API_PORT=3000 export RHIZOME_HTTP_API_PORT=3000
export RHIZOME_PEER_ID=peer1 export RHIZOME_PEER_ID=peer1
node dist/example-app.js npm run example-app
``` ```
```bash ```bash
export DEBUG="*,-express"
export RHIZOME_REQUEST_BIND_PORT=4002 export RHIZOME_REQUEST_BIND_PORT=4002
export RHIZOME_PUBLISH_BIND_PORT=4003 export RHIZOME_PUBLISH_BIND_PORT=4003
export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4004' export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4004'
export RHIZOME_HTTP_API_PORT=3001
export RHIZOME_PEER_ID=peer2 export RHIZOME_PEER_ID=peer2
node dist/example-app.js npm run example-app
``` ```
```bash ```bash
export DEBUG="*,-express"
export RHIZOME_REQUEST_BIND_PORT=4004 export RHIZOME_REQUEST_BIND_PORT=4004
export RHIZOME_PUBLISH_BIND_PORT=4005 export RHIZOME_PUBLISH_BIND_PORT=4005
export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4002' export RHIZOME_SEED_PEERS='127.0.0.1:4000, 127.0.0.1:4002'
export RHIZOME_HTTP_API_PORT=3002
export RHIZOME_PEER_ID=peer3 export RHIZOME_PEER_ID=peer3
node dist/example-app.js npm run example-app
``` ```
In a separate terminal, you can use `curl` to interact with an instance. In a separate terminal, you can use `curl` to interact with an instance.
`jq` is helpful for formatting the json responses. `jq` is helpful for formatting the json responses.
Query the number of peers seen by a given node (including itself)
```bash ```bash
curl -s http://localhost:3000/peers/count | jq curl -s http://localhost:3000/peers/count | jq
```
Query the list of peers seen by a given node (including itself)
```bash
curl -s http://localhost:3000/peers | jq curl -s http://localhost:3000/peers | jq
```
Query the number of deltas ingested by this node
```bash
curl -s http://localhost:3000/deltas/count | jq curl -s http://localhost:3000/deltas/count | jq
```
Query the list of deltas ingested by this node
```bash
curl -s http://localhost:3000/deltas | jq curl -s http://localhost:3000/deltas | jq
``` ```
# Project Management
- [] Item 1
- [] Item 2

View File

@ -1,12 +0,0 @@
> myk:
> I think so far this seems mostly on point, but I'd focus on building the bridge between Domain Entity (lossy representation) <-> Lossless Representation <-> Delta[] I think
> the tricky stuff comes in with, like, how do you take an undifferentiated stream of deltas, a query and a schema
> and filter / merge those deltas into the lossless tree structure you need in order to then reduce into a lossy domain node
> if that part of the flow works then the rest becomes fairly self-evident
> a "lossless representation" is basically a DAG/Tree that starts with a root node whose properties each contain the deltas that assign values to them, where the delta may have a pointer up to "this" and then down to some related domain node, which gets interpolated into the tree instead of just referenced, and it has its properties contain the deltas that target it, etc
> so you need both the ID of the root node (the thing being targeted by one or more deltas) as well as the scehma to apply to determine which contexts on that target to include (target_context effectively becomes a property on the domain entity, right?), as well as which schema to apply to included referenced entities, etc.
> so it's what keeps you from returning the whole stream of deltas, while still allowing you to follow arbitrary edges

View File

@ -88,3 +88,4 @@ Lossy transformation:
actors: [{role: neo, base_salary: 1000000, salary_currency: "usd"}], actors: [{role: neo, base_salary: 1000000, salary_currency: "usd"}],
}, },
} }

View File

@ -3,7 +3,9 @@
"version": "1.0.0", "version": "1.0.0",
"description": "Rhizomatic database engine node", "description": "Rhizomatic database engine node",
"scripts": { "scripts": {
"start": "node --experimental-strip-types --experimental-transform-types src/main.ts", "example-app": "node dist/example-app.js",
"build": "tsc",
"build:watch": "tsc --watch",
"lint": "eslint", "lint": "eslint",
"test": "jest" "test": "jest"
}, },

View File

@ -2,7 +2,9 @@ import EventEmitter from 'node:events';
import objectHash from 'object-hash'; import objectHash from 'object-hash';
import {myRequestAddr} from './peers'; import {myRequestAddr} from './peers';
import {publishSock, subscribeSock} from './pub-sub'; import {publishSock, subscribeSock} from './pub-sub';
import {Decision, Delta, PeerAddress, Properties} from './types'; import {Decision, Delta, PeerAddress} from './types';
import Debug from 'debug';
const debug = Debug('deltas');
export const deltaStream = new EventEmitter(); export const deltaStream = new EventEmitter();
@ -75,7 +77,7 @@ export function subscribeDeltas(fn: (delta: Delta) => void) {
} }
export async function publishDelta(delta: Delta) { export async function publishDelta(delta: Delta) {
console.log(`Publishing delta: ${JSON.stringify(delta)}`); debug(`Publishing delta: ${JSON.stringify(delta)}`);
await publishSock.send(["deltas", myRequestAddr.toAddrString(), serializeDelta(delta)]); await publishSock.send(["deltas", myRequestAddr.toAddrString(), serializeDelta(delta)]);
} }
@ -94,7 +96,7 @@ export async function runDeltas() {
} }
const delta = deserializeDelta(msg.toString()); const delta = deserializeDelta(msg.toString());
delta.receivedFrom = PeerAddress.fromString(sender.toString()); delta.receivedFrom = PeerAddress.fromString(sender.toString());
console.log(`Received delta: ${JSON.stringify(delta)}`); debug(`Received delta: ${JSON.stringify(delta)}`);
ingestDelta(delta); ingestDelta(delta);
} }
} }

View File

@ -1,4 +1,3 @@
import {Collection} from "./collection";
import {HTTP_API_ENABLE} from "./config"; import {HTTP_API_ENABLE} from "./config";
import {runDeltas} from "./deltas"; import {runDeltas} from "./deltas";
import {runHttpApi} from "./http-api"; import {runHttpApi} from "./http-api";
@ -7,6 +6,8 @@ import {askAllPeersForDeltas, subscribeToSeeds} from "./peers";
import {bindPublish, } from "./pub-sub"; import {bindPublish, } from "./pub-sub";
import {bindReply, runRequestHandlers} from "./request-reply"; import {bindReply, runRequestHandlers} from "./request-reply";
import {TypedCollection} from "./typed-collection"; import {TypedCollection} from "./typed-collection";
import Debug from 'debug';
const debug = Debug('example-app');
// As an app we want to be able to write and read data. // As an app we want to be able to write and read data.
// The data is whatever shape we define it to be in a given context. // The data is whatever shape we define it to be in a given context.
@ -39,11 +40,11 @@ type User = {
await new Promise((resolve) => setTimeout(resolve, 1000)); await new Promise((resolve) => setTimeout(resolve, 1000));
users.onUpdate((u: Entity) => { users.onUpdate((u: Entity) => {
console.log('User updated:', u); debug('User updated:', u);
}); });
users.onCreate((u: Entity) => { users.onCreate((u: Entity) => {
console.log('New user!:', u); debug('New user!:', u);
}); });
const taliesin = users.put(undefined, { const taliesin = users.put(undefined, {
@ -60,9 +61,9 @@ type User = {
const result = users.get(taliesin.id); const result = users.get(taliesin.id);
const matches: boolean = JSON.stringify(result) === JSON.stringify(taliesin); const matches: boolean = JSON.stringify(result) === JSON.stringify(taliesin);
if (matches) { if (matches) {
console.log('Result matches expected: ' + JSON.stringify(taliesin)); debug('Result matches expected: ' + JSON.stringify(taliesin));
} else { } else {
console.log(`Result does not match expected.` + debug(`Result does not match expected.` +
`\n\nExpected \n${JSON.stringify(taliesin)}` + `\n\nExpected \n${JSON.stringify(taliesin)}` +
`\nReceived\n${JSON.stringify(result)}`); `\nReceived\n${JSON.stringify(result)}`);
} }

View File

@ -1,13 +1,13 @@
import Debug from "debug";
import express from "express"; import express from "express";
import {readdirSync, readFileSync, watch} from "fs";
import path, {join} from "path";
import {Converter} from "showdown";
import {Collection} from "./collection";
import {HTTP_API_ADDR, HTTP_API_PORT} from "./config"; import {HTTP_API_ADDR, HTTP_API_PORT} from "./config";
import {deltasAccepted} from "./deltas"; import {deltasAccepted} from "./deltas";
import {peers} from "./peers"; import {peers} from "./peers";
import {Delta} from "./types"; import {Delta} from "./types";
import {readdirSync, readFileSync} from "fs";
import Debug from "debug";
import {Collection} from "./collection";
import {Converter} from "showdown";
import path from "path";
const debug = Debug('http-api'); const debug = Debug('http-api');
type CollectionsToServe = { type CollectionsToServe = {
@ -20,30 +20,128 @@ const docConverter = new Converter({
}); });
const htmlDocFromMarkdown = (md: string): string => docConverter.makeHtml(md); const htmlDocFromMarkdown = (md: string): string => docConverter.makeHtml(md);
type mdFileInfo = {
name: string,
md: string,
html: string
};
class MDFiles {
files = new Map<string, mdFileInfo>();
readme?: mdFileInfo;
readFile(name: string) {
const md = readFileSync(join('./markdown', `${name}.md`)).toString();
const html = htmlDocFromMarkdown(md);
this.files.set(name, {name, md, html});
}
readReadme() {
const md = readFileSync('./README.md').toString();
const html = htmlDocFromMarkdown(md);
this.readme = {name: 'README', md, html};
}
getReadmeHTML() {
return this.readme?.html;
}
getHtml(name: string): string | undefined {
return this.files.get(name)?.html;
}
list(): string[] {
return Array.from(this.files.keys());
}
readDir() {
// Read list of markdown files from directory and
// render each markdown file as html
readdirSync('./markdown/')
.filter((f) => f.endsWith('.md'))
.map((name) => path.parse(name).name)
.forEach((name) => this.readFile(name));
}
watchDir() {
watch('./markdown', null, (eventType, filename) => {
if (!filename) return;
if (!filename.endsWith(".md")) return;
const name = path.parse(filename).name;
switch (eventType) {
case 'rename': {
debug(`file ${name} renamed`);
// Remove it from memory and re-scan everything
this.files.delete(name);
this.readDir();
break;
}
case 'change': {
debug(`file ${name} changed`);
// Re-read this file
this.readFile(name)
break;
}
}
});
}
watchReadme() {
watch('./README.md', null, (eventType, filename) => {
if (!filename) return;
switch (eventType) {
case 'change': {
debug(`README file changed`);
// Re-read this file
this.readReadme()
break;
}
}
});
}
}
export function runHttpApi(collections?: CollectionsToServe) { export function runHttpApi(collections?: CollectionsToServe) {
const app = express(); const app = express();
app.use(express.json()); app.use(express.json());
// Convert markdown to HTML and serve it // Get list of markdown files
const mdFiles = readdirSync('./markdown/') const mdFiles = new MDFiles();
.filter((f) => f.endsWith('.md')) mdFiles.readDir();
.map((name) => path.parse(name).name); mdFiles.readReadme();
mdFiles.watchDir();
mdFiles.watchReadme();
debug('mdFiles:', mdFiles); // Serve README
app.get('/html/README', (_req: express.Request, res: express.Response) => {
app.get('/html', (_req: express.Request, res: express.Response) => { const html = mdFiles.getReadmeHTML();
let md = `# Files\n\n`;
for (const name of mdFiles) {
md += `- [${name}](./${name})\n`;
}
const html = htmlDocFromMarkdown(md);
res.setHeader('content-type', 'text/html').send(html); res.setHeader('content-type', 'text/html').send(html);
}); });
for (const name of mdFiles) { // Serve markdown files as html
const md = readFileSync(`./markdown/${name}.md`).toString(); app.get('/html/:name', (req: express.Request, res: express.Response) => {
let html = mdFiles.getHtml(req.params.name);
if (!html) {
res.status(404);
html = htmlDocFromMarkdown('# 404\n\n## [Index](/html)');
}
res.setHeader('content-type', 'text/html');
res.send(html);
});
// Serve index
{
let md = `# Files\n\n`;
md += `[README](/html/README)\n\n`;
for (const name of mdFiles.list()) {
md += `- [${name}](./${name})\n`;
}
const html = htmlDocFromMarkdown(md); const html = htmlDocFromMarkdown(md);
app.get(`/html/${name}`, (_req: express.Request, res: express.Response) => {
app.get('/html', (_req: express.Request, res: express.Response) => {
res.setHeader('content-type', 'text/html').send(html); res.setHeader('content-type', 'text/html').send(html);
}); });
} }
@ -54,16 +152,20 @@ export function runHttpApi(collections?: CollectionsToServe) {
for (const [name, collection] of Object.entries(collections)) { for (const [name, collection] of Object.entries(collections)) {
debug(`collection: ${name}`); debug(`collection: ${name}`);
// Get the ID of all domain entities
app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => { app.get(`/${name}/ids`, (_req: express.Request, res: express.Response) => {
res.json({ids: collection.getIds()}); res.json({ids: collection.getIds()});
}); });
// Add a new domain entity
// TODO: schema validation
app.put(`/${name}`, (req: express.Request, res: express.Response) => { app.put(`/${name}`, (req: express.Request, res: express.Response) => {
const {body: properties} = req; const {body: properties} = req;
const ent = collection.put(undefined, properties); const ent = collection.put(undefined, properties);
res.json(ent); res.json(ent);
}); });
// Update a domain entity
app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => { app.put(`/${name}/:id`, (req: express.Request, res: express.Response) => {
const {body: properties, params: {id}} = req; const {body: properties, params: {id}} = req;
if (properties.id && properties.id !== id) { if (properties.id && properties.id !== id) {
@ -81,10 +183,12 @@ export function runHttpApi(collections?: CollectionsToServe) {
res.json(deltasAccepted); res.json(deltasAccepted);
}); });
// Get the number of deltas ingested by this node
app.get("/deltas/count", (_req: express.Request, res: express.Response) => { app.get("/deltas/count", (_req: express.Request, res: express.Response) => {
res.json(deltasAccepted.length); res.json(deltasAccepted.length);
}); });
// Get the list of peers seen by this node (including itself)
app.get("/peers", (_req: express.Request, res: express.Response) => { app.get("/peers", (_req: express.Request, res: express.Response) => {
res.json(peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => { res.json(peers.map(({reqAddr, publishAddr, isSelf, isSeedPeer}) => {
const deltasAcceptedCount = deltasAccepted const deltasAcceptedCount = deltasAccepted
@ -106,6 +210,7 @@ export function runHttpApi(collections?: CollectionsToServe) {
})); }));
}); });
// Get the number of peers seen by this node (including itself)
app.get("/peers/count", (_req: express.Request, res: express.Response) => { app.get("/peers/count", (_req: express.Request, res: express.Response) => {
res.json(peers.length); res.json(peers.length);
}); });

View File

@ -1,45 +0,0 @@
import express from "express";
import { runDeltas } from "./deltas";
import {HTTP_API_ENABLE, HTTP_API_ADDR, HTTP_API_PORT} from "./config";
const app = express()
app.get("/", (req: express.Request, res: express.Response) => {
res.json({ message: "Welcome to the Express + TypeScript Server!" });
});
if (HTTP_API_ENABLE) {
app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => {
console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`);
});
}
// TODO: Endpoint: Receive a delta
//
// TODO: Websockets
// TODO: UDP
// TODO: ZeroMQ
//
// TODO: Endpoint: Query (materialized view)
// TODO: Endpoint: Info about peers
// TODO: Propagate information about peers (~gossip / or maybe just same as other kinds of deltas)
// So we dogfood the delta data structure and the distributed architecture
//
//
// TODO: Collections of functions
// How are we defining functions?
// Transformations?
// Inputs, calculations, outputs;
// Tx/Rx/Store/Retrieve/Compute;
// Schedule?
//
//
// What assumptions, if any, can we or do we want to make about our operating envoronment/situation?
// How much continuity dare we hope for?
// It's going to depend on the use case
// You simply want a formula for expressing your confidence in things
// That can be encoded as deltas
runDeltas();

View File

@ -1,8 +1,10 @@
import {PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from "./config"; import {PUBLISH_BIND_HOST, PUBLISH_BIND_PORT, REQUEST_BIND_HOST, REQUEST_BIND_PORT, SEED_PEERS} from "./config";
import {deltasAccepted, deltasProposed, ingestAll, receiveDelta} from "./deltas"; import {deltasAccepted, ingestAll, receiveDelta} from "./deltas";
import {connectSubscribe} from "./pub-sub"; import {connectSubscribe} from "./pub-sub";
import {PeerRequest, registerRequestHandler, RequestSocket, ResponseSocket} from "./request-reply"; import {PeerRequest, registerRequestHandler, RequestSocket, ResponseSocket} from "./request-reply";
import {Delta, PeerAddress} from "./types"; import {Delta, PeerAddress} from "./types";
import Debug from 'debug';
const debug = Debug('peers');
export enum PeerMethods { export enum PeerMethods {
GetPublishAddress, GetPublishAddress,
@ -13,15 +15,15 @@ export const myRequestAddr = new PeerAddress(REQUEST_BIND_HOST, REQUEST_BIND_POR
export const myPublishAddr = new PeerAddress(PUBLISH_BIND_HOST, PUBLISH_BIND_PORT); export const myPublishAddr = new PeerAddress(PUBLISH_BIND_HOST, PUBLISH_BIND_PORT);
registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => {
console.log('inspecting peer request'); debug('inspecting peer request');
switch (req.method) { switch (req.method) {
case PeerMethods.GetPublishAddress: { case PeerMethods.GetPublishAddress: {
console.log('it\'s a request for our publish address'); debug('it\'s a request for our publish address');
await res.send(myPublishAddr.toAddrString()); await res.send(myPublishAddr.toAddrString());
break; break;
} }
case PeerMethods.AskForDeltas: { case PeerMethods.AskForDeltas: {
console.log('it\'s a request for deltas'); debug('it\'s a request for deltas');
// TODO: stream these rather than // TODO: stream these rather than
// trying to write them all in one message // trying to write them all in one message
await res.send(JSON.stringify(deltasAccepted)); await res.send(JSON.stringify(deltasAccepted));
@ -41,7 +43,7 @@ class Peer {
this.reqSock = new RequestSocket(addr, port); this.reqSock = new RequestSocket(addr, port);
this.isSelf = addr === myRequestAddr.addr && port === myRequestAddr.port; this.isSelf = addr === myRequestAddr.addr && port === myRequestAddr.port;
this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => this.isSeedPeer = !!SEED_PEERS.find((seedPeer) =>
addr === seedPeer.addr && port === seedPeer.port); addr === seedPeer.addr && port === seedPeer.port);
} }
async subscribe() { async subscribe() {
if (!this.publishAddr) { if (!this.publishAddr) {
@ -76,7 +78,7 @@ function newPeer(addr: string, port: number) {
export async function subscribeToSeeds() { export async function subscribeToSeeds() {
SEED_PEERS.forEach(async ({addr, port}, idx) => { SEED_PEERS.forEach(async ({addr, port}, idx) => {
console.log(`SEED PEERS[${idx}]=${addr}:${port}`); debug(`SEED PEERS[${idx}]=${addr}:${port}`);
const peer = newPeer(addr, port); const peer = newPeer(addr, port);
await peer.subscribe(); await peer.subscribe();
}); });
@ -87,9 +89,9 @@ export async function askAllPeersForDeltas() {
peers peers
.filter(({isSelf}) => !isSelf) .filter(({isSelf}) => !isSelf)
.forEach(async (peer, idx) => { .forEach(async (peer, idx) => {
console.log(`Asking peer ${idx} for deltas`); debug(`Asking peer ${idx} for deltas`);
const deltas = await peer.askForDeltas(); const deltas = await peer.askForDeltas();
console.log(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`); debug(`received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
for (const delta of deltas) { for (const delta of deltas) {
delta.receivedFrom = peer.reqAddr; delta.receivedFrom = peer.reqAddr;
receiveDelta(delta); receiveDelta(delta);

View File

@ -1,6 +1,8 @@
import {Publisher, Subscriber} from 'zeromq'; import {Publisher, Subscriber} from 'zeromq';
import {PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT} from './config'; import {PUBLISH_BIND_ADDR, PUBLISH_BIND_PORT} from './config';
import {PeerAddress} from './types'; import {PeerAddress} from './types';
import Debug from 'debug';
const debug = Debug('pub-sub');
export const publishSock = new Publisher(); export const publishSock = new Publisher();
export const subscribeSock = new Subscriber(); export const subscribeSock = new Subscriber();
@ -8,14 +10,14 @@ export const subscribeSock = new Subscriber();
export async function bindPublish() { export async function bindPublish() {
const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`; const addrStr = `tcp://${PUBLISH_BIND_ADDR}:${PUBLISH_BIND_PORT}`;
await publishSock.bind(addrStr); await publishSock.bind(addrStr);
console.log(`Publishing socket bound to ${addrStr}`); debug(`Publishing socket bound to ${addrStr}`);
} }
export function connectSubscribe(publishAddr: PeerAddress) { export function connectSubscribe(publishAddr: PeerAddress) {
// TODO: peer discovery // TODO: peer discovery
const addrStr = `tcp://${publishAddr.toAddrString()}`; const addrStr = `tcp://${publishAddr.toAddrString()}`;
console.log('connectSubscribe', {addrStr}); debug('connectSubscribe', {addrStr});
subscribeSock.connect(addrStr); subscribeSock.connect(addrStr);
subscribeSock.subscribe("deltas"); subscribeSock.subscribe("deltas");
console.log(`Subscribing to ${addrStr}`); debug(`Subscribing to ${addrStr}`);
} }

View File

@ -2,6 +2,8 @@ import { Request, Reply, Message } from 'zeromq';
import { REQUEST_BIND_PORT, REQUEST_BIND_ADDR} from './config'; import { REQUEST_BIND_PORT, REQUEST_BIND_ADDR} from './config';
import { EventEmitter } from 'node:events'; import { EventEmitter } from 'node:events';
import { PeerMethods } from './peers'; import { PeerMethods } from './peers';
import Debug from 'debug';
const debug = Debug('request-reply');
export type PeerRequest = { export type PeerRequest = {
method: PeerMethods; method: PeerMethods;
@ -15,12 +17,12 @@ const requestStream = new EventEmitter();
export async function bindReply() { export async function bindReply() {
const addrStr = `tcp://${REQUEST_BIND_ADDR}:${REQUEST_BIND_PORT}`; const addrStr = `tcp://${REQUEST_BIND_ADDR}:${REQUEST_BIND_PORT}`;
await replySock.bind(addrStr); await replySock.bind(addrStr);
console.log(`Reply socket bound to ${addrStr}`); debug(`Reply socket bound to ${addrStr}`);
} }
export async function runRequestHandlers() { export async function runRequestHandlers() {
for await (const [msg] of replySock) { for await (const [msg] of replySock) {
console.log(`Received message`, {msg: msg.toString()}); debug(`Received message`, {msg: msg.toString()});
const req = peerRequestFromMsg(msg); const req = peerRequestFromMsg(msg);
requestStream.emit('request', req); requestStream.emit('request', req);
} }
@ -32,7 +34,7 @@ function peerRequestFromMsg(msg: Message): PeerRequest | null {
const obj = JSON.parse(msg.toString()); const obj = JSON.parse(msg.toString());
req = {...obj}; req = {...obj};
} catch(e) { } catch(e) {
console.log('error receiving command', e); debug('error receiving command', e);
} }
return req; return req;
} }
@ -46,7 +48,7 @@ export class ResponseSocket {
if (typeof msg === 'object') { if (typeof msg === 'object') {
msg = JSON.stringify(msg); msg = JSON.stringify(msg);
} }
console.log('sending reply', {msg}); debug('sending reply', {msg});
await this.sock.send(msg); await this.sock.send(msg);
} }
} }
@ -63,7 +65,7 @@ export class RequestSocket {
constructor(host: string, port: number) { constructor(host: string, port: number) {
const addrStr = `tcp://${host}:${port}`; const addrStr = `tcp://${host}:${port}`;
this.sock.connect(addrStr); this.sock.connect(addrStr);
console.log(`Request socket connecting to ${addrStr}`); debug(`Request socket connecting to ${addrStr}`);
} }
async request(method: PeerMethods): Promise<Message> { async request(method: PeerMethods): Promise<Message> {
const req: PeerRequest = { const req: PeerRequest = {