From eac32ba19a782020e5afa58876a0bc3e6e0df860 Mon Sep 17 00:00:00 2001 From: notjuliet Date: Tue, 15 Oct 2024 11:27:08 +0200 Subject: [PATCH] rewrite --- .env.example | 3 - .gitignore | 1 + .../{feed/post.json => chat/message.json} | 18 +- lexicons/social/psky/chat/room.json | 59 ++++ package-lock.json | 10 +- package.json | 1 + src/db.ts | 135 --------- src/db/init.ts | 135 +++++++++ src/db/message.ts | 56 ++++ src/db/room.ts | 75 +++++ src/db/user.ts | 59 ++++ src/index.ts | 92 +++--- src/lib/lexicon.ts | 48 ++- src/lib/schemas.ts | 9 +- src/relay.ts | 282 +++++++++--------- src/routes.ts | 29 +- src/{utils.ts => utils/api.ts} | 0 src/{ => utils}/env.ts | 4 +- src/utils/types.ts | 26 ++ 19 files changed, 672 insertions(+), 370 deletions(-) rename lexicons/social/psky/{feed/post.json => chat/message.json} (65%) create mode 100644 lexicons/social/psky/chat/room.json delete mode 100644 src/db.ts create mode 100644 src/db/init.ts create mode 100644 src/db/message.ts create mode 100644 src/db/room.ts create mode 100644 src/db/user.ts rename src/{utils.ts => utils/api.ts} (100%) rename src/{ => utils}/env.ts (83%) create mode 100644 src/utils/types.ts diff --git a/.env.example b/.env.example index 3d326fc..e9ded87 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,3 @@ NODE_ENV="test" PORT=3000 DB_PATH=":memory:" -DID="did:example:123" -PASSWORD="password" -SERVICE="https://bsky.social" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 6bf27d2..dbdbaf3 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ node_modules dist .env *.db* +cursor.txt diff --git a/lexicons/social/psky/feed/post.json b/lexicons/social/psky/chat/message.json similarity index 65% rename from lexicons/social/psky/feed/post.json rename to lexicons/social/psky/chat/message.json index faa7196..f2c6c27 100644 --- a/lexicons/social/psky/feed/post.json +++ b/lexicons/social/psky/chat/message.json @@ -1,23 +1,25 @@ { "lexicon": 1, - "id": "social.psky.feed.post", + "id": "social.psky.chat.message", "defs": { "main": { "type": "record", - "description": "A Picosky post containing at most 256 graphemes.", + "description": "A Picosky message containing at most 2048 graphemes.", "key": "tid", "record": { "type": "object", - "required": [ - "text" - ], + "required": ["content", "room"], "properties": { - "text": { + "content": { "type": "string", - "maxLength": 2560, - "maxGraphemes": 256, + "maxLength": 20480, + "maxGraphemes": 2048, "description": "Text content." }, + "room": { + "type": "string", + "format": "at-uri" + }, "facets": { "type": "array", "description": "Annotations of text (mentions, URLs, hashtags, etc)", diff --git a/lexicons/social/psky/chat/room.json b/lexicons/social/psky/chat/room.json new file mode 100644 index 0000000..7fc46e7 --- /dev/null +++ b/lexicons/social/psky/chat/room.json @@ -0,0 +1,59 @@ +{ + "lexicon": 1, + "id": "social.psky.chat.room", + "defs": { + "main": { + "type": "record", + "description": "A Picosky room belonging to the user.", + "key": "tid", + "record": { + "type": "object", + "required": ["name"], + "properties": { + "name": { + "type": "string", + "maxGraphemes": 32, + "maxLength": 320 + }, + "languages": { + "type": "array", + "maxLength": 3, + "items": { "type": "string", "format": "language" } + }, + "topic": { + "type": "string", + "maxLength": 2560, + "maxGraphemes": 256, + "description": "Topic title of the room." + }, + "tags": { + "type": "array", + "maxLength": 20, + "items": { "type": "string" } + }, + "allowlist": { + "type": "ref", + "ref": "#modlistRef", + "description": "List of users allowed to send messages in the room." + }, + "denylist": { + "type": "ref", + "ref": "#modlistRef", + "description": "List of users disallowed to send messages in the room." + } + } + } + }, + "modlistRef": { + "type": "object", + "required": ["active", "users"], + "properties": { + "active": { "type": "boolean", "default": false }, + "users": { + "type": "array", + "items": { "type": "string", "format": "did" } + } + } + } + } +} diff --git a/package-lock.json b/package-lock.json index 9507ff2..6283bae 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "0.0.0", "license": "BSD-3-Clause", "dependencies": { + "@atcute/client": "^2.0.3", "@fastify/cors": "^10.0.1", "@fastify/rate-limit": "^10.1.0", "@fastify/websocket": "^11.0.1", @@ -44,8 +45,7 @@ "version": "2.0.3", "resolved": "https://registry.npmjs.org/@atcute/client/-/client-2.0.3.tgz", "integrity": "sha512-j9GryA5l+4F0BTQWa6/1XmsuSPSq+bqNCY3mrHUGD592hMqUZxgpYDLgRWL+719V287AW/56AwvFYlbjlENp7A==", - "license": "MIT", - "peer": true + "license": "MIT" }, "node_modules/@atcute/lex-cli": { "version": "1.0.2", @@ -1829,9 +1829,9 @@ } }, "node_modules/tschema": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/tschema/-/tschema-3.1.0.tgz", - "integrity": "sha512-Q/+uKRKztF9e1vOUYo3sASLhfh5uPGp6su3LQjrZKqp520AclA+sIjt1+l+XgzQTjPpi90TEWdnC2abdNGRgAA==", + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/tschema/-/tschema-3.2.0.tgz", + "integrity": "sha512-WC0IUxN7vo7lFH4FoscD/A8rtE4mpYPNfEVUfjDdZ5ML6d2H4WcBY62rwxxLoN+B/2B4Ffi+FLkSFgdOhG8goA==", "license": "MIT", "engines": { "node": ">=14" diff --git a/package.json b/package.json index 5779013..7d8ca0a 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "lex": "npx lex-cli generate ./lexicons/social/psky/**/*.json -o ./src/lib/lexicon.ts" }, "dependencies": { + "@atcute/client": "^2.0.3", "@fastify/cors": "^10.0.1", "@fastify/rate-limit": "^10.1.0", "@fastify/websocket": "^11.0.1", diff --git a/src/db.ts b/src/db.ts deleted file mode 100644 index 4fa4f23..0000000 --- a/src/db.ts +++ /dev/null @@ -1,135 +0,0 @@ -import SqliteDb from "better-sqlite3"; -import { - Kysely, - Migrator, - SqliteDialect, - Migration, - MigrationProvider, - CompiledQuery, -} from "kysely"; - -// Types - -export type DatabaseSchema = { - posts: Post; - accounts: Account; -}; - -export type Post = { - uri: string; - cid: string; - post: string; - facets: string | null; // JSON string - reply: string | null; // JSON string - account_did: string; - indexed_at: number; - updated_at: number | null; -}; - -export type Account = { - did: string; - handle: string; - nickname: string | null; -}; - -// Migrations - -const migrations: Record = {}; - -const migrationProvider: MigrationProvider = { - async getMigrations() { - return migrations; - }, -}; - -migrations["001"] = { - async up(db: Kysely) { - await db.schema - .createTable("posts") - .addColumn("uri", "text", (col) => col.primaryKey()) - .addColumn("post", "text", (col) => col.notNull()) - .addColumn("account_did", "text", (col) => col.notNull()) - .addColumn("indexed_at", "integer", (col) => col.notNull()) - .addForeignKeyConstraint( - "account_foreign", - ["account_did"], - "accounts", - ["did"], - (col) => col.onDelete("cascade"), - ) - .execute(); - - await db.schema - .createTable("accounts") - .addColumn("did", "text", (col) => col.primaryKey()) - .addColumn("handle", "text", (col) => col.notNull()) - .addColumn("nickname", "text") - .execute(); - }, - async down(db: Kysely) { - await db.schema.dropTable("posts").execute(); - await db.schema.dropTable("accounts").execute(); - }, -}; - -migrations["002"] = { - async up(db: Kysely) { - await db.schema.alterTable("posts").addColumn("facets", "text").execute(); - }, - async down(db: Kysely) { - await db.schema.dropTable("posts").execute(); - }, -}; - -migrations["003"] = { - async up(db: Kysely) { - await db.schema - .alterTable("posts") - .addColumn("updated_at", "integer") - .execute(); - }, - async down(db: Kysely) { - await db.schema.dropTable("posts").execute(); - }, -}; - -migrations["004"] = { - async up(db: Kysely) { - await db.schema - .alterTable("posts") - .addColumn("cid", "text", (col) => col.notNull()) - .execute(); - }, - async down(db: Kysely) { - await db.schema.dropTable("posts").execute(); - }, -}; - -migrations["005"] = { - async up(db: Kysely) { - await db.schema.alterTable("posts").addColumn("reply", "text").execute(); - }, - async down(db: Kysely) { - await db.schema.dropTable("posts").execute(); - }, -}; - -// APIs - -export const createDb = (location: string): Database => { - const db = new Kysely({ - dialect: new SqliteDialect({ - database: new SqliteDb(location), - }), - }); - db.executeQuery(CompiledQuery.raw("PRAGMA journal_mode = WAL")); - return db; -}; - -export const migrateToLatest = async (db: Database) => { - const migrator = new Migrator({ db, provider: migrationProvider }); - const { error } = await migrator.migrateToLatest(); - if (error) throw error; -}; - -export type Database = Kysely; diff --git a/src/db/init.ts b/src/db/init.ts new file mode 100644 index 0000000..48de318 --- /dev/null +++ b/src/db/init.ts @@ -0,0 +1,135 @@ +import SqliteDb from "better-sqlite3"; +import { + Kysely, + Migrator, + SqliteDialect, + Migration, + MigrationProvider, + CompiledQuery, + JSONColumnType, + ColumnType, +} from "kysely"; +import { FacetsInterface } from "../lib/schemas.js"; + +// Types + +export type DatabaseSchema = { + messages: MessageTable; + users: UserTable; + rooms: RoomTable; +}; + +export type MessageTable = { + uri: ColumnType; + cid: string; + did: ColumnType; + content: string; + room: ColumnType; + facets: JSONColumnType | null; + reply: JSONColumnType<{ uri: string; cid: string }> | null; + indexed_at: ColumnType; + updated_at: ColumnType | null; +}; + +export type RoomTable = { + uri: ColumnType; + cid: string; + owner_did: ColumnType; + name: string; + languages: JSONColumnType | null; + topic: string | null; + tags: JSONColumnType | null; + allowlist: JSONColumnType<{ active: boolean; users: string[] }> | null; + denylist: JSONColumnType<{ active: boolean; users: string[] }> | null; + updated_at: ColumnType; +}; + +export type UserTable = { + did: ColumnType; + handle: string; + active: ColumnType; + nickname: string | null; + updated_at: ColumnType; +}; + +// Migrations + +const migrations: Record = {}; + +const migrationProvider: MigrationProvider = { + async getMigrations() { + return migrations; + }, +}; + +migrations["001"] = { + async up(db: Kysely) { + await db.schema + .createTable("messages") + .addColumn("uri", "text", (col) => col.primaryKey()) + .addColumn("cid", "text", (col) => col.notNull()) + .addColumn("did", "text", (col) => + col.notNull().references("users.did").onDelete("cascade"), + ) + .addColumn("content", "text", (col) => col.notNull()) + .addColumn("room", "text", (col) => + col.notNull().references("rooms.uri").onDelete("cascade"), + ) + .addColumn("facets", "text") + .addColumn("reply", "text") + .addColumn("indexed_at", "integer", (col) => col.notNull()) + .addColumn("updated_at", "integer") + .execute(); + + await db.schema + .createTable("rooms") + .addColumn("uri", "text", (col) => col.primaryKey()) + .addColumn("cid", "text", (col) => col.notNull()) + .addColumn("name", "text", (col) => col.notNull()) + .addColumn("owner_did", "text", (col) => + col.notNull().references("users.did").onDelete("cascade"), + ) + .addColumn("languages", "text") + .addColumn("topic", "text") + .addColumn("tags", "text") + .addColumn("allowlist", "text") + .addColumn("denylist", "text") + .addColumn("updated_at", "integer", (col) => col.notNull()) + .execute(); + + await db.schema + .createTable("users") + .addColumn("did", "text", (col) => col.primaryKey()) + .addColumn("handle", "text", (col) => col.notNull()) + .addColumn("active", "integer", (col) => col.notNull().defaultTo(true)) + .addColumn("nickname", "text") + .addColumn("updated_at", "integer", (col) => col.notNull()) + .execute(); + }, + async down(db: Kysely) { + await db.schema.dropTable("messages").execute(); + await db.schema.dropTable("users").execute(); + await db.schema.dropTable("rooms").execute(); + }, +}; + +// APIs + +export const createDb = (location: string): Database => { + const db = new Kysely({ + dialect: new SqliteDialect({ + database: new SqliteDb(location), + }), + }); + db.executeQuery(CompiledQuery.raw("PRAGMA journal_mode = WAL")); + db.executeQuery(CompiledQuery.raw("PRAGMA foreign_keys = ON")); + return db; +}; + +export const migrateToLatest = async (db: Database) => { + const migrator = new Migrator({ db, provider: migrationProvider }); + const { error } = await migrator.migrateToLatest(); + if (error) throw error; +}; + +export type Database = Kysely; diff --git a/src/db/message.ts b/src/db/message.ts new file mode 100644 index 0000000..5aabc2b --- /dev/null +++ b/src/db/message.ts @@ -0,0 +1,56 @@ +import { countGrapheme } from "unicode-segmenter"; +import { ctx } from "../index.js"; +import { CHARLIMIT, GRAPHLIMIT } from "../utils/env.js"; +import { Message } from "../utils/types.js"; + +const validateMessage = (msg: string) => + countGrapheme(msg) <= GRAPHLIMIT && msg.length <= CHARLIMIT; + +const getMessage = async (uri: string) => { + return ctx.db + .selectFrom("messages") + .where("uri", "=", uri) + .selectAll() + .executeTakeFirst(); +}; + +const addMessage = async (msg: Message) => { + if (!validateMessage(msg.msg.content)) return; + await ctx.db + .insertInto("messages") + .values({ + uri: msg.uri, + cid: msg.cid, + did: msg.did, + room: msg.msg.room, + facets: JSON.stringify(msg.msg.facets) ?? null, + reply: JSON.stringify(msg.msg.reply) ?? null, + content: msg.msg.content, + indexed_at: Date.now(), + }) + .executeTakeFirst(); + ctx.logger.info(`Added message: ${msg.uri}`); +}; + +const updateMessage = async (msg: Message) => { + if (!validateMessage(msg.msg.content)) return; + (await getMessage(msg.uri)) ?? (await addMessage(msg)); + await ctx.db + .updateTable("messages") + .set({ + cid: msg.cid, + content: msg.msg.content, + facets: JSON.stringify(msg.msg.facets) ?? null, + updated_at: Date.now(), + }) + .where("uri", "=", msg.uri) + .executeTakeFirst(); + ctx.logger.info(`Updated message: ${msg.uri}`); +}; + +const deleteMessage = async (uri: string) => { + await ctx.db.deleteFrom("messages").where("uri", "=", uri).executeTakeFirst(); + ctx.logger.info(`Deleted message: ${uri}`); +}; + +export { getMessage, addMessage, updateMessage, deleteMessage }; diff --git a/src/db/room.ts b/src/db/room.ts new file mode 100644 index 0000000..77766a4 --- /dev/null +++ b/src/db/room.ts @@ -0,0 +1,75 @@ +import { countGrapheme } from "unicode-segmenter"; +import { ctx } from "../index.js"; +import { Room } from "../utils/types.js"; + +const validateRoomName = (name: string) => + countGrapheme(name) <= 32 && name.length <= 320; + +const validateTopic = (topic: string) => { + return countGrapheme(topic) <= 256 && topic.length <= 2560 ? topic : null; +}; + +const getRoom = async (uri: string) => { + return ctx.db + .selectFrom("rooms") + .where("uri", "=", uri) + .selectAll() + .executeTakeFirst(); +}; + +const getRoomByName = async (name: string, did: string) => { + return ctx.db + .selectFrom("rooms") + .where("owner_did", "=", did) + .where("name", "=", name) + .selectAll() + .executeTakeFirst(); +}; + +const addRoom = async (room: Room) => { + if (!validateRoomName) return; + if (await getRoomByName(room.room.name, room.owner)) return; + const res = await ctx.db + .insertInto("rooms") + .values({ + uri: room.uri, + cid: room.cid, + owner_did: room.owner, + name: room.room.name, + topic: room.room.topic && validateTopic(room.room.topic), + allowlist: JSON.stringify(room.room.allowlist) ?? null, + denylist: JSON.stringify(room.room.denylist) ?? null, + updated_at: Date.now(), + }) + .executeTakeFirst(); + ctx.logger.info(`Added room: ${room.uri}`); + return res; +}; + +const updateRoom = async (room: Room) => { + if (!validateRoomName) return; + const a = await getRoomByName(room.room.name, room.owner); + if (a && a.uri !== room.uri) return; + const res = (await getRoom(room.uri)) ?? (await addRoom(room)); + await ctx.db + .updateTable("rooms") + .set({ + name: room.room.name, + cid: room.cid, + topic: room.room.topic && validateTopic(room.room.topic), + allowlist: JSON.stringify(room.room.allowlist) ?? null, + denylist: JSON.stringify(room.room.denylist) ?? null, + updated_at: Date.now(), + }) + .where("uri", "=", room.uri) + .executeTakeFirst(); + ctx.logger.info(`Updated room: ${room.uri}`); + return res; +}; + +const deleteRoom = async (uri: string) => { + await ctx.db.deleteFrom("rooms").where("uri", "=", uri).executeTakeFirst(); + ctx.logger.info(`Deleted room: ${uri}`); +}; + +export { getRoom, addRoom, updateRoom, deleteRoom }; diff --git a/src/db/user.ts b/src/db/user.ts new file mode 100644 index 0000000..5eaafa5 --- /dev/null +++ b/src/db/user.ts @@ -0,0 +1,59 @@ +import { countGrapheme } from "unicode-segmenter"; +import { ctx } from "../index.js"; +import { resolveDid } from "../utils/api.js"; +import { User } from "../utils/types.js"; + +const validateNickname = (nickname: string | undefined) => { + return nickname && countGrapheme(nickname) <= 32 && nickname.length <= 320 ? + nickname + : null; +}; + +const getUser = async (did: string) => { + return ctx.db + .selectFrom("users") + .where("did", "=", did) + .selectAll() + .executeTakeFirst(); +}; + +const addUser = async (user: User) => { + const handle = await resolveDid(user.did); + await ctx.db + .insertInto("users") + .values({ + did: user.did, + handle: handle, + nickname: user.profile?.nickname ?? null, + updated_at: Date.now(), + }) + .returningAll() + .executeTakeFirst(); + ctx.logger.info(`Added user: ${user.did}`); +}; + +const updateUser = async (user: User) => { + (await getUser(user.did)) ?? (await addUser(user)); + const nickname = validateNickname(user.profile?.nickname); + await ctx.db + .updateTable("users") + .set({ + nickname: nickname, + updated_at: Date.now(), + }) + .where("did", "=", user.did) + .executeTakeFirst(); + ctx.logger.info(`Updated user: ${user.did}`); + return await getUser(user.did); +}; + +const deleteProfile = async (did: string) => { + await ctx.db + .updateTable("users") + .set({ nickname: null, updated_at: Date.now() }) + .where("did", "=", did) + .executeTakeFirst(); + ctx.logger.info(`Deleted user: ${did}`); +}; + +export { getUser, addUser, updateUser, deleteProfile }; diff --git a/src/index.ts b/src/index.ts index e0dc49a..f4b43d4 100755 --- a/src/index.ts +++ b/src/index.ts @@ -1,11 +1,10 @@ import { fastify, FastifyInstance } from "fastify"; import cors from "@fastify/cors"; import { pino } from "pino"; - -import { createDb, migrateToLatest } from "./db.js"; -import { env } from "./env.js"; +import { createDb, migrateToLatest } from "./db/init.js"; +import { env } from "./utils/env.js"; import { createRouter } from "./routes.js"; -import type { Database } from "./db.js"; +import type { Database } from "./db/init.js"; import { startJetstream } from "./relay.js"; import fastifyWebsocket from "@fastify/websocket"; @@ -14,62 +13,51 @@ export type AppContext = { logger: pino.Logger; }; -export class Server { - constructor( - public server: FastifyInstance, - public ctx: AppContext, - ) {} - - static async create() { - const logger = pino(); - - const db = createDb(env.DB_PATH); - await migrateToLatest(db); - - const server = fastify({ trustProxy: true }); - server.register(cors, { origin: "*" }); - server.register(fastifyWebsocket); - server.register(import("@fastify/rate-limit"), { - max: 300, - timeWindow: "1m", - }); - const ctx = { db, logger }; - createRouter(server, ctx); - startJetstream(server, ctx); - - server.listen({ port: env.PORT }, (err, address) => { - if (err) { - console.error(err); - close(); - } - logger.info(`Server (${env.NODE_ENV}) listening at ${address}`); - }); - - return new Server(server, ctx); - } - - async close() { - this.ctx.logger.info("sigint received, shutting down"); - return new Promise((resolve) => { - this.server.close(() => { - this.ctx.logger.info("server closed"); - resolve(); - }); - }); - } -} - const run = async () => { - const server = await Server.create(); + const logger = pino(); + + const db = createDb(env.DB_PATH); + await migrateToLatest(db); + + const server = fastify({ trustProxy: true }); + server.register(cors, { origin: "*" }); + server.register(fastifyWebsocket); + server.register(import("@fastify/rate-limit"), { + max: 300, + timeWindow: "1m", + }); + const ctx = { db, logger }; + createRouter(server, ctx); + startJetstream(server, ctx); + + server.listen({ port: env.PORT }, (err, address) => { + if (err) { + console.error(err); + close(server); + } + logger.info(`Server (${env.NODE_ENV}) listening at ${address}`); + }); const onCloseSignal = async () => { setTimeout(() => process.exit(1), 1000).unref(); - await server.close(); + await close(server); process.exit(); }; process.on("SIGINT", onCloseSignal); process.on("SIGTERM", onCloseSignal); + + return ctx; +}; + +const close = async (server: FastifyInstance) => { + ctx.logger.info("sigint received, shutting down"); + return new Promise((resolve) => { + server.close(() => { + ctx.logger.info("server closed"); + resolve(); + }); + }); }; -run(); +export const ctx = await run(); diff --git a/src/lib/lexicon.ts b/src/lib/lexicon.ts index c6704fe..d13e3f5 100644 --- a/src/lib/lexicon.ts +++ b/src/lib/lexicon.ts @@ -16,22 +16,55 @@ declare module "@atcute/client/lexicons" { } } - namespace SocialPskyFeedPost { - /** A Picosky post containing at most 256 graphemes. */ + namespace SocialPskyChatMessage { + /** A Picosky message containing at most 2048 graphemes. */ interface Record { - $type: "social.psky.feed.post"; + $type: "social.psky.chat.message"; /** * Text content. \ - * Maximum string length: 2560 \ - * Maximum grapheme length: 256 + * Maximum string length: 20480 \ + * Maximum grapheme length: 2048 */ - text: string; + content: string; + room: At.Uri; /** Annotations of text (mentions, URLs, hashtags, etc) */ facets?: SocialPskyRichtextFacet.Main[]; reply?: ComAtprotoRepoStrongRef.Main; } } + namespace SocialPskyChatRoom { + /** A Picosky room belonging to the user. */ + interface Record { + $type: "social.psky.chat.room"; + /** + * Maximum string length: 320 \ + * Maximum grapheme length: 32 + */ + name: string; + /** List of users allowed to send messages in the room. */ + allowlist?: ModlistRef; + /** List of users disallowed to send messages in the room. */ + denylist?: ModlistRef; + /** Maximum array length: 3 */ + languages?: string[]; + /** Maximum array length: 20 */ + tags?: string[]; + /** + * Topic title of the room. \ + * Maximum string length: 2560 \ + * Maximum grapheme length: 256 + */ + topic?: string; + } + interface ModlistRef { + [Brand.Type]?: "social.psky.chat.room#modlistRef"; + /** @default false */ + active: boolean; + users: At.DID[]; + } + } + namespace SocialPskyRichtextFacet { /** Annotation of a sub-string within rich text. */ interface Main { @@ -70,7 +103,8 @@ declare module "@atcute/client/lexicons" { interface Records { "social.psky.actor.profile": SocialPskyActorProfile.Record; - "social.psky.feed.post": SocialPskyFeedPost.Record; + "social.psky.chat.message": SocialPskyChatMessage.Record; + "social.psky.chat.room": SocialPskyChatRoom.Record; } interface Queries {} diff --git a/src/lib/schemas.ts b/src/lib/schemas.ts index 484c0ad..55e62c0 100644 --- a/src/lib/schemas.ts +++ b/src/lib/schemas.ts @@ -50,11 +50,12 @@ const FacetsSchema = t.optional( ); type FacetsInterface = t.Infer; -const GetPostsSchema = t.object({ +const GetMessagesSchema = t.object({ + uri: t.optional(t.string({ format: "uri" })), limit: t.integer({ minimum: 1, maximum: 100, default: 50 }), cursor: t.optional(t.integer({ minimum: 0 })), }); -type GetPostsInterface = t.Infer; +type GetMessagesInterface = t.Infer; -export { FacetsSchema, GetPostsSchema }; -export type { FacetsInterface, GetPostsInterface }; +export { FacetsSchema, GetMessagesSchema }; +export type { FacetsInterface, GetMessagesInterface }; diff --git a/src/relay.ts b/src/relay.ts index 34b8a82..79bc586 100644 --- a/src/relay.ts +++ b/src/relay.ts @@ -1,171 +1,171 @@ import { Jetstream } from "@skyware/jetstream"; +import fs from "node:fs"; import { FastifyInstance } from "fastify"; -import type { AppContext } from "./index.js"; -import { countGrapheme } from "unicode-segmenter"; -import { CHARLIMIT, GRAPHLIMIT } from "./env.js"; -import { resolveDid } from "./utils.js"; +import { AppContext } from "./index.js"; +import { deleteProfile, getUser, updateUser } from "./db/user.js"; +import { addRoom, deleteRoom, getRoom, updateRoom } from "./db/room.js"; +import { addMessage, deleteMessage, updateMessage } from "./db/message.js"; +import { Message, Room } from "./utils/types.js"; -// TODO: make it not horrible sorry rn im too lazy and i need sleep -// -const getIdentity = async (ctx: AppContext, did: string, nickname?: string) => { - const account = await ctx.db - .selectFrom("accounts") - .where("did", "=", did) - .selectAll() - .executeTakeFirst(); - const handle = account === undefined ? await resolveDid(did) : account.handle; - let res; - if (account === undefined) { - await ctx.db - .insertInto("accounts") - .values({ did: did, handle: handle, nickname: nickname }) - .execute() - .catch((err) => ctx.logger.error(err)); - } else if (nickname !== undefined) { - res = await ctx.db - .updateTable("accounts") - .set({ nickname: nickname }) - .where("did", "=", did) - .execute(); - } - return { nickname: account?.nickname, handle: handle }; -}; +// TODO: proper validation export function startJetstream(server: FastifyInstance, ctx: AppContext) { + let intervalID: NodeJS.Timeout; + const cursorFile = fs.readFileSync("cursor.txt", "utf8"); + if (cursorFile) ctx.logger.info(`Initiate jetstream at cursor ${cursorFile}`); + const jetstream = new Jetstream({ wantedCollections: ["social.psky.*"], endpoint: "wss://jetstream2.us-west.bsky.network/subscribe", + cursor: Number(cursorFile), }); - jetstream.on("error", (err) => console.error(err)); - - jetstream.onCreate("social.psky.actor.profile", async (event) => { - const nick = event.commit.record.nickname; - if (nick !== undefined && (countGrapheme(nick) > 32 || nick.length > 320)) - return; + jetstream.on("error", (err) => ctx.logger.error(err)); - await getIdentity(ctx, event.did, nick); - ctx.logger.info(`Created profile ${event.did}`); + jetstream.on("open", () => { + intervalID = setInterval(() => { + if (jetstream.cursor) { + fs.writeFile("cursor.txt", jetstream.cursor.toString(), (err) => { + if (err) console.log(err); + }); + } + }, 60000); }); - jetstream.onUpdate("social.psky.actor.profile", async (event) => { - const nick = event.commit.record.nickname; - if (nick !== undefined && (countGrapheme(nick) > 32 || nick.length > 320)) - return; - - await getIdentity(ctx, event.did, nick); - ctx.logger.info(`Created profile ${event.did}`); - }); - - jetstream.onDelete("social.psky.actor.profile", async (event) => { - await ctx.db - .updateTable("accounts") - .set({ nickname: "" }) - .where("did", "=", event.did) - .executeTakeFirst(); - ctx.logger.info(`Deleted profile: ${event.did}`); + jetstream.on("social.psky.actor.profile", async (event) => { + try { + if (event.commit.type === "d") await deleteProfile(event.did); + else await updateUser({ did: event.did, profile: event.commit.record }); + } catch (err) { + ctx.logger.error(err, JSON.stringify(event)); + } }); - jetstream.onCreate("social.psky.feed.post", async (event) => { - const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`; - const post = event.commit.record.text; - const facets = event.commit.record.facets; - const reply = event.commit.record.reply; - if (countGrapheme(post) > GRAPHLIMIT || post.length > CHARLIMIT) return; - else if (!countGrapheme(post.trim())) return; - - const identity = await getIdentity(ctx, event.did); - - const timestamp = Date.now(); - const record = { - $type: "social.psky.feed.post#create", - did: event.did, - rkey: event.commit.rkey, - post: post, - facets: facets, - reply: reply, - handle: identity.handle, - nickname: identity.nickname, - indexedAt: timestamp, - }; - + jetstream.on("social.psky.chat.message", async (event) => { try { - const res = await ctx.db - .insertInto("posts") - .values({ + const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`; + let record; + if (event.commit.type === "d") { + await deleteMessage(uri); + record = { $type: "social.psky.chat.message#delete", event: event }; + } else { + const user = await updateUser({ did: event.did }); + if (!user) return; + const room = getRoom(event.commit.record.room); + // TODO: fetch record from repo if room not found + if (!room) return; + const msg: Message = { uri: uri, cid: event.commit.cid, - post: post, - facets: facets ? JSON.stringify(facets) : null, - reply: reply ? JSON.stringify(reply) : null, - account_did: event.did, - indexed_at: timestamp, - }) - .executeTakeFirst(); - if (res === undefined) return; + did: event.did, + msg: event.commit.record, + }; + if (event.commit.type === "c") await addMessage(msg); + else await updateMessage(msg); + record = { + $type: + event.commit.type === "c" ? + "social.psky.chat.message#create" + : "social.psky.chat.message#update", + did: event.did, + rkey: event.commit.rkey, + cid: event.commit.cid, + content: event.commit.record.content, + room: event.commit.record.room, + facets: event.commit.record.facets, + reply: event.commit.record.reply, + handle: user.handle, + nickname: user.nickname, + indexedAt: Date.now(), + }; + } server.websocketServer.emit("message", JSON.stringify(record)); - ctx.logger.info(`Created post: ${uri}`); } catch (err) { - ctx.logger.error(err); + ctx.logger.error(err, JSON.stringify(event)); } }); - jetstream.onUpdate("social.psky.feed.post", async (event) => { - const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`; - const facets = event.commit.record.facets; - const reply = event.commit.record.reply; - const timestamp = Date.now(); - await ctx.db - .updateTable("posts") - .set({ - post: event.commit.record.text, - cid: event.commit.cid, - facets: facets ? JSON.stringify(facets) : null, - reply: reply ? JSON.stringify(reply) : null, - updated_at: timestamp, - }) - .where("uri", "=", uri) - .executeTakeFirst(); - const record = { - $type: "social.psky.feed.post#update", - did: event.did, - rkey: event.commit.rkey, - post: event.commit.record.text, - facets: facets, - reply: reply, - updatedAt: timestamp, - }; - server.websocketServer.emit("message", JSON.stringify(record)); - ctx.logger.info(`Updated post: ${uri}`); + jetstream.on("social.psky.chat.room", async (event) => { + try { + const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`; + let record; + if (event.commit.type === "d") { + await deleteRoom(uri); + record = { $type: "social.psky.chat.room#delete", event: event }; + } else { + const user = await updateUser({ did: event.did }); + if (!user) return; + const room: Room = { + uri: uri, + cid: event.commit.cid, + owner: event.did, + room: event.commit.record, + }; + const res = + event.commit.type === "c" ? + await addRoom(room) + : await updateRoom(room); + if (!res) return; + record = { + $type: + event.commit.type === "c" ? + "social.psky.chat.room#create" + : "social.psky.chat.room#update", + record: event, + }; + } + server.websocketServer.emit("message", JSON.stringify(record)); + } catch (err) { + ctx.logger.error(err, JSON.stringify(event)); + } }); - jetstream.onDelete("social.psky.feed.post", async (event) => { - const uri = `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`; - await ctx.db.deleteFrom("posts").where("uri", "=", uri).executeTakeFirst(); - const record = { - $type: "social.psky.feed.post#delete", - did: event.did, - rkey: event.commit.rkey, - }; - server.websocketServer.emit("message", JSON.stringify(record)); - ctx.logger.info(`Deleted post: ${uri}`); + jetstream.on("identity", async (event) => { + try { + const user = await getUser(event.did); + if (user !== undefined && event.identity.handle !== user.handle) { + await ctx.db + .updateTable("users") + .set({ handle: event.identity.handle, updated_at: Date.now() }) + .where("did", "=", event.did) + .executeTakeFirstOrThrow(); + ctx.logger.info( + `Updated ${user.did}: ${user.handle} -> ${event.identity.handle}`, + ); + } + } catch (err) { + ctx.logger.error(err, JSON.stringify(event)); + } }); - jetstream.on("identity", async (event) => { - const identity = await ctx.db - .selectFrom("accounts") - .where("did", "=", event.did) - .select("handle") - .executeTakeFirst(); - if (identity !== undefined && event.identity.handle) { - await ctx.db - .updateTable("accounts") - .set({ did: event.did, handle: event.identity.handle }) - .where("did", "=", event.did) - .execute(); - ctx.logger.info( - `Updated handle: ${identity.handle} -> ${event.identity.handle}`, - ); + jetstream.on("account", async (event) => { + const did = event.account.did; + try { + const user = await getUser(event.did); + if (user === undefined) return; + if (!event.account.active && event.account.status === "deleted") { + await ctx.db + .deleteFrom("users") + .where("did", "=", did) + .executeTakeFirstOrThrow(); + ctx.logger.info(`Deleted account: ${did}`); + } else if (!event.account.active && event.account.status) { + await ctx.db + .updateTable("users") + .set({ active: false, updated_at: Date.now() }) + .where("did", "=", did) + .executeTakeFirstOrThrow(); + ctx.logger.info(`Disabled account (${event.account.status}): ${did}`); + } else if (event.account.active && !user.active) { + await ctx.db + .updateTable("users") + .set({ active: true, updated_at: Date.now() }) + .where("did", "=", did) + .executeTakeFirstOrThrow(); + ctx.logger.info(`Reactivated account: ${did}`); + } + } catch (err) { + ctx.logger.error(err, JSON.stringify(event)); } }); diff --git a/src/routes.ts b/src/routes.ts index 67c8163..4bfd58e 100644 --- a/src/routes.ts +++ b/src/routes.ts @@ -1,6 +1,6 @@ import { FastifyInstance } from "fastify"; import type { AppContext } from "./index.js"; -import { GetPostsInterface, GetPostsSchema } from "./lib/schemas.js"; +import { GetMessagesInterface, GetMessagesSchema } from "./lib/schemas.js"; let ipSet: Record = {}; const serverState = (sessionCount: number) => @@ -34,28 +34,31 @@ export const createRouter = (server: FastifyInstance, ctx: AppContext) => { }); }); - server.get<{ Querystring: GetPostsInterface }>( - "/posts", - { schema: { querystring: GetPostsSchema } }, + server.get<{ Querystring: GetMessagesInterface }>( + "/xrpc/social.psky.chat.getMessages", + { schema: { querystring: GetMessagesSchema } }, async (req, res) => { - const posts = await ctx.db - .selectFrom("posts") + //const { uri } = req.query; + const messages = await ctx.db + .selectFrom("messages") .orderBy("indexed_at", "desc") .limit(req.query.limit) .offset(req.query.cursor ?? 0) - .innerJoin("accounts", "posts.account_did", "accounts.did") - .selectAll() + .selectAll("messages") + .innerJoin("users", "messages.did", "users.did") + .select(["handle", "nickname"]) .execute(); const data = { - cursor: posts.length + (req.query.cursor ?? 0), - posts: posts.map((rec) => ({ + cursor: messages.length + (req.query.cursor ?? 0), + messages: messages.map((rec) => ({ did: rec.did, rkey: rec.uri.split("/").pop(), cid: rec.cid, - post: rec.post, - facets: rec.facets ? JSON.parse(rec.facets) : undefined, - reply: rec.reply ? JSON.parse(rec.reply) : undefined, + room: rec.room, + content: rec.content, + facets: rec.facets ?? undefined, + reply: rec.reply ?? undefined, handle: rec.handle, nickname: rec.nickname ?? undefined, indexedAt: rec.indexed_at, diff --git a/src/utils.ts b/src/utils/api.ts similarity index 100% rename from src/utils.ts rename to src/utils/api.ts diff --git a/src/env.ts b/src/utils/env.ts similarity index 83% rename from src/env.ts rename to src/utils/env.ts index 5682d63..8b4906b 100644 --- a/src/env.ts +++ b/src/utils/env.ts @@ -3,8 +3,8 @@ import { cleanEnv, port, str, testOnly } from "envalid"; dotenv.config(); -export const GRAPHLIMIT = 256; -export const CHARLIMIT = 2560; +export const GRAPHLIMIT = 2048; +export const CHARLIMIT = 20480; export const env = cleanEnv(process.env, { NODE_ENV: str({ diff --git a/src/utils/types.ts b/src/utils/types.ts new file mode 100644 index 0000000..aa351ba --- /dev/null +++ b/src/utils/types.ts @@ -0,0 +1,26 @@ +import { + SocialPskyActorProfile, + SocialPskyChatMessage, + SocialPskyChatRoom, +} from "@atcute/client/lexicons"; + +export interface Message { + uri: string; + cid: string; + did: string; + msg: SocialPskyChatMessage.Record; +} + +export interface Room { + uri: string; + cid: string; + owner: string; + room: SocialPskyChatRoom.Record; +} + +export interface User { + did: string; + handle?: string; + active?: boolean; + profile?: SocialPskyActorProfile.Record; +}