refactored rollup for code clarity
Gitea Actions Demo / Explore-Gitea-Actions (push) Failing after 35s Details

This commit is contained in:
Ladd Hoffman 2024-05-03 14:18:20 -05:00
parent 69c869a693
commit d3f4740422
14 changed files with 375 additions and 249 deletions

View File

@ -5,13 +5,15 @@ const {
appState,
proposalEventIds,
} = require('../util/db');
const { submitRollup, resetBatch } = require('./rollup');
const { submitRollup } = require('./rollup');
const { resetBatchItems } = require('./rollup/batch-items');
const {
BOT_INSTANCE_ID,
ETH_NETWORK,
} = process.env;
// TODO: Refactor into separate files
const handleCommand = async (client, roomId, event) => {
// Don't handle unhelpful events (ones that aren't text messages, are redacted, or sent by us)
if (event.content?.msgtype !== 'm.text') return;
@ -43,6 +45,7 @@ const handleCommand = async (client, roomId, event) => {
try {
const proposalEventId = await proposalEventIds.get(proposalIndex);
const proposalEventUri = `https://matrix.to/#/${roomId}/${proposalEventId}`;
// TODO: Send HTML message
const content = {
body: `Proposal ${proposalIndex}: ${proposalEventUri}`,
msgtype: 'm.text',
@ -67,7 +70,7 @@ const handleCommand = async (client, roomId, event) => {
console.log(`!resetBatch roomId ${roomId} instanceId ${instanceId}`);
if (instanceId === BOT_INSTANCE_ID) {
console.log('!resetBatch');
const batchItems = await resetBatch();
const batchItems = await resetBatchItems();
await client.replyText(roomId, event, `Reset batch, now contains ${batchItems.length} items`);
}
}

View File

@ -25,6 +25,7 @@ const start = () => {
console.log('post.content:', post.content);
// Send matrix room event
// TODO: Send HTML message
let message = `Proposal ${proposalIndex}\n\n${post.content}`;
if (post.embeddedData && Object.entries(post.embeddedData).length) {
message += `\n\n${JSON.stringify(post.embeddedData, null, 2)}`;

View File

@ -0,0 +1,65 @@
const { rollup } = require('../../util/contracts');
const { applicationData, matrixPools } = require('../../util/db');
const read = require('../../util/forum/read');
const { initiateMatrixPool } = require('./matrix-pools/initiate');
let batchItems;
const initializeBatchItems = async () => {
try {
batchItems = await applicationData.get('batchItems');
} catch (e) {
batchItems = [];
}
};
const getBatchItems = () => batchItems;
const addBatchItem = async (postId) => {
batchItems.push(postId);
await applicationData.put('batchItems', batchItems);
};
const clearBatchItems = async (itemsToClear) => {
batchItems = batchItems.filter((item) => !itemsToClear.includes(item));
await applicationData.put('batchItems', batchItems);
};
const resetBatchItems = async () => {
batchItems = [];
// Read from Rollup.items
const itemCount = await rollup.itemCount();
const promises = [];
for (let i = 0; i < itemCount; i += 1) {
promises.push(rollup.items(i));
}
const batchItemsInfo = await Promise.all(promises);
batchItems = batchItemsInfo.map((x) => x.postId);
await applicationData.put('batchItems', batchItems);
// Make sure there's a matrix pool for each batch item.
// If there's not, then let's start one.
await Promise.each(batchItemsInfo, async ({ postId, sender, fee }) => {
let post;
try {
post = await read(postId);
} catch (e) {
console.error(`Post ID ${postId} not found`);
return;
}
try {
await matrixPools.get(postId);
} catch (e) {
await initiateMatrixPool(postId, post, sender, fee);
}
});
return batchItems;
};
module.exports = {
initializeBatchItems,
getBatchItems,
addBatchItem,
clearBatchItems,
resetBatchItems,
};

View File

@ -0,0 +1,29 @@
const { rollup, wallet } = require('../../util/contracts');
let batchWorker;
const getCurrentBatchWorker = () => batchWorker;
const initializeBatchWorker = async () => {
batchWorker = await rollup.batchWorker();
console.log('At startup, batch worker:', batchWorker);
rollup.on('BatchWorkerAssigned', async (batchWorker_) => {
batchWorker = batchWorker_;
console.log('Batch worker assigned:', batchWorker);
if (batchWorker === await wallet.getAddress()) {
console.log('This instance is the new batch worker');
}
});
};
const setBatchWorker = (batchWorker_) => {
batchWorker = batchWorker_;
};
module.exports = {
getCurrentBatchWorker,
initializeBatchWorker,
setBatchWorker,
};

View File

@ -0,0 +1,31 @@
const read = require('../../util/forum/read');
const { matrixPools } = require('../../util/db');
const computeAuthorWeights = async (batchItems_) => {
const weights = {};
await Promise.each(batchItems_, async (postId) => {
const post = await read(postId);
const matrixPool = await matrixPools.get(postId);
const { fee, result: { votePasses, quorumMet } } = matrixPool;
post.authors.forEach(({ authorAddress, weightPPM }) => {
weights[authorAddress] = weights[authorAddress] ?? 0;
if (votePasses && quorumMet) {
// scale by matrix pool outcome and strength
weights[authorAddress] += weightPPM * fee;
}
// TODO: Rewards for policing
// TODO: Propagation via references
});
});
// Rescale author weights so they sum to 1000000
const sumOfWeights = Object.values(weights).reduce((t, v) => t + v, 0);
const scaledWeights = Object.values(weights)
.map((weight) => Math.floor((weight * 1000000) / sumOfWeights));
const sumOfScaledWeights = Object.values(scaledWeights).reduce((t, v) => t + v, 0);
scaledWeights[0] += 1000000 - sumOfScaledWeights;
const authors = Object.keys(weights)
.map((authorAddress, i) => ({ authorAddress, weightPPM: scaledWeights[i] }));
return authors;
};
module.exports = computeAuthorWeights;

View File

@ -0,0 +1,23 @@
const {
dao,
} = require('../../util/contracts');
const computeMatrixPoolResult = async (matrixPool) => {
// This should already contain all the info we need to evaluate the outcome
const { stakes, quorum, winRatio } = matrixPool;
const stakedFor = stakes
.filter((x) => x.inFavor)
.reduce((total, { amount }) => total + amount, 0);
const stakedAgainst = stakes
.filter((x) => !x.inFavor)
.reduce((total, { amount }) => total + amount, 0);
const votePasses = stakedFor * winRatio[1] >= (stakedFor + stakedAgainst) * winRatio[0];
const totalSupply = Number(await dao.totalSupply());
const quorumMet = (stakedFor + stakedAgainst) * quorum[1] >= totalSupply * quorum[0];
const result = {
stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet,
};
return result;
};
module.exports = computeMatrixPoolResult;

View File

@ -0,0 +1,9 @@
const {
ROLLUP_BATCH_SIZE,
ROLLUP_AVAILABILITY_STAKE_DURATION,
} = process.env;
module.exports = {
rollupBatchSize: ROLLUP_BATCH_SIZE || 10,
availabilityStakeDuration: ROLLUP_AVAILABILITY_STAKE_DURATION || 600,
};

View File

@ -1,248 +1,24 @@
const Promise = require('bluebird');
const { v4: uuidv4 } = require('uuid');
const { isEqual } = require('lodash');
const { registerDecider } = require('../validation-pools');
const { registerMatrixEventHandler, sendMatrixEvent, sendMatrixText } = require('../../matrix-bot');
const { registerMatrixEventHandler, sendMatrixText } = require('../../matrix-bot');
const { matrixPools, matrixUserToAuthorAddress, applicationData } = require('../../util/db');
const {
rollup, wallet, work2, dao,
rollup, wallet,
} = require('../../util/contracts');
const read = require('../../util/forum/read');
const write = require('../../util/forum/write');
const addPostWithRetry = require('../../util/add-post-with-retry');
const callWithRetry = require('../../util/call-contract-method-with-retry');
const { availabilityStakeDuration } = require('./config');
const {
ROLLUP_BATCH_SIZE,
ROLLUP_AVAILABILITY_STAKE_DURATION,
} = process.env;
stakeRollupAvailability, getBatchPostAuthorWeights, authorsMatch, validatePost,
} = require('./utils');
const computeMatrixPoolResult = require('./compute-matrix-pool-result');
const { initializeBatchItems } = require('./batch-items');
const submitRollup = require('./submit-rollup');
const { getCurrentBatchWorker, initializeBatchWorker } = require('./batch-worker');
const { initiateMatrixPool } = require('./matrix-pools/initiate');
const rollupBatchSize = ROLLUP_BATCH_SIZE || 10;
const availabilityStakeDuration = ROLLUP_AVAILABILITY_STAKE_DURATION || 600;
let batchWorker;
let batchItems;
const stakeRollupAvailability = async () => {
const currentRep = await dao.balanceOf(await wallet.getAddress());
if (currentRep) {
await callWithRetry(() => dao.stakeAvailability(
rollup.target,
currentRep,
availabilityStakeDuration,
));
}
};
const getBatchPostAuthorWeights = async (batchItems_) => {
const weights = {};
await Promise.each(batchItems_, async (postId) => {
const post = await read(postId);
const matrixPool = await matrixPools.get(postId);
const { fee, result: { votePasses, quorumMet } } = matrixPool;
post.authors.forEach(({ authorAddress, weightPPM }) => {
weights[authorAddress] = weights[authorAddress] ?? 0;
if (votePasses && quorumMet) {
// scale by matrix pool outcome and strength
weights[authorAddress] += weightPPM * fee;
}
// TODO: Rewards for policing
// TODO: Propagation via references
});
});
// Rescale author weights so they sum to 1000000
const sumOfWeights = Object.values(weights).reduce((t, v) => t + v, 0);
const scaledWeights = Object.values(weights)
.map((weight) => Math.floor((weight * 1000000) / sumOfWeights));
const sumOfScaledWeights = Object.values(scaledWeights).reduce((t, v) => t + v, 0);
scaledWeights[0] += 1000000 - sumOfScaledWeights;
const authors = Object.keys(weights)
.map((authorAddress, i) => ({ authorAddress, weightPPM: scaledWeights[i] }));
return authors;
};
const submitRollup = async () => {
if (!batchItems.length) {
return { batchItems: [] };
}
const authors = await getBatchPostAuthorWeights(batchItems);
// TODO: Compute citations as aggregate of the citations of posts in the batch
const citations = [];
const content = `Batch of ${batchItems.length} items`;
const embeddedData = {
batchItems,
nonce: uuidv4().replaceAll('-', ''),
};
const sender = await wallet.getAddress();
const contentToVerify = `${content}\n\n${JSON.stringify(embeddedData, null, 2)}`;
const signature = await wallet.signMessage(contentToVerify);
// Write to the forum database
const { hash: batchPostId } = await write({
sender, authors, citations, content, embeddedData, signature,
});
// Add rollup post on-chain
await addPostWithRetry(authors, batchPostId, citations);
// Stake our availability to be the next rollup worker
await stakeRollupAvailability();
// Call Rollup.submitBatch
console.log('Submitting batch', { batchPostId, batchItems, authors });
const poolDuration = 60;
await callWithRetry(() => rollup.submitBatch(batchPostId, batchItems, poolDuration));
// Send matrix event
await sendMatrixEvent('io.dgov.rollup.submit', { batchPostId, batchItems, authors });
// Clear the batch in preparation for next batch
batchItems = [];
await applicationData.put('batchItems', batchItems);
return {
batchPostId,
batchItems,
authors,
};
};
const evaluateMatrixPoolOutcome = async (postId, { dryRun } = {}) => {
const matrixPool = await matrixPools.get(postId);
// This should already contain all the info we need to evaluate the outcome
const { stakes, quorum, winRatio } = matrixPool;
const stakedFor = stakes
.filter((x) => x.inFavor)
.reduce((total, { amount }) => total + amount, 0);
const stakedAgainst = stakes
.filter((x) => !x.inFavor)
.reduce((total, { amount }) => total + amount, 0);
const votePasses = stakedFor * winRatio[1] >= (stakedFor + stakedAgainst) * winRatio[0];
const totalSupply = Number(await dao.totalSupply());
const quorumMet = (stakedFor + stakedAgainst) * quorum[1] >= totalSupply * quorum[0];
const result = {
stakedFor, stakedAgainst, totalSupply, votePasses, quorumMet,
};
if (!dryRun) {
console.log(`Matrix pool for post ${postId} outcome evaluated`, result);
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
sendMatrixEvent('io.dgov.pool.result', { postId, result });
batchItems.push(postId);
await applicationData.put('batchItems', batchItems);
let submitBatch = false;
if (batchWorker === '0x0000000000000000000000000000000000000000') {
// If there's no batch worker, we should stake our availability
// and then submit the batch immediately.
console.log('There is no batch worker assigned. Staking availability and submitting first batch.');
submitBatch = true;
} else if (batchWorker === await wallet.getAddress()) {
// If we are the batch worker, we should wait an appropriate amout of time /
// number of matrix pools before submitting a batch.
if (batchItems.length === rollupBatchSize) {
console.log(`Batch size = ${batchItems.length}. Submitting batch.`);
submitBatch = true;
}
}
if (submitBatch) {
await stakeRollupAvailability();
await submitRollup();
}
}
return result;
};
const authorsMatch = async (authors, expectedAuthors) => {
if (expectedAuthors.length !== authors.length) return false;
return authors.every(({ authorAddress, weightPPM }) => {
const expectedAuthor = expectedAuthors.find((x) => x.authorAddress === authorAddress);
return weightPPM === expectedAuthor.weightPPM;
});
};
const validateWorkEvidence = async (sender, post) => {
let valid = false;
if (sender === work2.target) {
const expectedContent = 'This is a work evidence post';
valid = post.content.startsWith(expectedContent);
}
console.log(`Work evidence ${valid ? 'matched' : 'did not match'} the expected content`);
return valid;
};
const validatePost = async ({
sender, post, postId, roomId, eventId, ...params
}) => {
const currentRep = Number(await dao.balanceOf(await wallet.getAddress()));
const valid = await validateWorkEvidence(sender, post);
const stake = { amount: currentRep, account: await wallet.getAddress(), inFavor: valid };
sendMatrixEvent('io.dgov.pool.stake', { postId, amount: currentRep, inFavor: valid });
const matrixPool = {
postId,
roomId,
eventId,
...params,
stakes: [stake],
};
console.log('matrixPool', matrixPool);
await matrixPools.put(postId, matrixPool);
};
const initiateMatrixPool = async (postId, post, sender, fee) => {
const duration = 20;
const quorum = [1, 3];
const winRatio = [1, 2];
const params = {
sender,
fee: Number(fee),
duration,
quorum,
winRatio,
};
console.log('sending matrix pool start event');
const { roomId, eventId } = await sendMatrixEvent('io.dgov.pool.start', {
postId,
...params,
});
console.log('sent matrix pool start event');
// Register our own stake and send a message
await validatePost({
sender, post, postId, roomId, eventId, ...params,
});
// Since we're assuming responsibility as the batch worker,
// set a timeout to evaulate the outcome
setTimeout(() => evaluateMatrixPoolOutcome(postId), duration * 1000);
};
const resetBatch = async () => {
batchItems = [];
// Read from Rollup.items
const itemCount = await rollup.itemCount();
const promises = [];
for (let i = 0; i < itemCount; i += 1) {
promises.push(rollup.items(i));
}
const batchItemsInfo = await Promise.all(promises);
batchItems = batchItemsInfo.map((x) => x.postId);
await applicationData.put('batchItems', batchItems);
// Make sure there's a matrix pool for each batch item.
// If there's not, then let's start one.
await Promise.each(batchItemsInfo, async ({ postId, sender, fee }) => {
let post;
try {
post = await read(postId);
} catch (e) {
console.error(`Post ID ${postId} not found`);
return;
}
try {
await matrixPools.get(postId);
} catch (e) {
await initiateMatrixPool(postId, post, sender, fee);
}
});
return batchItems;
};
const start = async () => {
console.log('registering validation pool decider for rollup');
registerDecider(async (pool, post) => {
@ -268,6 +44,7 @@ const start = async () => {
});
// Even if we're not the current batch worker, keep track of batch items
initializeBatchItems();
try {
batchItems = await applicationData.get('batchItems');
} catch (e) {
@ -275,24 +52,16 @@ const start = async () => {
}
// Check for an assigned batch worker
batchWorker = await rollup.batchWorker();
console.log('At startup, batch worker:', batchWorker);
await initializeBatchWorker();
// Stake availability and set an interval to maintain it
await stakeRollupAvailability();
setInterval(stakeRollupAvailability, availabilityStakeDuration * 1000);
rollup.on('BatchWorkerAssigned', async (batchWorker_) => {
batchWorker = batchWorker_;
console.log('Batch worker assigned:', batchWorker);
if (batchWorker === await wallet.getAddress()) {
console.log('This instance is the new batch worker');
}
});
/// `sender` is the address that called Rollup.addItem on chain, i.e. the Work2 contract.
rollup.on('BatchItemAdded', async (postId, sender, fee) => {
// If we are the batch worker or there is no batch worker, initiate a matrix pool
const batchWorker = getCurrentBatchWorker();
if (batchWorker === await wallet.getAddress()
|| batchWorker === '0x0000000000000000000000000000000000000000') {
let post;
@ -378,7 +147,7 @@ const start = async () => {
break;
}
// Compare batch worker's result with ours to verify and provide early warning
const expectedResult = await evaluateMatrixPoolOutcome(postId, { dryRun: true });
const expectedResult = await computeMatrixPoolResult(matrixPool);
if (!isEqual(result, expectedResult)) {
sendMatrixText(`Unexpected result for matrix pool, for post ${postId}. Result sent by ${event.sender}\n\n`
+ `received ${JSON.stringify(result)}\n`
@ -420,5 +189,4 @@ const start = async () => {
module.exports = {
start,
submitRollup,
resetBatch,
};

View File

@ -0,0 +1,44 @@
const { sendMatrixEvent } = require('../../../matrix-bot');
const { wallet } = require('../../../util/contracts');
const { matrixPools } = require('../../../util/db');
const { addBatchItem, getBatchItems } = require('../batch-items');
const { getCurrentBatchWorker } = require('../batch-worker');
const computeMatrixPoolResult = require('../compute-matrix-pool-result');
const { rollupBatchSize } = require('../config');
const submitRollup = require('../submit-rollup');
const { stakeRollupAvailability } = require('../utils');
const evaluateMatrixPoolOutcome = async (postId) => {
const matrixPool = await matrixPools.get(postId);
const result = await computeMatrixPoolResult(matrixPool);
console.log(`Matrix pool for post ${postId} outcome evaluated`, result);
matrixPool.result = result;
await matrixPools.put(postId, matrixPool);
sendMatrixEvent('io.dgov.pool.result', { postId, result });
await addBatchItem(postId);
let submitBatch = false;
const batchWorker = getCurrentBatchWorker();
if (batchWorker === '0x0000000000000000000000000000000000000000') {
// If there's no batch worker, we should stake our availability
// and then submit the batch immediately.
console.log('There is no batch worker assigned. Staking availability and submitting first batch.');
submitBatch = true;
} else if (batchWorker === await wallet.getAddress()) {
// If we are the batch worker, we should wait an appropriate amout of time /
// number of matrix pools before submitting a batch.
const batchItems = getBatchItems();
if (batchItems.length === rollupBatchSize) {
console.log(`Batch size = ${batchItems.length}. Submitting batch.`);
submitBatch = true;
}
}
if (submitBatch) {
await stakeRollupAvailability();
await submitRollup();
}
return result;
};
module.exports = evaluateMatrixPoolOutcome;

View File

@ -0,0 +1,40 @@
const { sendMatrixEvent } = require('../../../matrix-bot');
const { validatePost } = require('../utils');
const evaluateMatrixPoolOutcome = require('./evaluate');
const initiateMatrixPool = async (postId, post, sender, fee) => {
const duration = 20;
const quorum = [1, 3];
const winRatio = [1, 2];
const params = {
sender,
fee: Number(fee),
duration,
quorum,
winRatio,
};
console.log('sending matrix pool start event');
const { roomId, eventId } = await sendMatrixEvent('io.dgov.pool.start', {
postId,
...params,
});
console.log('sent matrix pool start event');
// Register our own stake and send a message
await validatePost({
sender, post, postId, roomId, eventId, ...params,
});
// Since we're assuming responsibility as the batch worker,
// set a timeout to evaulate the outcome
setTimeout(
() => {
evaluateMatrixPoolOutcome(postId);
},
duration * 1000,
);
};
module.exports = {
initiateMatrixPool,
};

View File

@ -0,0 +1,51 @@
const { v4: uuidv4 } = require('uuid');
const write = require('../../util/forum/write');
const addPostWithRetry = require('../../util/add-post-with-retry');
const callWithRetry = require('../../util/call-with-retry');
const { getBatchItems, clearBatchItems } = require('./batch-items');
const computeAuthorWeights = require('./compute-author-weights');
const { wallet, rollup } = require('../../util/contracts');
const { sendMatrixEvent } = require('../../matrix-bot');
const { stakeRollupAvailability } = require('./utils');
const submitRollup = async () => {
const batchItems = getBatchItems();
if (!batchItems.length) {
return { batchItems: [] };
}
const authors = await computeAuthorWeights(batchItems);
// TODO: Compute citations as aggregate of the citations of posts in the batch
const citations = [];
const content = `Batch of ${batchItems.length} items`;
const embeddedData = {
batchItems,
nonce: uuidv4().replaceAll('-', ''),
};
const sender = await wallet.getAddress();
const contentToVerify = `${content}\n\n${JSON.stringify(embeddedData, null, 2)}`;
const signature = await wallet.signMessage(contentToVerify);
// Write to the forum database
const { hash: batchPostId } = await write({
sender, authors, citations, content, embeddedData, signature,
});
// Add rollup post on-chain
await addPostWithRetry(authors, batchPostId, citations);
// Stake our availability to be the next rollup worker
await stakeRollupAvailability();
// Call Rollup.submitBatch
console.log('Submitting batch', { batchPostId, batchItems, authors });
const poolDuration = 60;
await callWithRetry(() => rollup.submitBatch(batchPostId, batchItems, poolDuration));
// Send matrix event
await sendMatrixEvent('io.dgov.rollup.submit', { batchPostId, batchItems, authors });
// Clear the batch in preparation for next batch
await clearBatchItems(batchItems);
return {
batchPostId,
batchItems,
authors,
};
};
module.exports = submitRollup;

View File

@ -0,0 +1,62 @@
const { sendMatrixEvent } = require('../../matrix-bot');
const callWithRetry = require('../../util/call-with-retry');
const {
rollup, wallet, dao,
work2,
} = require('../../util/contracts');
const { matrixPools } = require('../../util/db');
const { availabilityStakeDuration } = require('./config');
const stakeRollupAvailability = async () => {
const currentRep = await dao.balanceOf(await wallet.getAddress());
if (currentRep) {
await callWithRetry(() => dao.stakeAvailability(
rollup.target,
currentRep,
availabilityStakeDuration,
));
}
};
const authorsMatch = async (authors, expectedAuthors) => {
if (expectedAuthors.length !== authors.length) return false;
return authors.every(({ authorAddress, weightPPM }) => {
const expectedAuthor = expectedAuthors.find((x) => x.authorAddress === authorAddress);
return weightPPM === expectedAuthor.weightPPM;
});
};
const validateWorkEvidence = async (sender, post) => {
let valid = false;
if (sender === work2.target) {
const expectedContent = 'This is a work evidence post';
valid = post.content.startsWith(expectedContent);
}
console.log(`Work evidence ${valid ? 'matched' : 'did not match'} the expected content`);
return valid;
};
const validatePost = async ({
sender, post, postId, roomId, eventId, ...params
}) => {
const currentRep = Number(await dao.balanceOf(await wallet.getAddress()));
const valid = await validateWorkEvidence(sender, post);
const stake = { amount: currentRep, account: await wallet.getAddress(), inFavor: valid };
sendMatrixEvent('io.dgov.pool.stake', { postId, amount: currentRep, inFavor: valid });
const matrixPool = {
postId,
roomId,
eventId,
...params,
stakes: [stake],
};
console.log('matrixPool', matrixPool);
await matrixPools.put(postId, matrixPool);
};
module.exports = {
stakeRollupAvailability,
authorsMatch,
validateWorkEvidence,
validatePost,
};

View File

@ -1,4 +1,4 @@
const callWithRetry = require('./call-contract-method-with-retry');
const callWithRetry = require('./call-with-retry');
const { dao } = require('./contracts');
const addPostWithRetry = async (authors, hash, citations) => {