From 518bc4eb44ddbf91c727cb273ded23659f5f6c89 Mon Sep 17 00:00:00 2001 From: Ladd Date: Sun, 22 Dec 2024 09:13:44 -0600 Subject: [PATCH] peers are able to sync --- .gitignore | 2 + src/.collection-layer.ts.swp | Bin 12288 -> 0 bytes src/.example-app.ts.swp | Bin 12288 -> 0 bytes src/.object-layer.ts.swp | Bin 16384 -> 0 bytes src/collection-layer.ts | 189 ++++++++++++++++++++++++++++++----- src/deltas.ts | 28 +++--- src/example-app.ts | 43 ++++++-- src/object-layer.ts | 138 +------------------------ src/peers.ts | 66 ++++++++++-- 9 files changed, 271 insertions(+), 195 deletions(-) delete mode 100644 src/.collection-layer.ts.swp delete mode 100644 src/.example-app.ts.swp delete mode 100644 src/.object-layer.ts.swp diff --git a/.gitignore b/.gitignore index 1eae0cf..5a541bd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ dist/ node_modules/ +*.swp +*.swo diff --git a/src/.collection-layer.ts.swp b/src/.collection-layer.ts.swp deleted file mode 100644 index 956593202e3f477b90830dd34bf24a3ffefc54a9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeI2zi!k(5XLu@hyVgnA)!fV66xZ*00NN&C=f+}Ac_D5LTJz4+&S30*6v;|$KQfS z;02%o0WFAvjyHgk8lHe2_}2C%;S?!~s3MJ}PrKfmnQ#2tZBBN(b$fA%&PTHX*I^-! zA5)$F``qZaJPmc!U z$h)0uaRd`!0=p$JE>54CZH26plM{6G$gSNL;CLp$1egF5U;<2l2`~XBzy$U%0bh)X zXXt-l{S1@!I5slYw>&TbCcp%k025#WOn?b60Vco%m;e)C0(+2v#2fzm2ZZh4cc2^4b?6$@g2tfG)9=t1Urc}rFaajO1egF5U;<2l2{3{GC-6iNZGM03=mKfg zrzKe|IB!$kY0%}3uahrMP0tIpUKr~s&ZKi&eslClh)}HAq$rB2Nv}wxS5r#+)DISu z1{Ky6%KB8fX+)Ikm~UfxrA#ReN1cX7!HmdvQy2L`p-?0Y^H`0IM)SSc8#@)&=BO&S zjQKRuX-glM=E$^H@%42`91Wfqm3fuEjj@(_Fb>10aafz93gR}VH!Bi@yZ1uk*5H3l zL(fCwsy*Gy+se}8y7p*#LxeCI)<<{P7;MTS_RRFM1jLl4wxX4u#nh5o_ev)!p?>PS zq@qqlUbZtuPURBY5If4!2Z&EIuWW-Hw!cBECe?LW*DdQRU1o(1mf>6WI?bxwtl=P$ z98w=pxrTj*Kds5EhvL@csQW`xESrKn&C?oOflW+a)FdQ*di Y^dz&4R&b0>zl_UsqshfxJYOXK0?SS}r~m)} diff --git a/src/.example-app.ts.swp b/src/.example-app.ts.swp deleted file mode 100644 index cfefe4b28db95d7903d5610bcfea8b6e7e44410c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12288 zcmeHNO^h7H6)qD(0 zX=bWky?XC^e{Ie77N0!+I6dee7I@t%#GhXt4xfDM9r5?O4vA0(fnC1Z%gWt<-_@jq zc+m>B(~S}6T=9Nj$9wU$ zeybU%8K@bk8K@bk8K@bk8K@bk8K@bk8K@bk8Mp=+koblFx?c$Km+N6Xe*Z83|NrqV zLVN_g3Y-U4fZKqtZx-TR;2Gd);3PP1K<*H5f}rffn|Vz|J)$N zzktiYm%w|#JHXq(bHJUz=f4! zjfaqCc%3^Ft#hB%1_x)N`K`xJu0G*wld5Q_1`{u3orR_`JqPhZ`b>26Lc)E+0iD@f zQ)&AiH|=f1o+_{0*vp0euJ7CB-3N>e6>AlNCAtSJ_2Wp#A^TxG^vhkos24-CjA)5|I8d~-v}I3g`4nkKB9w8^w7BSU5+O{+z{%+M&_q#(9C zus_y}J*Fg$6FHRmX(LUhkH7*bRRb%Ta5{+7F-e#s15)EL2TB?aCxt4_xt1}VO%CHsba~hHI*|&*5EfTsKtc6ox zgR&%m5y3)}GAA=*C5}$HEziq3bTl%`Od7QG2yIpM$su>>acM^WAdKVGb01R~1@YKx z(A{)+adEMM`-vhs<`Z!=L=tq13?lCY{+6w=(3}ZWHj7miV6C*Nb=a-uK`j?&Va*cw zV?1YHF5<$C{fIZoI;)E|AUCBlZgnk<$4awjD?bi5*m@wNvGYO-?Ce0v2}KNWo3QZsVuCmu6Jjh z(f*ww?lT;DvlCXJNwcq3L6eXME@{Y?fp_$Y%ns@*Y4EDWz zmFxFoz5Fr9?UkJbcO##QyS#?*q@G0mnUliuo^zkF$xwi1q_iLOtijF&8?^Fshw{nu z>>okH-se2mpWHFqwroY@=Ne^Q(hGzz@<2tHQoB@H4(+u&a=;aYUlh&!mX$&$oHo(C zcQjku*ic#M`rDInyTHIl>12Y6*%pSjrfZ7DS!+Xa-L_E26(-8CykX|k{`}~psT%fv zK0juJ?()I)^e*B`eHn(xGaG2_Mt}+&&*=H5-8|S(0arqU{$GeSZOw&jhYAxKu zah9ZNj9I(^f?ONKAcH#ghd!|#M^2yZUL#g%CDD8ov9wK{W6GwRiR_l?3%NWvm*&$-~f*wK_VZsk# zb!1zv;S0vuvkc#{gZ5zuyL+5);<6WF>cdz!w$cCH6n7~Be+77pQ$t&{qjBzPvt>s; zYnw1lX@J2-bAB5(;8|hrp>0h&2UtZoVGN(D>}=Ygo$i`am4oQOc7C-|JDI8q%3N#~ zPY{(eaqm1Ht6MG!sz+vHuOgc#Kvk+T@|qRFB}M)7hTx zrmK6`!+IeS5y=rn2q8yAi69b0BDr7$93bKVToB5Ezkm>MKoJ)LNRi-x_};6k?w++B zD~JP#UgeW#y6flF_kOD0t4VMF{R=0>J>3Hy{@(0)e}8B&_|`K|dN1zY;YBhC=<(J5 zsNG%fyIL#H`!qG2&W7Qql98H@lR!=D%%4tr4=O+37s;{8y1BmkI8BR&frf$C$Uu?z z_uak2yW{o)`>E$Gdv=Sr>^$=t2Wgr&3^WWh3^WWh3^WWh3^WWh3^WY7z@J}_bplTT=YSR92yi#>F5vgC!??g_fo0$ba1gi| z_~Z5H3p@hc3A_{73B0t;^L__>2`GTufIpxB^Ehxna0Bo?6l^wt1He9DJ3u=8t1H}G z+13R9!oFzBczk3i%}uSA{*jURi6G|@WjMM}tQP>e9N?v^8=dp=Y`_51i4 zh^EoFOQ51$>Xry&KPoiOlL>Xse(6x3atNI&KkSFTFr7rEM#-8Ay5a#P{G^BiQD`N? zoa9mG@=W@lp~?dI(NYEvFNc_p&z0N_4Q@s7m>`dFi5O(`een|W1Xv! zSP%1IMbfb%L&I#8WU51)uR|Oei*S@iJS_xWQ<-4~QpLKTWFsb3cS?~?6M;I(a+=B? zDt|RiLTD-;K!OY+48~UGG)sE27mZ2op^DOe5mhwL(aJKNcSLWDc~kt}hK7()#EY@a z6)CM#id1HKNNdfYJIq#%vk;3kgHl4QfwT~6Vb@Me%aorfq9v6Y0%sUMS6PfTty#iK z&5I}jpCCgHEQ=V*Se=^<`fH(Ioa8FNR1!;2Y-L!W94Z-p;fVfvaP|ElbI1^nvxem5oBgE%ive^33Np&V*;RR^0BfbtMA?- z!n3T4pINn6)wmm>3J>dlpX9uTPVN`YNt&S!aG3rEF z2ElBe6{=#%suU8Z3Pz>$GGtppGnUS^qggI#DKO(D4^8*ohwK2AC{VtW52N>Gq^%T- z{0p-#M#?aKyOLz83mwc$f7ou*sFtvmDRJnq4T&`7gw>S8ZVH77T;yEC;7j9FVd56Z zQ@LuHl|iFRi$@n{5Op}-PhMCpbBMmC=p0z;AcHZkGk&@8iQWwc%P;19&iptfT% zEvCsreKs*WD-dLZWOU}t!qGN2BmI^#&xn>4S)3Ow)<8fQx>&(c>TJq2|$&6wKHrD+7>IIcV>*QEBdFwI3)M0iM%iDjl8!pLL1 z72A?WiZ;q7Do>#2Eq^UDIS629WQn`#X*0{KH80&2(W%_<+gvLA%lV8+yTy364 z2?Bq9&l)ki5Iw0yZ*p+NW@aIrJX((DDLS0aaY7h%leq0%tm8tzGwE6YE|zIkpT|IS!>H!bUAbicQhal9AX$ztx`cvvHaydk%VKOO8aH zNrx@_bf-x+U2kvqHrq`XJqkKK#ou+UBfG;yPv>ClezlfRMn4=>bJSN=mmThboCEF1 zZT5*d#61$u#5`tQ>DkM@<1Mu>{VTlYQMlvrWS!6UNs+m8n%OBh!eVyLtayvZQ7*)> zx{CRppsDHHjROEP-qz}q1!Yi_4^7$F-=$WsIubQl!a;G6;9!l!1MY#9CNvkF7O@3z zmVpe0@+m}lkMjSUkc-YDSEc;_n9Uo1i9G)~;OoHCz+-?9d;<6|a02)ca1C$~IsXOV zBfu`;ZNQI^&u;=B1Qvk3z{|+pp9DS+ECNS>S>Ua}3&__m02@F890guNUj75%dEooN zcC?(>|>h+0Bg8>l+7%XNv0 zX8qjN=ED^aD%`ok6+xsP$G$KvY&KFmiC;E}?Nk{t@;uaiT%u}cFj#>aG9zQ^0IKZ3DCYeK*H%l8#*pp`ORbJ<|HfL=qN=I8wbu1luL8It zK&9x?O3L+<$VsHSQ8H+srBaPyY4_R5g1vdVHSw}#jw>f#Oo+`{Nh6mgC{#S46ovJH zie)rDN|k_`ebHf*OJ64?5;ZEdo4OR8NpVPB4RP8u6s}&&g9tEf;-49ODnEy>M~`~sysPSQJE23 ztaJqvuA^?2p2~by1(|+G75MgKO~$mYmp=p*RH*_oRj#KJ&BdEjWzIq=CTFMh&}G)= z3{zzp3LY(FF;ox-zkyRV-I2t&i7_#ZsxIaYlfuP5=Fda|!!19+q?ar%4z9j5sz>nB zNWjt#Q1B-^Fy@B}Y^F3x1naYc;f<-9mUX$S7x}&z)b-zlW+|Jqa1N>j%`$iLy?}ZXmI(IjF7Ikr_cx&Az_S! z62*G(); +// class EntityType { +// name: string; +// properties?: Property[]; +// constructor(name: string) { +// this.name = name; +// } +// } + +// class Entity { +// type: EntityType; +// properties?: object; +// constructor(type: EntityType) { +// this.type = type; +// } +// } + +// class Collection { + // update(entityId, properties) // ... -} +// } -export class Collections { - collections = new Map(); +// export class Collections { +// collections = new Map(); +// } + + +export class Collection { + entities = new Map(); + eventStream = new EventEmitter(); + constructor() { + console.log('COLLECTION SUBSCRIBING TO DELTA STREAM'); + subscribeDeltas((delta: Delta) => { + // TODO: Make sure this is the kind of delta we're looking for + console.log('COLLECTION RECEIVED DELTA'); + this.applyDelta(delta); + }); + this.eventStream.on('create', (entity: Entity) => { + console.log(`new entity!`, entity); + }); + } + + // Applies the javascript rules for updating object values, + // e.g. set to `undefined` to delete a property + updateEntity(entityId?: string, properties?: object, local = false, deltas?: Delta[]): Entity { + let entity: Entity | undefined; + let eventType: 'create' | 'update' | 'delete' | undefined; + entityId = entityId ?? randomUUID(); + entity = this.entities.get(entityId); + if (!entity) { + entity = new Entity(entityId); + entity.id = entityId; + eventType = 'create'; + } + const deltaBulider = new EntityPropertiesDeltaBuilder(entityId); + + if (!properties) { + // Let's interpret this as entity deletion + this.entities.delete(entityId); + // TODO: prepare and publish a delta + // TODO: execute hooks + eventType = 'delete'; + } else { + let anyChanged = false; + Object.entries(properties).forEach(([key, value]) => { + let changed = false; + if (entity.properties && entity.properties[key] !== value) { + entity.properties[key] = value; + changed = true; + } + if (local && changed) { + // If this is a change, let's generate a delta + deltaBulider.add(key, value); + // We append to the array the caller may provide + // We can update this count as we receive network confirmation for deltas + entity.ahead += 1; + } + anyChanged = anyChanged || changed; + }); + // We've noted that we may be ahead of the server, let's update our + // local image of this entity. + //* In principle, this system can recreate past or alternative states. + //* At worst, by replaying all the deltas up to a particular point. + //* Some sort of checkpointing strategy would probably be helpful. + //* Furthermore, if we can implement reversible transformations, + //* it would then be efficient to calculate the state of the system with + //* specific deltas removed. We could use it to extract a measurement + //* of the effects of some deltas' inclusion or exclusion, the + //* evaluation of which may lend evidence to some possible arguments. + + this.entities.set(entityId, entity); + if (anyChanged) { + deltas?.push(deltaBulider.delta); + eventType = eventType || 'update'; + } + } + if (eventType) { + this.eventStream.emit(eventType, entity); + } + return entity; + } + // We can update our local image of the entity, but we should annotate it + // to indicate that we have not yet received any confirmation of this delta + // having been propagated. + // Later when we receive deltas regarding this entity we can detect when + // we have received back an image that matches our target. + + // So we need a function to generate one or more deltas for each call to put/ + // maybe we stage them and wait for a call to commit() that initiates the + // assembly and transmission of one or more deltas + + applyDelta(delta: Delta) { + // TODO: handle delta representing entity deletion + console.log('applying delta:', delta); + const idPtr = delta.pointers.find(({localContext}) => localContext === 'id'); + if (!idPtr) { + console.error('encountered delta with no entity id', delta); + return; + } + const properties: EntityProperties = {}; + delta.pointers.filter(({localContext}) => localContext !== 'id') + .forEach(({localContext: key, target: value}) => { + properties[key] = value; + }, {}); + const entityId = idPtr.target as string; + // TODO: Handle the scenario where this update has been superceded by a newer one locally + this.updateEntity(entityId, properties); + } + + onCreate(cb: (entity: Entity) => void) { + this.eventStream.on('create', (entity: Entity) => { + cb(entity); + }); + } + onUpdate(cb: (entity: Entity) => void) { + this.eventStream.on('update', (entity: Entity) => { + cb(entity); + }); + } + put(entityId: string | undefined, properties: object): Entity { + const deltas: Delta[] = []; + const entity = this.updateEntity(entityId, properties, true, deltas); + deltas.forEach(async (delta: Delta) => { + await publishDelta(delta); + }); + return entity; + } + del(entityId: string) { + const deltas: Delta[] = []; + this.updateEntity(entityId, undefined, true, deltas); + deltas.forEach(async (delta: Delta) => { + await publishDelta(delta); + }); + } + get(id: string): Entity | undefined { + return this.entities.get(id); + } + getIds(): string[] { + return Array.from(this.entities.keys()); + } } diff --git a/src/deltas.ts b/src/deltas.ts index 0a6d844..848eff7 100644 --- a/src/deltas.ts +++ b/src/deltas.ts @@ -10,7 +10,7 @@ export const deltasRejected: Delta[] = []; export const deltasDeferred: Delta[] = []; export function applyPolicy(delta: Delta): Decision { - return !!delta && Decision.Accept; + return !!delta && Decision.Accept; } export function receiveDelta(delta: Delta) { @@ -18,19 +18,19 @@ export function receiveDelta(delta: Delta) { } export function ingestDelta(delta: Delta) { - const decision = applyPolicy(delta); - switch (decision) { - case Decision.Accept: - deltasAccepted.push(delta); - deltaStream.emit('delta', { delta }); - break; - case Decision.Reject: - deltasRejected.push(delta); - break; - case Decision.Defer: - deltasDeferred.push(delta); - break; - } + const decision = applyPolicy(delta); + switch (decision) { + case Decision.Accept: + deltasAccepted.push(delta); + deltaStream.emit('delta', { delta }); + break; + case Decision.Reject: + deltasRejected.push(delta); + break; + case Decision.Defer: + deltasDeferred.push(delta); + break; + } } export function ingestNext(): boolean { diff --git a/src/example-app.ts b/src/example-app.ts index ff07db4..55a796f 100644 --- a/src/example-app.ts +++ b/src/example-app.ts @@ -2,10 +2,11 @@ import express from "express"; import { bindPublish, } from "./pub-sub"; -import { runDeltas } from "./deltas"; -import { Entities, Entity } from "./object-layer"; +import { deltasAccepted, deltasProposed, runDeltas } from "./deltas"; +import { Entity } from "./object-layer"; +import { Collection } from "./collection-layer"; import { bindReply, runRequestHandlers } from "./request-reply"; -import { subscribeToSeeds } from "./peers"; +import { askAllPeersForDeltas, subscribeToSeeds } from "./peers"; import { ENABLE_HTTP_API, HTTP_API_ADDR, HTTP_API_PORT } from "./config"; @@ -25,7 +26,7 @@ type UserProperties = { }; class Users { - db = new Entities(); + db = new Collection(); create(properties: UserProperties): Entity { // We provide undefined for the id, to let the database generate it // This call returns the id @@ -47,12 +48,23 @@ class Users { } (async () => { - const app = express() + const users = new Users(); + const app = express() app.get("/ids", (req: express.Request, res: express.Response) => { res.json({ ids: users.getIds()}); }); + app.get("/deltas", (req: express.Request, res: express.Response) => { + // TODO: streaming + res.json(deltasAccepted); + }); + + app.get("/deltas/count", (req: express.Request, res: express.Response) => { + // TODO: streaming + res.json(deltasAccepted.length); + }); + if (ENABLE_HTTP_API) { app.listen(HTTP_API_PORT, HTTP_API_ADDR, () => { console.log(`HTTP API bound to http://${HTTP_API_ADDR}:${HTTP_API_PORT}`); @@ -63,21 +75,30 @@ class Users { await bindReply(); runDeltas(); runRequestHandlers(); - await new Promise((resolve) => setTimeout(resolve, 200)); + await new Promise((resolve) => setTimeout(resolve, 500)); subscribeToSeeds(); - await new Promise((resolve) => setTimeout(resolve, 200)); + await new Promise((resolve) => setTimeout(resolve, 500)); + askAllPeersForDeltas(); + await new Promise((resolve) => setTimeout(resolve, 1000)); - const users = new Users(); + setInterval(() => { + console.log('deltasProposed count', deltasProposed.length, + 'deltasAccepted count', deltasAccepted.length); + }, 5000) const taliesin = users.upsert({ - id: 'taliesin-1', + // id: 'taliesin-1', name: 'Taliesin', nameLong: 'Taliesin (Ladd)', age: Math.floor(Math.random() * 1000) }); - taliesin.onUpdate((u: Entity) => { - console.log('User updated', u); + users.db.onUpdate((u: Entity) => { + console.log('User updated:', u); + }); + + users.db.onCreate((u: Entity) => { + console.log('New user!:', u); }); // TODO: Allow configuration regarding read/write concern i.e. diff --git a/src/object-layer.ts b/src/object-layer.ts index f7eaadc..0df5a3b 100644 --- a/src/object-layer.ts +++ b/src/object-layer.ts @@ -7,15 +7,10 @@ // - As typescript interfaces? // - As typescript classes? -import EventEmitter from "node:events"; import { CREATOR, HOST } from "./config"; -import { publishDelta, subscribeDeltas } from "./deltas"; import { Delta, PropertyTypes } from "./types"; -import { randomUUID } from "node:crypto"; -const entityEventStream = new EventEmitter(); - -type EntityProperties = { +export type EntityProperties = { [key: string]: PropertyTypes }; @@ -27,20 +22,11 @@ export class Entity { this.id = id; this.properties = {}; } - onUpdate(cb: (entity: Entity) => void) { - // TODO: This doesn't seem like it will scale well. - entityEventStream.on('update', (entity: Entity) => { - if (entity.id === this.id) { - cb(entity); - } - }); - } } -const entities = new Map(); // TODO: Use leveldb for storing view snapshots -class EntityPropertiesDeltaBuilder { +export class EntityPropertiesDeltaBuilder { delta: Delta; constructor(entityId: string) { this.delta = { @@ -58,123 +44,3 @@ class EntityPropertiesDeltaBuilder { } } -// Applies the javascript rules for updating object values, -// e.g. set to `undefined` to delete a property -function updateEntity(entityId?: string, properties?: object, local = false, deltas?: Delta[]): Entity { - let entity: Entity | undefined; - let eventType: 'create' | 'update' | 'delete' | undefined; - entityId = entityId ?? randomUUID(); - entity = entities.get(entityId); - if (!entity) { - entity = new Entity(entityId); - entity.id = entityId; - eventType = 'create'; - } - const deltaBulider = new EntityPropertiesDeltaBuilder(entityId); - - if (!properties) { - // Let's interpret this as entity deletion - entities.delete(entityId); - // TODO: prepare and publish a delta - // TODO: execute hooks - eventType = 'delete'; - } else { - let anyChanged = false; - Object.entries(properties).forEach(([key, value]) => { - let changed = false; - if (entity.properties && entity.properties[key] !== value) { - entity.properties[key] = value; - changed = true; - } - if (local && changed) { - // If this is a change, let's generate a delta - deltaBulider.add(key, value); - // We append to the array the caller may provide - // We can update this count as we receive network confirmation for deltas - entity.ahead += 1; - } - anyChanged = anyChanged || changed; - }); - // We've noted that we may be ahead of the server, let's update our - // local image of this entity. - //* In principle, this system can recreate past or alternative states. - //* At worst, by replaying all the deltas up to a particular point. - //* Some sort of checkpointing strategy would probably be helpful. - //* Furthermore, if we can implement reversible transformations, - //* it would then be efficient to calculate the state of the system with - //* specific deltas removed. We could use it to extract a measurement - //* of the effects of some deltas' inclusion or exclusion, the - //* evaluation of which may lend evidence to some possible arguments. - - entities.set(entityId, entity); - if (anyChanged) { - deltas?.push(deltaBulider.delta); - eventType = eventType || 'update'; - } - } - if (eventType) { - entityEventStream.emit(eventType, entity); - } - return entity; -} - -// We can update our local image of the entity, but we should annotate it -// to indicate that we have not yet received any confirmation of this delta -// having been propagated. -// Later when we receive deltas regarding this entity we can detect when -// we have received back an image that matches our target. - -// So we need a function to generate one or more deltas for each call to put/ -// maybe we stage them and wait for a call to commit() that initiates the -// assembly and transmission of one or more deltas - -function applyDelta(delta: Delta) { - // TODO: handle delta representing entity deletion - const idPtr = delta.pointers.find(({localContext}) => localContext === 'id'); - if (!idPtr) { - console.error('encountered delta with no entity id', delta); - return; - } - const properties: EntityProperties = {}; - delta.pointers.filter(({localContext}) => localContext !== 'id') - .forEach(({localContext: key, target: value}) => { - properties[key] = value; - }, {}); - const entityId = idPtr.target as string; - // TODO: Handle the scenario where this update has been superceded by a newer one locally - updateEntity(entityId, properties); -} - -subscribeDeltas((delta: Delta) => { - // TODO: Make sure this is the kind of delta we're looking for - applyDelta(delta); -}); - -export class Entities { - constructor() { - entityEventStream.on('create', (entity: Entity) => { - console.log(`new entity!`, entity); - }); - } - put(entityId: string | undefined, properties: object): Entity { - const deltas: Delta[] = []; - const entity = updateEntity(entityId, properties, true, deltas); - deltas.forEach(async (delta: Delta) => { - await publishDelta(delta); - }); - return entity; - } - del(entityId: string) { - const deltas: Delta[] = []; - updateEntity(entityId, undefined, true, deltas); - deltas.forEach(async (delta: Delta) => { - await publishDelta(delta); - }); - } - get(id: string): Entity | undefined { - return entities.get(id); - } - getIds(): string[] { - return Array.from(entities.keys()); - } -} diff --git a/src/peers.ts b/src/peers.ts index a496946..588e70b 100644 --- a/src/peers.ts +++ b/src/peers.ts @@ -3,11 +3,32 @@ import { registerRequestHandler, PeerRequest, ResponseSocket } from "./request-r import { RequestSocket, } from "./request-reply"; import { SEED_PEERS } from "./config"; import {connectSubscribe} from "./pub-sub"; +import {deltasAccepted, deltasProposed, ingestAll, receiveDelta} from "./deltas"; +import {Delta} from "./types"; export enum PeerMethods { GetPublishAddress, + AskForDeltas } +registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { + console.log('inspecting peer request'); + switch (req.method) { + case PeerMethods.GetPublishAddress: { + console.log('it\'s a request for our publish address'); + await res.send(publishAddr); + break; + } + case PeerMethods.AskForDeltas: { + console.log('it\'s a request for deltas'); + // TODO: stream these rather than + // trying to write them all in one message + await res.send(JSON.stringify(deltasAccepted)); + break; + } + } +}); + export type PeerAddress = { addr: string, port: number @@ -33,23 +54,46 @@ class Peer { connectSubscribe(addr, port); } } + async askForDeltas(): Promise { + // TODO as a first approximation we are trying to cram the entire history + // of accepted deltas, into one (potentially gargantuan) json message. + // A second approximation would be to stream the deltas. + // Third pass should find a way to reduce the number of deltas transmitted. + + // TODO: requestTimeout + const res = await this.reqSock.request(PeerMethods.AskForDeltas); + const deltas = JSON.parse(res.toString()); + return deltas; + } +} + +const peers: Peer[] = []; + +function newPeer(addr: string, port: number) { + const peer = new Peer(addr, port); + peers.push(peer); + return peer; } export async function subscribeToSeeds() { SEED_PEERS.forEach(async ({addr, port}, idx) => { console.log(`SEED PEERS[${idx}]=${addr}:${port}`); - const peer = new Peer(addr, port); + const peer = newPeer(addr, port); await peer.subscribe(); }); } -registerRequestHandler(async (req: PeerRequest, res: ResponseSocket) => { - console.log('inspecting peer request'); - switch (req.method) { - case PeerMethods.GetPublishAddress: - console.log('it\'s a request for our publish address'); - await res.send(publishAddr); - break; - } -}); - +//! TODO Expect abysmal scaling properties with this function +export async function askAllPeersForDeltas() { + peers.forEach(async (peer, idx) => { + console.log(`Asking peer ${idx} for deltas`); + const deltas = await peer.askForDeltas(); + console.log('received deltas:', deltas); + for (const delta of deltas) { + receiveDelta(delta); + } + console.log('deltasProposed count', deltasProposed.length); + console.log('deltasAccepted count', deltasAccepted.length); + ingestAll(); + }); +}