Skip to content

Commit

Permalink
Turns pipeline parameter into a function, updates README.
Browse files Browse the repository at this point in the history
  • Loading branch information
lamoglia committed Sep 19, 2017
1 parent a1a5a87 commit 714bace
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 26 deletions.
9 changes: 9 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
The MIT License (MIT)

Copyright (c) 2017 Fellipe Lamoglia

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
110 changes: 97 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,117 @@
Easily publish reactive aggregations from meteor collections.

### Installation
`
```bash
meteor add lamoglia:publish-aggregation
`
```

### Usage

Import `buildAggregator` function:

`
```javascript
import { buildAggregator } from 'meteor/lamoglia:publish-aggregation';
`
```

Name and publish the aggregation pipeline:


`
Meteor.publish('publication.name', buildAggregator(Collection, pipeline, options));
`
```javascript
Meteor.publish('publication.name', buildAggregator(collection, pipelineCreator, options));
```

Subscribe from client

```javascript
Meteor.subscribe('publication.name');
```

Now your Mini Mongo will receive live updates from the server with your aggregated data under the collection with name provided by `options.collectionName`, or the provided collection name if this option was not set.

### Parameters

- **collection**: *Mongo.Collection* - The collection to extract the data from.
- **pipelineCreator**: *Function* - Function that returns the pipeline for the aggregation query. It is a function instead of a simple array so it is possible to provide dynamic parameters such as the current logged user (`Meteor.userId()`)
- **options**: *Object* - An optional object with options, as described below:

#### Options

The `options` object has the following properties:

- **collectionName**: *String* - The name of the collection under which the results will appear on MiniMongo. If not set, it will default to the provided collection name.
- **transform**: *Function* - Transformation function that receives each document from the aggregation result and outputs a reshaped object. Set false if no transformation is required.
- **singleValueField**: *String* - If you are publishing a result with a single changing field like a count or sum, provide the name of this field for improved performance. If your aggregation pipeline returns only one field other than the aggregation _id, it will be used by default.
- **pastPeriod**: *Object* - Object with the following properties:
- **field**: *String* - The ISODate field name from the collection.
- **millis**: *Number* - Number of milliseconds from now to filter the results. Example: if you want only the documents with the field timestamp with values at most one hour from now, set "pastPeriod.field" to "timestamp" and "pastPeriod.millis" to "3600000".
- **rateLimitMillis**: *Number* - Value representing the maximum frequency of execution of the aggregation query, in milliseconds. If you possibly have many changes in the aggregated collection, it is best to provide a rate limit to avoid the query being run more times than your user can see the changes on screen. Set 0 to disable rate limiting.

You should set the **pastPeriod** property only if you want to filter the collection's documents by some (ISODate) field within the last N milliseconds. Common use cases: "Publish the number of messages by sender received in the last 24h", "Publish the sum of 'critical' alerts started in the last hour". If isn't set, no filtering will be done.


### Default options

- **collectionName** - The name of provided collection (Meteor.Collection._name)
- **transform** - false
- **singleValueField** - false
- **pastPeriod** - false
- **rateLimitMillis** - 500

### Example

Given a `Messages` collection in the following format

```javascript
{
_id: '59c119f2666c4eb0a695d8dd',
sender_id: '59c119d5666c4eb0a695d8db',
sender_name: 'Rick',
recipient_id: '59c119d6666c4eb0a695d8dc',
message: 'And that\'s the wayyyyyy the news goes!',
sent_at: ISODate("2017-09-14T20:22:41.188Z"),
}
```

To publish a collection with the number of messages received in the last day grouped by sender:

```javascript
const defaultOptions = {
collectionName: collection._name,
transform: false,
singleValueField: false,
pastPeriod: false,
rateLimitMillis: 500,
import { Meteor } from 'meteor/meteor';
import { buildAggregator } from 'meteor/lamoglia:publish-aggregation';
import { Messages } from '../../messages/messages.js';

const pipelineCreator = () => ([
{ $match: { recipient_id: { $eq: Meteor.userId() } } },
{ $group: { _id: { sender_id: '$sender_id' }, count: { $sum: 1 }, sender_name: { $first: '$sender_name' } } },
]);

const options = {
collectionName: 'messagecounts',
transform: (doc) => ({ ...doc, sender_id: doc._id.sender_id }),
singleValueField: 'count',
pastPeriod: {
millis: 60 * 60 * 1000,
field: 'sent_at',
},
};

Meteor.publish('message.counts.by.sender', buildAggregator(Messages, pipelineCreator, options));
```

