Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
lamoglia committed Sep 19, 2017
0 parents commit a1a5a87
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 0 deletions.
1 change: 1 addition & 0 deletions .npm/package/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
7 changes: 7 additions & 0 deletions .npm/package/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
This directory and the files immediately inside it are automatically generated
when you change this package's NPM dependencies. Commit the files in this
directory (npm-shrinkwrap.json, .gitignore, and this README) to source control
so that others run the same versions of sub-dependencies.

You should NOT check in the node_modules directory that Meteor automatically
creates; if you are using git, the .gitignore file tells git to ignore it.
14 changes: 14 additions & 0 deletions .npm/package/npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .versions
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## Publish Aggregation

Easily publish reactive aggregations from meteor collections.

### Installation
`
meteor add lamoglia:publish-aggregation
`

### Usage

Import `buildAggregator` function:

`
import { buildAggregator } from 'meteor/lamoglia:publish-aggregation';
`

Name and publish the aggregation pipeline:


`
Meteor.publish('publication.name', buildAggregator(Collection, pipeline, options));
`

### Default options

```javascript
const defaultOptions = {
collectionName: collection._name,
transform: false,
singleValueField: false,
pastPeriod: false,
rateLimitMillis: 500,
};
```
158 changes: 158 additions & 0 deletions aggregator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import { Meteor } from 'meteor/meteor';
import { Mongo } from 'meteor/mongo';
import { check } from 'meteor/check';
import { _ } from 'underscore';
import sum from 'hash-sum';

const getOid = (hashToOidMap, doc) => {
const oidKey = sum(doc._id);
if (!hashToOidMap[oidKey]) {
hashToOidMap[oidKey] = new Mongo.ObjectID()._str;
}
return hashToOidMap[oidKey];
};

const getPipelineMatchStage = (pipeline) => {
const matchStages = pipeline.filter((stage) => stage.hasOwnProperty('$match'));
if (matchStages.length) {
return matchStages[0];
}
return false;
};

buildAggregator = (collection, pipeline, options) => function() {
const self = this;

const defaultOptions = {
collectionName: collection._name,
transform: false,
singleValueField: false,
pastPeriod: false,
rateLimitMillis: 500,
};
const currentOptions = _.extend(defaultOptions, options);

let ready = false;
let interval = false;
let oldestDocument = false;
const hashToOidMap = {};
const published = {};
let matchStage = false;
const rawCollection = collection.rawCollection();
const aggregateQuery = Meteor.wrapAsync(rawCollection.aggregate, rawCollection);

if (currentOptions.pastPeriod) {
matchStage = getPipelineMatchStage(pipeline);
if (!matchStage) {
matchStage = { '$match': { } };
pipeline.push(matchStage);
}
}

if (!currentOptions.singleValueField && Object.keys(pipeline.$group).length === 2) {
currentOptions.singleValueField = Object.keys(pipeline.$group).filter(k => k !== '_id')[0];
}

let update = () => {
const { collectionName, transform } = currentOptions;

if (currentOptions.pastPeriod.millis) {
matchStage.$match[currentOptions.pastPeriod.field] = { $gt: new Date(Date.now() - currentOptions.pastPeriod.millis) };
}
const results = aggregateQuery(pipeline);
const resultOids = [];
results.forEach((doc) => {
const oid = getOid(hashToOidMap, doc);
resultOids.push(oid);
const transformedDocument = transform ? transform(doc) : doc;

if (published[oid]) {
if (currentOptions.singleValueField && published[oid] !== doc[currentOptions.singleValueField]) {
self.changed(collectionName, oid, transformedDocument);
published[oid] = doc[currentOptions.singleValueField];
}
} else {
self.added(collectionName, oid, transformedDocument);
if (currentOptions.singleValueField) {
published[oid] = doc[currentOptions.singleValueField];
}
}
});

Object.keys(published).forEach((oid) => {
if (resultOids.indexOf(oid) < 0) {
self.removed(collectionName, oid);
delete published[oid];
}
});
};

if (currentOptions.rateLimitMillis) {
update = _.throttle(Meteor.bindEnvironment(update), currentOptions.rateLimitMillis);
}

const updateTimeout = () => {
const currentTime = new Date();
const query = {};
const queryOptions = {
limit: 1,
fields: {},
sort: {},
};

query[currentOptions.pastPeriod.field] = { $gt: new Date(currentTime.getTime() - currentOptions.pastPeriod.millis) };
queryOptions.fields[currentOptions.pastPeriod.field] = 1;
queryOptions.sort[currentOptions.pastPeriod.field] = 1;

oldestDocument = collection.find(query, queryOptions).fetch()[0];

if (interval) {
Meteor.clearInterval(interval);
}

if (oldestDocument) {
const nextUpdate = currentOptions.pastPeriod.millis - (currentTime.getTime() - oldestDocument.timestamp.getTime());
interval = Meteor.setTimeout(() => {
update();
updateTimeout();
}, nextUpdate);
}
};

const handle = collection.find({}).observeChanges({
added(id, doc) {
if (!ready) {
return;
}
if (currentOptions.pastPeriod && ((Date.now() - doc.timestamp.getTime()) > currentOptions.pastPeriod.millis)) {
return;
}
if (currentOptions.pastPeriod && (!oldestDocument || (doc[currentOptions.pastPeriod.field] < oldestDocument[currentOptions.pastPeriod.field]))) {
updateTimeout();
}
update();
},
removed(id) {
if (currentOptions.pastPeriod && (!oldestDocument || (id === oldestDocument._id))) {
updateTimeout();
}
update();
},
});

update();

if (currentOptions.pastPeriod) {
updateTimeout();
}

self.ready();
ready = true;

self.onStop(() => {
if (interval) {
Meteor.clearInterval(interval);
}
handle.stop();
});
};
19 changes: 19 additions & 0 deletions package.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Package.describe({
name: 'lamoglia:publish-aggregation',
version: '0.0.1',
summary: 'Easily publish collection aggregations and "last period" aggregations.',
git: 'https://github.com/lamoglia/publish-aggregation',
documentation: 'README.md'
});

Package.onUse(function(api) {
api.use('meteor');
api.use(['[email protected]', '[email protected]']);
api.addFiles(['aggregator.js'], 'server');
api.export(['buildAggregator']);
});


Npm.depends({
'hash-sum': '1.0.2'
});

0 comments on commit a1a5a87

Please sign in to comment.