diff --git a/.gitignore b/.gitignore index d5f19d8..d320cbd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ node_modules package-lock.json +pnpm-lock.yaml +yarn.lock +SANDBOX diff --git a/README.md b/README.md index f4b10d2..d2bc51f 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,174 @@ # kappa-core +## **kappa-core WIP rewrite** ## + +kappa-core is a database abstraction for append-only logs and add-only sets. A kappa-core is a container for pairs of sources and views, called *flows*. In each flow, data flows from a source into a view. Sources have a pull function that fetches new messages since the last pull. Views have a map function which is called for each batch of messages from the source. + +kappa-core itself is dependencyless, but this module also contains stateful source handlers for [hypercores](https://github.com/mafintosh/hypercore), [multifeed](https://github.com/kappa-core/multifeed), and [corestore](https://github.com/andrewosh/corestore). + +## API + +`const { Kappa } = require('kappa-core')` + +#### `const kappa = new Kappa()` + +Create a new kappa core. + +#### `kappa.use(name, source, view)` + +Register a flow. + +* `name` (string) the name of the flow, has to be unique per kappa core +* `source` object with properties: + * `open: function (flow, cb)` *(optional)* Handler to call on open. `flow` is the current flow object (see below for docs). Call `cb` when done with opening. + * `close: function (cb)`: *(optional)* Handler to call on close. Has to call `cb`. + * `pull: function (next)`: **(required)** Handler to pull new messages from the view. Should call `next` with either nothing or an object that looks like this: + ```javascript + { + error: Error, + messages: [], // array of messages + finished: true, // if set to false, signal that more messages are pending + onindexed: function (cb) { + // will be called when the view finished indexing + // call cb after the source state is updated + // may return a state object with, by convention, the following keys: + cb(null, { + totalBlocks: Number, + indexedBlocks: Number, + prevIndexedBlocks: Number + }) + } + } + ``` + * `reset: function (cb)`: **(required)** Handler to reset internal state. This is called when a full reindex is necessary. This means that the next pull ought to start at the beginning. + * `storeVersion: function (version, cb)`: **(required)** Handler to store the flow version number. + * `fetchVersion: function (cb)`: **(required)** Handler to fetch the version stored with `storeVersion`. + * See the `SimpleState` docs below how to easily implement the `reset`, `storeVersion` and `fetchVersion` methods. + +* `view` object with properties: + * `open: function (flow, cb)` *(optional)* Handler to call on open. `flow` is the current flow object (see below for docs). Call `cb` when done with opening. + * `close: function (cb)`: *(optional)* Handler to call on close. Has to call `cb`. + * `map: function (messages, next)` **(required)** Handler for each batch of messages. Call `next` when done indexing this batch of messages. + * `reset: function (cb)`: **(required)** Handler to delete all indexed data. This is called by the Kappa core when a complete reindex is necessary. The `map` function will receive messages from the start on afterwards. + * `version: int` The view version. If the version is increased, the Kappa core will clear and restart the indexing for this view after the next reopening of the core. Defaults to `1`. + +Both `source` and `view` can have an `api` property with an object of functions. The functions are exposed on `kappa.view[name]` / `kappa.source[name]`. Their `this` object refers to the flow they are part of, and their first parameter is the `kappa`. Other parameters are passed through. + +The source has to track its state, so that subsequent calls to `pull()` do not return the same messages. Use the `onindexed` callback to update state. How to track its state is up to the source implementation. kappa-core provides a `SimpleState` helper to simplify this, see its documentation below. + +There are several source handlers included in kappa-core (TODO: document sources). See the tests and sources directories. + +#### `kappa.reset(name, cb)` + +Reset a specific flow, to restart indexing. This is equal to reopening the kappa-core with a changed view version for this flow. + +#### `kappa.ready(names, cb)` + +Call `cb` exactly once, after all flows with a name in the `names` array have finished processing. If `names` is empty, all flows will be awaited. This `names` is a string, the flow of this name will be awaited. If the requested flows are already ready, `cb` is called immediately. + +#### `kappa.pause()` + +Pause processing of all flows + +#### `kappa.resume()` + +Resume processing of all flows + +## Flow + +When calling `kappa.use()` a new *Flow* is created. A Flow is the combination of a source and a view - where the data flows from the source into the view. The `Flow` object is passed to sources and views in their `open` handler. It has this public API: + +* `flow.name`: (string) A name that uniquely identifies this flow within the Kappa core. +* `flow.update()`: Signal to the flow that the source has new data available. Youwant to call this from a source when the source has new data. If the Kappa core is not paused, this will cause the `pull` handler to be called. +* `flow.ready(cb)`: Calls `cb` (with no arguments) when this flow has finished processing all messages. `cb` is called immediately if the flow is already finished. +* `flow.getState()`: Get the current indexing state. Returns an object: + ```javascript + { + status: 'idle' | 'running' | 'paused' | 'error', + error: null | Error, + // ... other keys as returned by the source + // by convention this should include the following keys: + totalBlocks: Number, + indexedBlocks: Number, + prevIndexedBlocks: Number + } + ``` +* `flow.view`: Object with the view's API functions +* `flow.source`: Object with the source's API functions + +## SimpleState + +`kappa-core` exports a `SimpleState` class that can be used by sources for a simple state handling. It persists state either in-memory, and supports a [LevelDB](https://github.com/Level/level) (or compatible) option for persistence. + +Example: + +```javascript +const { Kappa, SimpleState } = require('kappa-core') +function createSource (opts) { + const state = new SimpleState({ db: opts.db }) + return { + pull (next) { + // get your current state + state.get((err, state) => { + if (err) return next() + // fetch messages from your data source + fetchMessages(state, ({ messages, finished, nextState }) => { + // call next with an onindexed handler + next({ + messages, + finished, + onindexed (cb) { + // store the new state + state.put(nextState, cb) + } + }) + }) + }) + }, + fetchVersion: state.fetchVersion, + storeVersion: state.storeVersion, + reset (cb) { + state.put('', cb) + } + } +} +``` + +## Sources + +#### [hypercore](https://github.com/mafintosh/hypercore) + +```javascript +const createHypercoreSource = require('kappa-core/sources/hypercore') +const source = createHypercoreSource({ feed, db }) +``` + +where `feed` is a hypercore instance and `db` is a levelup instance (for persisting state) + +#### [multifeed](https://github.com/kappa-core/multifeed) + +```javascript +const createMultifeedSource = require('kappa-core/sources/multifeed') +const source = createMultifeedSource({ feeds, db }) +``` + +where `feeds` is a multifeed instance and `db` is a levelup instance (for persisting state) + +This source exposes an API method `feed (key)` that return a feed by key from the underlying multifeed. + +#### [corestore](https://github.com/andrewosh/corestore) + +```javascript +const createCorestoreSource = require('kappa-core/sources/corestore') +const source = createCorestoreSource({ store, db }) +``` + +where `store` is a corestore instance and `db` is a levelup instance (for persisting state) + +This source exposes an API method `feed (key)` that return a feed by key from the underlying corestore. + +--- + > kappa-core is a minimal peer-to-peer database, based on append-only logs and materialized views. ## Introduction diff --git a/index.js b/index.js index 4c95c4f..3ff6a60 100644 --- a/index.js +++ b/index.js @@ -1,129 +1,6 @@ -var inherits = require('inherits') -var EventEmitter = require('events').EventEmitter -var hypercore = require('hypercore') -var multifeed = require('multifeed') -var indexer = require('multifeed-index') +const Kappa = require('./kappa') +const SimpleState = require('./sources/util/state') module.exports = Kappa - -function Kappa (storage, opts) { - if (!(this instanceof Kappa)) return new Kappa(storage, opts) - if (!opts) opts = {} - - this._logs = opts.multifeed || multifeed(hypercore, storage, opts) - this._indexes = {} - - this.api = {} -} - -inherits(Kappa, EventEmitter) - -Kappa.prototype.use = function (name, version, view) { - var self = this - if (typeof version !== 'number') { - view = version - version = undefined - } - var idx = indexer(Object.assign({}, view, { - log: this._logs, - version: version, - maxBatch: view.maxBatch || 10, - batch: view.map - })) - idx.on('error', function (err) { - self.emit('error', err) - }) - if (view.indexed) idx.on('indexed', view.indexed) - this._indexes[name] = idx - this.api[name] = {} - this.api[name].ready = idx.ready.bind(idx) - for (var key in view.api) { - if (typeof view.api[key] === 'function') this.api[name][key] = view.api[key].bind(idx, this) - else this.api[name][key] = view.api[key] - } -} - -Kappa.prototype.feeds = function () { - return this._logs.feeds() -} - -Kappa.prototype.ready = function (viewNames, cb) { - if (typeof viewNames === 'function') { - cb = viewNames - viewNames = [] - } - - if (typeof viewNames === 'string') viewNames = [viewNames] - if (viewNames.length === 0) { - viewNames = Object.keys(this._indexes) - } - - var pending = viewNames.length + 1 - var self = this - this._logs.ready(function () { - for (var i = 0; i < viewNames.length; i++) { - self._indexes[viewNames[i]].ready(done) - } - done() - }) - - function done () { - if (!--pending) cb() - } -} - -Kappa.prototype.pause = function (viewNames, cb) { - if (typeof viewNames === 'function') { - cb = viewNames - viewNames = [] - } - cb = cb || noop - - if (!viewNames) viewNames = [] - if (typeof viewNames === 'string') viewNames = [viewNames] - if (viewNames.length === 0) { - viewNames = Object.keys(this._indexes) - } - - var pending = viewNames.length + 1 - var self = this - this._logs.ready(function () { - for (var i = 0; i < viewNames.length; i++) { - self._indexes[viewNames[i]].pause(done) - } - done() - }) - - function done () { - if (!--pending) cb() - } -} - -Kappa.prototype.resume = function (viewNames) { - if (!viewNames) viewNames = [] - if (typeof viewNames === 'string') viewNames = [viewNames] - if (viewNames.length === 0) { - viewNames = Object.keys(this._indexes) - } - - var self = this - this._logs.ready(function () { - for (var i = 0; i < viewNames.length; i++) { - self._indexes[viewNames[i]].resume() - } - }) -} - -Kappa.prototype.writer = function (name, cb) { - this._logs.writer(name, cb) -} - -Kappa.prototype.feed = function (key) { - return this._logs.feed(key) -} - -Kappa.prototype.replicate = function (opts) { - return this._logs.replicate(opts) -} - -function noop () {} +module.exports.Kappa = Kappa +module.exports.SimpleState = SimpleState diff --git a/kappa-graph.svg b/kappa-graph.svg new file mode 100644 index 0000000..e89eb12 --- /dev/null +++ b/kappa-graph.svg @@ -0,0 +1,3 @@ + + +kappa.source(name, createSource. opts)kappa.source<br><i>(name, createSource. opts)</i>kappa.view(name, handlers)kappa.view<br><i>(name, handlers)</i>kappa.connect(sourcename, viewname)kappa.connect<br><i>(sourcename, viewname)</i>view: viewname[Not supported by viewer]flow: sourcename~viewnameflow: <i style="font-weight: normal">sourcename~viewname</i>ready()ready()pause()pause()resume()resume()statestatesource: sourcename[Not supported by viewer]createSource()createSource()pull(state)pull(<i>state</i>)transform(msgs)transform(<i>msgs</i>)filter(msgs)filter(<i>msgs</i>)transform(msgs)transform(<i>msgs</i>)map(msgs)map(<i>msgs</i>)source instancesource instancekappa.source(name, createSource, opts)kappa.source<br><i>(name, createSource, opts)</i>source: sourcename[Not supported by viewer]createSource()createSource()kappa.view(name, handlers)kappa.view<br><i>(name, handlers)</i>view: viewname[Not supported by viewer]map(msgs)map(<i>msgs</i>)apiapiapiapikappa.ready()kappa.ready()kappa.pause()kappa.pause()kappa.resume()kappa.resume() \ No newline at end of file diff --git a/kappa.js b/kappa.js new file mode 100644 index 0000000..d9352a0 --- /dev/null +++ b/kappa.js @@ -0,0 +1,399 @@ +const thunky = require('thunky') +const { EventEmitter } = require('events') + +const Status = { + Closed: 'closed', + Ready: 'ready', + Running: 'running', + Paused: 'paused', + Closing: 'closing', + Error: 'error' +} + +module.exports = class Kappa extends EventEmitter { + /** + * Create a kappa core. + * @constructor + */ + constructor (opts = {}) { + super() + this.flows = {} + this.status = Status.Ready + // APIs + this.view = {} + this.source = {} + } + + // This is here for backwards compatibility. + get api () { return this.view } + + use (name, source, view, opts = {}) { + opts.status = opts.status || this.status + opts.context = opts.context || this + const flow = new Flow(name, source, view, opts) + this.flows[name] = flow + this.view[name] = flow.view + this.source[name] = flow.source + flow.on('error', err => this.emit('error', err, flow)) + flow.on('state-update', state => this.emit('state-update', name, state)) + + if (this.status !== Status.Paused) { + process.nextTick(() => flow.open(err => { + if (err) this.emit('error', err) + })) + } + + this.emit('flow', name) + return flow + } + + pause () { + this.status = Status.Paused + this._forEach(flow => flow.pause()) + } + + resume () { + if (this.status !== Status.Paused) return + this._forEach(flow => flow.resume()) + this.status = Status.Ready + } + + reset (names, cb) { + this._forEachAsync((flow, next) => { + flow.reset(next) + }, names, cb) + } + + getState () { + const state = {} + this._forEach(flow => (state[flow.name] = flow.getState())) + return state + } + + ready (names, cb) { + this._forEachAsync((flow, next) => { + flow.ready(next) + }, names, cb) + } + + close (cb) { + this._forEachAsync((flow, next) => { + flow.close(next) + }, cb) + } + + _forEach (fn, names) { + if (typeof names === 'string') names = [names] + if (!names) names = Object.keys(this.flows) + for (const name of names) { + if (!this.flows[name]) continue + fn(this.flows[name]) + } + } + + _forEachAsync (fn, names, cb) { + if (typeof names === 'function') { + cb = names + names = null + } + cb = once(cb) + let pending = 1 + this._forEach(flow => { + ++pending + fn(flow, done) + }, names) + done() + function done (err) { + if (err) return cb(err) + if (--pending === 0) cb() + } + } +} + +class Flow extends EventEmitter { + constructor (name, source, view, opts) { + super() + + this.opts = opts + this.name = name + + if (!view.version) view.version = 1 + + // TODO: Backward-compatibility only. Remove. + if (view.clearIndex && !view.reset) { + view.reset = view.clearIndex.bind(view) + } + + this._view = view + this._source = source + + this._context = opts.context + this._indexingState = {} + + // Assign view and source apis + this.view = bindApi(view.api, this._context) + this.view.ready = cb => this.ready(cb) + this.source = bindApi(source.api, this._context) + + // Create the list of funtions through which messages run between pull and map. + this._transform = new Pipeline() + if (this._source.transform) this._transform.push(this._source.transform.bind(this._source)) + if (this.opts.transform) this._transform.push(this.opts.transform) + if (this._view.transform) this._transform.push(this._view.transform.bind(this._view)) + + this.opened = false + this.open = thunky(this._open.bind(this)) + this._state = new State() + } + + get version () { + return this._view.version + } + + _open (cb = noop) { + if (this.opened) return cb() + const self = this + let done = false + let pending = 1 + if (this._view.open) ++pending && this._view.open(this, onopen) + if (this._source.open) ++pending && this._source.open(this, onopen) + onopen() + + function onopen (err) { + if (err) return ondone(err) + if (--pending !== 0) return + if (!self._source.fetchVersion) return ondone() + + self._source.fetchVersion((err, version) => { + if (err) return ondone(err) + if (!version) { + self._source.storeVersion(self.version, ondone) + } else if (version !== self.version) { + self.reset(() => { + self._source.storeVersion(self.version, ondone) + }) + } else { + ondone() + } + }) + } + + function ondone (err) { + if (done) return + done = true + if (err) return cb(err) + self._setState(Status.Ready) + self.opened = true + self._run() + cb() + } + } + + close (cb) { + const self = this + this.pause() + let state = this._state.state + this._setState(Status.Closing) + + if (state === Status.Running) return this.once('ready', close) + else close() + + function close () { + let pending = 1 + if (self._source.close) ++pending && self._source.close(done) + if (self._view.close) ++pending && self._view.close(done) + done() + function done () { + if (--pending !== 0) return + self._setState(Status.Closed) + self.opened = false + cb() + } + } + } + + ready (cb) { + const self = this + if (!this.opened) return this.open(() => this.ready(cb)) + + setImmediate(() => { + if (this.source.ready) this.source.ready(onsourceready) + else if (this._source.ready) this._source.ready(onsourceready) + else onsourceready() + }) + + function onsourceready () { + process.nextTick(() => { + if (self._state.state === Status.Ready) process.nextTick(cb) + else self.once('ready', cb) + }) + } + } + + pause () { + this._setState(Status.Paused) + } + + resume () { + if (this._state.state !== Status.Paused) return + if (!this.opened) return this.open() + this._setState(Status.Ready) + this._run() + } + + reset (cb = noop) { + const self = this + const paused = this._state.state === Status.Paused + this.pause() + let pending = 1 + process.nextTick(() => { + if (this._view.reset) ++pending && this._view.reset(done) + if (this._source.reset) ++pending && this._source.reset(done) + done() + }) + function done () { + if (--pending !== 0) return + if (!paused) self.resume() + cb() + } + } + + update () { + if (!this.opened) return + this.incomingUpdate = true + process.nextTick(this._run.bind(this)) + } + + getState () { + return this._state.get() + } + + _setState (state, context) { + this._state.set(state, context) + this.emit('state-update', this._state.get()) + if (state === Status.Error) { + this.emit('error', context && context.error) + } + } + + _run () { + if (this._state.state !== Status.Ready) return + const self = this + + this._setState(Status.Running) + this._source.pull(onbatch) + + function onbatch (result) { + // If set to paused while pulling, drop the result and don't update state. + if (self._state.state === Status.Paused) return + if (!result) return close() + + let { error, messages, finished, onindexed } = result + + if (error) return close(error) + if (!messages) return close() + messages = messages.filter(m => m) + if (!messages.length) return close() + + self._transform.run(messages, messages => { + if (!messages.length) return close() + // TODO: Handle timeout? + self._view.map(messages, err => { + close(err, messages, finished, onindexed) + }) + }) + } + + function close (err, messages, finished, onindexed) { + if (err) return finish(err) + if (messages && messages.length && self._view.indexed) { + self._view.indexed(messages) + } + if (onindexed) { + onindexed((err, context) => finish(err, finished, context)) + } else { + finish(null, finished) + } + } + + function finish (err, finished = true, context) { + if (err) { + self._setState(Status.Error, { error: err }) + } else if (self._state.state !== Status.Closing) { + self._setState(Status.Ready, context) + } + + if (self._state.state === Status.Ready && (self.incomingUpdate || !finished)) { + self.incomingUpdate = false + process.nextTick(self._run.bind(self)) + } else { + self.emit('ready') + } + + } + } +} + +class State { + constructor () { + this.state = Status.Closed + this.context = null + } + + set (state, context) { + this.state = state + if (context) this.context = { ...this.context, ...context } + } + + get () { + return Object.assign({ status: this.state }, this.context || {}) + } +} + +// Utils + +function bindApi (api, ...binds) { + if (!api) return {} + for (let [key, value] of Object.entries(api)) { + if (typeof value !== 'function') continue + api[key] = value.bind(api, ...binds) + } + return api +} + +class Pipeline { + constructor () { + this.fns = [] + } + + push (fn) { + this.fns.push(fn) + } + + run (messages, final) { + runThrough(messages, this.fns, final) + } +} + +function runThrough (state, fns, final) { + fns = fns.filter(f => f) + next(state) + function next (state) { + const fn = fns.shift() + if (!fn) return final(state) + fn(state, nextState => { + process.nextTick(next, nextState) + }) + } +} + +function noop () {} + +function once (fn) { + let called = false + return (...args) => { + if (called) return + called = true + fn(...args) + } +} diff --git a/package.json b/package.json index 82069e1..55d0d73 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "kappa-core", "description": "Minimal peer-to-peer database, based on kappa architecture.", - "author": "Stephen Whitmore ", + "author": "Stephen Whitmore , Franz Heinzmann ", "version": "4.0.1", "repository": { "url": "git://github.com/noffle/kappa-core.git" @@ -15,15 +15,18 @@ }, "keywords": [], "dependencies": { - "hypercore": "^7.4.0", - "inherits": "^2.0.4", - "multifeed": "^4.0.0", - "multifeed-index": "^3.3.2" + "thunky": "^1.1.0" }, "devDependencies": { + "corestore": "^5.0.0", + "hypercore": "^8", + "hypercore-protocol": "^7.10.0", + "level-mem": "^5.0.1", + "multifeed": "^5", + "nanoiterator": "^1.2.0", "random-access-memory": "^3.1.1", "standard": "~12.0.1", - "tape": "^4.11.0" + "tape": "^5.0" }, "license": "ISC" } diff --git a/sources/corestore.js b/sources/corestore.js new file mode 100644 index 0000000..5f50c0c --- /dev/null +++ b/sources/corestore.js @@ -0,0 +1,45 @@ +const hypercoreSource = require('./hypercore') +const { mergePull, mergeReset } = require('./util/merge') +const SimpleState = require('./util/state') + +module.exports = function corestoreSource (opts) { + const state = opts.state || new SimpleState(opts) + const store = opts.store + const feeds = new Set() + const sources = [] + return { + open (flow, cb) { + store.ready(() => { + store.list().forEach(feed => _onfeed(flow, feed)) + store.on('feed', feed => _onfeed(flow, feed)) + cb() + }) + }, + pull (next) { + mergePull(sources, next) + }, + reset (next) { + mergeReset(sources, next) + }, + fetchVersion: state.fetchVersion, + storeVersion: state.storeVersion, + api: { + feed (kappa, key) { + return store.get({ key }) + } + } + } + + function _onfeed (flow, feed) { + if (feeds.has(feed)) return + feeds.add(feed) + const source = hypercoreSource({ + feed, + state + }) + sources.push(source) + source.open(flow, () => { + flow.update() + }) + } +} diff --git a/sources/hypercore.js b/sources/hypercore.js new file mode 100644 index 0000000..37f42e2 --- /dev/null +++ b/sources/hypercore.js @@ -0,0 +1,72 @@ +const SimpleState = require('./util/state') + +module.exports = (...args) => new HypercoreSource(...args) + +class HypercoreSource { + constructor (opts = {}) { + this.feed = opts.feed + this.maxBatch = opts.maxBatch || 50 + this.state = opts.state || new SimpleState(opts) + } + + open (flow, cb) { + this.flow = flow + this.feed.on('append', () => flow.update()) + this.feed.on('download', () => flow.update()) + this.feed.ready(cb) + } + + fetchVersion (cb) { + this.state.fetchVersion(cb) + } + + storeVersion (version, cb) { + this.state.storeVersion(version, cb) + } + + get name () { + return this.feed.key.toString('hex') + } + + pull (cb) { + this.state.get(this.name, (err, seq) => { + if (err) return cb(err) + return this._pull(seq, cb) + }) + } + + reset (cb) { + this.state.put(this.name, 0, cb) + } + + _pull (at, next) { + const self = this + const feed = this.feed + const len = feed.length + const to = Math.min(len, at + this.maxBatch) + + if (!(to > at)) return next() + + if (!feed.has(at, to)) { + return next({ finished: true }) + } + + feed.getBatch(at, to, { wait: false }, (err, res) => { + if (err) return next() + + res = res.map((node, i) => ({ + key: feed.key.toString('hex'), + seq: at + i, + value: node + })) + + next({ + messages: res, + finished: to === len, + onindexed (cb) { + self.state.put(self.name, to, cb) + } + }) + }) + } +} diff --git a/sources/multifeed.js b/sources/multifeed.js new file mode 100644 index 0000000..d2aeae7 --- /dev/null +++ b/sources/multifeed.js @@ -0,0 +1,43 @@ +const hypercoreSource = require('./hypercore') +const { mergePull, mergeReset } = require('./util/merge') +const SimpleState = require('./util/state') + +module.exports = function multifeedSource (opts) { + const state = new SimpleState(opts) + const feeds = opts.feeds + const sources = [] + + return { + open (flow, next) { + feeds.ready(() => { + feeds.feeds().forEach(feed => onfeed(flow, feed)) + feeds.on('feed', feed => onfeed(flow, feed)) + next() + }) + }, + pull (next) { + mergePull(sources, next) + }, + reset (next) { + mergeReset(sources, next) + }, + fetchVersion: state.fetchVersion, + storeVersion: state.storeVersion, + api: { + feed (kappa, key) { + return feeds.feed(key) + } + } + } + + function onfeed (flow, feed) { + const source = hypercoreSource({ + feed, + state + }) + sources.push(source) + source.open(flow, () => { + flow.update() + }) + } +} diff --git a/sources/util/merge.js b/sources/util/merge.js new file mode 100644 index 0000000..40e672c --- /dev/null +++ b/sources/util/merge.js @@ -0,0 +1,41 @@ +exports.mergePull = function (sources, next) { + if (!sources.length) return process.nextTick(next) + let results = [] + let pending = sources.length + sources.forEach(source => source.pull(onresult)) + function onresult (result) { + if (result) results.push(result) + if (--pending === 0) onfinish() + } + function onfinish () { + if (!results.length) return next() + let messages = [] + let finished = true + for (let result of results) { + if (result.messages) Array.prototype.push.apply(messages, result.messages) + if (!result.finished) finished = false + } + next({ + messages, + finished, + onindexed + }) + } + function onindexed (cb) { + let fns = results.map(r => r.onindexed).filter(f => f) + if (!fns.length) return cb() + let pending = fns.length + fns.forEach(fn => fn(done)) + function done () { + if (--pending === 0) cb() + } + } +} + +exports.mergeReset = function (sources, cb) { + let pending = sources.length + sources.forEach(source => source.reset(done)) + function done () { + if (--pending === 0) cb() + } +} diff --git a/sources/util/state.js b/sources/util/state.js new file mode 100644 index 0000000..ed219d5 --- /dev/null +++ b/sources/util/state.js @@ -0,0 +1,81 @@ +module.exports = class SimpleState { + constructor (opts = {}) { + this.db = opts.db || new FakeDB() + this._prefix = opts.prefix || '' + this._STATE = this._prefix + '!state!' + this._VERSION = this._prefix + '!version!' + // Bind public methods so that they can be passed on directly. + this.get = this.get.bind(this) + this.put = this.put.bind(this) + this.storeVersion = this.storeVersion.bind(this) + this.fetchVersion = this.fetchVersion.bind(this) + } + + prefix (prefix) { + return new SimpleState({ + db: this.db, + prefix: this._prefix + '/' + prefix + }) + } + + reset (cb) { + } + + get (name, cb) { + if (!cb) return this.get('', name) + const key = this._STATE + name + getInt(this.db, key, cb) + } + + put (name, seq, cb) { + if (!cb) return this.put('', name, seq) + const key = this._STATE + name + putInt(this.db, key, seq, cb) + } + + storeVersion (version, cb) { + putInt(this.db, this._VERSION, version, cb) + } + + fetchVersion (cb) { + getInt(this.db, this._VERSION, cb) + } +} + +function getInt (db, key, cb) { + db.get(key, (err, value) => { + if (err && err.type !== 'NotFoundError') return cb(err) + if (!value) return cb(null, 0) + value = Number(value) + cb(null, value) + }) +} + +function putInt (db, key, int, cb) { + const value = String(int) + db.put(key, value, cb || noop) +} + +function noop () {} + +class FakeDB { + constructor () { + this.state = {} + } + + put (key, value, cb) { + this.state[key] = value + process.nextTick(cb) + } + + get (key, cb) { + if (typeof this.state[key] === 'undefined') { + const err = new Error('Key not found') + err.type = 'NotFoundError' + err.notFound = true + process.nextTick(cb, err) + } else { + process.nextTick(cb, null, this.state[key]) + } + } +} diff --git a/test/basic.js b/test/basic.js index f3247c7..7547a8d 100644 --- a/test/basic.js +++ b/test/basic.js @@ -1,36 +1,248 @@ -var test = require('tape') -var ram = require('random-access-memory') -var kappa = require('..') - -test('simple view', function (t) { - var core = kappa(ram, { valueEncoding: 'json' }) - - var sum = 0 - - var sumview = { - api: { - get: function (core, cb) { - this.ready(function () { - cb(null, sum) - }) - } - }, - map: function (msgs, next) { - msgs.forEach(function (msg) { - if (typeof msg.value === 'number') sum += msg.value +const tape = require('tape') +const { Kappa } = require('..') +const { runAll } = require('./lib/util') +const createSimpleView = require('./lib/simple-view') +const createSimpleSource = require('./lib/simple-source') + +tape('simple source', t => { + const kappa = new Kappa() + + kappa.use('view1', createSimpleSource(), createSimpleView()) + kappa.use('view2', createSimpleSource(), createSimpleView()) + kappa.source.view1.push(1) + kappa.source.view1.push(2) + kappa.source.view2.push(3) + kappa.source.view2.push(4) + + runAll([ + cb => kappa.view.view1.collect((err, res) => { + t.error(err) + t.deepEqual(res, [1, 2]) + cb() + }), + cb => kappa.view.view2.collect((err, res) => { + t.error(err) + t.deepEqual(res, [3, 4]) + cb() + }), + cb => t.end() + ]) +}) + +tape('finished handling', t => { + const kappa = new Kappa() + t.plan(5) + + let msgs = ['a', 'b', 'c'] + let i = 0 + kappa.use('foo', { + pull (next) { + let finished + if (i !== msgs.length - 1) finished = false + next({ + messages: [msgs[i]], + finished, + onindexed: (cb) => { + t.pass('onindexed ' + i) + i = i + 1 + cb() + } }) - next() } - } + }, createSimpleView()) - core.use('sum', sumview) + runAll([ + cb => kappa.view.foo.collect((err, res) => { + t.error(err) + t.deepEqual(res, ['a', 'b', 'c']) + cb() + }), + cb => t.end() + ]) +}) - core.writer('default', function (err, feed) { - feed.append(1, function (err) { - core.api.sum.get(function (err, value) { - t.equals(1, value) - t.end() +tape('error on pull', t => { + const kappa = new Kappa() + let msgs = ['a'] + let i = 0 + kappa.use('foo', { + pull (next) { + if (i === 1) return next({ error: new Error('pull error') }) + if (i > 1) t.fail('pull after error') + next({ + messages: msgs, + finished: false, + onindexed: (cb) => { + t.pass('onindexed ' + i) + i++ + cb() + } }) - }) + } + }, createSimpleView()) + kappa.once('error', err => { + t.equal(err.message, 'pull error') + t.equal(kappa.flows.foo.getState().status, 'error') + t.end() + }) +}) + +tape('error on map', t => { + const kappa = new Kappa() + kappa.use('foo', createSimpleSource(), { + map (messages, next) { + next(new Error('map error')) + } + }) + kappa.source.foo.push('a') + kappa.once('error', err => { + t.equal(err.message, 'map error') + t.equal(kappa.flows.foo.getState().status, 'error') + t.end() + }) + kappa.ready(() => { + t.fail('no ready on error') + }) +}) + +tape('state update', t => { + const kappa = new Kappa() + const foo = kappa.use('foo', createSimpleSource(), createSimpleView()) + let state + foo.on('state-update', newState => { + state = newState + }) + foo.source.push([1, 2]) + process.nextTick(() => { + foo.source.push([3, 4]) + }) + runAll([ + cb => setTimeout(cb, 0), + cb => foo.view.collect((err, res) => { + t.error(err, 'no error') + t.deepEqual(res, [1, 2, 3, 4], 'result matches') + t.deepEqual(state, { + status: 'ready', + totalBlocks: 4, + indexedBlocks: 4, + prevIndexedBlocks: 2 + }, 'state matches') + t.deepEqual(state, foo.getState()) + cb() + }), + cb => { + kappa.once('error', err => { + t.equal(err.message, 'bad') + process.nextTick(cb) + }) + foo.source.error(new Error('bad')) + }, + cb => { + t.equal(state.status, 'error') + t.equal(state.error.message, 'bad') + t.equal(foo.getState().status, 'error') + t.equal(foo.getState().error.message, 'bad') + cb() + }, + cb => t.end() + ]) +}) + +tape('reset', t => { + const kappa = new Kappa() + const foo = kappa.use('foo', createSimpleSource(), createSimpleView()) + foo.source.push(1) + foo.source.push(2) + foo.source.push(3) + runAll([ + cb => foo.view.collect((err, res) => { + t.error(err) + t.deepEqual(res, [1, 2, 3]) + t.equal(kappa.view.foo.clearedCount(), 0) + cb() + }), + cb => { + kappa.reset('foo', cb) + }, + cb => foo.view.collect((err, res) => { + t.error(err) + t.deepEqual(res, [1, 2, 3]) + t.equal(kappa.view.foo.clearedCount(), 1) + cb() + }), + cb => t.end() + ]) +}) + +tape('open close', t => { + t.plan(5) + const kappa = new Kappa() + let i = 0 + kappa.use('foo', { + pull (next) { + t.pass('pull') + return next({ + messages: [++i, ++i], + finished: true, + onindexed (cb) { + t.pass('onindexed') + cb() + } + }) + }, + open (flow, cb) { + t.pass('open') + cb() + }, + close (cb) { + t.pass('close') + cb() + } + }, createSimpleView()) + + runAll([ + cb => kappa.ready(cb), + cb => kappa.close(cb), + cb => { + t.pass('closed!') + cb() + } + ]) +}) + +tape('open error', t => { + const kappa = new Kappa() + kappa.use('foo', { + open (flow, cb) { + cb(new Error('open error')) + }, + pull (next) { next() } + }, createSimpleView()) + kappa.use('bar', { + open (flow, cb) { + cb() + }, + pull (next) { next() } + }, createSimpleView()) + kappa.on('error', err => { + t.equal(err.message, 'open error') + t.equal(kappa.flows.foo.opened, false) + t.equal(kappa.flows.bar.opened, true) + t.end() + }) +}) + +tape('fetch version error', t => { + const kappa = new Kappa() + kappa.use('foo', { + fetchVersion (cb) { + cb(new Error('fetch version error')) + }, + pull (next) { next() } + }, createSimpleView()) + kappa.on('error', err => { + t.equal(err.message, 'fetch version error') + t.equal(kappa.flows.foo.opened, false) + t.end() }) }) diff --git a/test/corestore.js b/test/corestore.js new file mode 100644 index 0000000..0ed3af6 --- /dev/null +++ b/test/corestore.js @@ -0,0 +1,46 @@ +const tape = require('tape') +const { Kappa } = require('..') +const Corestore = require('corestore') +const ram = require('random-access-memory') +const mem = require('level-mem') +const corestoreSource = require('../sources/corestore') + +tape('corestore source', t => { + const kappa = new Kappa() + + const store = new Corestore(ram) + store.ready(() => { + const core1 = store.default({ valueEncoding: 'json' }) + const core2 = store.get({ valueEncoding: 'json' }) + const db = mem() + + kappa.use('view', corestoreSource({ store, db }), createSimpleView()) + + core1.append(1) + core2.append(2) + core1.append(3) + + setImmediate(() => { + kappa.view.view.collect((err, res) => { + t.error(err) + t.deepEqual(res.sort(), [1, 2, 3]) + t.end() + }) + }) + }) +}) + +function createSimpleView () { + let res = [] + return { + map (msgs, next) { + res = res.concat(msgs.map(msg => msg.value)) + next() + }, + api: { + collect (kappa, cb) { + this.ready(() => cb(null, res)) + } + } + } +} diff --git a/test/hypercore.js b/test/hypercore.js new file mode 100644 index 0000000..23c493f --- /dev/null +++ b/test/hypercore.js @@ -0,0 +1,134 @@ +const tape = require('tape') +const { Kappa } = require('..') +const hypercore = require('hypercore') +const mem = require('level-mem') +const ram = require('random-access-memory') +const hypercoreSource = require('../sources/hypercore') +const { runAll } = require('./lib/util') + +tape('hypercore source', t => { + const kappa = new Kappa() + + const core1 = hypercore(ram, { valueEncoding: 'json' }) + const statedb = mem() + + let res = [] + kappa.use('view', hypercoreSource({ feed: core1, db: statedb }), { + map (msgs, next) { + res = res.concat(msgs.map(msg => msg.value)) + next() + }, + api: { + collect (kappa, cb) { + this.ready(() => cb(null, res)) + } + } + }) + + core1.append(1) + core1.append(2) + core1.append(3) + + setImmediate(() => { + kappa.view.view.collect((err, res) => { + t.error(err) + t.deepEqual(res, [1, 2, 3]) + t.end() + }) + }) +}) + +tape('versions', t => { + const feed = hypercore(ram, { valueEncoding: 'json' }) + const sourceState = mem() + const viewState = mem() + + function createKappa (feed, version) { + const kappa = new Kappa() + const source = hypercoreSource({ feed, db: sourceState }) + const view = makeSimpleView(viewState, version) + const opts = { + transform (msgs, next) { + next(msgs.map(msg => msg.value)) + } + } + kappa.use('foo', source, view, opts) + return kappa + } + + feed.append('a') + feed.append('b') + + let kappa = createKappa(feed, 1) + + runAll([ + cb => setImmediate(cb), + cb => { + kappa.view.foo.collect((err, res) => { + t.error(err) + t.deepEqual(res, ['av1', 'bv1'], 'first round ok') + cb() + }) + }, + cb => { + kappa.pause() + kappa = createKappa(feed, 1) + cb() + }, + cb => { + kappa.view.foo.collect((err, res) => { + t.error(err) + t.deepEqual(res, ['av1', 'bv1'], 'second round ok') + cb() + }) + }, + cb => { + kappa.pause() + kappa = createKappa(feed, 2) + cb() + }, + cb => { + kappa.view.foo.collect((err, res) => { + t.error(err) + t.deepEqual(res, ['av2', 'bv2'], 'second round ok') + cb() + }) + }, + cb => t.end() + ]) +}) + +function makeSimpleView (db, version) { + let clears = 0 + const view = { + map (msgs, next) { + msgs = msgs.map(str => { + return str + 'v' + version + }) + db.get('msgs', (err, value) => { + if (err && !err.notFound) return next() + value = value ? JSON.parse(value) : [] + value = value.concat(msgs) + db.put('msgs', JSON.stringify(value), next) + }) + }, + version, + reset (cb) { + clears = clears + 1 + db.put('msgs', JSON.stringify([]), cb) + }, + api: { + collect (kappa, cb) { + this.ready(() => { + db.get('msgs', (err, value) => { + cb(err, value ? JSON.parse(value) : []) + }) + }) + }, + clearedCount (kappa) { + return clears + } + } + } + return view +} diff --git a/test/lib/simple-source.js b/test/lib/simple-source.js new file mode 100644 index 0000000..66c0b23 --- /dev/null +++ b/test/lib/simple-source.js @@ -0,0 +1,56 @@ +module.exports = (opts) => new SimpleSource(opts) + +class SimpleSource { + constructor (opts = {}) { + this.buf = opts.data || [] + this.cursor = 0 + this.flow = null + this.error = null + this.maxBatch = opts.maxBatch || 10 + } + + open (flow, next) { + this.flow = flow + next() + } + + pull (next) { + if (this.error) return next({ error: this.error }) + const len = this.buf.length + const end = Math.min(this.cursor + this.maxBatch, len) + const messages = this.buf.slice(this.cursor, end) + const lastState = this.cursor + next({ + messages, + finished: end === len, + onindexed: cb => { + this.cursor = end + cb(null, { + totalBlocks: this.buf.length, + indexedBlocks: end, + prevIndexedBlocks: lastState + }) + } + }) + } + + reset (cb) { + this.cursor = 0 + cb() + } + + get api () { + const self = this + return { + push (kappa, value) { + if (!Array.isArray(value)) value = [value] + self.buf.push(...value) + if (self.flow) self.flow.update() + }, + error (kappa, err) { + self.error = err + if (self.flow) self.flow.update() + } + } + } +} diff --git a/test/lib/simple-view.js b/test/lib/simple-view.js new file mode 100644 index 0000000..955f2f7 --- /dev/null +++ b/test/lib/simple-view.js @@ -0,0 +1,27 @@ +module.exports = function createSimpleView () { + let res = [] + let clears = 0 + const view = { + map (msgs, next) { + res = res.concat(msgs) + process.nextTick(next) + }, + reset (cb) { + clears = clears + 1 + res = [] + cb() + }, + api: { + collect (kappa, cb) { + this.ready(() => cb(null, res)) + }, + count (kappa) { + return res.eength + }, + clearedCount (kappa) { + return clears + } + } + } + return view +} diff --git a/test/lib/util.js b/test/lib/util.js new file mode 100644 index 0000000..6dd1adb --- /dev/null +++ b/test/lib/util.js @@ -0,0 +1,21 @@ +exports.runAll = function runAll (ops) { + return new Promise((resolve, reject) => { + runNext(ops.shift()) + function runNext (op) { + op(err => { + if (err) return reject(err) + let next = ops.shift() + if (!next) return resolve() + return runNext(next) + }) + } + }) +} + +exports.replicate = function replicate (a, b, opts, cb) { + if (typeof opts === 'function') return replicate(a, b, null, opts) + if (!opts) opts = { live: true } + const stream = a.replicate(true, opts) + stream.pipe(b.replicate(false, opts)).pipe(stream) + setImmediate(cb) +} diff --git a/test/multifeed.js b/test/multifeed.js new file mode 100644 index 0000000..d745dfe --- /dev/null +++ b/test/multifeed.js @@ -0,0 +1,63 @@ +const tape = require('tape') +const ram = require('random-access-memory') +const multifeed = require('multifeed') +const mem = require('level-mem') + +const { Kappa } = require('..') +const createMultifeedSource = require('../sources/multifeed') +const { runAll } = require('./lib/util') + +tape('multifeed', async t => { + const feeds = multifeed(ram, { valueEncoding: 'json' }) + const kappa = new Kappa() + const db = mem() + + kappa.use('sum', createMultifeedSource({ feeds, db }), createSumView()) + + var feed1, feed2 + + await runAll([ + cb => feeds.writer('default', (err, feed) => { + t.error(err) + feed1 = feed + cb() + }), + cb => feeds.writer('second', (err, feed) => { + t.error(err) + feed2 = feed + cb() + }), + cb => feed1.append(1, cb), + cb => feed1.append(1, cb), + cb => feed2.append(3, cb), + cb => { + kappa.view.sum.get(function (err, value) { + t.error(err) + t.equals(5, value) + cb() + }) + } + ]) + + t.end() +}) + +function createSumView () { + let sum = 0 + const sumview = { + api: { + get: function (kappa, cb) { + this.ready(function () { + cb(null, sum) + }) + } + }, + map: function (msgs, next) { + msgs.forEach(function (msg) { + if (typeof msg.value === 'number') sum += msg.value + }) + next() + } + } + return sumview +}