From 4636c30703711abc25248cb513041ebda8ccfdeb Mon Sep 17 00:00:00 2001 From: Kristine Harutyunyan Date: Fri, 14 Jun 2024 15:22:42 +0400 Subject: [PATCH 1/3] feat(Messaging): Moved messaging related methods to separate module --- src/famcache.ts | 40 +++-------------- src/modules/index.ts | 1 + src/modules/messaging.ts | 44 +++++++++++++++++++ src/transport/index.ts | 2 +- .../{messaging.ts => messaging-event.ts} | 7 +-- src/types.ts | 6 +++ 6 files changed, 63 insertions(+), 37 deletions(-) create mode 100644 src/modules/index.ts create mode 100644 src/modules/messaging.ts rename src/transport/{messaging.ts => messaging-event.ts} (69%) diff --git a/src/famcache.ts b/src/famcache.ts index 2a36b3f..c9b6715 100644 --- a/src/famcache.ts +++ b/src/famcache.ts @@ -1,28 +1,28 @@ import { Socket } from 'net'; import { randomUUID } from 'crypto'; import type { ConnectionParams } from './params'; -import type { QueueResolver, SubscribeCallback } from './types'; +import type { IMessaging, QueueResolver, SubscribeCallback } from './types'; import { CacheQuery, get, set, del, - publish, - unsubscribe, - subscribe, - Messaging, + MessagingEvent, } from './transport'; +import { Messaging } from './modules'; class Famcache { private socket: Socket; private params: ConnectionParams; private queue: Map; private listeners: Map; + public mesaging: IMessaging; constructor(params: ConnectionParams) { this.socket = new Socket(); this.queue = new Map(); this.listeners = new Map(); + this.mesaging = new Messaging(this.socket); this.params = params; } @@ -35,8 +35,8 @@ class Famcache { this.socket.on('data', (data) => { const payload = data.toString(); - if (Messaging.isMessagingEvent(payload)) { - const message = Messaging.fromEvent(payload); + if (MessagingEvent.isMessagingEvent(payload)) { + const message = MessagingEvent.fromEvent(payload); if (!this.listeners.has(message.topic)) { return; @@ -112,32 +112,6 @@ class Famcache { this.queue.set(queryId, { resolve: () => resolve(), reject }); }); } - - publish(topic: string, data: string) { - const queryId = this.genId(); - - this.socket.write(publish(queryId, topic, data)); - } - - subscribe(topic: string, callback: SubscribeCallback) { - const queryId = this.genId(); - - this.socket.write(subscribe(queryId, topic)); - - const listeners = this.listeners.get(topic); - - if (!listeners) { - this.listeners.set(topic, [callback]); - } else { - listeners.push(callback); - } - } - - unsubscribe(topic: string) { - const queryId = this.genId(); - - this.socket.write(unsubscribe(queryId, topic)); - } } export default Famcache; diff --git a/src/modules/index.ts b/src/modules/index.ts new file mode 100644 index 0000000..d2d3b8c --- /dev/null +++ b/src/modules/index.ts @@ -0,0 +1 @@ +export * from './messaging'; \ No newline at end of file diff --git a/src/modules/messaging.ts b/src/modules/messaging.ts new file mode 100644 index 0000000..f5c3c3f --- /dev/null +++ b/src/modules/messaging.ts @@ -0,0 +1,44 @@ +import { randomUUID } from "crypto"; +import { Socket } from "net"; +import { publish, subscribe, unsubscribe } from "../transport"; +import { IMessaging, SubscribeCallback } from "../types"; + +export class Messaging implements IMessaging { + private socket: Socket; + private listeners: Map; + + constructor(socket: Socket) { + this.socket = socket; + this.listeners = new Map(); + } + + private genId() { + return randomUUID(); + } + + publish(topic: string, data: string) { + const queryId = this.genId(); + + this.socket.write(publish(queryId, topic, data)); + } + + subscribe(topic: string, callback: SubscribeCallback) { + const queryId = this.genId(); + + this.socket.write(subscribe(queryId, topic)); + + const listeners = this.listeners.get(topic); + + if (!listeners) { + this.listeners.set(topic, [callback]); + } else { + listeners.push(callback); + } + } + + unsubscribe(topic: string) { + const queryId = this.genId(); + + this.socket.write(unsubscribe(queryId, topic)); + } +} \ No newline at end of file diff --git a/src/transport/index.ts b/src/transport/index.ts index f2ed488..a88aa86 100644 --- a/src/transport/index.ts +++ b/src/transport/index.ts @@ -1,3 +1,3 @@ export * from './cache-query'; export * from './commands'; -export * from './messaging'; +export * from './messaging-event'; diff --git a/src/transport/messaging.ts b/src/transport/messaging-event.ts similarity index 69% rename from src/transport/messaging.ts rename to src/transport/messaging-event.ts index 23a6807..7e323e1 100644 --- a/src/transport/messaging.ts +++ b/src/transport/messaging-event.ts @@ -1,12 +1,12 @@ -export class Messaging { +export class MessagingEvent { static isMessagingEvent(event: string): boolean { return event.startsWith('MESSAGE '); } - static fromEvent(event: string): Messaging { + static fromEvent(event: string): MessagingEvent { const [, topic, data] = event.split(' '); - return new Messaging(topic, data); + return new MessagingEvent(topic, data); } public topic: string; @@ -16,4 +16,5 @@ export class Messaging { this.topic = topic; this.data = data; } + } diff --git a/src/types.ts b/src/types.ts index 24c9209..e1d432c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -6,3 +6,9 @@ export type QueueResolver = { }; export type SubscribeCallback = (data: string) => void; + +export type IMessaging = { + publish(topic: string, data: string): void; + subscribe(topic: string, callback: SubscribeCallback): void; + unsubscribe(topic: string): void; +} \ No newline at end of file From 85327c81b6b2892240a06c3998ce70e8404c23aa Mon Sep 17 00:00:00 2001 From: Kristine Harutyunyan Date: Tue, 18 Jun 2024 16:28:47 +0400 Subject: [PATCH 2/3] fix(Messaging): fix messaging publish, subscribe, unsubscribe --- README.md | 12 ++++++------ example/client.ts | 28 ---------------------------- example/pubsub/publish.ts | 16 ++++++++++++++++ example/pubsub/subscribe.ts | 18 ++++++++++++++++++ example/pubsub/unsubscribe.ts | 16 ++++++++++++++++ src/famcache.ts | 16 ++++------------ src/modules/messaging.ts | 12 ++++++++++-- 7 files changed, 70 insertions(+), 48 deletions(-) delete mode 100644 example/client.ts create mode 100644 example/pubsub/publish.ts create mode 100644 example/pubsub/subscribe.ts create mode 100644 example/pubsub/unsubscribe.ts diff --git a/README.md b/README.md index c900319..68daa81 100644 --- a/README.md +++ b/README.md @@ -80,14 +80,14 @@ await client.del('key'); To publish data to the topic: ```ts -client.publish('topic', 'data'); +client.messaging.publish('topic', 'data'); ``` #### Subscribe to the topic To subscribe to the topic: ```ts -client.subscribe('topic', (data) => { +client.messaging.subscribe('topic', (data) => { // ... }); ``` @@ -96,7 +96,7 @@ client.subscribe('topic', (data) => { To unsubscribe from the topic: ```ts -client.unsubscribe('topic'); +client.messaging.unsubscribe('topic'); ``` ## API Reference @@ -129,14 +129,14 @@ Gets a value from the cache. Deletes a value from the cache. -#### `client.publish(topic, data)` +#### `client.messaging.publish(topic, data)` Publishes data to the topic - **topic** (string): Topic name - **data** (string): Payload that will be send to the subscribers -#### `client.subscribe(topic, callback)` +#### `client.messaging.subscribe(topic, callback)` Subscribes to the topic @@ -144,7 +144,7 @@ Subscribes to the topic - **callback** (Function): Callback function that will be invoked when message will be received for this topic -#### `client.unsubscribe(topic)` +#### `client.messaging.unsubscribe(topic)` Unsubscribes from the topic diff --git a/example/client.ts b/example/client.ts deleted file mode 100644 index 54acd3f..0000000 --- a/example/client.ts +++ /dev/null @@ -1,28 +0,0 @@ -import Famcache from '../src'; - -const client = new Famcache({ - host: 'localhost', - port: 3577, -}); - -client - .connect() - .then(() => { - console.log('Connected!'); - - client.subscribe('topic1', (data) => { - console.log('topic1 received data: ', data); - }); - - client.set('key', '10', 3000) - .then(() => { - return client.get('key'); - }) - .then((data) => { - console.log('Received', data); - }); - }) - .catch((e) => { - console.log('Failed to connect'); - }); - diff --git a/example/pubsub/publish.ts b/example/pubsub/publish.ts new file mode 100644 index 0000000..a92e9de --- /dev/null +++ b/example/pubsub/publish.ts @@ -0,0 +1,16 @@ +import Famcache from '../../src'; + +const client = new Famcache({ + host: 'localhost', + port: 3577, +}); + +client + .connect() + .then(() => { + client.messaging.publish('topic1', 'interesting'); + }) + .catch((e) => { + console.log('Failed to connect'); + }); + diff --git a/example/pubsub/subscribe.ts b/example/pubsub/subscribe.ts new file mode 100644 index 0000000..5b3d6ac --- /dev/null +++ b/example/pubsub/subscribe.ts @@ -0,0 +1,18 @@ +import Famcache from '../../src'; + +const client = new Famcache({ + host: 'localhost', + port: 3577, +}); + +client + .connect() + .then(() => { + client.messaging.subscribe('topic1', (data) => { + console.log('Received', data); + }); + }) + .catch((e) => { + console.log('Failed to connect'); + }); + diff --git a/example/pubsub/unsubscribe.ts b/example/pubsub/unsubscribe.ts new file mode 100644 index 0000000..20432e2 --- /dev/null +++ b/example/pubsub/unsubscribe.ts @@ -0,0 +1,16 @@ +import Famcache from '../../src'; + +const client = new Famcache({ + host: 'localhost', + port: 3577, +}); + +client + .connect() + .then(() => { + client.messaging.unsubscribe('topic1'); + }) + .catch((e) => { + console.log('Failed to connect'); + }); + diff --git a/src/famcache.ts b/src/famcache.ts index c9b6715..3e0bf24 100644 --- a/src/famcache.ts +++ b/src/famcache.ts @@ -1,7 +1,7 @@ import { Socket } from 'net'; import { randomUUID } from 'crypto'; import type { ConnectionParams } from './params'; -import type { IMessaging, QueueResolver, SubscribeCallback } from './types'; +import type { IMessaging, QueueResolver } from './types'; import { CacheQuery, get, @@ -15,14 +15,12 @@ class Famcache { private socket: Socket; private params: ConnectionParams; private queue: Map; - private listeners: Map; - public mesaging: IMessaging; + public messaging: IMessaging; constructor(params: ConnectionParams) { this.socket = new Socket(); this.queue = new Map(); - this.listeners = new Map(); - this.mesaging = new Messaging(this.socket); + this.messaging = new Messaging(this.socket); this.params = params; } @@ -38,13 +36,7 @@ class Famcache { if (MessagingEvent.isMessagingEvent(payload)) { const message = MessagingEvent.fromEvent(payload); - if (!this.listeners.has(message.topic)) { - return; - } - - this.listeners - .get(message.topic) - ?.forEach((callback) => callback(message.data)); + (this.messaging as Messaging).trigger(message); return; } diff --git a/src/modules/messaging.ts b/src/modules/messaging.ts index f5c3c3f..180ed87 100644 --- a/src/modules/messaging.ts +++ b/src/modules/messaging.ts @@ -1,11 +1,11 @@ import { randomUUID } from "crypto"; import { Socket } from "net"; -import { publish, subscribe, unsubscribe } from "../transport"; +import { MessagingEvent, publish, subscribe, unsubscribe } from "../transport"; import { IMessaging, SubscribeCallback } from "../types"; export class Messaging implements IMessaging { private socket: Socket; - private listeners: Map; + public listeners: Map; constructor(socket: Socket) { this.socket = socket; @@ -41,4 +41,12 @@ export class Messaging implements IMessaging { this.socket.write(unsubscribe(queryId, topic)); } + + trigger(message: MessagingEvent) { + if (!this.listeners.has(message.topic)) { + return; + } + this.listeners.get(message.topic) + ?.forEach((callback) => callback(message.data)); + } } \ No newline at end of file From b0075ac82e3f76d5833501f482af4d068111ccee Mon Sep 17 00:00:00 2001 From: Kristine Harutyunyan Date: Tue, 18 Jun 2024 16:53:22 +0400 Subject: [PATCH 3/3] fix(Messaging): Fix memory leak on unsubscribe --- README.md | 2 +- src/modules/messaging.ts | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 68daa81..be9f85c 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ client.messaging.subscribe('topic', (data) => { ``` #### Unsubscribe from the topic - + To unsubscribe from the topic: ```ts client.messaging.unsubscribe('topic'); diff --git a/src/modules/messaging.ts b/src/modules/messaging.ts index 180ed87..dbe9e6a 100644 --- a/src/modules/messaging.ts +++ b/src/modules/messaging.ts @@ -40,6 +40,8 @@ export class Messaging implements IMessaging { const queryId = this.genId(); this.socket.write(unsubscribe(queryId, topic)); + + this.listeners.delete(topic); } trigger(message: MessagingEvent) {