From a1a5a87f0e34365f7984cc7158423ac6eb648292 Mon Sep 17 00:00:00 2001 From: Fellipe Lamoglia Date: Tue, 19 Sep 2017 09:19:37 -0300 Subject: [PATCH] Initial commit --- .npm/package/.gitignore | 1 + .npm/package/README | 7 ++ .npm/package/npm-shrinkwrap.json | 14 +++ .versions | 12 +++ README.md | 35 +++++++ aggregator.js | 158 +++++++++++++++++++++++++++++++ package.js | 19 ++++ 7 files changed, 246 insertions(+) create mode 100644 .npm/package/.gitignore create mode 100644 .npm/package/README create mode 100644 .npm/package/npm-shrinkwrap.json create mode 100644 .versions create mode 100644 README.md create mode 100644 aggregator.js create mode 100644 package.js diff --git a/.npm/package/.gitignore b/.npm/package/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/.npm/package/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/.npm/package/README b/.npm/package/README new file mode 100644 index 0000000..3d49255 --- /dev/null +++ b/.npm/package/README @@ -0,0 +1,7 @@ +This directory and the files immediately inside it are automatically generated +when you change this package's NPM dependencies. Commit the files in this +directory (npm-shrinkwrap.json, .gitignore, and this README) to source control +so that others run the same versions of sub-dependencies. + +You should NOT check in the node_modules directory that Meteor automatically +creates; if you are using git, the .gitignore file tells git to ignore it. diff --git a/.npm/package/npm-shrinkwrap.json b/.npm/package/npm-shrinkwrap.json new file mode 100644 index 0000000..5e8bc28 --- /dev/null +++ b/.npm/package/npm-shrinkwrap.json @@ -0,0 +1,14 @@ +{ + "dependencies": { + "hash-sum": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/hash-sum/-/hash-sum-1.0.2.tgz", + "from": "hash-sum@1.0.2" + }, + "underscore": { + "version": "1.8.3", + "resolved": "https://registry.npmjs.org/underscore/-/underscore-1.8.3.tgz", + "from": "underscore@1.8.3" + } + } +} diff --git a/.versions b/.versions new file mode 100644 index 0000000..adc2ca4 --- /dev/null +++ b/.versions @@ -0,0 +1,12 @@ +babel-compiler@6.20.0 +babel-runtime@1.0.1 +ecmascript@0.8.2 +ecmascript-runtime@0.4.1 +ecmascript-runtime-client@0.4.3 +ecmascript-runtime-server@0.4.1 +lamoglia:period-aggregator@0.0.2 +meteor@1.7.2 +modules@0.10.0 +modules-runtime@0.8.0 +promise@0.9.0 +underscore@1.0.10 diff --git a/README.md b/README.md new file mode 100644 index 0000000..7dae8ea --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +## Publish Aggregation + +Easily publish reactive aggregations from meteor collections. + +### Installation +` +meteor add lamoglia:publish-aggregation +` + +### Usage + +Import `buildAggregator` function: + +` +import { buildAggregator } from 'meteor/lamoglia:publish-aggregation'; +` + +Name and publish the aggregation pipeline: + + +` +Meteor.publish('publication.name', buildAggregator(Collection, pipeline, options)); +` + +### Default options + +```javascript +const defaultOptions = { + collectionName: collection._name, + transform: false, + singleValueField: false, + pastPeriod: false, + rateLimitMillis: 500, +}; +``` diff --git a/aggregator.js b/aggregator.js new file mode 100644 index 0000000..c0b8272 --- /dev/null +++ b/aggregator.js @@ -0,0 +1,158 @@ +import { Meteor } from 'meteor/meteor'; +import { Mongo } from 'meteor/mongo'; +import { check } from 'meteor/check'; +import { _ } from 'underscore'; +import sum from 'hash-sum'; + +const getOid = (hashToOidMap, doc) => { + const oidKey = sum(doc._id); + if (!hashToOidMap[oidKey]) { + hashToOidMap[oidKey] = new Mongo.ObjectID()._str; + } + return hashToOidMap[oidKey]; +}; + +const getPipelineMatchStage = (pipeline) => { + const matchStages = pipeline.filter((stage) => stage.hasOwnProperty('$match')); + if (matchStages.length) { + return matchStages[0]; + } + return false; +}; + +buildAggregator = (collection, pipeline, options) => function() { + const self = this; + + const defaultOptions = { + collectionName: collection._name, + transform: false, + singleValueField: false, + pastPeriod: false, + rateLimitMillis: 500, + }; + const currentOptions = _.extend(defaultOptions, options); + + let ready = false; + let interval = false; + let oldestDocument = false; + const hashToOidMap = {}; + const published = {}; + let matchStage = false; + const rawCollection = collection.rawCollection(); + const aggregateQuery = Meteor.wrapAsync(rawCollection.aggregate, rawCollection); + + if (currentOptions.pastPeriod) { + matchStage = getPipelineMatchStage(pipeline); + if (!matchStage) { + matchStage = { '$match': { } }; + pipeline.push(matchStage); + } + } + + if (!currentOptions.singleValueField && Object.keys(pipeline.$group).length === 2) { + currentOptions.singleValueField = Object.keys(pipeline.$group).filter(k => k !== '_id')[0]; + } + + let update = () => { + const { collectionName, transform } = currentOptions; + + if (currentOptions.pastPeriod.millis) { + matchStage.$match[currentOptions.pastPeriod.field] = { $gt: new Date(Date.now() - currentOptions.pastPeriod.millis) }; + } + const results = aggregateQuery(pipeline); + const resultOids = []; + results.forEach((doc) => { + const oid = getOid(hashToOidMap, doc); + resultOids.push(oid); + const transformedDocument = transform ? transform(doc) : doc; + + if (published[oid]) { + if (currentOptions.singleValueField && published[oid] !== doc[currentOptions.singleValueField]) { + self.changed(collectionName, oid, transformedDocument); + published[oid] = doc[currentOptions.singleValueField]; + } + } else { + self.added(collectionName, oid, transformedDocument); + if (currentOptions.singleValueField) { + published[oid] = doc[currentOptions.singleValueField]; + } + } + }); + + Object.keys(published).forEach((oid) => { + if (resultOids.indexOf(oid) < 0) { + self.removed(collectionName, oid); + delete published[oid]; + } + }); + }; + + if (currentOptions.rateLimitMillis) { + update = _.throttle(Meteor.bindEnvironment(update), currentOptions.rateLimitMillis); + } + + const updateTimeout = () => { + const currentTime = new Date(); + const query = {}; + const queryOptions = { + limit: 1, + fields: {}, + sort: {}, + }; + + query[currentOptions.pastPeriod.field] = { $gt: new Date(currentTime.getTime() - currentOptions.pastPeriod.millis) }; + queryOptions.fields[currentOptions.pastPeriod.field] = 1; + queryOptions.sort[currentOptions.pastPeriod.field] = 1; + + oldestDocument = collection.find(query, queryOptions).fetch()[0]; + + if (interval) { + Meteor.clearInterval(interval); + } + + if (oldestDocument) { + const nextUpdate = currentOptions.pastPeriod.millis - (currentTime.getTime() - oldestDocument.timestamp.getTime()); + interval = Meteor.setTimeout(() => { + update(); + updateTimeout(); + }, nextUpdate); + } + }; + + const handle = collection.find({}).observeChanges({ + added(id, doc) { + if (!ready) { + return; + } + if (currentOptions.pastPeriod && ((Date.now() - doc.timestamp.getTime()) > currentOptions.pastPeriod.millis)) { + return; + } + if (currentOptions.pastPeriod && (!oldestDocument || (doc[currentOptions.pastPeriod.field] < oldestDocument[currentOptions.pastPeriod.field]))) { + updateTimeout(); + } + update(); + }, + removed(id) { + if (currentOptions.pastPeriod && (!oldestDocument || (id === oldestDocument._id))) { + updateTimeout(); + } + update(); + }, + }); + + update(); + + if (currentOptions.pastPeriod) { + updateTimeout(); + } + + self.ready(); + ready = true; + + self.onStop(() => { + if (interval) { + Meteor.clearInterval(interval); + } + handle.stop(); + }); +}; diff --git a/package.js b/package.js new file mode 100644 index 0000000..4df2ef5 --- /dev/null +++ b/package.js @@ -0,0 +1,19 @@ +Package.describe({ + name: 'lamoglia:publish-aggregation', + version: '0.0.1', + summary: 'Easily publish collection aggregations and "last period" aggregations.', + git: 'https://github.com/lamoglia/publish-aggregation', + documentation: 'README.md' +}); + +Package.onUse(function(api) { + api.use('meteor'); + api.use(['ecmascript@0.8.2', 'underscore@1.0.10']); + api.addFiles(['aggregator.js'], 'server'); + api.export(['buildAggregator']); +}); + + +Npm.depends({ + 'hash-sum': '1.0.2' +});