refactoring in preparation for adding more resolvers
This commit is contained in:
parent
e684eac932
commit
c6f6ece504
|
@ -1,8 +1,8 @@
|
|||
import Debug from 'debug';
|
||||
import {Delta, PointerTarget} from "../src/delta";
|
||||
import {lastValueFromDeltas} from "../src/last-write-wins";
|
||||
import {lastValueFromDeltas, valueFromCollapsedDelta} from "../src/last-write-wins";
|
||||
import {Lossless, LosslessViewOne} from "../src/lossless";
|
||||
import {Lossy, valueFromCollapsedDelta} from "../src/lossy";
|
||||
import {Lossy} from "../src/lossy";
|
||||
import {RhizomeNode} from "../src/node";
|
||||
const debug = Debug('test:lossy');
|
||||
|
||||
|
@ -16,45 +16,45 @@ type Summary = {
|
|||
roles: Role[];
|
||||
};
|
||||
|
||||
|
||||
function initializer(): Summary {
|
||||
return {
|
||||
roles: []
|
||||
};
|
||||
}
|
||||
|
||||
// TODO: Add more rigor to this example approach to generating a summary.
|
||||
// it's really not CRDT, it likely depends on the order of the pointers.
|
||||
// TODO: Prove with failing test
|
||||
|
||||
const reducer = (acc: Summary, cur: LosslessViewOne): Summary => {
|
||||
if (cur.referencedAs.includes("role")) {
|
||||
const {delta, value: actor} = lastValueFromDeltas("actor", cur.propertyDeltas["actor"]) ?? {};
|
||||
if (!delta) throw new Error('expected to find delta');
|
||||
if (!actor) throw new Error('expected to find actor');
|
||||
const film = valueFromCollapsedDelta("film", delta);
|
||||
if (!film) throw new Error('expected to find film');
|
||||
acc.roles.push({
|
||||
role: cur.id,
|
||||
actor,
|
||||
film
|
||||
});
|
||||
class Summarizer extends Lossy<Summary, Summary> {
|
||||
initializer(): Summary {
|
||||
return {
|
||||
roles: []
|
||||
};
|
||||
}
|
||||
|
||||
return acc;
|
||||
}
|
||||
// TODO: Add more rigor to this example approach to generating a summary.
|
||||
// it's really not CRDT, it likely depends on the order of the pointers.
|
||||
// TODO: Prove with failing test
|
||||
|
||||
const resolver = (acc: Summary): Summary => {
|
||||
return acc;
|
||||
}
|
||||
reducer(acc: Summary, cur: LosslessViewOne): Summary {
|
||||
if (cur.referencedAs.includes("role")) {
|
||||
const {delta, value: actor} = lastValueFromDeltas("actor", cur.propertyDeltas["actor"]) ?? {};
|
||||
if (!delta) throw new Error('expected to find delta');
|
||||
if (!actor) throw new Error('expected to find actor');
|
||||
const film = valueFromCollapsedDelta("film", delta);
|
||||
if (!film) throw new Error('expected to find film');
|
||||
acc.roles.push({
|
||||
role: cur.id,
|
||||
actor,
|
||||
film
|
||||
});
|
||||
}
|
||||
|
||||
return acc;
|
||||
}
|
||||
|
||||
resolver(acc: Summary): Summary {
|
||||
return acc;
|
||||
}
|
||||
}
|
||||
|
||||
describe('Lossy', () => {
|
||||
describe('use a provided initializer, reducer, and resolver to resolve entity views', () => {
|
||||
const node = new RhizomeNode();
|
||||
const lossless = new Lossless(node);
|
||||
|
||||
const lossy = new Lossy(lossless, initializer, reducer, resolver);
|
||||
const lossy = new Summarizer(lossless);
|
||||
|
||||
beforeAll(() => {
|
||||
lossless.ingestDelta(new Delta({
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
describe('Relational', () => {
|
||||
it.skip('Allows expressing a domain ontology as a relational schema', async () => {});
|
||||
|
||||
// Deltas can be filtered at time of view resolution, and
|
||||
// excluded if they violate schema constraints;
|
||||
// Ideally the sender minimizes this by locally validating against the constraints.
|
||||
// For cases where deltas conflict, there can be a resolution process,
|
||||
// with configurable parameters such as duration, quorum, and so on;
|
||||
// or a deterministic algorithm can be applied.
|
||||
|
||||
it.skip('Can validate a delta against a relational constraint', async () => {});
|
||||
it.skip('Can validate a delta against a set of relational constraints', async () => {});
|
||||
});
|
|
@ -1,7 +1,7 @@
|
|||
// import Debug from 'debug';
|
||||
import {EntityProperties} from "./entity";
|
||||
import {CollapsedDelta, Lossless, LosslessViewOne} from "./lossless";
|
||||
import {Lossy, valueFromCollapsedDelta} from './lossy';
|
||||
import {CollapsedDelta, LosslessViewOne} from "./lossless";
|
||||
import {Lossy} from './lossy';
|
||||
import {DomainEntityID, PropertyID, PropertyTypes, Timestamp, ViewMany} from "./types";
|
||||
// const debug = Debug('rz:lossy:last-write-wins');
|
||||
|
||||
|
@ -27,7 +27,21 @@ export type ResolvedViewMany = ViewMany<ResolvedViewOne>;
|
|||
type Accumulator = LossyViewMany<TimestampedProperties>;
|
||||
type Result = LossyViewMany<EntityProperties>;
|
||||
|
||||
// Function for resolving a value for an entity by last write wins
|
||||
// Extract a particular value from a delta's pointers
|
||||
export function valueFromCollapsedDelta(
|
||||
key: string,
|
||||
delta: CollapsedDelta
|
||||
): string | number | undefined {
|
||||
for (const pointer of delta.pointers) {
|
||||
for (const [k, value] of Object.entries(pointer)) {
|
||||
if (k === key && (typeof value === "string" || typeof value === "number")) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve a value for an entity by last write wins
|
||||
export function lastValueFromDeltas(
|
||||
key: string,
|
||||
deltas?: CollapsedDelta[]
|
||||
|
@ -55,46 +69,41 @@ export function lastValueFromDeltas(
|
|||
return res;
|
||||
}
|
||||
|
||||
function initializer(): Accumulator {
|
||||
return {};
|
||||
};
|
||||
|
||||
function reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator {
|
||||
if (!acc[cur.id]) {
|
||||
acc[cur.id] = {id: cur.id, properties: {}};
|
||||
}
|
||||
|
||||
for (const [key, deltas] of Object.entries(cur.propertyDeltas)) {
|
||||
const {value, timeUpdated} = lastValueFromDeltas(key, deltas) || {};
|
||||
if (!value || !timeUpdated) continue;
|
||||
|
||||
if (timeUpdated > (acc[cur.id].properties[key]?.timeUpdated || 0)) {
|
||||
acc[cur.id].properties[key] = {
|
||||
value,
|
||||
timeUpdated
|
||||
};
|
||||
}
|
||||
}
|
||||
return acc;
|
||||
};
|
||||
|
||||
function resolver(cur: Accumulator): Result {
|
||||
const res: Result = {};
|
||||
|
||||
for (const [id, ent] of Object.entries(cur)) {
|
||||
res[id] = {id, properties: {}};
|
||||
for (const [key, {value}] of Object.entries(ent.properties)) {
|
||||
res[id].properties[key] = value;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
};
|
||||
|
||||
export class LastWriteWins extends Lossy<Accumulator, Result> {
|
||||
constructor(
|
||||
readonly lossless: Lossless,
|
||||
) {
|
||||
super(lossless, initializer, reducer, resolver);
|
||||
initializer(): Accumulator {
|
||||
return {};
|
||||
}
|
||||
|
||||
reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator {
|
||||
if (!acc[cur.id]) {
|
||||
acc[cur.id] = {id: cur.id, properties: {}};
|
||||
}
|
||||
|
||||
for (const [key, deltas] of Object.entries(cur.propertyDeltas)) {
|
||||
const {value, timeUpdated} = lastValueFromDeltas(key, deltas) || {};
|
||||
if (!value || !timeUpdated) continue;
|
||||
|
||||
if (timeUpdated > (acc[cur.id].properties[key]?.timeUpdated || 0)) {
|
||||
acc[cur.id].properties[key] = {
|
||||
value,
|
||||
timeUpdated
|
||||
};
|
||||
}
|
||||
}
|
||||
return acc;
|
||||
};
|
||||
|
||||
resolver(cur: Accumulator): Result {
|
||||
const res: Result = {};
|
||||
|
||||
for (const [id, ent] of Object.entries(cur)) {
|
||||
res[id] = {id, properties: {}};
|
||||
for (const [key, {value}] of Object.entries(ent.properties)) {
|
||||
res[id].properties[key] = value;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
};
|
||||
}
|
||||
|
||||
|
|
29
src/lossy.ts
29
src/lossy.ts
|
@ -4,38 +4,21 @@
|
|||
|
||||
import Debug from 'debug';
|
||||
import {DeltaFilter, DeltaID} from "./delta";
|
||||
import {CollapsedDelta, Lossless, LosslessViewOne} from "./lossless";
|
||||
import {Lossless, LosslessViewOne} from "./lossless";
|
||||
import {DomainEntityID} from "./types";
|
||||
const debug = Debug('rz:lossy');
|
||||
|
||||
export type Initializer<Accumulator> = (v: LosslessViewOne) => Accumulator;
|
||||
export type Reducer<Accumulator> = (acc: Accumulator, cur: LosslessViewOne) => Accumulator;
|
||||
export type Resolver<Accumulator, Result> = (cur: Accumulator) => Result;
|
||||
|
||||
// Extract a particular value from a delta's pointers
|
||||
export function valueFromCollapsedDelta(
|
||||
key: string,
|
||||
delta: CollapsedDelta
|
||||
): string | number | undefined {
|
||||
for (const pointer of delta.pointers) {
|
||||
for (const [k, value] of Object.entries(pointer)) {
|
||||
if (k === key && (typeof value === "string" || typeof value === "number")) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We support incremental updates of lossy models.
|
||||
export class Lossy<Accumulator, Result> {
|
||||
export abstract class Lossy<Accumulator, Result> {
|
||||
deltaFilter?: DeltaFilter;
|
||||
accumulator?: Accumulator;
|
||||
|
||||
abstract initializer(v: LosslessViewOne): Accumulator;
|
||||
abstract reducer(acc: Accumulator, cur: LosslessViewOne): Accumulator;
|
||||
abstract resolver(cur: Accumulator): Result;
|
||||
|
||||
constructor(
|
||||
readonly lossless: Lossless,
|
||||
readonly initializer: Initializer<Accumulator>,
|
||||
readonly reducer: Reducer<Accumulator>,
|
||||
readonly resolver: Resolver<Accumulator, Result>,
|
||||
) {
|
||||
this.lossless.eventStream.on("updated", (id, deltaIds) => {
|
||||
debug(`[${this.lossless.rhizomeNode.config.peerId}] entity ${id} updated, deltaIds:`,
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
import {Collection} from "./collection";
|
||||
|
||||
|
||||
|
||||
|
||||
export class RelationalCollection extends Collection {
|
||||
// lossy?:
|
||||
|
||||
}
|
Loading…
Reference in New Issue