Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/open tracing #26

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
709 changes: 708 additions & 1 deletion common/config/rush/pnpm-lock.yaml

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions services/monolith/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: "3.8"
services:
zipkin:
image: openzipkin/zipkin-slim:2.23.2
ports:
- '9411:9411'
14 changes: 13 additions & 1 deletion services/monolith/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,19 @@
"swagger-ui-express": "~4.1.6",
"jose": "~3.6.2",
"dotenv": "~8.2.0",
"pino-std-serializers": "~3.2.0"
"pino-std-serializers": "~3.2.0",
"@opentelemetry/api": "~0.18.0",
"@opentelemetry/node": "~0.18.0",
"@opentelemetry/tracing": "~0.18.0",
"@opentelemetry/plugin-http": "~0.18.0",
"@opentelemetry/plugin-express": "~0.14.0",
"@opentelemetry/instrumentation": "~0.18.0",
"@opentelemetry/context-async-hooks": "~0.18.0",
"@opentelemetry/exporter-zipkin": "~0.18.0",
"@opentelemetry/exporter-collector-grpc": "~0.18.0",
"@aws/otel-aws-xray-propagator": "~0.13.0",
"@aws/otel-aws-xray-id-generator": "~0.13.1",
"@google-cloud/opentelemetry-cloud-trace-exporter": "~0.9.0"
},
"devDependencies": {
"@abitia/eslint-config": "~1.1.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { MikroOrmModule, MikroOrmModuleSyncOptions } from '@mikro-orm/nestjs';
import { DynamicModule, Module } from '@nestjs/common';
import { APP_INTERCEPTOR } from '@nestjs/core';

import { EVENT_BUS, EventBus, EventBusCompositeCoordinator, nestJsInMemoryEventBusProvider } from '../Core/EventBus';
import { LOGGER, Logger, NestJsLoggerAdapter, nestJsLoggerProvider } from '../Core/Logger';
Expand Down
11 changes: 11 additions & 0 deletions services/monolith/src/Core/EventBus/InMemoryEventBus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { context, SpanKind, SpanStatusCode } from '@opentelemetry/api';

import { Logger } from '../Logger';
import { tracer } from '../OpenTracing';

import { Event } from './Event';
import { EventBus, EventBusSubscriber } from './EventBus';
Expand All @@ -22,10 +25,18 @@ export class InMemoryEventBus implements EventBus {
const eventSubscribers = this.subscribers[event.constructor.name] ?? [];

for(const subscriber of eventSubscribers) {
const span = tracer.startSpan('event-handler', {
kind: SpanKind.CONSUMER,
}, context.active());
span.setAttribute('event.name', event.name);
try {
await subscriber(event);
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
this.logger?.error(`Failed to run subscriber for event ${event.name}`, error);
span.setStatus({ code: SpanStatusCode.ERROR, message: error?.message });
} finally {
span.end();
}
}
}
Expand Down
17 changes: 12 additions & 5 deletions services/monolith/src/Core/Logger/PinoLogger.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Logger as PinoInstance } from 'pino';

import { getActiveSpan } from '../OpenTracing';

import { Logger } from './Logger';

