Skip to content

Commit

Permalink
wantedRooms websocket update
Browse files Browse the repository at this point in the history
  • Loading branch information
notjuliet committed Oct 19, 2024
1 parent f479cbb commit 9e05893
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 27 deletions.
7 changes: 6 additions & 1 deletion src/db/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ const updateMessage = async (msg: Message) => {
};

const deleteMessage = async (uri: string) => {
await ctx.db.deleteFrom("messages").where("uri", "=", uri).executeTakeFirst();
const res = await ctx.db
.deleteFrom("messages")
.where("uri", "=", uri)
.returning("room as room")
.executeTakeFirst();
ctx.logger.info(`Deleted message: ${uri}`);
return res;
};

export { getMessage, addMessage, updateMessage, deleteMessage };
13 changes: 9 additions & 4 deletions src/lib/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,19 @@ const FacetsSchema = t.optional(
),
),
);
type FacetsInterface = t.Infer<typeof FacetsSchema>;
type FacetsI = t.Infer<typeof FacetsSchema>;

const GetMessagesSchema = t.object({
uri: t.optional(t.string({ format: "uri" })),
limit: t.integer({ minimum: 1, maximum: 100, default: 50 }),
cursor: t.optional(t.integer({ minimum: 0 })),
});
type GetMessagesInterface = t.Infer<typeof GetMessagesSchema>;
type GetMessagesI = t.Infer<typeof GetMessagesSchema>;

export { FacetsSchema, GetMessagesSchema };
export type { FacetsInterface, GetMessagesInterface };
const SocketQuerySchema = t.object({
wantedRooms: t.optional(t.array(t.string({ format: "uri" }))),
});
type SocketQueryI = t.Infer<typeof SocketQuerySchema>;

export { FacetsSchema, GetMessagesSchema, SocketQuerySchema };
export type { FacetsI, GetMessagesI, SocketQueryI };
3 changes: 2 additions & 1 deletion src/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ export function startJetstream(server: FastifyInstance, ctx: AppContext) {
const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`;
let record;
if (event.commit.operation === "delete") {
await deleteMessage(uri);
const res = await deleteMessage(uri);
record = {
$type: "social.psky.chat.message#delete",
did: event.did,
rkey: event.commit.rkey,
room: res?.room,
};
} else {
let user = await getUser(event.did);
Expand Down
55 changes: 34 additions & 21 deletions src/routes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { FastifyInstance } from "fastify";
import type { AppContext } from "./index.js";
import { GetMessagesInterface, GetMessagesSchema } from "./lib/schemas.js";
import {
GetMessagesI,
GetMessagesSchema,
SocketQueryI,
SocketQuerySchema,
} from "./lib/schemas.js";

let ipSet: Record<string, number> = {};
const serverState = (sessionCount: number) =>
Expand All @@ -11,30 +16,38 @@ export const createRouter = (server: FastifyInstance, ctx: AppContext) => {
const stream = server.websocketServer;
stream.setMaxListeners(0);

server.get("/subscribe", { websocket: true }, (socket, req) => {
if (!ipSet[req.ip]) {
ipSet[req.ip] = 1;
stream.emit("message", serverState(Object.keys(ipSet).length));
} else {
ipSet[req.ip] += 1;
}
socket.send(serverState(Object.keys(ipSet).length));
const callback = (data: any) => {
socket.send(String(data));
};
stream.on("message", callback);
socket.on("close", () => {
stream.removeListener("data", callback);
ipSet[req.ip] -= 1;
if (ipSet[req.ip] == 0) {
delete ipSet[req.ip];
server.get<{ Querystring: SocketQueryI }>(
"/subscribe",
{ schema: { querystring: SocketQuerySchema }, websocket: true },
(socket, req) => {
if (!ipSet[req.ip]) {
ipSet[req.ip] = 1;
stream.emit("message", serverState(Object.keys(ipSet).length));
} else {
ipSet[req.ip] += 1;
}
});
});
socket.send(serverState(Object.keys(ipSet).length));
const callback = (data: any) => {
if (
!req.query.wantedRooms ||
req.query.wantedRooms?.includes(JSON.parse(data).room)
)
socket.send(String(data));
};
stream.on("message", callback);
socket.on("close", () => {
stream.removeListener("data", callback);
ipSet[req.ip] -= 1;
if (ipSet[req.ip] == 0) {
delete ipSet[req.ip];
stream.emit("message", serverState(Object.keys(ipSet).length));
}
});
},
);
});

server.get<{ Querystring: GetMessagesInterface }>(
server.get<{ Querystring: GetMessagesI }>(
"/xrpc/social.psky.chat.getMessages",
{ schema: { querystring: GetMessagesSchema } },
async (req, res) => {
Expand Down

0 comments on commit 9e05893

Please sign in to comment.