Skip to content

Commit

Permalink
add realtime notification support
Browse files Browse the repository at this point in the history
  • Loading branch information
KambangSinclaire committed Mar 28, 2024
1 parent 7ab4d40 commit e62a1e4
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 24 deletions.
23 changes: 19 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
name: Create and publish kingscorp notification Docker image to docker hub
name: Create and publish kings Notifications Service Docker image to docker hub and deploy to server

on:
push:
branches: [ main ]
branches: [main]
pull_request:
branches: [ main ]
branches: [main]

env:
REGISTRY: docker.io
Expand All @@ -29,4 +29,19 @@ jobs:
dockerfile: Dockerfile
registry: ${{ env.REGISTRY }}
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Setting up SSH keys
run: |
eval "$(ssh-agent -s)"
ssh-add - <<< "${{ secrets.PRIVATE_KEY }}"
- name: Deploying Kings Notification service
uses: appleboy/ssh-action@master
with:
host: ${{secrets.SERVER_HOST}}
username: ${{ secrets.SERVER_USER }}
key: ${{ secrets.PRIVATE_KEY }}
port: 22
script: |
cd /home/kingscorp-soft
docker pull kambang/kingscorp:notification
docker compose -f docker-compose-dev.yml up -d
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# compiled output
/dist
/node_modules

*.html
# Logs
logs
*.log
Expand Down
30 changes: 30 additions & 0 deletions src/cache/redis/cache.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* eslint-disable prettier/prettier */
import { RedisService } from 'src/cache/redis/redis.service';

export class CacheManager {

static async set(key: string, data: any) {
try {
const cache = RedisService.client;
let prevData = await CacheManager.get(key);
prevData = prevData ? prevData : {};
const newData = { ...prevData, ...data };
return await cache.set(key.toString(), JSON.stringify(newData));
} catch (error) {
console.log("Error in setting to cache ", error?.message);
return false;
}
}

static async get(key: string) {
try {
const cache = RedisService.client;
const response = await cache.get(key.toString());
return JSON.parse(response);
} catch (error) {
console.log("Error in getting from cache ", error?.message);
return false;
}
}

}
37 changes: 18 additions & 19 deletions src/modules/notifications/notifications.controller.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import { Controller, Sse, Post, Body, Res } from '@nestjs/common';
import { interval, of } from 'rxjs';
import { Controller, Sse, Post, Body, Res, MessageEvent, Param } from '@nestjs/common';
import { Observable, interval, of } from 'rxjs';
import { map } from 'rxjs/operators';
import { INotification } from 'src/models/interfaces/notification.interface';
import { NotificationsService } from './notifications.service';
import { Response } from 'express';
import { EventPattern } from '@nestjs/microservices';
import { MICROSERVICE_EVENTS } from 'src/common/constants/microservice.constants';
import { CacheManager } from 'src/cache/redis/cache.utils';

@Controller('notifications')
export class NotificationsController {

private data: any = {};

constructor(private notificationsService: NotificationsService) { }

@Post("send")
Expand All @@ -23,29 +22,29 @@ export class NotificationsController {
@Post("push/send")
async setData(@Body() payload: any) {
const res = await this.notificationsService.push(payload);
this.data = payload;
return res;
}

@EventPattern(MICROSERVICE_EVENTS.PRODUCT_THRESHOLD_REACHED)
async handler(data: Record<string, unknown>) {
// this.data = data;
console.log('The streamed data is => ', this.data);
async handler(data: Record<string, any>) {
await CacheManager.set(data?.organization?.owner_id ?? "notification", data);
}

@Sse('pull')
@EventPattern(MICROSERVICE_EVENTS.PRODUCT_THRESHOLD_REACHED)
async sse() {
this.getData();
return interval(5000).pipe(map((_) => ({ data: { num: this.data } })));
@Sse('pull/:id')
sse(@Param() param: { id: string }): Observable<MessageEvent> {
let data = {};
const notificationsData = interval(5000).pipe(map((_) => {
this.getCachedData(param.id).then((value) => {
data = value;
})
return ({ data });
}));
return notificationsData;
}

private getData() {
let num = this.data?.num ?? 1;
num += 10;
this.data.num = num;
console.log('This is now Data',this.data);
return num;
private async getCachedData(key: string) {
const data = await CacheManager.get(key ?? "notification");
return data;
}

}

0 comments on commit e62a1e4

Please sign in to comment.