/**
Expand Down Expand Up @@ -28,10 +30,15 @@ export class PinoLogger implements Logger {
}

private _doLog(logType: string, message: string, obj?: Record<string, unknown>): void {
if(obj) {
this.pino[logType](obj, message);
} else {
this.pino[logType](message);
}
const spanContext = getActiveSpan();

const logObj = {
traceId: spanContext?.traceId,
spanId: spanContext?.spanId,
traceFlags: spanContext?.traceFlags,
...obj,
};

this.pino[logType](logObj, message);
}
}
40 changes: 40 additions & 0 deletions services/monolith/src/Core/OpenTracing/LoggerSpanExporter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { ReadableSpan, SpanExporter } from '@opentelemetry/tracing';

import { Logger } from '../Logger';

const CODE_OK = 1;

export class LoggerSpanExporter implements SpanExporter {
public constructor(private readonly logger: Logger) {}

public export(spans: ReadableSpan[], resultCallback: (result) => void): void {
for (const span of spans) {
this.logger.info(span.name, this.exportInfo(span));
}

resultCallback({ code: CODE_OK });
}

public shutdown(): Promise<void> {
return Promise.resolve();
}

private exportInfo(span): Record<string, unknown> {
return {
traceId: span.spanContext.traceId,
parentId: span.parentSpanId,
name: span.name,
id: span.spanContext.spanId,
kind: span.kind,
timestamp: this.hrTimeToMicroseconds(span.startTime),
duration: this.hrTimeToMicroseconds(span.duration),
attributes: span.attributes,
status: span.status,
events: span.events,
};
}

private hrTimeToMicroseconds(hrTime): number {
return Math.round(hrTime[0] * 1e6 + hrTime[1] / 1e3);
}
}
6 changes: 6 additions & 0 deletions services/monolith/src/Core/OpenTracing/getActiveSpan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { getSpanContext, SpanContext, context, trace } from '@opentelemetry/api';

export const tracer = trace.getTracer('monolith');

export const getActiveSpan = (): SpanContext | undefined =>
getSpanContext(context.active());
2 changes: 2 additions & 0 deletions services/monolith/src/Core/OpenTracing/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './getActiveSpan';
export * from './LoggerSpanExporter';
4 changes: 3 additions & 1 deletion services/monolith/src/Core/Outbox/MikroOrm/MikroOrmOutbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { EntityManager } from '@mikro-orm/core';

import { Event } from '../../EventBus';
import { Logger } from '../../Logger';
import { getActiveSpan } from '../../OpenTracing';
import { Outbox } from '../Outbox';

import { newOutboxMessageId, OutboxMessageEntity } from './OutboxMessageEntity';
Expand Down Expand Up @@ -29,10 +30,11 @@ export class MikroOrmOutbox implements Outbox {
this.logger?.info(`Persisting event ${event.name}`);
}

const tracingContext = getActiveSpan();
const message = new OutboxMessageEntity(
newOutboxMessageId(),
event.name,
JSON.stringify(event),
JSON.stringify({ tracingContext, event }),
);

this.em.persist(message);
Expand Down
38 changes: 29 additions & 9 deletions services/monolith/src/Core/Outbox/MikroOrm/MikroOrmOutboxWorker.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
import { EntityManager } from '@mikro-orm/core';
import { context, ROOT_CONTEXT, setSpanContext, SpanKind } from '@opentelemetry/api';

import { Event, EventBus } from '../../EventBus';
import { Logger } from '../../Logger';
import { tracer } from '../../OpenTracing';

import { OutboxMessageEntity } from './OutboxMessageEntity';

const sleep = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));

const adaptMessageToEvent = (message: OutboxMessageEntity): Event => {
const payload = JSON.parse(message.eventPayload);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const adaptMessageToEvent = (eventName: string, event: any): Event => {
Object.defineProperty(event, 'name', {
value: eventName,
enumerable: false,
writable: false,
});

Object.defineProperty(payload, 'constructor', {
Object.defineProperty(event, 'constructor', {
value: {
name: message.eventName,
name: eventName,
},
enumerable: false,
writable: false,
});

return payload;
return event;
};

type MikroOrmOutboxWorkerOptions = {
Expand Down Expand Up @@ -83,12 +90,25 @@ export class MikroOrmOutboxWorker {
}

for(const message of messages) {
const event = adaptMessageToEvent(message);
const payload = JSON.parse(message.eventPayload);
const rootCtx = setSpanContext(ROOT_CONTEXT, payload.tracingContext);

await context.with(rootCtx, async () => {
const span = tracer.startSpan('outbox-worker', {
kind: SpanKind.CONSUMER,
});

const event = adaptMessageToEvent(message.eventName, payload.event);

span.setAttribute('event.name', event.name);

this.eventBus.publish(event);

this.eventBus.publish(event);
message.markAsProcessed();
await this.em.persistAndFlush(message);

message.markAsProcessed();
await this.em.persistAndFlush(message);
span.end();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ it('should serialize and persist the event', async () => {
id: expect.any(String),
createdAt: expect.any(Date),
eventName: 'DummyEvent',
eventPayload: JSON.stringify(event),
eventPayload: JSON.stringify({ event }),
} as OutboxMessageEntity);
});
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const eventBusMock = {
const dummyMessage = new OutboxMessageEntity(
'test-id',
'TestEvent',
JSON.stringify({ prop1: 'test' }),
JSON.stringify({ event: { prop1: 'test' } }),
);

const logger = new TestLogger();
Expand Down
15 changes: 14 additions & 1 deletion services/monolith/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
/* istanbul ignore file */
// open telemetry doesn't register express.js handlers if import is not on top
// eslint-disable-next-line import/order
import { provider as otelProvider } from './telemetry';
import { once } from 'events';
import * as path from 'path';

Expand All @@ -9,12 +12,15 @@ import { AbstractHttpAdapter } from '@nestjs/core';
import { NestFactoryStatic } from '@nestjs/core/nest-factory';
import { ExpressAdapter } from '@nestjs/platform-express';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
import { SimpleSpanProcessor } from '@opentelemetry/tracing';
import { config as configureDotenv } from 'dotenv';

import { AccountContextModule } from './AccountContext/AccountContextModule';
import { AuctionContextModule } from './AuctionContext/AuctionContextModule';
import { EVENT_BUS, EventBus, EventBusCompositeCoordinator } from './Core/EventBus';
import { NestJsLoggerAdapter } from './Core/Logger';
import { NestJsLoggerAdapter, PinoLogger } from './Core/Logger';
import { pinoFactory } from './Core/Logger/pinoFactory';
import { LoggerSpanExporter } from './Core/OpenTracing';

patchNestjsSwagger();

Expand Down Expand Up @@ -86,6 +92,13 @@ async function createModule(
throw error;
}

const isProd = process.env.NODE_ENV === 'production';
if(!isProd) {
const logger = new PinoLogger(pinoFactory());
const spanExporter = new LoggerSpanExporter(logger);
otelProvider.addSpanProcessor(new SimpleSpanProcessor(spanExporter));
}

const factoryOptions = { abortOnError: true };
const express = new ExpressAdapter();

Expand Down
55 changes: 55 additions & 0 deletions services/monolith/src/telemetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { TraceExporter } from '@google-cloud/opentelemetry-cloud-trace-exporter';
import { context, trace } from '@opentelemetry/api';
import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { NodeTracerProvider } from '@opentelemetry/node';
import { BatchSpanProcessor } from '@opentelemetry/tracing';


// diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.VERBOSE);

const contextManager = new AsyncHooksContextManager();
contextManager.enable();

// provider.addSpanProcessor(
// new BatchSpanProcessor(
// new ZipkinExporter({
// serviceName: 'monolith',
// url: 'http://localhost:9411/api/v2/spans',
// }),
// ),
// );

const provider = new NodeTracerProvider();

const gcpExporter = new TraceExporter({
keyFile: './service_account_key.json',
});

provider.addSpanProcessor(new BatchSpanProcessor(gcpExporter));

provider.register({ contextManager });

context.setGlobalContextManager(contextManager);

trace.setGlobalTracerProvider(provider);

registerInstrumentations({
tracerProvider: provider,
instrumentations: [
{
plugins: {
express: {
enabled: true,
path: '@opentelemetry/plugin-express',
},
},
},
],
});

console.log('OpenTelemetry initialized');

export {
provider,
};