dao-governance-framework/forum-network/public/classes/forum-node.js

113 lines
3.8 KiB
JavaScript

import { Actor } from './actor.js';
import { Action } from './action.js';
import {
Message, PostMessage, PeerMessage, messageFromJSON,
} from './message.js';
import { CryptoUtil } from './crypto.js';
import { ForumView } from './forum-view.js';
import { PrioritizedQueue } from './prioritized-queue.js';
export class ForumNode extends Actor {
constructor(name, scene) {
super(name, scene);
this.forumView = new ForumView();
this.queue = new PrioritizedQueue();
this.actions = {
storePost: new Action('store post', scene),
peerMessage: new Action('peer message', scene),
};
}
// Generate a signing key pair and connect to the network
async initialize(forumNetwork) {
this.keyPair = await CryptoUtil.generateAsymmetricKey();
this.forumNetwork = forumNetwork.addNode(this);
this.status.set('Initialized');
return this;
}
// Send a message to all other nodes in the network
async broadcast(message) {
await message.sign(this.keyPair);
const otherForumNodes = this.forumNetwork
.listNodes()
.filter((forumNode) => forumNode.keyPair.publicKey !== this.keyPair.publicKey);
for (const forumNode of otherForumNodes) {
// For now just call receiveMessage on the target node
this.actions.peerMessage.log(this, forumNode, null, message.content);
await forumNode.receiveMessage(JSON.stringify(message.toJSON()));
}
}
// Perform minimal processing to ingest a message.
// Enqueue it for further processing.
async receiveMessage(messageStr) {
const messageJson = JSON.parse(messageStr);
const senderReputation = this.forumView.getReputation(messageJson.publicKey) || 0;
this.queue.add(messageJson, senderReputation);
}
// Process next highest priority message in the queue
async processNextMessage() {
const messageJson = this.queue.pop();
if (!messageJson) {
return null;
}
return this.processMessage(messageJson);
}
// Process a message from the queue
async processMessage(messageJson) {
try {
await Message.verify(messageJson);
} catch (e) {
this.actions.processMessage.log(this, this, 'invalid signature', messageJson, '-x');
console.log(`${this.name}: received message with invalid signature`);
return;
}
const { publicKey } = messageJson;
const message = messageFromJSON(messageJson);
console.log(`${this.name}: processMessage`, message);
if (message instanceof PostMessage) {
await this.processPostMessage(publicKey, message.content);
} else if (message instanceof PeerMessage) {
await this.processPeerMessage(publicKey, message.content);
} else {
// Unknown message type
// Penalize sender for wasting our time
console.log(`${this.name}: penalizing sender for unknown message type ${message.type}`);
this.forumView.incrementReputation(message.publicKey, -1);
}
}
// Process an incoming post, received by whatever means
processPost(authorId, post, stake) {
if (!post.id) {
post.id = CryptoUtil.randomUUID();
}
this.actions.storePost.log(this, this, null, { authorId, post, stake });
this.forumView.addPost(authorId, post.id, post, stake);
}
// Process a post we received in a message
async processPostMessage(authorId, { post, stake }) {
this.processPost(authorId, post, stake);
await this.broadcast(
new PeerMessage({
posts: [{ authorId, post, stake }],
}),
);
}
// Process a message we receive from a peer
async processPeerMessage(peerId, { posts }) {
// We are trusting that the peer verified the signatures of the posts they're forwarding.
// We could instead have the peer forward the signed messages and re-verify them.
for (const { authorId, post, stake } of posts) {
this.processPost(authorId, post, stake);
}
}
}