Subscribe to this publication on the client

```javascript
Meteor.subscribe('message.counts.by.sender');
```

The resulting collection (messagecounts) will have the following format at mini mongo:

```javascript
{
_id: '59c119f3666c4eb0a695d8df',
sender_id: '59c119d5666c4eb0a695d8db',
sender_name: 'Rick',
count: 42,
}
```

If no messages are sent within an hour, the counter will eventually get down to zero.
31 changes: 19 additions & 12 deletions aggregator.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const getPipelineMatchStage = (pipeline) => {
return false;
};

buildAggregator = (collection, pipeline, options) => function() {
buildAggregator = (collection, pipelineCreator, options) => function() {
const self = this;

const defaultOptions = {
Expand All @@ -30,8 +30,9 @@ buildAggregator = (collection, pipeline, options) => function() {
pastPeriod: false,
rateLimitMillis: 500,
};
const currentOptions = _.extend(defaultOptions, options);
const currentOptions = _.extendOwn(defaultOptions, options);

const pipeline = pipelineCreator();
let ready = false;
let interval = false;
let oldestDocument = false;
Expand All @@ -54,10 +55,10 @@ buildAggregator = (collection, pipeline, options) => function() {
}

let update = () => {
const { collectionName, transform } = currentOptions;
const { collectionName, transform, pastPeriod, singleValueField } = currentOptions;

if (currentOptions.pastPeriod.millis) {
matchStage.$match[currentOptions.pastPeriod.field] = { $gt: new Date(Date.now() - currentOptions.pastPeriod.millis) };
if (pastPeriod.millis) {
matchStage.$match[pastPeriod.field] = { $gt: new Date(Date.now() - pastPeriod.millis) };
}
const results = aggregateQuery(pipeline);
const resultOids = [];
Expand All @@ -67,14 +68,20 @@ buildAggregator = (collection, pipeline, options) => function() {
const transformedDocument = transform ? transform(doc) : doc;

if (published[oid]) {
if (currentOptions.singleValueField && published[oid] !== doc[currentOptions.singleValueField]) {
self.changed(collectionName, oid, transformedDocument);
published[oid] = doc[currentOptions.singleValueField];
if (singleValueField) {
if (published[oid] !== doc[singleValueField]) {
self.changed(collectionName, oid, transformedDocument);
published[oid] = doc[singleValueField];
}
} else {
published[oid] = true;
}
} else {
self.added(collectionName, oid, transformedDocument);
if (currentOptions.singleValueField) {
published[oid] = doc[currentOptions.singleValueField];
if (singleValueField) {
published[oid] = doc[singleValueField];
} else {
published[oid] = true;
}
}
});
Expand Down Expand Up @@ -111,7 +118,7 @@ buildAggregator = (collection, pipeline, options) => function() {
}

if (oldestDocument) {
const nextUpdate = currentOptions.pastPeriod.millis - (currentTime.getTime() - oldestDocument.timestamp.getTime());
const nextUpdate = currentOptions.pastPeriod.millis - (currentTime.getTime() - oldestDocument[currentOptions.pastPeriod.field].getTime());
interval = Meteor.setTimeout(() => {
update();
updateTimeout();
Expand All @@ -124,7 +131,7 @@ buildAggregator = (collection, pipeline, options) => function() {
if (!ready) {
return;
}
if (currentOptions.pastPeriod && ((Date.now() - doc.timestamp.getTime()) > currentOptions.pastPeriod.millis)) {
if (currentOptions.pastPeriod && ((Date.now() - doc[currentOptions.pastPeriod.field].getTime()) > currentOptions.pastPeriod.millis)) {
return;
}
if (currentOptions.pastPeriod && (!oldestDocument || (doc[currentOptions.pastPeriod.field] < oldestDocument[currentOptions.pastPeriod.field]))) {
Expand Down
2 changes: 1 addition & 1 deletion package.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package.describe({
name: 'lamoglia:publish-aggregation',
version: '0.0.1',
summary: 'Easily publish collection aggregations and "last period" aggregations.',
summary: 'Easily publish collection aggregations.',
git: 'https://github.com/lamoglia/publish-aggregation',
documentation: 'README.md'
});
Expand Down

0 comments on commit 714bace

Please sign in to comment.