fixed a small bug with global seedPeers
This commit is contained in:
parent
20b71c5d2f
commit
cb09606590
|
@ -1,13 +1,22 @@
|
||||||
import {PeerAddress} from '../src/types.js';
|
import {parseAddressList, PeerAddress} from '../src/peers.js';
|
||||||
|
|
||||||
describe('PeerAddress', () => {
|
describe('PeerAddress', () => {
|
||||||
it('toString()', () => {
|
it('toString()', () => {
|
||||||
const addr = new PeerAddress('localhost', 1000);
|
const addr = new PeerAddress('localhost', 1000);
|
||||||
expect(addr.toAddrString()).toBe("localhost:1000");
|
expect(addr.toAddrString()).toBe("localhost:1000");
|
||||||
});
|
});
|
||||||
|
|
||||||
it('fromString()', () => {
|
it('fromString()', () => {
|
||||||
const addr = PeerAddress.fromString("localhost:1000");
|
const addr = PeerAddress.fromString("localhost:1000");
|
||||||
expect(addr.addr).toBe("localhost");
|
expect(addr.addr).toBe("localhost");
|
||||||
expect(addr.port).toBe(1000);
|
expect(addr.port).toBe(1000);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('parseAddressList()', () => {
|
||||||
|
const input = "255.255.255.255:99999, 0.0.0.0:0";
|
||||||
|
const result = parseAddressList(input);
|
||||||
|
expect(result).toHaveLength(2);
|
||||||
|
expect(result[0].isEqual(new PeerAddress("255.255.255.255", 99999))).toBeTruthy();
|
||||||
|
expect(result[1].isEqual(new PeerAddress("0.0.0.0", 0))).toBeTruthy();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -16,8 +16,13 @@ describe('Run', () => {
|
||||||
// Make the apps use the same pubsub topic so they can talk to each other
|
// Make the apps use the same pubsub topic so they can talk to each other
|
||||||
pubSubTopic: apps[0].config.pubSubTopic,
|
pubSubTopic: apps[0].config.pubSubTopic,
|
||||||
});
|
});
|
||||||
|
debug('app[0].config.seedPeers before adding:', JSON.stringify(apps[0].config.seedPeers));
|
||||||
apps[0].config.seedPeers.push(apps[1].myRequestAddr);
|
apps[0].config.seedPeers.push(apps[1].myRequestAddr);
|
||||||
|
debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers));
|
||||||
|
debug('app[1].config.seedPeers before adding:', JSON.stringify(apps[1].config.seedPeers));
|
||||||
apps[1].config.seedPeers.push(apps[0].myRequestAddr);
|
apps[1].config.seedPeers.push(apps[0].myRequestAddr);
|
||||||
|
debug('app[1].config.seedPeers after adding:', JSON.stringify(apps[1].config.seedPeers));
|
||||||
|
debug('app[0].config.seedPeers after adding:', JSON.stringify(apps[0].config.seedPeers));
|
||||||
|
|
||||||
await Promise.all(apps.map((app) => app.start()));
|
await Promise.all(apps.map((app) => app.start()));
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import {randomUUID} from "crypto";
|
import {randomUUID} from "crypto";
|
||||||
import {PeerAddress} from "./types.js";
|
|
||||||
|
|
||||||
// _HOST refers to the address from an external perspective
|
// _HOST refers to the address from an external perspective
|
||||||
// _ADDR refers to the interface address from the service's perspective
|
// _ADDR refers to the interface address from the service's perspective
|
||||||
|
@ -8,6 +7,9 @@ export const LEVEL_DB_DIR = process.env.RHIZOME_LEVEL_DB_DIR ?? './data';
|
||||||
export const CREATOR = process.env.USER!;
|
export const CREATOR = process.env.USER!;
|
||||||
export const PEER_ID = process.env.RHIZOME_PEER_ID || randomUUID();
|
export const PEER_ID = process.env.RHIZOME_PEER_ID || randomUUID();
|
||||||
export const ADDRESS = process.env.RHIZOME_ADDRESS ?? 'localhost';
|
export const ADDRESS = process.env.RHIZOME_ADDRESS ?? 'localhost';
|
||||||
|
|
||||||
|
export const SEED_PEERS = process.env.RHIZOME_SEED_PEERS || '';
|
||||||
|
|
||||||
export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS;
|
export const REQUEST_BIND_ADDR = process.env.RHIZOME_REQUEST_BIND_ADDR || ADDRESS;
|
||||||
export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000');
|
export const REQUEST_BIND_PORT = parseInt(process.env.RHIZOME_REQUEST_BIND_PORT || '4000');
|
||||||
export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUEST_BIND_ADDR;
|
export const REQUEST_BIND_HOST = process.env.RHIZOME_REQUEST_BIND_HOST || REQUEST_BIND_ADDR;
|
||||||
|
@ -17,8 +19,5 @@ export const PUBLISH_BIND_HOST = process.env.RHIZOME_PUBLISH_BIND_HOST || PUBLIS
|
||||||
export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || 'localhost';
|
export const HTTP_API_ADDR = process.env.RHIZOME_HTTP_API_ADDR || ADDRESS || 'localhost';
|
||||||
export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000');
|
export const HTTP_API_PORT = parseInt(process.env.RHIZOME_HTTP_API_PORT || '3000');
|
||||||
export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
|
export const HTTP_API_ENABLE = process.env.RHIZOME_HTTP_API_ENABLE === 'true';
|
||||||
export const SEED_PEERS: PeerAddress[] = (process.env.RHIZOME_SEED_PEERS || '').split(',')
|
|
||||||
.filter(x => !!x)
|
|
||||||
.map((peer: string) => PeerAddress.fromString(peer));
|
|
||||||
|
|
||||||
export const PUB_SUB_TOPIC = process.env.RHIZOME_PUB_SUB_TOPIC || `deltas-${randomUUID()}`;
|
export const PUB_SUB_TOPIC = process.env.RHIZOME_PUB_SUB_TOPIC || `deltas-${randomUUID()}`;
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import {randomUUID} from "crypto";
|
import {randomUUID} from "crypto";
|
||||||
import microtime from 'microtime';
|
import microtime from 'microtime';
|
||||||
import {CreatorID, HostID, PeerAddress, Timestamp, TransactionID} from "./types.js";
|
import {CreatorID, HostID, Timestamp, TransactionID} from "./types.js";
|
||||||
|
import {PeerAddress} from "./peers.js";
|
||||||
|
|
||||||
export type DeltaID = string;
|
export type DeltaID = string;
|
||||||
|
|
||||||
|
|
|
@ -3,10 +3,9 @@ import {CREATOR, HTTP_API_ADDR, HTTP_API_ENABLE, HTTP_API_PORT, PEER_ID, PUB_SUB
|
||||||
import {DeltaStream} from './deltas.js';
|
import {DeltaStream} from './deltas.js';
|
||||||
import {HttpServer} from './http/index.js';
|
import {HttpServer} from './http/index.js';
|
||||||
import {Lossless} from './lossless.js';
|
import {Lossless} from './lossless.js';
|
||||||
import {Peers} from './peers.js';
|
import {parseAddressList, PeerAddress, Peers} from './peers.js';
|
||||||
import {PubSub} from './pub-sub.js';
|
import {PubSub} from './pub-sub.js';
|
||||||
import {RequestReply} from './request-reply.js';
|
import {RequestReply} from './request-reply.js';
|
||||||
import {PeerAddress} from './types.js';
|
|
||||||
const debug = Debug('rz:rhizome-node');
|
const debug = Debug('rz:rhizome-node');
|
||||||
|
|
||||||
export type RhizomeNodeConfig = {
|
export type RhizomeNodeConfig = {
|
||||||
|
@ -48,7 +47,7 @@ export class RhizomeNode {
|
||||||
httpAddr: HTTP_API_ADDR,
|
httpAddr: HTTP_API_ADDR,
|
||||||
httpPort: HTTP_API_PORT,
|
httpPort: HTTP_API_PORT,
|
||||||
httpEnable: HTTP_API_ENABLE,
|
httpEnable: HTTP_API_ENABLE,
|
||||||
seedPeers: SEED_PEERS,
|
seedPeers: parseAddressList(SEED_PEERS),
|
||||||
peerId: PEER_ID,
|
peerId: PEER_ID,
|
||||||
creator: CREATOR,
|
creator: CREATOR,
|
||||||
pubSubTopic: PUB_SUB_TOPIC,
|
pubSubTopic: PUB_SUB_TOPIC,
|
||||||
|
@ -107,5 +106,6 @@ export class RhizomeNode {
|
||||||
await this.pubSub.stop();
|
await this.pubSub.stop();
|
||||||
await this.requestReply.stop();
|
await this.requestReply.stop();
|
||||||
await this.httpServer.stop();
|
await this.httpServer.stop();
|
||||||
|
debug(`[${this.config.peerId}]`, 'stopped');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
53
src/peers.ts
53
src/peers.ts
|
@ -1,13 +1,44 @@
|
||||||
import Debug from 'debug';
|
import Debug from 'debug';
|
||||||
import {Message} from 'zeromq';
|
import {Message} from 'zeromq';
|
||||||
import {SEED_PEERS} from "./config.js";
|
|
||||||
import {Delta} from "./delta.js";
|
import {Delta} from "./delta.js";
|
||||||
import {RhizomeNode} from "./node.js";
|
import {RhizomeNode} from "./node.js";
|
||||||
import {Subscription} from './pub-sub.js';
|
import {Subscription} from './pub-sub.js';
|
||||||
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply.js";
|
import {PeerRequest, RequestSocket, ResponseSocket} from "./request-reply.js";
|
||||||
import {PeerAddress} from "./types.js";
|
|
||||||
const debug = Debug('rz:peers');
|
const debug = Debug('rz:peers');
|
||||||
|
|
||||||
|
export class PeerAddress {
|
||||||
|
addr: string;
|
||||||
|
port: number;
|
||||||
|
|
||||||
|
constructor(addr: string, port: number) {
|
||||||
|
this.addr = addr;
|
||||||
|
this.port = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
static fromString(addrString: string): PeerAddress {
|
||||||
|
const [addr, port] = addrString.trim().split(':');
|
||||||
|
return new PeerAddress(addr, parseInt(port));
|
||||||
|
}
|
||||||
|
|
||||||
|
toAddrString() {
|
||||||
|
return `${this.addr}:${this.port}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
toJSON() {
|
||||||
|
return this.toAddrString();
|
||||||
|
}
|
||||||
|
|
||||||
|
isEqual(other: PeerAddress) {
|
||||||
|
return this.addr === other.addr && this.port === other.port;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export function parseAddressList(input: string): PeerAddress[] {
|
||||||
|
return input.split(',')
|
||||||
|
.filter(x => !!x)
|
||||||
|
.map((peer: string) => PeerAddress.fromString(peer));
|
||||||
|
}
|
||||||
|
|
||||||
export enum RequestMethods {
|
export enum RequestMethods {
|
||||||
GetPublishAddress,
|
GetPublishAddress,
|
||||||
AskForDeltas
|
AskForDeltas
|
||||||
|
@ -26,7 +57,7 @@ class Peer {
|
||||||
this.rhizomeNode = rhizomeNode;
|
this.rhizomeNode = rhizomeNode;
|
||||||
this.reqAddr = reqAddr;
|
this.reqAddr = reqAddr;
|
||||||
this.isSelf = reqAddr.isEqual(this.rhizomeNode.myRequestAddr);
|
this.isSelf = reqAddr.isEqual(this.rhizomeNode.myRequestAddr);
|
||||||
this.isSeedPeer = !!SEED_PEERS.find((seedPeer) => reqAddr.isEqual(seedPeer));
|
this.isSeedPeer = this.rhizomeNode.config.seedPeers.some((seedPeer) => reqAddr.isEqual(seedPeer));
|
||||||
}
|
}
|
||||||
|
|
||||||
async request(method: RequestMethods): Promise<Message> {
|
async request(method: RequestMethods): Promise<Message> {
|
||||||
|
@ -44,6 +75,9 @@ class Peer {
|
||||||
debug(`[${this.rhizomeNode.config.peerId}]`, `received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
|
debug(`[${this.rhizomeNode.config.peerId}]`, `received publish addr ${this.publishAddr.toAddrString()} from peer ${this.reqAddr.toAddrString()}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug(`[${this.rhizomeNode.config.peerId}]`, `subscribing to peer ${this.reqAddr.toAddrString()}`);
|
||||||
|
|
||||||
|
// ZeroMQ subscription
|
||||||
this.subscription = this.rhizomeNode.pubSub.subscribe(
|
this.subscription = this.rhizomeNode.pubSub.subscribe(
|
||||||
this.publishAddr,
|
this.publishAddr,
|
||||||
this.rhizomeNode.config.pubSubTopic,
|
this.rhizomeNode.config.pubSubTopic,
|
||||||
|
@ -121,17 +155,24 @@ export class Peers {
|
||||||
}
|
}
|
||||||
|
|
||||||
async subscribeToSeeds() {
|
async subscribeToSeeds() {
|
||||||
SEED_PEERS.forEach(async (addr, idx) => {
|
const {seedPeers} = this.rhizomeNode.config;
|
||||||
debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}`);
|
debug(`[${this.rhizomeNode.config.peerId}]`, `subscribeToSeeds, seedPeers: ${JSON.stringify(seedPeers)}`);
|
||||||
|
seedPeers.forEach(async (addr, idx) => {
|
||||||
const peer = this.addPeer(addr);
|
const peer = this.addPeer(addr);
|
||||||
|
|
||||||
|
debug(`[${this.rhizomeNode.config.peerId}]`, `SEED PEERS[${idx}]=${addr.toAddrString()}, isSelf:`, peer.isSelf);
|
||||||
|
if (!peer.isSelf) {
|
||||||
await peer.subscribeDeltas();
|
await peer.subscribeDeltas();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
//! TODO Expect abysmal scaling properties with this function
|
//! TODO Expect abysmal scaling properties with this function
|
||||||
async askAllPeersForDeltas() {
|
async askAllPeersForDeltas() {
|
||||||
this.peers.filter(({isSelf}) => !isSelf)
|
this.peers
|
||||||
.forEach(async (peer, idx) => {
|
.forEach(async (peer, idx) => {
|
||||||
|
debug(`[${this.rhizomeNode.config.peerId}]`, `peer ${peer.reqAddr.toAddrString()} isSelf`, peer.isSelf);
|
||||||
|
if (peer.isSelf) return;
|
||||||
debug(`[${this.rhizomeNode.config.peerId}]`, `Asking peer ${idx} for deltas`);
|
debug(`[${this.rhizomeNode.config.peerId}]`, `Asking peer ${idx} for deltas`);
|
||||||
const deltas = await peer.askForDeltas();
|
const deltas = await peer.askForDeltas();
|
||||||
debug(`[${this.rhizomeNode.config.peerId}]`, `received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
|
debug(`[${this.rhizomeNode.config.peerId}]`, `received ${deltas.length} deltas from ${peer.reqAddr.toAddrString()}`);
|
||||||
|
|
|
@ -9,7 +9,7 @@ import Debug from 'debug';
|
||||||
import {Libp2p, createLibp2p} from 'libp2p';
|
import {Libp2p, createLibp2p} from 'libp2p';
|
||||||
import {Publisher, Subscriber} from 'zeromq';
|
import {Publisher, Subscriber} from 'zeromq';
|
||||||
import {RhizomeNode} from './node.js';
|
import {RhizomeNode} from './node.js';
|
||||||
import {PeerAddress} from './types.js';
|
import {PeerAddress} from './peers.js';
|
||||||
const debug = Debug('rz:pub-sub');
|
const debug = Debug('rz:pub-sub');
|
||||||
|
|
||||||
export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
|
export type SubscribedMessageHandler = (sender: PeerAddress, msg: string) => void;
|
||||||
|
@ -50,6 +50,8 @@ export class Subscription {
|
||||||
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`);
|
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `ZeroMQ subscribtion received msg: ${msgStr}`);
|
||||||
this.cb(senderStr, msgStr);
|
this.cb(senderStr, msgStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug(`[${this.pubSub.rhizomeNode.config.peerId}]`, `done waiting for subscription socket for topic ${this.topic}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,12 +91,12 @@ export class PubSub {
|
||||||
});
|
});
|
||||||
|
|
||||||
this.libp2p.addEventListener("peer:discovery", (event) => {
|
this.libp2p.addEventListener("peer:discovery", (event) => {
|
||||||
debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail, null, 2)}`);
|
debug(`[${this.rhizomeNode.config.peerId}]`, `found peer: ${JSON.stringify(event.detail)}`);
|
||||||
this.libp2p?.dial(event.detail.multiaddrs);
|
this.libp2p?.dial(event.detail.multiaddrs);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.libp2p.addEventListener("peer:connect", (event) => {
|
this.libp2p.addEventListener("peer:connect", (event) => {
|
||||||
debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail, null, 2)}`);
|
debug(`[${this.rhizomeNode.config.peerId}]`, `connected to peer: ${JSON.stringify(event.detail)}`);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,8 +2,7 @@ import Debug from 'debug';
|
||||||
import {EventEmitter} from 'node:events';
|
import {EventEmitter} from 'node:events';
|
||||||
import {Message, Reply, Request} from 'zeromq';
|
import {Message, Reply, Request} from 'zeromq';
|
||||||
import {RhizomeNode} from './node.js';
|
import {RhizomeNode} from './node.js';
|
||||||
import {RequestMethods} from './peers.js';
|
import {PeerAddress, RequestMethods} from './peers.js';
|
||||||
import {PeerAddress} from './types.js';
|
|
||||||
const debug = Debug('rz:request-reply');
|
const debug = Debug('rz:request-reply');
|
||||||
|
|
||||||
export type PeerRequest = {
|
export type PeerRequest = {
|
||||||
|
|
28
src/types.ts
28
src/types.ts
|
@ -18,31 +18,3 @@ export type ViewMany<T> = {
|
||||||
[key: DomainEntityID]: T;
|
[key: DomainEntityID]: T;
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: Move to ./peers.ts
|
|
||||||
export class PeerAddress {
|
|
||||||
addr: string;
|
|
||||||
port: number;
|
|
||||||
|
|
||||||
constructor(addr: string, port: number) {
|
|
||||||
this.addr = addr;
|
|
||||||
this.port = port;
|
|
||||||
}
|
|
||||||
|
|
||||||
static fromString(addrString: string): PeerAddress {
|
|
||||||
const [addr, port] = addrString.trim().split(':');
|
|
||||||
return new PeerAddress(addr, parseInt(port));
|
|
||||||
}
|
|
||||||
|
|
||||||
toAddrString() {
|
|
||||||
return `${this.addr}:${this.port}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
toJSON() {
|
|
||||||
return this.toAddrString();
|
|
||||||
}
|
|
||||||
|
|
||||||
isEqual(other: PeerAddress) {
|
|
||||||
return this.addr === other.addr && this.port === other.port;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue