RPC Call (Point to Point) #105
Unanswered
ganikurnia
asked this question in
Q&A
Replies: 2 comments
-
You should be able to follow a guide from any other client library. For example this one: https://rabbitmq.com/tutorials/tutorial-six-python.html But we should add a higher level RPC api to make it easier. |
Beta Was this translation helpful? Give feedback.
0 replies
-
I've been able to use create-amqp-rpc.tsimport type {
AMQPChannel,
AMQPClient,
AMQPConsumer,
AMQPMessage,
AMQPQueue,
AMQPWebSocketClient,
} from "@cloudamqp/amqp-client";
import { AMQPError } from "@cloudamqp/amqp-client";
import { v4 as uuid } from "uuid";
import { getLogger } from "~/services/logging";
export type RpcClientFunc = (
rpcTopic: string,
data: string | Uint8Array | ArrayBuffer | Buffer | null,
props?: { timeoutMs?: number; reportTiming?: boolean }
) => Promise<AMQPMessage>;
export async function consumerCancelClose(
consumer: AMQPConsumer
): Promise<void> {
try {
await consumer.cancel();
} catch (err) {
consumer.setClosed(
err instanceof Error || err instanceof AMQPError
? err
: new Error("consumer.setClosed() failed")
);
}
}
interface AmqpRpc {
initialize(): Promise<AmqpRpc>;
wait(): Promise<void>;
clientFunc: RpcClientFunc;
}
export function createAmqpRpc(init: {
amqpClient: AMQPWebSocketClient | AMQPClient;
rpcExchangeName: string;
signal: AbortSignal;
}): AmqpRpc {
let amqpRpc: AmqpRpc;
const logger = getLogger("~/services/create-amqp-rpc.ts:createAmqpRpc");
const rpcExchangeName = init.rpcExchangeName;
const signal = init.signal;
let ch: AMQPChannel;
let q: AMQPQueue;
let replyRoutingKey: string;
let connected: Promise<void>;
const rpcMap = new Map<
string,
{
resolve: (value: AMQPMessage) => void;
reject: (reason?: any) => void;
startMs?: number;
}
>();
async function initializeImpl() {
ch = await init.amqpClient.channel();
try {
signal.throwIfAborted();
const replyExchangeName = `${rpcExchangeName}_response`;
await ch.exchangeDeclare(rpcExchangeName, "topic", {
durable: true,
});
await ch.exchangeDeclare(replyExchangeName, "topic", {
durable: true,
});
signal.throwIfAborted();
q = await ch.queue("", {
durable: false,
exclusive: true,
autoDelete: true,
});
signal.throwIfAborted();
logger.info("Binding to RPC topics");
replyRoutingKey = q.name;
await q.bind(replyExchangeName, replyRoutingKey);
signal.throwIfAborted();
} catch (e) {
if (!ch.closed) {
await ch.close(signal.aborted ? "Canceled" : "");
}
throw e;
}
return;
}
async function connectedImpl() {
try {
const messageCallback = (msg: AMQPMessage) => {
const correlationId = msg.properties.correlationId;
if (correlationId == null) return;
const value = rpcMap.get(correlationId);
if (value == null) {
return;
}
if (value.startMs != null) {
const roundTripTime = performance.now() - value.startMs;
console.debug(
`Timing for RPC response was ${roundTripTime.toFixed(2)} ms`
);
}
value.resolve(msg);
};
const consumer = await q.subscribe({ noAck: false }, (msg) => {
if (signal.aborted) {
return;
}
messageCallback(msg);
});
logger.info("AMQP RPC topic subscription started");
signal.onabort = () => {
if (!init.amqpClient.closed) {
consumerCancelClose(consumer).catch((err) =>
logger.error({ err }, "error in consumerCancelClose")
);
}
};
if (signal.aborted) {
await consumerCancelClose(consumer);
return;
}
try {
await consumer.wait(); // will block until consumer is canceled or throw an error if server closed channel/connection
if (signal.aborted) {
logger.info("AMQP RPC client stopping");
return;
}
} catch (err) {
if (
err instanceof Error &&
err.message === "RPC Connection closed by client"
) {
logger.info("AMQP RPC topic subscription ended");
} else {
logger.error({ err }, "AMQP RPC topic subscription ended");
}
}
} finally {
if (!ch.closed) {
await ch.close(signal.aborted ? "Canceled" : "");
}
}
}
amqpRpc = {
async initialize(): Promise<AmqpRpc> {
if (init.amqpClient.closed) {
throw new Error("AMQP Client is closed");
}
await initializeImpl();
connected = connectedImpl();
return amqpRpc;
},
wait() {
return connected;
},
async clientFunc(
rpcTopic,
data,
{ timeoutMs = 200, reportTiming = true } = {}
) {
const correlationId = uuid();
const rpcPromise = new Promise<AMQPMessage>((resolve, reject) => {
ch.basicPublish(init.rpcExchangeName, rpcTopic, data, {
replyTo: replyRoutingKey,
correlationId,
expiration: (timeoutMs / 2).toString(),
headers: { "Content-Type": "application/x-protobuf" },
userId: ch.connection.username,
}).then(
(confirmId) => {
rpcMap.set(correlationId, {
resolve,
reject,
startMs: reportTiming ? performance.now() : undefined,
});
},
(reason) => reject(reason)
);
});
let timer: NodeJS.Timeout | number | undefined;
const timerPromise = new Promise<AMQPMessage>((_resolve, reject) => {
timer = setTimeout(
() => reject(new Error("AMQP RPC timeout")),
timeoutMs
);
});
try {
return await Promise.race([rpcPromise, timerPromise]);
} finally {
clearTimeout(timer);
}
},
};
return amqpRpc;
} All the edge cases probably aren't handled yet, but it's a start. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
How to make RPC calls using existing functions, because I found no explicit examples, other than just functions in the library
Beta Was this translation helpful? Give feedback.
All reactions