-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 6c8fcc7
Showing
13 changed files
with
1,906 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore | ||
.tool-versions | ||
|
||
# Logs | ||
|
||
logs | ||
_.log | ||
npm-debug.log_ | ||
yarn-debug.log* | ||
yarn-error.log* | ||
lerna-debug.log* | ||
.pnpm-debug.log* | ||
|
||
# Caches | ||
|
||
.cache | ||
|
||
# Diagnostic reports (https://nodejs.org/api/report.html) | ||
|
||
report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json | ||
|
||
# Runtime data | ||
|
||
pids | ||
_.pid | ||
_.seed | ||
*.pid.lock | ||
|
||
# Directory for instrumented libs generated by jscoverage/JSCover | ||
|
||
lib-cov | ||
|
||
# Coverage directory used by tools like istanbul | ||
|
||
coverage | ||
*.lcov | ||
|
||
# nyc test coverage | ||
|
||
.nyc_output | ||
|
||
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) | ||
|
||
.grunt | ||
|
||
# Bower dependency directory (https://bower.io/) | ||
|
||
bower_components | ||
|
||
# node-waf configuration | ||
|
||
.lock-wscript | ||
|
||
# Compiled binary addons (https://nodejs.org/api/addons.html) | ||
|
||
build/Release | ||
|
||
# Dependency directories | ||
|
||
node_modules/ | ||
jspm_packages/ | ||
|
||
# Snowpack dependency directory (https://snowpack.dev/) | ||
|
||
web_modules/ | ||
|
||
# TypeScript cache | ||
|
||
*.tsbuildinfo | ||
|
||
# Optional npm cache directory | ||
|
||
.npm | ||
|
||
# Optional eslint cache | ||
|
||
.eslintcache | ||
|
||
# Optional stylelint cache | ||
|
||
.stylelintcache | ||
|
||
# Microbundle cache | ||
|
||
.rpt2_cache/ | ||
.rts2_cache_cjs/ | ||
.rts2_cache_es/ | ||
.rts2_cache_umd/ | ||
|
||
# Optional REPL history | ||
|
||
.node_repl_history | ||
|
||
# Output of 'npm pack' | ||
|
||
*.tgz | ||
|
||
# Yarn Integrity file | ||
|
||
.yarn-integrity | ||
|
||
# dotenv environment variable files | ||
|
||
.env | ||
.env.development.local | ||
.env.test.local | ||
.env.production.local | ||
.env.local | ||
|
||
# parcel-bundler cache (https://parceljs.org/) | ||
|
||
.parcel-cache | ||
|
||
# Next.js build output | ||
|
||
.next | ||
out | ||
|
||
# Nuxt.js build / generate output | ||
|
||
.nuxt | ||
dist | ||
|
||
# Gatsby files | ||
|
||
# Comment in the public line in if your project uses Gatsby and not Next.js | ||
|
||
# https://nextjs.org/blog/next-9-1#public-directory-support | ||
|
||
# public | ||
|
||
# vuepress build output | ||
|
||
.vuepress/dist | ||
|
||
# vuepress v2.x temp and cache directory | ||
|
||
.temp | ||
|
||
# Docusaurus cache and generated files | ||
|
||
.docusaurus | ||
|
||
# Serverless directories | ||
|
||
.serverless/ | ||
|
||
# FuseBox cache | ||
|
||
.fusebox/ | ||
|
||
# DynamoDB Local files | ||
|
||
.dynamodb/ | ||
|
||
# TernJS port file | ||
|
||
.tern-port | ||
|
||
# Stores VSCode versions used for testing VSCode extensions | ||
|
||
.vscode-test | ||
|
||
# yarn v2 | ||
|
||
.yarn/cache | ||
.yarn/unplugged | ||
.yarn/build-state.yml | ||
.yarn/install-state.gz | ||
.pnp.* | ||
|
||
# IntelliJ based IDEs | ||
.idea | ||
|
||
# Finder (MacOS) folder config | ||
.DS_Store |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"cSpell.words": [ | ||
"columnnames", | ||
"columntypes", | ||
"columnvalues", | ||
"keynames", | ||
"keytypes", | ||
"keyvalues", | ||
"kysely", | ||
"oldkeys" | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
# kysely-pg-observables | ||
|
||
`kysely-pg-observables` makes reactive programming with Kysely and PostgreSQL easy and safe. | ||
|
||
It wraps PostgreSQL's `wal2json` plugins, and let's you interact with the results using the type-information | ||
you have already defined for your Kysely database. | ||
|
||
Because it depends on `wal2json`, it only works with installations of PostgreSQL that support it, which fortunately | ||
includes RDS, Google Cloud SQL, Supabase, default Patroni images, Crunchy images, and many others. | ||
It does *not* work with Neon (or Vercel Postgres) yet. | ||
|
||
There are two main things you can do with `kysely-pg-observables`: | ||
1. Create a type-safe RxJS `Subject` from a Kysely database | ||
2. Create full observable's from Kysely queries | ||
|
||
# 1. Create a type-safe RxJS `Subject` from a Kysely database | ||
|
||
```typescript | ||
import { getKyselyChanges } from "kysely-pg-observables"; | ||
|
||
const pool = new Pool({ | ||
connectionString: process.env.DATABASE_URL | ||
}); | ||
|
||
const dialect = new PostgresDialect({ | ||
pool: testPool, | ||
}); | ||
|
||
type MyDatabaseType = { | ||
widgets: { | ||
id: number; | ||
name: string; | ||
color: string; | ||
active: boolean; | ||
}; | ||
tableWithOtherPrimaryKey: { | ||
other_id: number; | ||
name: string; | ||
color: string; | ||
}; | ||
}; | ||
|
||
const db = new Kysely<MyDatabaseType>({ | ||
dialect, | ||
}); | ||
|
||
const changes = await getKyselyChanges( | ||
testPool, | ||
db, | ||
["widgets"] // the list of tables you actually care about changes for | ||
{ | ||
otherPrimaryKey: ["otherId"], // the name of the primary key for tableWithOtherPrimaryKey | ||
} | ||
); | ||
|
||
// subject is an RxJs subject that you can subscribe to, operate on, create more observables from, etc. | ||
const subject = changes.subject | ||
|
||
const subscription = subject.subscribe({ | ||
next: (change) => { | ||
// Change is typed as: | ||
// { | ||
// table: string; | ||
// event: 'insert' | 'update' | 'delete'; | ||
// row: Database[table] (if event is insert or update) | ||
// identity: // a partial Database[table] containing only the primary keys of the deleted row | ||
// } | ||
} | ||
}) | ||
|
||
// When ready, cancel the wal slot and subscription altogether | ||
subscription.unsubscribe() | ||
|
||
// There is also changes.teardown() to fully tear everything down | ||
``` | ||
|
||
`getKyselyChanges` takes 4 parameters: | ||
- `pool`: a `pg.Pool` instance that is used to connect to the database | ||
- `db`: a `Kysely` instance that is used to get the type information for the database | ||
- `tables`: an array of table names that you want to get changes for | ||
- `primaryKeyOverrides`: an object that maps tables to their primary keys | ||
|
||
## Why do I need to provide tables? | ||
|
||
In order to make the events generated by `wal2json` type-safe, we need to limit the tables that we get changes for | ||
to only the tables that we know type information for. If we generated events for all tables, we would have to | ||
filter for them inside of the application, but we don't have a list of tables (just a type of tables). | ||
|
||
## What is this primary key override business? | ||
|
||
For deleted records, the only information available is the value of the primary key(s) of the deleted record. | ||
By default, we'll assume that any `id` column on a table is a primary key. If that is not true for any table, | ||
just include its name and an array of columns which are the primary keys of the table, and we'll use that | ||
for delete event's identities instead. | ||
|
||
# 2. Create full observable's from Kysely queries | ||
|
||
You can create full observables which re-run the query whenever the underlying data changes. | ||
|
||
```typescript | ||
import { observeQuery } from "kysely-pg-observables"; | ||
|
||
// setup otherwise similar as before | ||
const { subject, teardown } = await getKyselyChanges( | ||
testPool, | ||
db, | ||
["widgets"], | ||
undefined, | ||
{ slotId } | ||
); | ||
|
||
const observable = await observeQuery( | ||
observeQuery( | ||
subject, | ||
() => db.selectFrom('widgets').selectAll().execute(), | ||
{ | ||
widgets: { | ||
insert(row) { | ||
return true; // Assuming we want to re-run the query for every insert | ||
}, | ||
update(row, lastResult) { | ||
return lastResult.some((widget) => widget.id === row.id); // Re-run only if the updated widget was in the last result | ||
}, | ||
delete(identity, lastResult) { | ||
return lastResult.some((widget) => widget.id === identity.id); // Re-run only if the deleted widget was in the last result | ||
}, | ||
}, | ||
} | ||
) | ||
); | ||
|
||
observable.subscribe(result => { | ||
// This callback will fire every time the underlying data changes | ||
}) | ||
``` | ||
|
||
This can be used for a query of any complexity. Do whatever you want inside of your async function! | ||
Just make sure that you take into account the way the underlying data can change and how that will influence your query results. | ||
|
||
## Usage With trpc | ||
|
||
Although trpc does not use RxJS under the hood, the resulting observable is compatible with the type expected by trpc subscriptions. | ||
|
||
This means you cand do: | ||
```typescript | ||
allWidgets: procedure.subscription(() => { | ||
return observeQuery( | ||
subject, | ||
() => db.selectFrom('widgets').selectAll().execute(), | ||
{ | ||
widgets: { | ||
insert(row) { | ||
return true; // Assuming we want to re-run the query for every insert | ||
}, | ||
update(row, lastResult) { | ||
return lastResult.some((widget) => widget.id === row.id); // Re-run only if the updated widget was in the last result | ||
}, | ||
delete(identity, lastResult) { | ||
return lastResult.some((widget) => widget.id === identity.id); // Re-run only if the deleted widget was in the last result | ||
}, | ||
}, | ||
} | ||
) | ||
} | ||
}) | ||
``` | ||
|
||
Because the observable's are compatible, properly unsubscribing when the client disconnects is handled for you. | ||
|
||
## Sharing an observable between multiple trpc subscriptions clients | ||
|
||
If you want to share an observable between multiple clients, you can pass a resulting observable to the | ||
[`multicast` function](https://rxjs.dev/guide/subject#multicasted-observables) and then create multiple observables from the same source. | ||
|
||
If you share an observable between multiple clients, and do *not* multi-cast it, each event from the observable will only | ||
be fired once, and both clients will not get each event. I do not know who will get which one. | ||
|
||
# Roadmap | ||
|
||
- [ ] I'd like to support json-patch, with client-side utilities as well. | ||
|
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
console.log("Hello via Bun!"); |
Oops, something went wrong.