Skip to content

Commit

Permalink
Merge pull request #6 from Famcache/refactor/messaging
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Moved messaging related methods to separate module
  • Loading branch information
shahen94 authored Jun 18, 2024
2 parents 64c54b6 + b0075ac commit c5df836
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 81 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,23 @@ await client.del('key');

To publish data to the topic:
```ts
client.publish('topic', 'data');
client.messaging.publish('topic', 'data');
```

#### Subscribe to the topic

To subscribe to the topic:
```ts
client.subscribe('topic', (data) => {
client.messaging.subscribe('topic', (data) => {
// ...
});
```

#### Unsubscribe from the topic

To unsubscribe from the topic:
```ts
client.unsubscribe('topic');
client.messaging.unsubscribe('topic');
```

## API Reference
Expand Down Expand Up @@ -129,22 +129,22 @@ Gets a value from the cache.

Deletes a value from the cache.

#### `client.publish(topic, data)`
#### `client.messaging.publish(topic, data)`

Publishes data to the topic

- **topic** (string): Topic name
- **data** (string): Payload that will be send to the subscribers

#### `client.subscribe(topic, callback)`
#### `client.messaging.subscribe(topic, callback)`

Subscribes to the topic

- **topic** (string): Topic name
- **callback** (Function): Callback function that will be invoked when message will be received for this topic


#### `client.unsubscribe(topic)`
#### `client.messaging.unsubscribe(topic)`

Unsubscribes from the topic

Expand Down
28 changes: 0 additions & 28 deletions example/client.ts

This file was deleted.

16 changes: 16 additions & 0 deletions example/pubsub/publish.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import Famcache from '../../src';

const client = new Famcache({
host: 'localhost',
port: 3577,
});

client
.connect()
.then(() => {
client.messaging.publish('topic1', 'interesting');
})
.catch((e) => {
console.log('Failed to connect');
});

18 changes: 18 additions & 0 deletions example/pubsub/subscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import Famcache from '../../src';

const client = new Famcache({
host: 'localhost',
port: 3577,
});

client
.connect()
.then(() => {
client.messaging.subscribe('topic1', (data) => {
console.log('Received', data);
});
})
.catch((e) => {
console.log('Failed to connect');
});

16 changes: 16 additions & 0 deletions example/pubsub/unsubscribe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import Famcache from '../../src';

const client = new Famcache({
host: 'localhost',
port: 3577,
});

client
.connect()
.then(() => {
client.messaging.unsubscribe('topic1');
})
.catch((e) => {
console.log('Failed to connect');
});

50 changes: 8 additions & 42 deletions src/famcache.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
import { Socket } from 'net';
import { randomUUID } from 'crypto';
import type { ConnectionParams } from './params';
import type { QueueResolver, SubscribeCallback } from './types';
import type { IMessaging, QueueResolver } from './types';
import {
CacheQuery,
get,
set,
del,
publish,
unsubscribe,
subscribe,
Messaging,
MessagingEvent,
} from './transport';
import { Messaging } from './modules';

class Famcache {
private socket: Socket;
private params: ConnectionParams;
private queue: Map<string, QueueResolver>;
private listeners: Map<string, SubscribeCallback[]>;
public messaging: IMessaging;

constructor(params: ConnectionParams) {
this.socket = new Socket();
this.queue = new Map();
this.listeners = new Map();
this.messaging = new Messaging(this.socket);

this.params = params;
}
Expand All @@ -35,16 +33,10 @@ class Famcache {
this.socket.on('data', (data) => {
const payload = data.toString();

if (Messaging.isMessagingEvent(payload)) {
const message = Messaging.fromEvent(payload);
if (MessagingEvent.isMessagingEvent(payload)) {
const message = MessagingEvent.fromEvent(payload);

if (!this.listeners.has(message.topic)) {
return;
}

this.listeners
.get(message.topic)
?.forEach((callback) => callback(message.data));
(this.messaging as Messaging).trigger(message);
return;
}

Expand Down Expand Up @@ -112,32 +104,6 @@ class Famcache {
this.queue.set(queryId, { resolve: () => resolve(), reject });
});
}

publish(topic: string, data: string) {
const queryId = this.genId();

this.socket.write(publish(queryId, topic, data));
}

subscribe(topic: string, callback: SubscribeCallback) {
const queryId = this.genId();

this.socket.write(subscribe(queryId, topic));

const listeners = this.listeners.get(topic);

if (!listeners) {
this.listeners.set(topic, [callback]);
} else {
listeners.push(callback);
}
}

unsubscribe(topic: string) {
const queryId = this.genId();

this.socket.write(unsubscribe(queryId, topic));
}
}

export default Famcache;
1 change: 1 addition & 0 deletions src/modules/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './messaging';
54 changes: 54 additions & 0 deletions src/modules/messaging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { randomUUID } from "crypto";
import { Socket } from "net";
import { MessagingEvent, publish, subscribe, unsubscribe } from "../transport";
import { IMessaging, SubscribeCallback } from "../types";

export class Messaging implements IMessaging {
private socket: Socket;
public listeners: Map<string, SubscribeCallback[]>;

constructor(socket: Socket) {
this.socket = socket;
this.listeners = new Map();
}

private genId() {
return randomUUID();
}

publish(topic: string, data: string) {
const queryId = this.genId();

this.socket.write(publish(queryId, topic, data));
}

subscribe(topic: string, callback: SubscribeCallback) {
const queryId = this.genId();

this.socket.write(subscribe(queryId, topic));

const listeners = this.listeners.get(topic);

if (!listeners) {
this.listeners.set(topic, [callback]);
} else {
listeners.push(callback);
}
}

unsubscribe(topic: string) {
const queryId = this.genId();

this.socket.write(unsubscribe(queryId, topic));

this.listeners.delete(topic);
}

trigger(message: MessagingEvent) {
if (!this.listeners.has(message.topic)) {
return;
}
this.listeners.get(message.topic)
?.forEach((callback) => callback(message.data));
}
}
2 changes: 1 addition & 1 deletion src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from './cache-query';
export * from './commands';
export * from './messaging';
export * from './messaging-event';
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
export class Messaging {
export class MessagingEvent {
static isMessagingEvent(event: string): boolean {
return event.startsWith('MESSAGE ');
}

static fromEvent(event: string): Messaging {
static fromEvent(event: string): MessagingEvent {
const [, topic, data] = event.split(' ');

return new Messaging(topic, data);
return new MessagingEvent(topic, data);
}

public topic: string;
Expand All @@ -16,4 +16,5 @@ export class Messaging {
this.topic = topic;
this.data = data;
}

}
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,9 @@ export type QueueResolver = {
};

export type SubscribeCallback = (data: string) => void;

export type IMessaging = {
publish(topic: string, data: string): void;
subscribe(topic: string, callback: SubscribeCallback): void;
unsubscribe(topic: string): void;
}

0 comments on commit c5df836

Please sign in to comment.