diff --git a/README.md b/README.md index c900319..be9f85c 100644 --- a/README.md +++ b/README.md @@ -80,23 +80,23 @@ await client.del('key'); To publish data to the topic: ```ts -client.publish('topic', 'data'); +client.messaging.publish('topic', 'data'); ``` #### Subscribe to the topic To subscribe to the topic: ```ts -client.subscribe('topic', (data) => { +client.messaging.subscribe('topic', (data) => { // ... }); ``` #### Unsubscribe from the topic - + To unsubscribe from the topic: ```ts -client.unsubscribe('topic'); +client.messaging.unsubscribe('topic'); ``` ## API Reference @@ -129,14 +129,14 @@ Gets a value from the cache. Deletes a value from the cache. -#### `client.publish(topic, data)` +#### `client.messaging.publish(topic, data)` Publishes data to the topic - **topic** (string): Topic name - **data** (string): Payload that will be send to the subscribers -#### `client.subscribe(topic, callback)` +#### `client.messaging.subscribe(topic, callback)` Subscribes to the topic @@ -144,7 +144,7 @@ Subscribes to the topic - **callback** (Function): Callback function that will be invoked when message will be received for this topic -#### `client.unsubscribe(topic)` +#### `client.messaging.unsubscribe(topic)` Unsubscribes from the topic diff --git a/example/client.ts b/example/client.ts deleted file mode 100644 index 54acd3f..0000000 --- a/example/client.ts +++ /dev/null @@ -1,28 +0,0 @@ -import Famcache from '../src'; - -const client = new Famcache({ - host: 'localhost', - port: 3577, -}); - -client - .connect() - .then(() => { - console.log('Connected!'); - - client.subscribe('topic1', (data) => { - console.log('topic1 received data: ', data); - }); - - client.set('key', '10', 3000) - .then(() => { - return client.get('key'); - }) - .then((data) => { - console.log('Received', data); - }); - }) - .catch((e) => { - console.log('Failed to connect'); - }); - diff --git a/example/pubsub/publish.ts b/example/pubsub/publish.ts new file mode 100644 index 0000000..a92e9de --- /dev/null +++ b/example/pubsub/publish.ts @@ -0,0 +1,16 @@ +import Famcache from '../../src'; + +const client = new Famcache({ + host: 'localhost', + port: 3577, +}); + +client + .connect() + .then(() => { + client.messaging.publish('topic1', 'interesting'); + }) + .catch((e) => { + console.log('Failed to connect'); + }); + diff --git a/example/pubsub/subscribe.ts b/example/pubsub/subscribe.ts new file mode 100644 index 0000000..5b3d6ac --- /dev/null +++ b/example/pubsub/subscribe.ts @@ -0,0 +1,18 @@ +import Famcache from '../../src'; + +const client = new Famcache({ + host: 'localhost', + port: 3577, +}); + +client + .connect() + .then(() => { + client.messaging.subscribe('topic1', (data) => { + console.log('Received', data); + }); + }) + .catch((e) => { + console.log('Failed to connect'); + }); + diff --git a/example/pubsub/unsubscribe.ts b/example/pubsub/unsubscribe.ts new file mode 100644 index 0000000..20432e2 --- /dev/null +++ b/example/pubsub/unsubscribe.ts @@ -0,0 +1,16 @@ +import Famcache from '../../src'; + +const client = new Famcache({ + host: 'localhost', + port: 3577, +}); + +client + .connect() + .then(() => { + client.messaging.unsubscribe('topic1'); + }) + .catch((e) => { + console.log('Failed to connect'); + }); + diff --git a/src/famcache.ts b/src/famcache.ts index 2a36b3f..3e0bf24 100644 --- a/src/famcache.ts +++ b/src/famcache.ts @@ -1,28 +1,26 @@ import { Socket } from 'net'; import { randomUUID } from 'crypto'; import type { ConnectionParams } from './params'; -import type { QueueResolver, SubscribeCallback } from './types'; +import type { IMessaging, QueueResolver } from './types'; import { CacheQuery, get, set, del, - publish, - unsubscribe, - subscribe, - Messaging, + MessagingEvent, } from './transport'; +import { Messaging } from './modules'; class Famcache { private socket: Socket; private params: ConnectionParams; private queue: Map; - private listeners: Map; + public messaging: IMessaging; constructor(params: ConnectionParams) { this.socket = new Socket(); this.queue = new Map(); - this.listeners = new Map(); + this.messaging = new Messaging(this.socket); this.params = params; } @@ -35,16 +33,10 @@ class Famcache { this.socket.on('data', (data) => { const payload = data.toString(); - if (Messaging.isMessagingEvent(payload)) { - const message = Messaging.fromEvent(payload); + if (MessagingEvent.isMessagingEvent(payload)) { + const message = MessagingEvent.fromEvent(payload); - if (!this.listeners.has(message.topic)) { - return; - } - - this.listeners - .get(message.topic) - ?.forEach((callback) => callback(message.data)); + (this.messaging as Messaging).trigger(message); return; } @@ -112,32 +104,6 @@ class Famcache { this.queue.set(queryId, { resolve: () => resolve(), reject }); }); } - - publish(topic: string, data: string) { - const queryId = this.genId(); - - this.socket.write(publish(queryId, topic, data)); - } - - subscribe(topic: string, callback: SubscribeCallback) { - const queryId = this.genId(); - - this.socket.write(subscribe(queryId, topic)); - - const listeners = this.listeners.get(topic); - - if (!listeners) { - this.listeners.set(topic, [callback]); - } else { - listeners.push(callback); - } - } - - unsubscribe(topic: string) { - const queryId = this.genId(); - - this.socket.write(unsubscribe(queryId, topic)); - } } export default Famcache; diff --git a/src/modules/index.ts b/src/modules/index.ts new file mode 100644 index 0000000..d2d3b8c --- /dev/null +++ b/src/modules/index.ts @@ -0,0 +1 @@ +export * from './messaging'; \ No newline at end of file diff --git a/src/modules/messaging.ts b/src/modules/messaging.ts new file mode 100644 index 0000000..dbe9e6a --- /dev/null +++ b/src/modules/messaging.ts @@ -0,0 +1,54 @@ +import { randomUUID } from "crypto"; +import { Socket } from "net"; +import { MessagingEvent, publish, subscribe, unsubscribe } from "../transport"; +import { IMessaging, SubscribeCallback } from "../types"; + +export class Messaging implements IMessaging { + private socket: Socket; + public listeners: Map; + + constructor(socket: Socket) { + this.socket = socket; + this.listeners = new Map(); + } + + private genId() { + return randomUUID(); + } + + publish(topic: string, data: string) { + const queryId = this.genId(); + + this.socket.write(publish(queryId, topic, data)); + } + + subscribe(topic: string, callback: SubscribeCallback) { + const queryId = this.genId(); + + this.socket.write(subscribe(queryId, topic)); + + const listeners = this.listeners.get(topic); + + if (!listeners) { + this.listeners.set(topic, [callback]); + } else { + listeners.push(callback); + } + } + + unsubscribe(topic: string) { + const queryId = this.genId(); + + this.socket.write(unsubscribe(queryId, topic)); + + this.listeners.delete(topic); + } + + trigger(message: MessagingEvent) { + if (!this.listeners.has(message.topic)) { + return; + } + this.listeners.get(message.topic) + ?.forEach((callback) => callback(message.data)); + } +} \ No newline at end of file diff --git a/src/transport/index.ts b/src/transport/index.ts index f2ed488..a88aa86 100644 --- a/src/transport/index.ts +++ b/src/transport/index.ts @@ -1,3 +1,3 @@ export * from './cache-query'; export * from './commands'; -export * from './messaging'; +export * from './messaging-event'; diff --git a/src/transport/messaging.ts b/src/transport/messaging-event.ts similarity index 69% rename from src/transport/messaging.ts rename to src/transport/messaging-event.ts index 23a6807..7e323e1 100644 --- a/src/transport/messaging.ts +++ b/src/transport/messaging-event.ts @@ -1,12 +1,12 @@ -export class Messaging { +export class MessagingEvent { static isMessagingEvent(event: string): boolean { return event.startsWith('MESSAGE '); } - static fromEvent(event: string): Messaging { + static fromEvent(event: string): MessagingEvent { const [, topic, data] = event.split(' '); - return new Messaging(topic, data); + return new MessagingEvent(topic, data); } public topic: string; @@ -16,4 +16,5 @@ export class Messaging { this.topic = topic; this.data = data; } + } diff --git a/src/types.ts b/src/types.ts index 24c9209..e1d432c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -6,3 +6,9 @@ export type QueueResolver = { }; export type SubscribeCallback = (data: string) => void; + +export type IMessaging = { + publish(topic: string, data: string): void; + subscribe(topic: string, callback: SubscribeCallback): void; + unsubscribe(topic: string): void; +} \ No newline at end of file