Skip to content

Commit

Permalink
FIX: buffered metric emit return value
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael O'Brien committed Jul 9, 2024
1 parent 4fa99d5 commit e45b9cf
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 38 deletions.
2 changes: 2 additions & 0 deletions dist/cjs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ type BufferElt = {
dimensions: string;
metric: string;
namespace: string;
spans: Span[];
sum: number;
timestamp: number;
elapsed: number;
};
export declare class CustomMetrics {
private consistent;
Expand Down
31 changes: 21 additions & 10 deletions dist/cjs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,36 @@ class CustomMetrics {
let key = this.getBufferKey(namespace, metricName, dimensions);
let buffers = (this.buffers = this.buffers || {});
let timestamp = Math.floor((options.timestamp || Date.now()) / 1000);
let elapsed = (buffer.elapsed || this.spans[0].period / this.spans[0].samples);
let elt = (buffers[key] = buffers[key] || {
count: 0,
sum: 0,
timestamp: timestamp + (buffer.elapsed || this.spans[0].period / this.spans[0].samples),
timestamp: timestamp + elapsed,
elapsed: elapsed,
namespace: namespace,
metric: metricName,
dimensions,
spans: [{ points: [{ count: 0, sum: 0 }] }],
});
let current = elt.spans[0].points.at(-1);
current.count += point.count;
current.sum += point.sum;
elt.count += point.count;
elt.sum += point.sum;
if (buffer.force ||
(buffer.sum && elt.sum >= buffer.sum) ||
(buffer.count && elt.count >= buffer.count) ||
timestamp >= elt.timestamp) {
options = Object.assign({}, options, { timestamp: timestamp * 1000 });
await this.emitDimensionedMetric(namespace, metricName, elt, dimensions, options);
delete buffers[key];
let metric = await this.emitDimensionedMetric(namespace, metricName, elt, dimensions, options);
elt.count = elt.sum = 0;
elt.spans = metric.spans;
elt.timestamp = timestamp + (buffer.elapsed || this.spans[0].period / this.spans[0].samples);
return metric;
}
CustomMetrics.saveInstance({ key }, this);
return {
spans: [{ points: [{ count: elt.count, sum: elt.sum }] }],
spans: elt.spans,
metric: metricName,
namespace: namespace,
owner: options.owner || this.owner,
Expand Down Expand Up @@ -223,17 +232,19 @@ class CustomMetrics {
for (let elt of Object.values(this.buffers)) {
await this.flushElt(elt);
}
this.buffers = null;
}
async flushElt(elt) {
let point = { count: elt.count, sum: elt.sum, timestamp: elt.timestamp };
let timestamp = Date.now() / 1000;
let now = Date.now() / 1000;
let timestamp = now;
if (timestamp > elt.timestamp) {
timestamp = elt.timestamp;
}
await this.emitDimensionedMetric(elt.namespace, elt.metric, point, elt.dimensions, {
timestamp: timestamp * 1000
let metric = await this.emitDimensionedMetric(elt.namespace, elt.metric, elt, elt.dimensions, {
timestamp: timestamp * 1000,
});
elt.count = elt.sum = 0;
elt.spans = metric.spans;
elt.timestamp = now + (elt.elapsed || this.spans[0].period / this.spans[0].samples);
}
getBufferKey(namespace, metricName, dimensions) {
return `${namespace}|${metricName}|${JSON.stringify(dimensions)}`;
Expand All @@ -254,7 +265,7 @@ class CustomMetrics {
}
let span;
if (options.start) {
span = metric.spans.find((s) => (s.end - s.period) <= options.start / 1000);
span = metric.spans.find((s) => s.end - s.period <= options.start / 1000);
if (!span) {
span = metric.spans[metric.spans.length - 1];
period = span.period;
Expand Down
2 changes: 2 additions & 0 deletions dist/mjs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ type BufferElt = {
dimensions: string;
metric: string;
namespace: string;
spans: Span[];
sum: number;
timestamp: number;
elapsed: number;
};
export declare class CustomMetrics {
private consistent;
Expand Down
31 changes: 21 additions & 10 deletions dist/mjs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,36 @@ export class CustomMetrics {
let key = this.getBufferKey(namespace, metricName, dimensions);
let buffers = (this.buffers = this.buffers || {});
let timestamp = Math.floor((options.timestamp || Date.now()) / 1000);
let elapsed = (buffer.elapsed || this.spans[0].period / this.spans[0].samples);
let elt = (buffers[key] = buffers[key] || {
count: 0,
sum: 0,
timestamp: timestamp + (buffer.elapsed || this.spans[0].period / this.spans[0].samples),
timestamp: timestamp + elapsed,
elapsed: elapsed,
namespace: namespace,
metric: metricName,
dimensions,
spans: [{ points: [{ count: 0, sum: 0 }] }],
});
let current = elt.spans[0].points.at(-1);
current.count += point.count;
current.sum += point.sum;
elt.count += point.count;
elt.sum += point.sum;
if (buffer.force ||
(buffer.sum && elt.sum >= buffer.sum) ||
(buffer.count && elt.count >= buffer.count) ||
timestamp >= elt.timestamp) {
options = Object.assign({}, options, { timestamp: timestamp * 1000 });
await this.emitDimensionedMetric(namespace, metricName, elt, dimensions, options);
delete buffers[key];
let metric = await this.emitDimensionedMetric(namespace, metricName, elt, dimensions, options);
elt.count = elt.sum = 0;
elt.spans = metric.spans;
elt.timestamp = timestamp + (buffer.elapsed || this.spans[0].period / this.spans[0].samples);
return metric;
}
CustomMetrics.saveInstance({ key }, this);
return {
spans: [{ points: [{ count: elt.count, sum: elt.sum }] }],
spans: elt.spans,
metric: metricName,
namespace: namespace,
owner: options.owner || this.owner,
Expand Down Expand Up @@ -231,17 +240,19 @@ export class CustomMetrics {
for (let elt of Object.values(this.buffers)) {
await this.flushElt(elt);
}
this.buffers = null;
}
async flushElt(elt) {
let point = { count: elt.count, sum: elt.sum, timestamp: elt.timestamp };
let timestamp = Date.now() / 1000;
let now = Date.now() / 1000;
let timestamp = now;
if (timestamp > elt.timestamp) {
timestamp = elt.timestamp;
}
await this.emitDimensionedMetric(elt.namespace, elt.metric, point, elt.dimensions, {
timestamp: timestamp * 1000
let metric = await this.emitDimensionedMetric(elt.namespace, elt.metric, elt, elt.dimensions, {
timestamp: timestamp * 1000,
});
elt.count = elt.sum = 0;
elt.spans = metric.spans;
elt.timestamp = now + (elt.elapsed || this.spans[0].period / this.spans[0].samples);
}
getBufferKey(namespace, metricName, dimensions) {
return `${namespace}|${metricName}|${JSON.stringify(dimensions)}`;
Expand All @@ -262,7 +273,7 @@ export class CustomMetrics {
}
let span;
if (options.start) {
span = metric.spans.find((s) => (s.end - s.period) <= options.start / 1000);
span = metric.spans.find((s) => s.end - s.period <= options.start / 1000);
if (!span) {
span = metric.spans[metric.spans.length - 1];
period = span.period;
Expand Down
66 changes: 48 additions & 18 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type SpanDef = {
*/
export const DefaultSpans: SpanDef[] = [
{period: 5 * 60, samples: 10}, // 300, 5 mins, interval: 30 secs
{period: 60 * 60, samples: 12}, // 3600, 1 hr, interval: 5 mins
{period: 60 * 60, samples: 12}, // 3600, 1 hr, interval: 5 mins
{period: 24 * 60 * 60, samples: 12}, // 86400, 24 hrs, interval: 2 hrs
{period: 7 * 24 * 60 * 60, samples: 14}, // 604,800, 7 days, interval: 1/2 day
{period: 28 * 24 * 60 * 60, samples: 14}, // 2,419,200, 28 days, interval: 2 days
Expand All @@ -54,11 +54,11 @@ export type Metric = {
}

export type Point = {
count: number // Count of values in sum
count: number // Count of values in sum
max?: number
min?: number
pvalues?: number[]
sum: number // Aggregated values
sum: number // Aggregated values
// Never stored
timestamp?: number
}
Expand Down Expand Up @@ -155,8 +155,10 @@ type BufferElt = {
dimensions: string
metric: string
namespace: string
spans: Span[] // Persisted spans from when metric is flushed
sum: number
timestamp: number
elapsed: number
}
type BufferMap = {
[key: string]: BufferElt
Expand Down Expand Up @@ -318,6 +320,13 @@ export class CustomMetrics {

/*
Buffer a metric for specific dimensions
Point values are uniquely buffered in elements indexed by a (namespace, metric, dimensions) key.
Buffered values are flushed when a sum, count or timespan parameter is exceeded.
The accumulated buffered values are returned as a metric result until the buffered values are
flushed. These values will be only the buffered values for this Lambda.
When the metric is flushed, the full persisted metric is returned with all spans.
This means returned metrics will typically be just accumulated buffered values and won't reflect
other lambdas until flushed. If you need consistent return values -- use query().
*/
async bufferMetric(
namespace: string,
Expand All @@ -330,14 +339,23 @@ export class CustomMetrics {
let key = this.getBufferKey(namespace, metricName, dimensions)
let buffers = (this.buffers = this.buffers || {})
let timestamp = Math.floor((options.timestamp || Date.now()) / 1000)
let elapsed = (buffer.elapsed || this.spans[0].period / this.spans[0].samples)
let elt: BufferElt = (buffers[key] = buffers[key] || {
count: 0,
sum: 0,
timestamp: timestamp + (buffer.elapsed || this.spans[0].period / this.spans[0].samples),
timestamp: timestamp + elapsed,
elapsed: elapsed,
namespace: namespace,
metric: metricName,
dimensions,
spans: [{points: [{count: 0, sum: 0}]}] as Span[],
})
/*
Add point value to the lowest span and to the elt (to manage when to persist)
*/
let current = elt.spans[0].points.at(-1)
current.count += point.count
current.sum += point.sum
elt.count += point.count
elt.sum += point.sum

Expand All @@ -348,12 +366,17 @@ export class CustomMetrics {
timestamp >= elt.timestamp
) {
options = Object.assign({}, options, {timestamp: timestamp * 1000})
await this.emitDimensionedMetric(namespace, metricName, elt, dimensions, options)
delete buffers[key]
let metric = await this.emitDimensionedMetric(namespace, metricName, elt, dimensions, options)
// Reset tallies and save higher spans to return for future buffered metrics
elt.count = elt.sum = 0
elt.spans = metric.spans
elt.timestamp = timestamp + (buffer.elapsed || this.spans[0].period / this.spans[0].samples)
return metric
}
CustomMetrics.saveInstance({key}, this)

return {
spans: [{points: [{count: elt.count, sum: elt.sum}]}],
spans: elt.spans,
metric: metricName,
namespace: namespace,
owner: options.owner || this.owner,
Expand Down Expand Up @@ -459,20 +482,21 @@ export class CustomMetrics {
for (let elt of Object.values(this.buffers)) {
await this.flushElt(elt)
}
this.buffers = null
}

async flushElt(elt: BufferElt) {
let point = {count: elt.count, sum: elt.sum, timestamp: elt.timestamp}

// Choose current time if before the buffer expires, otherwise choose the buffer expiry time
let timestamp = Date.now() / 1000
let now = Date.now() / 1000
let timestamp = now
if (timestamp > elt.timestamp) {
timestamp = elt.timestamp
}
await this.emitDimensionedMetric(elt.namespace, elt.metric, point, elt.dimensions, {
timestamp: timestamp * 1000
let metric = await this.emitDimensionedMetric(elt.namespace, elt.metric, elt, elt.dimensions, {
timestamp: timestamp * 1000,
})
elt.count = elt.sum = 0
elt.spans = metric.spans
elt.timestamp = now + (elt.elapsed || this.spans[0].period / this.spans[0].samples)
}

getBufferKey(namespace: string, metricName: string, dimensions: string): string {
Expand Down Expand Up @@ -514,7 +538,7 @@ export class CustomMetrics {
/*
Map the period that encompasses the start
*/
span = metric.spans.find((s) => (s.end - s.period) <= options.start / 1000)
span = metric.spans.find((s) => s.end - s.period <= options.start / 1000)
if (!span) {
span = metric.spans[metric.spans.length - 1]
period = span.period
Expand Down Expand Up @@ -542,7 +566,7 @@ export class CustomMetrics {
if (metric && span) {
if (options.start) {
let interval = span.period / span.samples
let end = span.points.length - Math.ceil((span.end - (options.start/1000 + period)) / interval)
let end = span.points.length - Math.ceil((span.end - (options.start / 1000 + period)) / interval)
let front = end - Math.round(period / interval)
span.end -= (span.points.length - end) * interval
span.points = span.points.slice(front, end)
Expand Down Expand Up @@ -655,7 +679,13 @@ export class CustomMetrics {
Process the metric points. This is used for graphs that need all the data points.
This calculates the avg, max, min, sum, count, pvalues and per-point timestamps.
*/
private calculateSeries(metric: Metric, span: Span, statistic: string, owner: string, timestamp: number): MetricQueryResult {
private calculateSeries(
metric: Metric,
span: Span,
statistic: string,
owner: string,
timestamp: number
): MetricQueryResult {
let points: MetricQueryPoint[] = []
let interval = span.period / span.samples

Expand Down Expand Up @@ -756,7 +786,7 @@ export class CustomMetrics {
/* istanbul ignore next */
let points = span.points || []

let queryRecurse = queryPeriod && span.period < queryPeriod && si + 1 < metric.spans.length
let queryRecurse = queryPeriod && span.period < queryPeriod && si + 1 < metric.spans.length

// Just for safety, should not happen
/* istanbul ignore next */
Expand Down Expand Up @@ -800,7 +830,7 @@ export class CustomMetrics {
// Querying and not at the terminal period. Must recurse and aggregate all spans up to the target span.
this.addValue(metric, timestamp, point, si + 1, queryPeriod)
return
}
}
if (point.count) {
if (points.length == 0) {
start = span.end = this.getTimestamp(span, timestamp)
Expand Down

0 comments on commit e45b9cf

Please sign in to comment.