From 6c8fcc70e68e241c6ec6f784dd9ee7d0b064365e Mon Sep 17 00:00:00 2001 From: Ben Packer Date: Tue, 23 Jan 2024 17:38:58 -0500 Subject: [PATCH] feat: implement everything --- .gitignore | 176 ++++++++++++++++ .vscode/settings.json | 12 ++ README.md | 181 ++++++++++++++++ bun.lockb | Bin 0 -> 13616 bytes index.ts | 1 + package.json | 33 +++ src/observeQuery.test.ts | 444 +++++++++++++++++++++++++++++++++++++++ src/observeQuery.ts | 155 ++++++++++++++ src/types.ts | 83 ++++++++ src/walSubject.test.ts | 373 ++++++++++++++++++++++++++++++++ src/walSubject.ts | 312 +++++++++++++++++++++++++++ tsconfig.build.json | 114 ++++++++++ tsconfig.json | 22 ++ 13 files changed, 1906 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 README.md create mode 100755 bun.lockb create mode 100644 index.ts create mode 100644 package.json create mode 100644 src/observeQuery.test.ts create mode 100644 src/observeQuery.ts create mode 100644 src/types.ts create mode 100644 src/walSubject.test.ts create mode 100644 src/walSubject.ts create mode 100644 tsconfig.build.json create mode 100644 tsconfig.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dbac942 --- /dev/null +++ b/.gitignore @@ -0,0 +1,176 @@ +# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore +.tool-versions + +# Logs + +logs +_.log +npm-debug.log_ +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Caches + +.cache + +# Diagnostic reports (https://nodejs.org/api/report.html) + +report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json + +# Runtime data + +pids +_.pid +_.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover + +lib-cov + +# Coverage directory used by tools like istanbul + +coverage +*.lcov + +# nyc test coverage + +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) + +.grunt + +# Bower dependency directory (https://bower.io/) + +bower_components + +# node-waf configuration + +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) + +build/Release + +# Dependency directories + +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) + +web_modules/ + +# TypeScript cache + +*.tsbuildinfo + +# Optional npm cache directory + +.npm + +# Optional eslint cache + +.eslintcache + +# Optional stylelint cache + +.stylelintcache + +# Microbundle cache + +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history + +.node_repl_history + +# Output of 'npm pack' + +*.tgz + +# Yarn Integrity file + +.yarn-integrity + +# dotenv environment variable files + +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) + +.parcel-cache + +# Next.js build output + +.next +out + +# Nuxt.js build / generate output + +.nuxt +dist + +# Gatsby files + +# Comment in the public line in if your project uses Gatsby and not Next.js + +# https://nextjs.org/blog/next-9-1#public-directory-support + +# public + +# vuepress build output + +.vuepress/dist + +# vuepress v2.x temp and cache directory + +.temp + +# Docusaurus cache and generated files + +.docusaurus + +# Serverless directories + +.serverless/ + +# FuseBox cache + +.fusebox/ + +# DynamoDB Local files + +.dynamodb/ + +# TernJS port file + +.tern-port + +# Stores VSCode versions used for testing VSCode extensions + +.vscode-test + +# yarn v2 + +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# IntelliJ based IDEs +.idea + +# Finder (MacOS) folder config +.DS_Store diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..66a448e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,12 @@ +{ + "cSpell.words": [ + "columnnames", + "columntypes", + "columnvalues", + "keynames", + "keytypes", + "keyvalues", + "kysely", + "oldkeys" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..9c23647 --- /dev/null +++ b/README.md @@ -0,0 +1,181 @@ +# kysely-pg-observables + +`kysely-pg-observables` makes reactive programming with Kysely and PostgreSQL easy and safe. + +It wraps PostgreSQL's `wal2json` plugins, and let's you interact with the results using the type-information +you have already defined for your Kysely database. + +Because it depends on `wal2json`, it only works with installations of PostgreSQL that support it, which fortunately +includes RDS, Google Cloud SQL, Supabase, default Patroni images, Crunchy images, and many others. +It does *not* work with Neon (or Vercel Postgres) yet. + +There are two main things you can do with `kysely-pg-observables`: +1. Create a type-safe RxJS `Subject` from a Kysely database +2. Create full observable's from Kysely queries + +# 1. Create a type-safe RxJS `Subject` from a Kysely database + +```typescript +import { getKyselyChanges } from "kysely-pg-observables"; + +const pool = new Pool({ + connectionString: process.env.DATABASE_URL +}); + +const dialect = new PostgresDialect({ + pool: testPool, +}); + +type MyDatabaseType = { + widgets: { + id: number; + name: string; + color: string; + active: boolean; + }; + tableWithOtherPrimaryKey: { + other_id: number; + name: string; + color: string; + }; +}; + +const db = new Kysely({ + dialect, +}); + +const changes = await getKyselyChanges( + testPool, + db, + ["widgets"] // the list of tables you actually care about changes for + { + otherPrimaryKey: ["otherId"], // the name of the primary key for tableWithOtherPrimaryKey + } +); + +// subject is an RxJs subject that you can subscribe to, operate on, create more observables from, etc. +const subject = changes.subject + +const subscription = subject.subscribe({ + next: (change) => { + // Change is typed as: + // { + // table: string; + // event: 'insert' | 'update' | 'delete'; + // row: Database[table] (if event is insert or update) + // identity: // a partial Database[table] containing only the primary keys of the deleted row + // } + } +}) + +// When ready, cancel the wal slot and subscription altogether +subscription.unsubscribe() + +// There is also changes.teardown() to fully tear everything down +``` + +`getKyselyChanges` takes 4 parameters: +- `pool`: a `pg.Pool` instance that is used to connect to the database +- `db`: a `Kysely` instance that is used to get the type information for the database +- `tables`: an array of table names that you want to get changes for +- `primaryKeyOverrides`: an object that maps tables to their primary keys + +## Why do I need to provide tables? + +In order to make the events generated by `wal2json` type-safe, we need to limit the tables that we get changes for +to only the tables that we know type information for. If we generated events for all tables, we would have to +filter for them inside of the application, but we don't have a list of tables (just a type of tables). + +## What is this primary key override business? + +For deleted records, the only information available is the value of the primary key(s) of the deleted record. +By default, we'll assume that any `id` column on a table is a primary key. If that is not true for any table, +just include its name and an array of columns which are the primary keys of the table, and we'll use that +for delete event's identities instead. + +# 2. Create full observable's from Kysely queries + +You can create full observables which re-run the query whenever the underlying data changes. + +```typescript +import { observeQuery } from "kysely-pg-observables"; + +// setup otherwise similar as before +const { subject, teardown } = await getKyselyChanges( + testPool, + db, + ["widgets"], + undefined, + { slotId } +); + +const observable = await observeQuery( + observeQuery( + subject, + () => db.selectFrom('widgets').selectAll().execute(), + { + widgets: { + insert(row) { + return true; // Assuming we want to re-run the query for every insert + }, + update(row, lastResult) { + return lastResult.some((widget) => widget.id === row.id); // Re-run only if the updated widget was in the last result + }, + delete(identity, lastResult) { + return lastResult.some((widget) => widget.id === identity.id); // Re-run only if the deleted widget was in the last result + }, + }, + } + ) +); + +observable.subscribe(result => { + // This callback will fire every time the underlying data changes +}) +``` + +This can be used for a query of any complexity. Do whatever you want inside of your async function! +Just make sure that you take into account the way the underlying data can change and how that will influence your query results. + +## Usage With trpc + +Although trpc does not use RxJS under the hood, the resulting observable is compatible with the type expected by trpc subscriptions. + +This means you cand do: +```typescript + allWidgets: procedure.subscription(() => { + return observeQuery( + subject, + () => db.selectFrom('widgets').selectAll().execute(), + { + widgets: { + insert(row) { + return true; // Assuming we want to re-run the query for every insert + }, + update(row, lastResult) { + return lastResult.some((widget) => widget.id === row.id); // Re-run only if the updated widget was in the last result + }, + delete(identity, lastResult) { + return lastResult.some((widget) => widget.id === identity.id); // Re-run only if the deleted widget was in the last result + }, + }, + } + ) + } + }) +``` + +Because the observable's are compatible, properly unsubscribing when the client disconnects is handled for you. + +## Sharing an observable between multiple trpc subscriptions clients + +If you want to share an observable between multiple clients, you can pass a resulting observable to the +[`multicast` function](https://rxjs.dev/guide/subject#multicasted-observables) and then create multiple observables from the same source. + +If you share an observable between multiple clients, and do *not* multi-cast it, each event from the observable will only +be fired once, and both clients will not get each event. I do not know who will get which one. + +# Roadmap + +- [ ] I'd like to support json-patch, with client-side utilities as well. + diff --git a/bun.lockb b/bun.lockb new file mode 100755 index 0000000000000000000000000000000000000000..e679d90376d677b406c1d294d18d7a62ddcfeff3 GIT binary patch literal 13616 zcmeHOd0dQZ`=4qeA==O(S|q8NYTA`bQRkHU_AG@+*-jjx zL`Y?cM0Q14QaR@&UXRS^MzCkt{zZsBtxV1f>ak$%nyLHr$l*wal0@sLl_;#5!8aY$af7=Wk}CKih7QT z%R_y`0{}wXE6#6&R1xX{&kGS+)4U;%{D2XcV{00%TwcTz#tmZ&z*0e25KqXW(THB6 zFG|P>@rU+*)l2k2`>@c^Alh%hrv`em#k}d_G%#Ag35urCROjhOxw0k|l*uUHnY91e zyEG$J`9u@DbeHY-vordie7oQ!_i}zA<|Hyz9I*#$LY+J37jS zl%MRq#lZ7ot@E(-L;JE`jF`tdot5|FrD5~iZ>ypGt71@(nG0^l zOuGErqjQ^#ww+73amA~pLL<3iqJ@*5e^xhFhv0xKb8=T(H23>M@JG^iy;vi|s47fL@&{2UcLH3Yo&|cU-c4w1C0`v zD>x>v^V^*B%rVse(F~&zjVFWu6x6&frNylYZ0$W>>q+?8d#W%E{wslDro;+<$G|us zpB!9WB40PS2rt}rDx}?gXk{z59|0PjqCQm;yfqj+ z8t^dvXoKJ)<)2!9`Hr-U20SeOSnNA;z*_2(5PUA+Z2%8z0oNVH(G>!Mua~g@hyMVt z4@AU%v>o=CE(y^e05{fPKV6IhbGjr1{|au%{6j3x!LA6wn*(4R=q4h-{Sm)mKe{3W?%0(i*jH8CG8vl?H7Gp{`V#Z0gK zPa_$2-ziG>_$`Irqn7(Voxi?F=1AVA?DspjsaKx(%eiqap4qm2XU+Hhl1VD>?b5u&GLp4Y>{_dmdiQ(GiZ0^0<4vl5RDmZoO^s zF5i7-CbuPc$-a(mrnk=YnLOmmu<|-i6N%t<8XiR%?DK0NE+PrD6#rZ4u zYtDZdcka+}-Q&SZ>MxhQim1D_vbip7<=#bmuGUSDv3!5EehhfL~_gNp^JW_QP`n2b_PR_KMzSDEJ zcEaHj?;Q#%aVr*&DqKJx@aS+05Rq{s>kzr=aoZLY(sd`hUT)6Rpgp|*+srqSR%%BK z4wRJ)ugKbcJj!s?bP(_ zkyPzBHK*Ow177^ROM+MOJAgjLZ6K|3Oxs73O1Y%-QxbKKoDFhTe-~$QJS%gHe8a;# zdT~kf=D%_dUN|u*h4Fl$YmnvfS*+x=(SuaHJ$oOL%2 zT`;WdGJ8xUvrc9!^Xi->_7~esaw?YCSG(q1Fphbx9xK;PSh_Rw!tw-#+0}!7UA9`^ z<;2Jy?|_K-m+bS%O`m@Ir;K=3z5D8GR~s9092b?&S?98E=BoscAF}l{?)h%Ht0+6H z|M-!OnPrPktYa0n<~8oB&s7b#3{yCMb$s&DRta8wUJ#+n>pG{O+SEgMIJVEx#c68Q zyhAINn)4>|&D30Mi_+Cx2UM%cztK4TvMez@vDZz$^V-rY=2dZPtG12fuI{I%xdDiX zz2y0W+;rbc#fYNDC(oL1(Ul8+b9O_5D%j|Jh`COKyw zuj^I)Y)*M(d^=-w2~Ex-K2CG01TUQZbO>GHR#dV(LczMC-fzOsT4T*Xa{wqEMiM@R~Si#;= zi^5Ji4wI>TVODdb@7G-aIOW}Q|gBPE>MCi|3Q&+f-Jt60=aEi${yXTjpyfb)8T7E--&58|9 zh6l0^?fDS8T&E;)PTG~M1MY{m40NqHeW-Em%($T7-%lU)$p9i^FPwdK2%Ubla>dWD z>TQmtWSpz7_4mnMc3yoz`<$&Xh%&)&Ls_mNEb#sVG2wGg+uLuK+s4!N2& zn-#5_uglbxOYo|6u!0|E7TC=s4)M5e|_Kmza*PA804RrX|ZH@wWp%fp(p>A z;Kg$VBJ|ziTXS>9Uw`T7)-a($<}`iQ+OmD|wiBYb6I-U7joIa0ov)VtYUc+f&c5`c z53aB2(<}V8=lq#6NBk36(L1i`djk>iuR0-xl>WZ@z^Tx_i}tKLaDx4A+}OME{Zkgr zu&x^{n>S%j(!85*H)^im(fx#FAav5yK}s&axXH;uIFvwwPTI=iar-oD8POEu4^K23@3 z`FmBAN4qv{oV`Pm-flb_BKDH^Ii#nnR}~z5TM?kPEn}bscY(q!qp^C7u<%YDcebL+gTyy?kCdIo0`I8(b4vp59yc{Rs(P@;*rMPQG|=Xx*-D zzxwyjET4V(MVMgZ@F?}+x78a0>;kuJ?CsrFdM(A1*4-pKUbyi=)wG*?W?Z-yV4z?f z*rP&ePK>IT1h3>d7JZ)yZT(@R4dvD&wfp)V3{!g3OZ!IehVFv;go=chC)ldYvV}fd z9}8_a%kD}Yx&GqJ7bRLUt4EwlPu~76rbmQ1+DQC>XGBEkuQz%QzuHh7Bee1|&S&2p zsBs~_U{c}-C*H4Y9nHqJgr`D^FW?qZgr(2s0Cn!5CKe+DY{?p|H0)Cv+}p*uPg}HQ#71(Hu=ttB6rh1 zmrrlh+md&(Thf#tBF8HniI2LHSA=Vq@Zy;q5&Drw^+`wF<4)_Fo0zARCn7uzW<_^TeNn2B<|=x(bG7h zwdK8rQl8~Q$Kvg^)7IXS;2kW<>#;s?%5EFEgFeoB#{?;{H*MXbi*l?u4ng*5iC*V> z&&z+Grl36aWO~l^`=*05+XTm3mDo!8X(@a6)1DVJDkoGswYdbeD4&_D_~YRraZT@v z+eS+8YDx0)2PdD&+B_hG&*S=+yIpwL+N=h%q z+22hqvDHa9tF`=qigInRuKbYFQ#&MhwIz86$sTC?{kOqu&f7ZDerMb5pKBkh;>E4( z*7!zoZEX*`ybCn@)xRoPKaP%RFtNpNb!@*x`%EvrT^w2rA*=CjmFZpgldV1h7 zUcTkDD)Zp&y_Q-8U#z=je#Xz~{S;BrWKa% zbL?K7r15d_LVLflaT2_`NGKM%l7Fc3+ch7qKVN$8uA9c@JDxtN12yMA$bVbxYp^iA zeee6?qyJtI*>lapnX4bO_MCH0dh6=^Dz3iq+}-mUK*`0j!49{BEo{}B(&z=J@Lz2a2SPQdqL@Ew;s94>BN%cHw3$dr*3o;l$8K#JIAa|D8P zW;^OZ3}Vp+v>W?C{DSsiAFwZYkLRA4qCT`2b)#K)o`h#$*cY@3&jryw>0OqLas1(8!C77{@OHP%d1Q>GbZJM<}*#27)11=ECSBC-~|6HB6+6ex@d(}wC7 z){wX;1=%3BkRuS%Xe6?XL{Q11TPZPsUyKzaF;=oNKA@gy`Y8&HL|Z`(i)o4$koYqa zXC*5mb_R_^sF6slPc?5=|zq0l$%WI1={-H7pT^ zM9z^2F|ZBU08-j3aS$BwkEb2Gsx=jYQg!h%!-s zu!h9qk(e{k!(v(@8;RBeGRr*qM|SOT0D11>>!DeLkprn|EK6e5=BSv z=xH@9c%0X9RVQBbn?1BlO~!td$k`+Ukwn^oZRVmm`t>pW!pMo;^M#Sai2(ii!pLEb zIgR#(k;72{42gmy(SDR|6x-idz5OU_YJ47v7m}7C?bEaS0=4hgQjsdQZWCu}^sD6B2n9z~B zH4fuqa4mH&1y9^%jQ&oxvEnJ|8+Fw`$Js1t8NE74q|(9|u1+T+C(A>qLs zKF=>KR3PN?IRXZo&u2$>DpmBJ3{DB!b0xSY-`Au6bb*qhG^ zU~=Hp?cAFk5Tad@hLka==R{YTuv3yOn1?A2gWFXG%1M$GR6&=lzy}r}F`@b|ZR&RP zpBg4fom2s(Qz{3SAs%$ZcVtA1Q(B)z;qW3L2F6n;sh;{YQ$I<6$t1CFNg2~cDQ~z29R!g^ldNTktl~Nm&)M-rgd~pHelg3I~ zuL;9f7BF&1^&H{&%7Ua`qomm(`tJud%+$X?U>U>9q;%BZrl^SGZ^d1DVt+ROkw|hq pP%}@OFJKeN@EbcEdf;g+9diUL`R6;w*Gm){|7RNu(ALE literal 0 HcmV?d00001 diff --git a/index.ts b/index.ts new file mode 100644 index 0000000..f67b2c6 --- /dev/null +++ b/index.ts @@ -0,0 +1 @@ +console.log("Hello via Bun!"); \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 0000000..eda3db5 --- /dev/null +++ b/package.json @@ -0,0 +1,33 @@ +{ + "name": "kysely-pg-observables", + "main": "dist/index.js", + "devDependencies": { + "bun-types": "latest", + "@types/pg": "^8.10.9" + }, + "peerDependencies": { + "typescript": "^5.0.0", + "kysely": "^0.27.2", + "pg": "^8.11.3" + }, + "scripts": { + "compile": "tsc -p tsconfig.build.json" + }, + "files": [ + "dist" + ], + "repository": { + "type": "git", + "url": "git@github.com:ben-pr-p/kysely-access-control.git" + }, + "author": "Ben Packer ", + "license": "MIT", + "bugs": { + "url": "https://github.com/ben-pr-p/kysely-access-control/issues" + }, + "homepage": "https://github.com/ben-pr-p/kysely-access-control", + "dependencies": { + "rxjs": "^7.8.1", + "tiny-invariant": "^1.3.1" + } +} \ No newline at end of file diff --git a/src/observeQuery.test.ts b/src/observeQuery.test.ts new file mode 100644 index 0000000..84e070e --- /dev/null +++ b/src/observeQuery.test.ts @@ -0,0 +1,444 @@ +import { describe, test, expect, mock, beforeAll } from "bun:test"; +import { observeQuery } from "./observeQuery"; +import { getKyselyChanges } from "./walSubject"; +import { Pool } from "pg"; +import { ColumnType, Kysely, PostgresDialect } from "kysely"; + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +const testPool = new Pool({ + connectionString: + process.env.DATABASE_URL || + "postgres://postgres:postgres@localhost:7432/postgres", +}); + +const dialect = new PostgresDialect({ + pool: testPool, +}); + +const db = new Kysely({ + dialect, +}); + +type TestDatabase = { + cards: { + id: ColumnType; + kind: string; + deck_id: ColumnType< + number | undefined, + number | undefined, + number | undefined + >; + created_at: ColumnType; + updated_at: ColumnType; + }; + decks: { + id: ColumnType; + name: string; + created_at: ColumnType; + updated_at: ColumnType; + }; + ignore_this_table: { + id: ColumnType; + kind: string; + created_at: Date; + updated_at: Date; + }; +}; + +describe("observeQuery", () => { + beforeAll(async () => { + await testPool.query(` + drop table if exists decks cascade; + create table decks ( + id serial primary key, + name text not null, + created_at timestamp not null default now(), + updated_at timestamp not null default now() + ); + + drop table if exists cards; + create table cards ( + id serial primary key, + kind text not null, + deck_id integer references decks (id), + created_at timestamp not null default now(), + updated_at timestamp not null default now() + ); + + drop table if exists ignore_this_table; + create table ignore_this_table ( + id serial primary key, + kind text not null, + created_at timestamp not null default now(), + updated_at timestamp not null default now() + ); + `); + }); + + test("query re-runs if the handler.insert returns true", async () => { + const uniqueKindForThisTest = Math.random().toString(); + const changes = await getKyselyChanges(testPool, db, ["cards"], undefined); + + const nextListener = mock(() => {}); + + const observable = observeQuery( + changes, + () => db.selectFrom("cards").selectAll().execute(), + { + cards: { + insert(row) { + return row.kind === uniqueKindForThisTest; + }, + }, + } + ); + + const subscription = observable.subscribe({ + next: nextListener, + }); + + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest }) + .returning("id") + .execute(); + + await sleep(100); + + subscription.unsubscribe(); + await changes.teardown(); + + expect(nextListener).toHaveBeenCalledTimes(2); + }); + + test("query does not re-run if the handler.insert returns false", async () => { + const uniqueKindForThisTest = Math.random().toString(); + const changes = await getKyselyChanges(testPool, db, ["cards"], undefined); + + const nextListener = mock(() => {}); + + const observable = observeQuery( + changes, + () => db.selectFrom("cards").selectAll().execute(), + { + cards: { + insert(row) { + return false; + }, + }, + } + ); + + const subscription = observable.subscribe({ + next: nextListener, + }); + + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest }) + .returning("id") + .execute(); + + await sleep(100); + + subscription.unsubscribe(); + await changes.teardown(); + + expect(nextListener).toHaveBeenCalledTimes(1); // Should be called only once initially + }); + + test("query only re-runs once if we get two invalidation events while the query is running", async () => { + const uniqueKindForThisTest = Math.random().toString(); + const changes = await getKyselyChanges(testPool, db, ["cards"], undefined, { + // Fast poll interval for shorter test + pollInterval: 5, + }); + + const nextListener = mock(() => {}); + + const observable = observeQuery( + changes, + async () => { + const result = await db + .selectFrom("cards") + .selectAll() + .select((qb) => [qb.fn("pg_sleep", [qb.val(0.1)]).as("sleep")]) + .execute(); + + return result; + }, + { + cards: { + insert(row) { + return row.kind === uniqueKindForThisTest; + }, + }, + } + ); + + const subscription = observable.subscribe({ + next: nextListener, + }); + + await sleep(10); + + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest }) + .returning("id") + .execute(); + + await sleep(10); + + // Query takes 2 seconds to run, so it still isn't done by this point + // after being queued in the first time + // As a result, since the above invalidation event is still queued + // We should expect just one more run to fire from both the above + // and this current insert + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest }) + .returning("id") + .execute(); + + // Give it enough time to let the query run twice + await sleep(1000); + + subscription.unsubscribe(); + await changes.teardown(); + + expect(nextListener).toHaveBeenCalledTimes(2); + }); + + test("can make decisions based on the lastResult", async () => { + const uniqueKindForThisTest = Math.random().toString(); + const changes = await getKyselyChanges(testPool, db, ["cards"], undefined); + + const nextListener = mock(() => {}); + + await db.deleteFrom("cards").execute(); + + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest }) + .execute(); + + await db.insertInto("cards").values({ kind: "dont include me!" }).execute(); + + const observable = observeQuery( + changes, + () => + db + .selectFrom("cards") + .selectAll() + .where("kind", "=", uniqueKindForThisTest) + .execute(), + { + cards: { + insert(row) { + return true; + }, + update(row, lastResult) { + return lastResult.some((card) => card.id === row.id); + }, + delete(row, lastResult) { + return true; + }, + }, + } + ); + + await sleep(100); + + const subscription = observable.subscribe({ + next: nextListener, + }); + + // Update a card that should trigger a rerun + await db + .updateTable("cards") + .set({ updated_at: new Date() }) + .where("kind", "=", uniqueKindForThisTest) + .execute(); + + await sleep(100); + + // Update a card that shouldn't trigger a rerun + await db + .updateTable("cards") + .set({ updated_at: new Date() }) + .where("kind", "!=", uniqueKindForThisTest) + .execute(); + + await sleep(100); + + subscription.unsubscribe(); + await changes.teardown(); + + expect(nextListener).toHaveBeenCalledTimes(2); + }); + + test("can make decisions based on the lastResult asynchronously", async () => { + const uniqueKindForThisTest = Math.random().toString(); + const changes = await getKyselyChanges(testPool, db, ["cards"], undefined); + + const nextListener = mock(() => {}); + + const testDeck = await db + .insertInto("decks") + .values({ name: "test deck" }) + .returning("id") + .executeTakeFirstOrThrow(); + + const otherDeck = await db + .insertInto("decks") + .values({ name: "other deck" }) + .returning("id") + .executeTakeFirstOrThrow(); + + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest, deck_id: testDeck.id }) + .execute(); + + await db.insertInto("cards").values({ kind: "dont include me!" }).execute(); + + const observable = observeQuery( + changes, + () => + db + .selectFrom("cards") + .fullJoin("decks", "cards.deck_id", "decks.id") + .select((qb) => [qb.fn.count("cards.id").as("card_count")]) + .where("decks.name", "like", "test%") + .executeTakeFirstOrThrow(), + { + cards: { + async insert(row) { + if (row.deck_id) { + const doesNewRowDeckStartWithTest = await db + .selectFrom("decks") + .select("name") + .where("id", "=", row.deck_id) + .where("name", "like", "test%") + .executeTakeFirst(); + + if (doesNewRowDeckStartWithTest) { + return true; + } + } + + return false; + }, + update(row, lastResult) { + return true; + }, + delete(row, lastResult) { + return true; + }, + }, + } + ); + + await sleep(100); + + const subscription = observable.subscribe({ + next: nextListener, + }); + + // Insert a card that should trigger a rerun + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest, deck_id: testDeck.id }) + .execute(); + + await sleep(100); + + // Insert a card that shouldn't trigger a rerun + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest, deck_id: otherDeck.id }) + .execute(); + + await sleep(100); + + subscription.unsubscribe(); + await changes.teardown(); + + expect(nextListener).toHaveBeenCalledTimes(2); + }); + + test("unsubscribe makes the query stops running", async () => { + const uniqueKindForThisTest = Math.random().toString(); + + const changes = await getKyselyChanges(testPool, db, ["cards"], undefined); + + const runWhenQueryRuns = mock(() => {}); + + const observable = observeQuery( + changes, + async () => { + runWhenQueryRuns(); + + const result = await db.selectFrom("cards").selectAll().execute(); + + return result; + }, + { + cards: { + insert(row, lastResult) { + return true; + }, + update(row, lastResult) { + return true; + }, + delete(row, lastResult) { + return true; + }, + }, + } + ); + + await sleep(100); + + const subscription = observable.subscribe({ + next: () => {}, + }); + + await sleep(100); + + // Do an insert that should trigger a rerun + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest }) + .execute(); + + await sleep(100); + subscription.unsubscribe(); + + // Do another insert that should trigger a rerun + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest }) + .execute(); + + await sleep(100); + + // And another insert that should trigger a rerun + await db + .insertInto("cards") + .values({ kind: uniqueKindForThisTest }) + .execute(); + + await sleep(100); + + subscription.unsubscribe(); + await changes.teardown(); + + // One time for the original run, and another time for the first + // call before .unsubscribe() was called + // None after that! + expect(runWhenQueryRuns).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/observeQuery.ts b/src/observeQuery.ts new file mode 100644 index 0000000..7ef311b --- /dev/null +++ b/src/observeQuery.ts @@ -0,0 +1,155 @@ +import { Observable } from "rxjs"; +import { + DeletePayload, + UpdatePayload, + InsertPayload, + MinimalKyselyDatabaseTemplate, + KyselyChanges, +} from "./types"; + +type TableMapHandler< + Database extends MinimalKyselyDatabaseTemplate, + Table extends keyof Database & string, + PrimaryKeyMap, + Result +> = { + insert?: ( + row: InsertPayload, + lastResult: Result + ) => boolean | Promise; + update?: ( + row: UpdatePayload, + lastResult: Result + ) => boolean | Promise; + delete?: ( + row: DeletePayload, + lastResult: Result + ) => boolean | Promise; +}; + +// type TableFunctionHandler< +// Database extends MinimalKyselyDatabaseTemplate, +// Table extends keyof Database & string, +// PrimaryKeyMap +// > = ( +// change: +// | { event: "insert"; row: InsertPayload } +// | { event: "update"; row: UpdatePayload } +// | { +// event: "delete"; +// identity: DeletePayload; +// } +// ) => boolean; + +type HandlerMap< + Database extends MinimalKyselyDatabaseTemplate, + IncludedTables extends keyof Database & string, + PrimaryKeyMap, + Result +> = { + [Table in IncludedTables]: TableMapHandler< + Database, + IncludedTables, + PrimaryKeyMap, + Result + >; +}; + +export const observeQuery = < + Database extends MinimalKyselyDatabaseTemplate, + IncludedTables extends keyof Database & string, + PrimaryKeyMap, + Result +>( + kyselyChanges: KyselyChanges, + query: () => Promise, + handlerMap: HandlerMap +): Observable => { + const { subject } = kyselyChanges; + + const observable = new Observable((subscriber) => { + let runningQuery: Promise | undefined = undefined; + let queueNextQuery: (() => Promise) | undefined; + let lastResult: undefined | Result = undefined; + + const runQueryAndCallNextIfExists = () => { + runningQuery = query() + .then((result) => { + // Query is no longer running + // Could be queued again + lastResult = result; + runningQuery = undefined; + + // Send the first result + subscriber.next(result); + + if (queueNextQuery) { + const toRun = queueNextQuery; + queueNextQuery = undefined; + runningQuery = toRun(); + } + }) + .catch((error) => { + subscriber.error(error); + }); + + return runningQuery; + }; + + // Kick off a first invocation + runQueryAndCallNextIfExists(); + + // Start subscribing after a transaction starts to fetch the first result + const subjectSubscription = subject.subscribe({ + next: async (change) => { + const valueAtHandlerMapForTable = handlerMap[change.table]; + + let shouldReRunQuery = false; + + if (typeof valueAtHandlerMapForTable === "object") { + const maybeHandler = valueAtHandlerMapForTable[change.event]; + if (maybeHandler) { + const handlerResult = await maybeHandler( + ("row" in change ? change.row : change.identity) as any, + lastResult as Result + ); + shouldReRunQuery = handlerResult; + } + } + + if (shouldReRunQuery) { + if (runningQuery === undefined && queueNextQuery === undefined) { + // There is no query running and no query already queued + // We should run the query immediately + runningQuery = runQueryAndCallNextIfExists(); + } else if ( + runningQuery !== undefined && + queueNextQuery === undefined + ) { + // We need to queue one up by setting queueNextQuery to the run function + // When the current query finishes, it will run queueNextQuery if defined + queueNextQuery = runQueryAndCallNextIfExists; + } else { + // We don't need to do anything - someone else is already waiting to start + // and they'll start with visibility of the current transaction we're reacting to + } + } else { + // There is no handler for this change + // Do nothing! We can't assume that the query needs to be re-run + } + }, + complete: () => { + subscriber.complete(); + }, + error: (error) => { + subscriber.error(error); + }, + }); + + return () => { + subjectSubscription.unsubscribe(); + }; + }); + + return observable; +}; diff --git a/src/types.ts b/src/types.ts new file mode 100644 index 0000000..328b33b --- /dev/null +++ b/src/types.ts @@ -0,0 +1,83 @@ +import { Kysely, Selectable } from "kysely"; +import { Subject } from "rxjs"; + +export type MinimalKyselyDatabaseTemplate = { + [table: string]: { + [column: string]: unknown; + }; +}; + +export type TableWithIdTemplate = { id: unknown }; + +type DatabaseWithOnlyPrimaryKeys = + // If the tables keys are undefined and table has an id + // then return the id + // Otherwise, there is no known primary key + NonStandardKeys extends undefined + ? Table extends TableWithIdTemplate + ? Pick + : {} + : // Otherwise, if the tables keys are specified and in the table, use them + NonStandardKeys extends readonly (keyof Table & string)[] + ? Pick + : never; + +export type PrimaryKeyOverrideInput< + Database, + IncludedTables extends keyof Database & string +> = { + readonly [table in IncludedTables]?: readonly (keyof Database[table] & + string)[]; +}; + +export type InsertPayload< + Database extends MinimalKyselyDatabaseTemplate, + Table extends keyof Database & string +> = Selectable; + +export type UpdatePayload< + Database extends MinimalKyselyDatabaseTemplate, + Table extends keyof Database & string +> = Selectable; + +export type DeletePayload< + Database extends MinimalKyselyDatabaseTemplate, + Table extends keyof Database & string, + PrimaryKeyMap +> = PrimaryKeyMap extends PrimaryKeyOverrideInput + ? Selectable< + DatabaseWithOnlyPrimaryKeys + > + : Selectable>; + +export type SubjectChangeEvent< + Database extends MinimalKyselyDatabaseTemplate, + IncludedTables extends keyof Database & string, + PrimaryKeyMap +> = + | { + table: IncludedTables; + event: "insert"; + row: InsertPayload; + } + | { + table: IncludedTables; + event: "update"; + row: UpdatePayload; + } + | { + table: IncludedTables; + event: "delete"; + identity: DeletePayload; + }; + +export type KyselyChanges< + Database extends MinimalKyselyDatabaseTemplate, + IncludedTables extends keyof Database & string, + PrimaryKeyMap +> = { + subject: Subject>; + db: Kysely; + teardown: () => Promise; + primaryKeyMap: PrimaryKeyMap; +}; diff --git a/src/walSubject.test.ts b/src/walSubject.test.ts new file mode 100644 index 0000000..24492fc --- /dev/null +++ b/src/walSubject.test.ts @@ -0,0 +1,373 @@ +import { Pool } from "pg"; +import { describe, test, expect, beforeAll, mock } from "bun:test"; +import invariant from "tiny-invariant"; +import { + GetObservableEventType, + getKyselyChanges, + listWalSlots, +} from "./walSubject"; +import { Kysely, PostgresDialect } from "kysely"; + +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +const testPool = new Pool({ + connectionString: + process.env.DATABASE_URL || + "postgres://postgres:postgres@localhost:7432/postgres", +}); + +const dialect = new PostgresDialect({ + pool: testPool, +}); + +const db = new Kysely({ + dialect, +}); + +type TestDatabase = { + widgets: { + id: number; + kind: string; + created_at: Date; + updated_at: Date; + }; + other_table_on_public: { + id: number; + kind: string; + created_at: Date; + updated_at: Date; + }; + "schema_two.widgets": { + id: number; + kind: string; + created_at: Date; + updated_at: Date; + }; + "schema_two.other_table_on_schema_two": { + id: number; + kind: string; + created_at: Date; + updated_at: Date; + }; +}; + +describe("walSubject", () => { + beforeAll(async () => { + await testPool.query(` + drop table if exists widgets; + create table widgets ( + id serial primary key, + kind text not null, + created_at timestamp not null default now(), + updated_at timestamp not null default now() + ); + + drop table if exists other_table_on_public; + create table other_table_on_public ( + id serial primary key, + kind text not null, + created_at timestamp not null default now(), + updated_at timestamp not null default now() + ); + + drop schema if exists schema_two cascade; + create schema schema_two; + create table schema_two.widgets ( + id serial primary key, + kind text not null, + created_at timestamp not null default now(), + updated_at timestamp not null default now() + ); + + create table schema_two.other_table_on_schema_two ( + id serial primary key, + kind text not null, + created_at timestamp not null default now(), + updated_at timestamp not null default now() + ); + `); + }); + + test("getKyselyChanges creates a temporary slot and teardown removes it", async () => { + const slotId = "basic_create_test"; + const { subject, teardown } = await getKyselyChanges( + testPool, + db, + ["widgets"], + undefined, + { slotId } + ); + + const slots = await listWalSlots(testPool); + const mySlot = slots.find((s) => s.slot_name === `app_slot_${slotId}`); + expect(mySlot).not.toBeUndefined(); + + await teardown(); + + const slotsAfterTeardown = await listWalSlots(testPool); + const mySlotAfterTeardown = slotsAfterTeardown.find( + (s) => s.slot_name === `app_slot_${slotId}` + ); + expect(mySlotAfterTeardown).toBeUndefined(); + }); + + test("can get changes from the subject", async () => { + const slotId = "basic_get_changes_test"; + const { subject, teardown } = await getKyselyChanges( + testPool, + db, + ["widgets"], + undefined, + { + slotId, + } + ); + + const listener = mock(() => {}); + + const subscription = subject.subscribe({ + next: listener, + }); + + await testPool.query("insert into widgets (kind) values ($1)", [ + "baseball", + ]); + + await sleep(100); + + await teardown(); + subscription.unsubscribe(); + + expect(listener).toHaveBeenCalled(); + }); + + test("changes from the subject follow a predictable shape", async () => { + const slotId = "basic_get_changes_shape_test"; + const { subject, teardown } = await getKyselyChanges( + testPool, + db, + ["widgets"], + undefined, + { + slotId, + } + ); + + let event: Parameters[0] | undefined; + const listener = mock((change) => { + event = change; + }); + + const subscription = subject.subscribe((change) => { + listener(change); + }); + + await testPool.query("insert into widgets (kind) values ($1)", [ + "baseball", + ]); + + await sleep(100); + + await teardown(); + subscription.unsubscribe(); + + expect(listener).toHaveBeenCalled(); + invariant(event); + expect(event.event).toBe("insert"); + expect(event.table).toBe("widgets"); + }); + + test("only get changes from the requested tables", async () => { + const slotId = "basic_get_changes_exclude_tables_test"; + const { subject, teardown } = await getKyselyChanges( + testPool, + db, + ["widgets"], + undefined, + { + slotId, + } + ); + + const changes: GetObservableEventType[] = []; + const listener = mock((change) => { + changes.push(change); + }); + + const subscription = subject.subscribe(listener); + + await testPool.query("insert into widgets (kind) values ($1)", [ + "baseball", + ]); + + await testPool.query( + "insert into other_table_on_public (kind) values ($1)", + ["basketball"] + ); + + await sleep(100); + + await teardown(); + subscription.unsubscribe(); + + expect(listener).toHaveBeenCalledTimes(1); + const basketballChange = changes.find( + // To verify that typescript is excluding this type, remove as any + // and get the type error + (change) => (change.table as any) === "other_table_on_public" + ); + expect(basketballChange).toBeUndefined(); + }); + + test("update payload follows a predictable shape", async () => { + const slotId = "update_get_changes_shape_test"; + + const insertResult = await testPool.query( + "insert into widgets (kind) values ($1) returning id", + ["baseball"] + ); + const insertedWidgetId = insertResult.rows[0].id; + + const { subject, teardown } = await getKyselyChanges( + testPool, + db, + ["widgets"], + undefined, + { + slotId, + } + ); + + await testPool.query("update widgets set kind = $1 where id = $2", [ + "basketball", + insertedWidgetId, + ]); + + let event: Parameters[0] | undefined; + const listener = mock((change) => { + event = change; + }); + + const subscription = subject.subscribe((change) => { + listener(change); + }); + + await sleep(100); + + await teardown(); + subscription.unsubscribe(); + + expect(listener).toHaveBeenCalled(); + invariant(event); + expect(event.event).toBe("update"); + expect(event.table).toBe("widgets"); + + invariant(event.event === "update"); + + expect(event.row.kind).toBe("basketball"); + }); + + test("delete payload follows a predictable shape and is typed to only include primary keys", async () => { + const slotId = "delete_get_changes_shape_test"; + + const insertResult = await testPool.query( + "insert into widgets (kind) values ($1) returning id", + ["baseball"] + ); + const insertedWidgetId = insertResult.rows[0].id; + + const { subject, teardown } = await getKyselyChanges( + testPool, + db, + ["widgets"], + undefined, + { + slotId, + } + ); + + await testPool.query("delete from widgets where id = $1", [ + insertedWidgetId, + ]); + + let event: Parameters[0] | undefined; + const listener = mock((change) => { + event = change; + }); + + const subscription = subject.subscribe((change) => { + listener(change); + }); + + await sleep(100); + + await teardown(); + subscription.unsubscribe(); + + expect(listener).toHaveBeenCalled(); + invariant(event); + expect(event.event).toBe("delete"); + expect(event.table).toBe("widgets"); + + invariant(event.event === "delete"); + + expect(event.identity.id).toBe(insertedWidgetId); + // To verify that delete only include primary keys, remove as any + // and see the error + expect((event.identity as any).kind).toBe(undefined); + }); + + test("delete primary key override changes the type of identity", async () => { + const slotId = "delete_primary_key_override_test"; + + // Assuming 'widgets' table has a composite primary key [id, kind] + const primaryKeyOverride = { widgets: ["id", "kind"] } as const; + + const insertResult = await testPool.query( + "insert into widgets (kind) values ($1) returning id, kind", + ["slinky"] + ); + const insertedWidget = insertResult.rows[0]; + + const { subject, teardown } = await getKyselyChanges( + testPool, + db, + ["widgets"], + primaryKeyOverride, + { + slotId, + } + ); + + await testPool.query("delete from widgets where id = $1 and kind = $2", [ + insertedWidget.id, + insertedWidget.kind, + ]); + + let event: Parameters[0] | undefined; + const listener = mock((change) => { + event = change; + }); + + const subscription = subject.subscribe((change) => { + listener(change); + }); + + await sleep(100); + + await teardown(); + subscription.unsubscribe(); + + expect(listener).toHaveBeenCalled(); + invariant(event); + expect(event.event).toBe("delete"); + expect(event.table).toBe("widgets"); + + invariant(event.event === "delete"); + + // Check that the identity includes both primary keys + expect(event.identity.id).toBe(insertedWidget.id); + // Expect it to be undefined, but expect Typescript to allow us to lie about it being defined + expect(event.identity.kind).toBeUndefined(); + }); +}); diff --git a/src/walSubject.ts b/src/walSubject.ts new file mode 100644 index 0000000..2f2a81d --- /dev/null +++ b/src/walSubject.ts @@ -0,0 +1,312 @@ +import { Pool, PoolClient } from "pg"; +import invariant from "tiny-invariant"; +import { Subject } from "rxjs"; +import { Kysely } from "kysely"; +import { + KyselyChanges, + MinimalKyselyDatabaseTemplate, + PrimaryKeyOverrideInput, + SubjectChangeEvent, +} from "./types"; + +const POLL_INTERVAL_MS = 50; + +/** + * Beware: this may include more than the keys (e.g. if there is no index) + * Largely copied from: https://github.com/graphile/graphile-engine/blob/v4/packages/lds/src/pg-logical-decoding.ts + */ +interface Keys { + keynames: Array; + keytypes: Array; + keyvalues: Array; +} + +interface Change { + // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L957 + schema: string; + // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L961 + table: string; +} + +// https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L941-L949 +export interface WALInsertChange extends Change { + kind: "insert"; + // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L969 + columnnames: Array; + columntypes: Array; + columnvalues: Array; +} + +export interface WALUpdateChange extends Change { + kind: "update"; + + // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L973 + columnnames: Array; + columntypes: Array; + columnvalues: Array; + // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L992-L1003 + oldkeys: Keys; +} + +export interface WALDeleteChange extends Change { + kind: "delete"; + // https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L1009-L1018 + oldkeys: Keys; +} + +export type WALChange = WALInsertChange | WALUpdateChange | WALDeleteChange; + +interface Payload { + lsn: string; + data: { + change: Array; + }; +} + +const toLsnData = ({ lsn, data }: { lsn: string; data: string }): Payload => ({ + lsn, + data: JSON.parse(data), +}); + +type WalConfigOptions = { + slotId?: string; + pollInterval?: number; + assumeSchema?: string; +}; + +const createSlot = async (client: PoolClient, slotName: string) => { + try { + await client.query( + "select pg_catalog.pg_create_logical_replication_slot($1, $2, $3)", + [slotName, "wal2json", true] + ); + } catch (e: unknown) { + invariant( + e instanceof Error, + "Expected error thrown from pg_create_logical_replication_slot to be an instance of Error, but got something else" + ); + + if ("code" in e && e.code === "58P01") { + throw new Error( + `Couldn't create replication slot, seems you don't have wal2json installed? Error: ${e.message}` + ); + } else { + throw e; + } + } +}; + +const dropSlot = async (client: PoolClient, slotName: string) => { + try { + await client.query("select pg_catalog.pg_drop_replication_slot($1)", [ + slotName, + ]); + } catch (e) { + // most likely reason for failure is that the slot does not exist, and we're fine! + } +}; + +let currentlyGettingChanges = false; +const getChanges = async ( + client: PoolClient, + slotName: string, + schema: string, + onlyIncludeTables: string[], + changeHandler: (payload: Payload[]) => void +): Promise => { + // if we're taking longer than POLL_INTERVAL_MS to process hanges, just skip this iteration + if (currentlyGettingChanges) { + return; + } + + currentlyGettingChanges = true; + try { + const addTablesString = onlyIncludeTables + .map((table) => (table.includes(".") ? `${table}` : `${schema}.${table}`)) + .join(","); + + // options documented here: https://github.com/eulerto/wal2json + const { rows } = await client.query<{ lsn: string; data: string }>( + `select lsn, data from pg_catalog.pg_logical_slot_get_changes( + $1, $2, $3, + 'include-transaction', 'false', + 'add-tables', $4 + )`, + [ + slotName, + null, // up to LSN + null, // limit of changes to get + addTablesString, + ] + ); + + const changes = rows.map(toLsnData); + changeHandler(changes); + } catch (e) { + invariant( + e instanceof Error, + "Expected error thrown from pg_logical_slot_get_changes to be an instance of Error, but got something else" + ); + + if ("code" in e && e.code === "42704") { + console.warn("Replication slot went away?"); + await createSlot(client, slotName); + console.warn( + "Recreated slot; retrying getChanges (no further output implies success)" + ); + + await getChanges( + client, + slotName, + schema, + onlyIncludeTables, + changeHandler + ); + } + + throw e; + } finally { + currentlyGettingChanges = false; + } +}; + +export const getKyselyChanges = async < + Database extends MinimalKyselyDatabaseTemplate, + IncludedTables extends keyof Database & string, + PrimaryKeyMap extends + | PrimaryKeyOverrideInput + | undefined +>( + pg: Pool, + db: Kysely, + onlyIncludeTables: IncludedTables[], + primaryKeyMap: PrimaryKeyMap, + options?: WalConfigOptions +): Promise> => { + // Destructure options and create constants + const { + slotId: userSlotId, + pollInterval: userPollInterval, + assumeSchema, + } = options ?? {}; + + const slotId = userSlotId || Math.random().toString().slice(2); + const pollInterval = userPollInterval ?? POLL_INTERVAL_MS; + const slotName = `app_slot_${slotId}`; + const schema = assumeSchema ?? "public"; + + // Create client, subject, and actually create the slot in the DB + const client = await pg.connect(); + const subject = new Subject< + SubjectChangeEvent + >(); + await createSlot(client, slotName); + + const isUsingMultipleSchemas = onlyIncludeTables.some((table) => + table.includes(".") + ); + + // Start polling for changes + const interval = setInterval( + () => + getChanges(client, slotName, schema, onlyIncludeTables, (payloads) => { + for (const payload of payloads) { + for (const change of payload.data.change) { + if (change.kind === "insert") { + const pairs = change.columnnames.map((name, index) => [ + name, + change.columnvalues[index], + ]); + + const tableKey = ( + !isUsingMultipleSchemas + ? `${change.table}` + : `${schema}.${change.table}` + ) as IncludedTables; + + const event = { + table: tableKey, + event: change.kind, + row: Object.fromEntries(pairs), + }; + + subject.next(event); + } + + if (change.kind === "update") { + const pairs = change.columnnames.map((name, index) => [ + name, + change.columnvalues[index], + ]); + + const tableKey = ( + !isUsingMultipleSchemas + ? `${change.table}` + : `${schema}.${change.table}` + ) as IncludedTables; + + const event = { + table: tableKey, + event: change.kind, + row: Object.fromEntries(pairs), + }; + + subject.next(event); + } + + if (change.kind === "delete") { + const pairs = change.oldkeys.keynames.map((name, index) => [ + name, + change.oldkeys.keyvalues[index], + ]); + + const tableKey = ( + !isUsingMultipleSchemas + ? `${change.table}` + : `${schema}.${change.table}` + ) as IncludedTables; + + const event = { + table: tableKey, + event: change.kind, + identity: Object.fromEntries(pairs), + }; + + subject.next(event); + } + } + } + }), + pollInterval + ); + + /** + * Teardown function that should be called when you're done with the subject + * It will drop the slot from the DB and release the client back to the pool + * It will also complete the subject, completing any subscribers of it as well + */ + const teardown = async () => { + clearInterval(interval); + await dropSlot(client, slotName); + await client.release(); + subject.complete(); + }; + + return { subject, db, teardown, primaryKeyMap }; +}; + +export type GetObservableEventType = T extends Subject ? U : never; + +export const listWalSlots = async (pg: Pool) => { + const { rows } = await pg.query<{ + slot_name: string; + plugin_name: string; + database: string; + temporary: boolean; + active: boolean; + }>( + "select slot_name, plugin, slot_type, database, temporary, active from pg_replication_slots" + ); + + return rows; +}; diff --git a/tsconfig.build.json b/tsconfig.build.json new file mode 100644 index 0000000..48ef472 --- /dev/null +++ b/tsconfig.build.json @@ -0,0 +1,114 @@ +{ + "compilerOptions": { + /* Visit https://aka.ms/tsconfig to read more about this file */ + + /* Projects */ + // "incremental": true, /* Save .tsbuildinfo files to allow for incremental compilation of projects. */ + // "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */ + // "tsBuildInfoFile": "./.tsbuildinfo", /* Specify the path to .tsbuildinfo incremental compilation file. */ + // "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */ + // "disableSolutionSearching": true, /* Opt a project out of multi-project reference checking when editing. */ + // "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */ + + /* Language and Environment */ + "target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */ + // "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */ + // "jsx": "preserve", /* Specify what JSX code is generated. */ + // "experimentalDecorators": true, /* Enable experimental support for legacy experimental decorators. */ + // "emitDecoratorMetadata": true, /* Emit design-type metadata for decorated declarations in source files. */ + // "jsxFactory": "", /* Specify the JSX factory function used when targeting React JSX emit, e.g. 'React.createElement' or 'h'. */ + // "jsxFragmentFactory": "", /* Specify the JSX Fragment reference used for fragments when targeting React JSX emit e.g. 'React.Fragment' or 'Fragment'. */ + // "jsxImportSource": "", /* Specify module specifier used to import the JSX factory functions when using 'jsx: react-jsx*'. */ + // "reactNamespace": "", /* Specify the object invoked for 'createElement'. This only applies when targeting 'react' JSX emit. */ + // "noLib": true, /* Disable including any library files, including the default lib.d.ts. */ + // "useDefineForClassFields": true, /* Emit ECMAScript-standard-compliant class fields. */ + // "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */ + + /* Modules */ + "module": "commonjs", /* Specify what module code is generated. */ + // "rootDir": "./", /* Specify the root folder within your source files. */ + // "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */ + // "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */ + // "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */ + // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ + // "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */ + // "types": [], /* Specify type package names to be included without being referenced in a source file. */ + // "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */ + // "moduleSuffixes": [], /* List of file name suffixes to search when resolving a module. */ + // "allowImportingTsExtensions": true, /* Allow imports to include TypeScript file extensions. Requires '--moduleResolution bundler' and either '--noEmit' or '--emitDeclarationOnly' to be set. */ + // "resolvePackageJsonExports": true, /* Use the package.json 'exports' field when resolving package imports. */ + // "resolvePackageJsonImports": true, /* Use the package.json 'imports' field when resolving imports. */ + // "customConditions": [], /* Conditions to set in addition to the resolver-specific defaults when resolving imports. */ + // "resolveJsonModule": true, /* Enable importing .json files. */ + // "allowArbitraryExtensions": true, /* Enable importing files with any extension, provided a declaration file is present. */ + // "noResolve": true, /* Disallow 'import's, 'require's or ''s from expanding the number of files TypeScript should add to a project. */ + + /* JavaScript Support */ + // "allowJs": true, /* Allow JavaScript files to be a part of your program. Use the 'checkJS' option to get errors from these files. */ + // "checkJs": true, /* Enable error reporting in type-checked JavaScript files. */ + // "maxNodeModuleJsDepth": 1, /* Specify the maximum folder depth used for checking JavaScript files from 'node_modules'. Only applicable with 'allowJs'. */ + + /* Emit */ + "declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */ + // "declarationMap": true, /* Create sourcemaps for d.ts files. */ + // "emitDeclarationOnly": true, /* Only output d.ts files and not JavaScript files. */ + // "sourceMap": true, /* Create source map files for emitted JavaScript files. */ + // "inlineSourceMap": true, /* Include sourcemap files inside the emitted JavaScript. */ + // "outFile": "./", /* Specify a file that bundles all outputs into one JavaScript file. If 'declaration' is true, also designates a file that bundles all .d.ts output. */ + "outDir": "./dist", /* Specify an output folder for all emitted files. */ + // "removeComments": true, /* Disable emitting comments. */ + // "noEmit": true, /* Disable emitting files from a compilation. */ + // "importHelpers": true, /* Allow importing helper functions from tslib once per project, instead of including them per-file. */ + // "importsNotUsedAsValues": "remove", /* Specify emit/checking behavior for imports that are only used for types. */ + // "downlevelIteration": true, /* Emit more compliant, but verbose and less performant JavaScript for iteration. */ + // "sourceRoot": "", /* Specify the root path for debuggers to find the reference source code. */ + // "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */ + // "inlineSources": true, /* Include source code in the sourcemaps inside the emitted JavaScript. */ + // "emitBOM": true, /* Emit a UTF-8 Byte Order Mark (BOM) in the beginning of output files. */ + // "newLine": "crlf", /* Set the newline character for emitting files. */ + // "stripInternal": true, /* Disable emitting declarations that have '@internal' in their JSDoc comments. */ + // "noEmitHelpers": true, /* Disable generating custom helper functions like '__extends' in compiled output. */ + // "noEmitOnError": true, /* Disable emitting files if any type checking errors are reported. */ + // "preserveConstEnums": true, /* Disable erasing 'const enum' declarations in generated code. */ + // "declarationDir": "./", /* Specify the output directory for generated declaration files. */ + // "preserveValueImports": true, /* Preserve unused imported values in the JavaScript output that would otherwise be removed. */ + + /* Interop Constraints */ + // "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */ + // "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */ + // "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */ + "esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */ + // "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */ + "forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */ + + /* Type Checking */ + "strict": true, /* Enable all strict type-checking options. */ + // "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied 'any' type. */ + // "strictNullChecks": true, /* When type checking, take into account 'null' and 'undefined'. */ + // "strictFunctionTypes": true, /* When assigning functions, check to ensure parameters and the return values are subtype-compatible. */ + // "strictBindCallApply": true, /* Check that the arguments for 'bind', 'call', and 'apply' methods match the original function. */ + // "strictPropertyInitialization": true, /* Check for class properties that are declared but not set in the constructor. */ + // "noImplicitThis": true, /* Enable error reporting when 'this' is given the type 'any'. */ + // "useUnknownInCatchVariables": true, /* Default catch clause variables as 'unknown' instead of 'any'. */ + // "alwaysStrict": true, /* Ensure 'use strict' is always emitted. */ + // "noUnusedLocals": true, /* Enable error reporting when local variables aren't read. */ + // "noUnusedParameters": true, /* Raise an error when a function parameter isn't read. */ + // "exactOptionalPropertyTypes": true, /* Interpret optional property types as written, rather than adding 'undefined'. */ + // "noImplicitReturns": true, /* Enable error reporting for codepaths that do not explicitly return in a function. */ + // "noFallthroughCasesInSwitch": true, /* Enable error reporting for fallthrough cases in switch statements. */ + // "noUncheckedIndexedAccess": true, /* Add 'undefined' to a type when accessed using an index. */ + // "noImplicitOverride": true, /* Ensure overriding members in derived classes are marked with an override modifier. */ + // "noPropertyAccessFromIndexSignature": true, /* Enforces using indexed accessors for keys declared using an indexed type. */ + // "allowUnusedLabels": true, /* Disable error reporting for unused labels. */ + // "allowUnreachableCode": true, /* Disable error reporting for unreachable code. */ + + /* Completeness */ + // "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */ + "skipLibCheck": true /* Skip type checking all .d.ts files. */ + }, + "exclude": [ + "node_modules", + "**/*.test.ts", + "dist" + ], +} \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..7556e1d --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "lib": ["ESNext"], + "module": "esnext", + "target": "esnext", + "moduleResolution": "bundler", + "moduleDetection": "force", + "allowImportingTsExtensions": true, + "noEmit": true, + "composite": true, + "strict": true, + "downlevelIteration": true, + "skipLibCheck": true, + "jsx": "react-jsx", + "allowSyntheticDefaultImports": true, + "forceConsistentCasingInFileNames": true, + "allowJs": true, + "types": [ + "bun-types" // add Bun global + ] + } +}