diff --git a/components/neon_postgres/actions/delete-rows/delete-rows.mjs b/components/neon_postgres/actions/delete-rows/delete-rows.mjs new file mode 100644 index 0000000000000..e44aa086eeeb4 --- /dev/null +++ b/components/neon_postgres/actions/delete-rows/delete-rows.mjs @@ -0,0 +1,70 @@ +import neon from "../../neon_postgres.app.mjs"; + +export default { + name: "Delete Row(s)", + key: "neon_postgres-delete-rows", + description: "Deletes a row or rows from a table. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "action", + props: { + neon, + schema: { + propDefinition: [ + neon, + "schema", + ], + }, + table: { + propDefinition: [ + neon, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + column: { + propDefinition: [ + neon, + "column", + (c) => ({ + table: c.table, + schema: c.schema, + }), + ], + label: "Lookup Column", + description: "Find row(s) by searching for a value in this column", + }, + value: { + propDefinition: [ + neon, + "value", + (c) => ({ + table: c.table, + column: c.column, + schema: c.schema, + }), + ], + }, + }, + async run({ $ }) { + const { + table, + schema, + column, + value, + } = this; + + const errorMsg = "Row not deleted due to an error. "; + + const rows = await this.neon.deleteRows( + schema, + table, + column, + value, + errorMsg, + ); + $.export("$summary", `Deleted ${rows.length} rows from ${table}`); + return rows; + }, +}; diff --git a/components/neon_postgres/actions/execute-custom-query/execute-custom-query.mjs b/components/neon_postgres/actions/execute-custom-query/execute-custom-query.mjs new file mode 100644 index 0000000000000..d5bf26ef2602d --- /dev/null +++ b/components/neon_postgres/actions/execute-custom-query/execute-custom-query.mjs @@ -0,0 +1,28 @@ +import neon from "../../neon_postgres.app.mjs"; + +export default { + key: "neon_postgres-execute-custom-query", + name: "Execute SQL Query", + description: "Execute a custom PostgreSQL query. See [our docs](https://pipedream.com/docs/databases/working-with-sql) to learn more about working with SQL in Pipedream.", + version: "0.0.1", + type: "action", + props: { + neon, + // eslint-disable-next-line pipedream/props-description + sql: { + type: "sql", + auth: { + app: "neon", + }, + label: "PostgreSQL Query", + }, + }, + async run({ $ }) { + const args = this.neon.executeQueryAdapter(this.sql); + const data = await this.neon.executeQuery(args); + $.export("$summary", `Returned ${data.length} ${data.length === 1 + ? "row" + : "rows"}`); + return data; + }, +}; diff --git a/components/neon_postgres/actions/find-row-custom-query/find-row-custom-query.mjs b/components/neon_postgres/actions/find-row-custom-query/find-row-custom-query.mjs new file mode 100644 index 0000000000000..2bf1c44a9c1aa --- /dev/null +++ b/components/neon_postgres/actions/find-row-custom-query/find-row-custom-query.mjs @@ -0,0 +1,53 @@ +import neon from "../../neon_postgres.app.mjs"; + +export default { + name: "Find Row With Custom Query", + key: "neon_postgres-find-row-custom-query", + description: "Finds a row in a table via a custom query. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "action", + props: { + neon, + query: { + propDefinition: [ + neon, + "query", + ], + }, + values: { + propDefinition: [ + neon, + "values", + ], + }, + }, + async run({ $ }) { + const { + query, + values = [], + } = this; + + if (!Array.isArray(values)) { + throw new Error("No valid values provided. The values property must be an array."); + } + + if (this.values) { + const numberOfValues = query?.match(/\$/g)?.length || 0; + if (values.length !== numberOfValues) { + throw new Error("The number of values provided does not match the number of values in the query."); + } + } + + if (!query.toLowerCase().includes("select")) { + throw new Error("Need be a `SELECT` statement query. Read more about [SELECT queries here](https://www.w3schools.com/sql/sql_select.asp)"); + } + + const res = await this.neon.executeQuery({ + text: query, + values, + errorMsg: "Query not executed due to an error. ", + }); + $.export("$summary", "Successfully executed query"); + return res; + }, +}; diff --git a/components/neon_postgres/actions/find-row/find-row.mjs b/components/neon_postgres/actions/find-row/find-row.mjs new file mode 100644 index 0000000000000..02a8bd109d28d --- /dev/null +++ b/components/neon_postgres/actions/find-row/find-row.mjs @@ -0,0 +1,73 @@ +import neon from "../../neon_postgres.app.mjs"; + +export default { + name: "Find Row", + key: "neon_postgres-find-row", + description: "Finds a row in a table via a lookup column. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "action", + props: { + neon, + schema: { + propDefinition: [ + neon, + "schema", + ], + }, + table: { + propDefinition: [ + neon, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + column: { + propDefinition: [ + neon, + "column", + (c) => ({ + table: c.table, + schema: c.schema, + }), + ], + label: "Lookup Column", + description: "Find row by searching for a value in this column. Returns first row found", + }, + value: { + propDefinition: [ + neon, + "value", + (c) => ({ + table: c.table, + column: c.column, + schema: c.schema, + }), + ], + }, + }, + async run({ $ }) { + const { + schema, + table, + column, + value, + } = this; + + const errorMsg = "Row not found due to an error. "; + + const res = await this.neon.findRowByValue( + schema, + table, + column, + value, + errorMsg, + ); + const summary = res + ? "Row found" + : "Row not found"; + $.export("$summary", summary); + return res; + }, +}; diff --git a/components/neon_postgres/actions/insert-row/insert-row.mjs b/components/neon_postgres/actions/insert-row/insert-row.mjs new file mode 100644 index 0000000000000..473022e33a5e0 --- /dev/null +++ b/components/neon_postgres/actions/insert-row/insert-row.mjs @@ -0,0 +1,68 @@ +import neon from "../../neon_postgres.app.mjs"; +import { parseRowValues } from "../../common/utils.mjs"; + +export default { + name: "Insert Row", + key: "neon_postgres-insert-row", + description: "Adds a new row. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "action", + props: { + neon, + schema: { + propDefinition: [ + neon, + "schema", + ], + }, + table: { + propDefinition: [ + neon, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + rowValues: { + propDefinition: [ + neon, + "rowValues", + ], + description: "JSON representation of your table rows. Accept a single row (JSON Object) or multiple rows (JSON array). For example: `{ \"product_id\": 1, \"product_name\": \"Laptop Pro 15\", \"price\": 1200.50, \"stock_quantity\": 50, \"created_at\": \"2023-10-26T10:00:00Z\" }`", + }, + }, + async run({ $ }) { + const { + schema, + table, + rowValues, + } = this; + const results = []; + const parsedRowValues = parseRowValues(rowValues); + const parsedRowValuesArray = Array.isArray(parsedRowValues) + ? parsedRowValues + : [ + parsedRowValues, + ]; + + const errorMsg = "New row(s) not inserted due to an error. "; + + for (const row of parsedRowValuesArray) { + const columns = Object.keys(row); + const values = Object.values(row); + const res = await this.neon.insertRow( + schema, + table, + columns, + values, + errorMsg, + ); + results.push(res); + } + $.export("$summary", `Successfully inserted ${results.length} row${results.length === 1 + ? "" + : "s"}`); + return results; + }, +}; diff --git a/components/neon_postgres/actions/update-row/update-row.mjs b/components/neon_postgres/actions/update-row/update-row.mjs new file mode 100644 index 0000000000000..f08685eadaab4 --- /dev/null +++ b/components/neon_postgres/actions/update-row/update-row.mjs @@ -0,0 +1,84 @@ +import neon from "../../neon_postgres.app.mjs"; +import { parseRowValues } from "../../common/utils.mjs"; + +export default { + name: "Update Row", + key: "neon_postgres-update-row", + description: "Updates an existing row. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "action", + props: { + neon, + schema: { + propDefinition: [ + neon, + "schema", + ], + }, + table: { + propDefinition: [ + neon, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + column: { + propDefinition: [ + neon, + "column", + (c) => ({ + table: c.table, + schema: c.schema, + }), + ], + label: "Lookup Column", + description: "Find row to update by searching for a value in this column. Returns first row found", + }, + value: { + propDefinition: [ + neon, + "value", + (c) => ({ + table: c.table, + column: c.column, + schema: c.schema, + }), + ], + }, + rowValues: { + propDefinition: [ + neon, + "rowValues", + ], + description: "JSON representation of your new table row values. For example: `{ \"product_name\": \"Laptop Pro 15\", \"price\": 1200.50, \"stock_quantity\": 50 }`", + }, + }, + async run({ $ }) { + const { + schema, + table, + column, + value, + rowValues, + } = this; + + const parsedRowValues = parseRowValues(rowValues); + const errorMsg = "Row not updated due to an error. "; + + const res = await this.neon.updateRow( + schema, + table, + column, + value, + parsedRowValues, + errorMsg, + ); + const summary = res + ? "Row updated" + : "Row not found"; + $.export("$summary", summary); + return res; + }, +}; diff --git a/components/neon_postgres/actions/upsert-row/upsert-row.mjs b/components/neon_postgres/actions/upsert-row/upsert-row.mjs new file mode 100644 index 0000000000000..f6c4565e56122 --- /dev/null +++ b/components/neon_postgres/actions/upsert-row/upsert-row.mjs @@ -0,0 +1,70 @@ +import neon from "../../neon_postgres.app.mjs"; +import { parseRowValues } from "../../common/utils.mjs"; + +export default { + name: "Upsert Row", + key: "neon_postgres-upsert-row", + description: "Adds a new row or updates an existing row. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "action", + props: { + neon, + schema: { + propDefinition: [ + neon, + "schema", + ], + }, + table: { + propDefinition: [ + neon, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + conflictTarget: { + propDefinition: [ + neon, + "column", + (c) => ({ + table: c.table, + schema: c.schema, + }), + ], + label: "Conflict Target", + description: "If insert fails, update the row with the same value in this column.", + }, + rowValues: { + propDefinition: [ + neon, + "rowValues", + ], + description: "JSON representation of your table row values. For example: `{ \"product_name\": \"Laptop Pro 15\", \"price\": 1200.50, \"stock_quantity\": 50 }`", + }, + }, + async run({ $ }) { + const { + rowValues, + ...args + } = this; + + const parsedRowValues = parseRowValues(rowValues); + + const columns = Object.keys(parsedRowValues); + const values = Object.values(parsedRowValues); + + const res = await this.neon.upsertRow({ + columns, + values, + errorMsg: "Row not upserted due to an error. ", + ...args, + }); + const summary = res + ? "Row upserted" + : "Row not upserted"; + $.export("$summary", summary); + return res; + }, +}; diff --git a/components/neon_postgres/common/utils.mjs b/components/neon_postgres/common/utils.mjs new file mode 100644 index 0000000000000..de76327adc95a --- /dev/null +++ b/components/neon_postgres/common/utils.mjs @@ -0,0 +1,16 @@ +export function parseRowValues(rowValues) { + if (!rowValues) { + return undefined; + } + if (Array.isArray(rowValues)) { + return rowValues.map(parseRowValues); + } + if (typeof rowValues === "string") { + try { + return JSON.parse(rowValues); + } catch (error) { + return rowValues; + } + } + return rowValues; +} diff --git a/components/neon_postgres/neon_postgres.app.mjs b/components/neon_postgres/neon_postgres.app.mjs index c3096bc83084e..cb3126896ffd9 100644 --- a/components/neon_postgres/neon_postgres.app.mjs +++ b/components/neon_postgres/neon_postgres.app.mjs @@ -1,11 +1,56 @@ +import postgresql from "@pipedream/postgresql"; +import format from "pg-format"; + export default { type: "app", app: "neon_postgres", - propDefinitions: {}, + propDefinitions: { + ...postgresql.propDefinitions, + }, methods: { - // this.$auth contains connected account data - authKeys() { - console.log(Object.keys(this.$auth)); + ...postgresql.methods, + getClientConfiguration() { + const { + host, + port, + user, + password, + database, + } = this.$auth; + + return { + host, + port, + user, + password, + database, + ssl: this._getSslConfig(), + }; + }, + upsertRow({ + schema, table, columns, values, conflictTarget = "id", errorMsg, + } = {}) { + const placeholders = this.getPlaceholders({ + values, + }); + + const updates = columns + .filter((column) => column !== conflictTarget) + .map((column) => `${column}=EXCLUDED.${column}`); + + const query = ` + INSERT INTO ${schema}.${table} (${columns}) + VALUES (${placeholders}) + ON CONFLICT (${conflictTarget}) + DO UPDATE SET ${updates} + RETURNING * + `; + + return this.executeQuery({ + text: format(query, schema, table), + values, + errorMsg, + }); }, }, }; diff --git a/components/neon_postgres/package.json b/components/neon_postgres/package.json index 19acbd4945023..3ede6d69f017e 100644 --- a/components/neon_postgres/package.json +++ b/components/neon_postgres/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/neon_postgres", - "version": "0.0.1", + "version": "0.1.1", "description": "Pipedream Neon Postgres Components", "main": "neon_postgres.app.mjs", "keywords": [ @@ -11,5 +11,10 @@ "author": "Pipedream (https://pipedream.com/)", "publishConfig": { "access": "public" + }, + "dependencies": { + "@pipedream/platform": "^3.0.3", + "@pipedream/postgresql": "^2.2.3", + "pg-format": "^1.0.4" } -} \ No newline at end of file +} diff --git a/components/neon_postgres/sources/common/common.mjs b/components/neon_postgres/sources/common/common.mjs new file mode 100644 index 0000000000000..83e8f48ef9f84 --- /dev/null +++ b/components/neon_postgres/sources/common/common.mjs @@ -0,0 +1,19 @@ +import neon from "../../neon_postgres.app.mjs"; +import common from "@pipedream/postgresql/sources/common.mjs"; +import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; + +export default { + ...common, + props: { + postgresql: neon, + db: "$.service.db", + timer: { + type: "$.interface.timer", + default: { + intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, + }, + label: "Polling Interval", + description: "Pipedream will poll the API on this schedule", + }, + }, +}; diff --git a/components/neon_postgres/sources/new-column/new-column.mjs b/components/neon_postgres/sources/new-column/new-column.mjs new file mode 100644 index 0000000000000..7ed2dec2fb3d8 --- /dev/null +++ b/components/neon_postgres/sources/new-column/new-column.mjs @@ -0,0 +1,41 @@ +import common from "../common/common.mjs"; + +export default { + ...common, + name: "New Column", + key: "neon_postgres-new-column", + description: "Emit new event when a new column is added to a table. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "source", + props: { + ...common.props, + schema: { + propDefinition: [ + common.props.postgresql, + "schema", + ], + }, + table: { + propDefinition: [ + common.props.postgresql, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + }, + async run() { + const previousColumns = this._getPreviousValues() || []; + + const columns = await this.postgresql.getColumns(this.table, this.schema); + + const newColumns = columns.filter((column) => !previousColumns.includes(column)); + for (const column of newColumns) { + const meta = this.generateMeta(column); + this.$emit(column, meta); + } + + this._setPreviousValues(columns); + }, +}; diff --git a/components/neon_postgres/sources/new-or-updated-row/new-or-updated-row.mjs b/components/neon_postgres/sources/new-or-updated-row/new-or-updated-row.mjs new file mode 100644 index 0000000000000..9f14c54e86c3a --- /dev/null +++ b/components/neon_postgres/sources/new-or-updated-row/new-or-updated-row.mjs @@ -0,0 +1,76 @@ +import common from "../common/common.mjs"; + +export default { + ...common, + name: "New or Updated Row", + key: "neon_postgres-new-or-updated-row", + description: "Emit new event when a row is added or modified. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "source", + dedupe: "unique", + props: { + ...common.props, + schema: { + propDefinition: [ + common.props.postgresql, + "schema", + ], + }, + table: { + propDefinition: [ + common.props.postgresql, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + identifierColumn: { + label: "Identifier Column", + propDefinition: [ + common.props.postgresql, + "column", + (c) => ({ + table: c.table, + schema: c.schema, + }), + ], + description: "The column to identify an unique row, commonly it's `id` or `uuid`.", + }, + timestampColumn: { + label: "Timestamp Column", + propDefinition: [ + common.props.postgresql, + "column", + (c) => ({ + table: c.table, + schema: c.schema, + }), + ], + description: "A datetime column, such as 'date_updated' or 'last_modified' that is set to the current datetime when a row is updated.", + }, + }, + hooks: { + async deploy() { + await this.initialRows( + this.schema, + this.table, + this.timestampColumn, + this.limit, + ); + }, + }, + methods: { + ...common.methods, + generateMeta(row, column) { + return { + id: `${row[this.identifierColumn]}-${row[column]}`, + summary: "Row Added/Updated", + ts: row[column], + }; + }, + }, + async run() { + await this.newRows(this.schema, this.table, this.timestampColumn); + }, +}; diff --git a/components/neon_postgres/sources/new-row-custom-query/new-row-custom-query.mjs b/components/neon_postgres/sources/new-row-custom-query/new-row-custom-query.mjs new file mode 100644 index 0000000000000..82dca31dc1a10 --- /dev/null +++ b/components/neon_postgres/sources/new-row-custom-query/new-row-custom-query.mjs @@ -0,0 +1,89 @@ +import common from "../common/common.mjs"; + +export default { + ...common, + name: "New Row Custom Query", + key: "neon_postgres-new-row-custom-query", + description: "Emit new event when new rows are returned from a custom query that you provide. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "source", + dedupe: "unique", + props: { + ...common.props, + schema: { + propDefinition: [ + common.props.postgresql, + "schema", + ], + }, + table: { + propDefinition: [ + common.props.postgresql, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + column: { + propDefinition: [ + common.props.postgresql, + "column", + (c) => ({ + table: c.table, + schema: c.schema, + }), + ], + }, + query: { + propDefinition: [ + common.props.postgresql, + "query", + ], + description: "Specify the query to select new or updated rows since the last poll. For example, `SELECT * FROM users WHERE country = 'US'`", + }, + values: { + propDefinition: [ + common.props.postgresql, + "values", + ], + }, + }, + hooks: { + async deploy() { + if (this.values && !Array.isArray(this.values)) { + throw new Error("No valid values provided. The values property must be an array."); + } + + const numberOfValues = this.query?.match(/\$/g)?.length || 0; + if (this.values && this.values.length !== numberOfValues) { + throw new Error("The number of values provided does not match the number of values in the query."); + } + + const isColumnUnique = await this.isColumnUnique(this.schema, this.table, this.column); + if (!isColumnUnique) { + throw new Error("The column selected contains duplicate values. Column must be unique"); + } + }, + }, + methods: { + ...common.methods, + generateMeta(row, column) { + return { + id: row[column], + summary: "New Row", + ts: Date.now(), + }; + }, + }, + async run() { + const rows = await this.postgresql.executeQuery({ + text: this.query, + values: this.values, + }); + for (const row of rows) { + const meta = this.generateMeta(row, this.column); + this.$emit(row, meta); + } + }, +}; diff --git a/components/neon_postgres/sources/new-row/new-row.mjs b/components/neon_postgres/sources/new-row/new-row.mjs new file mode 100644 index 0000000000000..1a884e8915373 --- /dev/null +++ b/components/neon_postgres/sources/new-row/new-row.mjs @@ -0,0 +1,79 @@ +import common from "../common/common.mjs"; + +export default { + ...common, + name: "New Row", + key: "neon_postgres-new-row", + description: "Emit new event when a new row is added to a table. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "source", + dedupe: "unique", + props: { + ...common.props, + schema: { + propDefinition: [ + common.props.postgresql, + "schema", + ], + }, + table: { + propDefinition: [ + common.props.postgresql, + "table", + (c) => ({ + schema: c.schema, + }), + ], + }, + column: { + propDefinition: [ + common.props.postgresql, + "column", + (c) => ({ + schema: c.schema, + table: c.table, + }), + ], + description: "An ID or timestamp column where new rows will always contain larger values than the previous row. Defaults to the table's primary key.", + optional: true, + }, + }, + hooks: { + /** If column prop is left blank, get the table's primary key to use + * for ordering and deduping. */ + async deploy() { + const column = this.column + ? this.column + : await this.postgresql.getPrimaryKey(this.table, this.schema); + + const isColumnUnique = await this.isColumnUnique(this.schema, this.table, column); + if (!isColumnUnique) { + throw new Error("The column selected contains duplicate values. Column must be unique"); + } + + this._setColumn(column); + + await this.initialRows(this.schema, this.table, column); + }, + }, + methods: { + ...common.methods, + _getColumn() { + return this.db.get("column"); + }, + _setColumn(column) { + this.db.set("column", column); + }, + generateMeta(row, column) { + return { + id: row[column], + summary: "New Row Added", + ts: Date.now(), + }; + }, + }, + async run() { + const column = this._getColumn(); + await this.newRows(this.schema, this.table, column); + }, +}; diff --git a/components/neon_postgres/sources/new-table/new-table.mjs b/components/neon_postgres/sources/new-table/new-table.mjs new file mode 100644 index 0000000000000..4bc61d228a4f5 --- /dev/null +++ b/components/neon_postgres/sources/new-table/new-table.mjs @@ -0,0 +1,32 @@ +import common from "../common/common.mjs"; + +export default { + ...common, + name: "New Table", + key: "neon_postgres-new-table", + description: "Emit new event when a new table is added to the database. [See the documentation](https://node-postgres.com/features/queries)", + version: "0.0.1", + type: "source", + props: { + ...common.props, + schema: { + propDefinition: [ + common.props.postgresql, + "schema", + ], + }, + }, + async run() { + const previousTables = this._getPreviousValues() || []; + + const tables = await this.postgresql.getTables(this.schema); + + const newTables = tables.filter((table) => !previousTables.includes(table)); + for (const table of newTables) { + const meta = this.generateMeta(table); + this.$emit(table, meta); + } + + this._setPreviousValues(tables); + }, +}; diff --git a/components/postgresql/actions/delete-rows/delete-rows.mjs b/components/postgresql/actions/delete-rows/delete-rows.mjs index 220d074c1f827..994e05ec9f591 100644 --- a/components/postgresql/actions/delete-rows/delete-rows.mjs +++ b/components/postgresql/actions/delete-rows/delete-rows.mjs @@ -4,7 +4,7 @@ export default { name: "Delete Row(s)", key: "postgresql-delete-rows", description: "Deletes a row or rows from a table. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "action", props: { postgresql, @@ -55,21 +55,16 @@ export default { value, } = this; - try { - const rows = await this.postgresql.deleteRows( - schema, - table, - column, - value, - ); - $.export("$summary", `Deleted ${rows.length} rows from ${table}`); - return rows; - } catch (error) { - let errorMsg = "Row not deleted due to an error. "; - errorMsg += `${error}`.includes("SSL verification failed") - ? "This could be because SSL verification failed. To resolve this, reconnect your account and set SSL Verification Mode: Skip Verification, and try again." - : `${error}`; - throw new Error(errorMsg); - } + const errorMsg = "Row not deleted due to an error. "; + + const rows = await this.postgresql.deleteRows( + schema, + table, + column, + value, + errorMsg, + ); + $.export("$summary", `Deleted ${rows.length} rows from ${table}`); + return rows; }, }; diff --git a/components/postgresql/actions/execute-custom-query/execute-custom-query.mjs b/components/postgresql/actions/execute-custom-query/execute-custom-query.mjs index 70cf00880b03e..14603357eb1ea 100644 --- a/components/postgresql/actions/execute-custom-query/execute-custom-query.mjs +++ b/components/postgresql/actions/execute-custom-query/execute-custom-query.mjs @@ -4,7 +4,7 @@ export default { name: "Execute SQL Query", key: "postgresql-execute-custom-query", description: "Execute a custom PostgreSQL query. See [our docs](https://pipedream.com/docs/databases/working-with-sql) to learn more about working with SQL in Pipedream.", - version: "3.0.3", + version: "3.0.4", type: "action", props: { postgresql, diff --git a/components/postgresql/actions/find-row-custom-query/find-row-custom-query.mjs b/components/postgresql/actions/find-row-custom-query/find-row-custom-query.mjs index 7dbd871ea5ddb..cbde0262543e3 100644 --- a/components/postgresql/actions/find-row-custom-query/find-row-custom-query.mjs +++ b/components/postgresql/actions/find-row-custom-query/find-row-custom-query.mjs @@ -4,7 +4,7 @@ export default { name: "Find Row With Custom Query", key: "postgresql-find-row-custom-query", description: "Finds a row in a table via a custom query. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "action", props: { postgresql, @@ -42,19 +42,12 @@ export default { throw new Error("Need be a `SELECT` statement query. Read more about [SELECT queries here](https://www.w3schools.com/sql/sql_select.asp)"); } - try { - const res = await this.postgresql.executeQuery({ - text: query, - values, - }); - $.export("$summary", "Successfully executed query"); - return res; - } catch (error) { - let errorMsg = "Query not executed due to an error. "; - errorMsg += `${error}`.includes("SSL verification failed") - ? "This could be because SSL verification failed. To resolve this, reconnect your account and set SSL Verification Mode: Skip Verification, and try again." - : `${error}`; - throw new Error(errorMsg); - } + const res = await this.postgresql.executeQuery({ + text: query, + values, + errorMsg: "Query not executed due to an error. ", + }); + $.export("$summary", "Successfully executed query"); + return res; }, }; diff --git a/components/postgresql/actions/find-row/find-row.mjs b/components/postgresql/actions/find-row/find-row.mjs index 82c337526738d..9a982aea0a3cf 100644 --- a/components/postgresql/actions/find-row/find-row.mjs +++ b/components/postgresql/actions/find-row/find-row.mjs @@ -4,7 +4,7 @@ export default { name: "Find Row", key: "postgresql-find-row", description: "Finds a row in a table via a lookup column. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "action", props: { postgresql, @@ -54,24 +54,20 @@ export default { column, value, } = this; - try { - const res = await this.postgresql.findRowByValue( - schema, - table, - column, - value, - ); - const summary = res - ? "Row found" - : "Row not found"; - $.export("$summary", summary); - return res; - } catch (error) { - let errorMsg = "Row not retrieved due to an error. "; - errorMsg += `${error}`.includes("SSL verification failed") - ? "This could be because SSL verification failed. To resolve this, reconnect your account and set SSL Verification Mode: Skip Verification, and try again." - : `${error}`; - throw new Error(errorMsg); - } + + const errorMsg = "Row not found due to an error. "; + + const res = await this.postgresql.findRowByValue( + schema, + table, + column, + value, + errorMsg, + ); + const summary = res + ? "Row found" + : "Row not found"; + $.export("$summary", summary); + return res; }, }; diff --git a/components/postgresql/actions/insert-row/insert-row.mjs b/components/postgresql/actions/insert-row/insert-row.mjs index 63c9671b48474..f09d8fc04a82e 100644 --- a/components/postgresql/actions/insert-row/insert-row.mjs +++ b/components/postgresql/actions/insert-row/insert-row.mjs @@ -1,10 +1,11 @@ import postgresql from "../../postgresql.app.mjs"; +import { parseRowValues } from "../../common/utils.mjs"; export default { name: "Insert Row", key: "postgresql-insert-row", description: "Adds a new row. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "action", props: { postgresql, @@ -28,6 +29,7 @@ export default { postgresql, "rowValues", ], + description: "JSON representation of your table rows. Accept a single row (JSON Object) or multiple rows (JSON array). For example: `{ \"product_id\": 1, \"product_name\": \"Laptop Pro 15\", \"price\": 1200.50, \"stock_quantity\": 50, \"created_at\": \"2023-10-26T10:00:00Z\" }`", }, }, async run({ $ }) { @@ -36,23 +38,31 @@ export default { table, rowValues, } = this; - const columns = Object.keys(rowValues); - const values = Object.values(rowValues); - try { + const results = []; + const parsedRowValues = parseRowValues(rowValues); + const parsedRowValuesArray = Array.isArray(parsedRowValues) + ? parsedRowValues + : [ + parsedRowValues, + ]; + + const errorMsg = "New row(s) not inserted due to an error. "; + + for (const row of parsedRowValuesArray) { + const columns = Object.keys(row); + const values = Object.values(row); const res = await this.postgresql.insertRow( schema, table, columns, values, + errorMsg, ); - $.export("$summary", "New row inserted"); - return res; - } catch (error) { - let errorMsg = "New row not inserted due to an error. "; - errorMsg += `${error}`.includes("SSL verification failed") - ? "This could be because SSL verification failed. To resolve this, reconnect your account and set SSL Verification Mode: Skip Verification, and try again." - : `${error}`; - throw new Error(errorMsg); + results.push(res); } + $.export("$summary", `Successfully inserted ${results.length} row${results.length === 1 + ? "" + : "s"}`); + return results; }, }; diff --git a/components/postgresql/actions/update-row/update-row.mjs b/components/postgresql/actions/update-row/update-row.mjs index 96efa08676b35..a1b75cedcdeed 100644 --- a/components/postgresql/actions/update-row/update-row.mjs +++ b/components/postgresql/actions/update-row/update-row.mjs @@ -1,10 +1,11 @@ import postgresql from "../../postgresql.app.mjs"; +import { parseRowValues } from "../../common/utils.mjs"; export default { name: "Update Row", key: "postgresql-update-row", description: "Updates an existing row. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "action", props: { postgresql, @@ -51,6 +52,7 @@ export default { postgresql, "rowValues", ], + description: "JSON representation of your new table row values. For example: `{ \"product_name\": \"Laptop Pro 15\", \"price\": 1200.50, \"stock_quantity\": 50 }`", }, }, async run({ $ }) { @@ -61,25 +63,22 @@ export default { value, rowValues, } = this; - try { - const res = await this.postgresql.updateRow( - schema, - table, - column, - value, - rowValues, - ); - const summary = res - ? "Row updated" - : "Row not found"; - $.export("$summary", summary); - return res; - } catch (error) { - let errorMsg = "Row not updated due to an error. "; - errorMsg += `${error}`.includes("SSL verification failed") - ? "This could be because SSL verification failed. To resolve this, reconnect your account and set SSL Verification Mode: Skip Verification, and try again." - : `${error}`; - throw new Error(errorMsg); - } + + const parsedRowValues = parseRowValues(rowValues); + const errorMsg = "Row not updated due to an error. "; + + const res = await this.postgresql.updateRow( + schema, + table, + column, + value, + parsedRowValues, + errorMsg, + ); + const summary = res + ? "Row updated" + : "Row not found"; + $.export("$summary", summary); + return res; }, }; diff --git a/components/postgresql/actions/upsert-row/upsert-row.mjs b/components/postgresql/actions/upsert-row/upsert-row.mjs index 73d6617de55ac..b56a747bf435e 100644 --- a/components/postgresql/actions/upsert-row/upsert-row.mjs +++ b/components/postgresql/actions/upsert-row/upsert-row.mjs @@ -1,11 +1,11 @@ import postgresql from "../../postgresql.app.mjs"; -import format from "pg-format"; +import { parseRowValues } from "../../common/utils.mjs"; export default { name: "Upsert Row", key: "postgresql-upsert-row", description: "Adds a new row or updates an existing row. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "action", props: { postgresql, @@ -41,72 +41,30 @@ export default { postgresql, "rowValues", ], - }, - }, - methods: { - /** - * Upserts a row in a table. - * @param {object} args - The parameters to the query. - * @param {string} args.schema - The name of the schema. - * @param {string} args.table - The name of the table. - * @param {Array} args.columns - The columns in which to insert values. - * @param {Array} args.values - The values to insert. - * @param {string} args.conflictTarget - The column to use as the conflict target. - * @returns {Promise} A promise that resolves with the result of the query. - * @throws {Error} Will throw an error if the query fails. - */ - upsertRow({ - schema, table, columns, values, conflictTarget = "id", - } = {}) { - const placeholders = this.postgresql.getPlaceholders({ - values, - }); - - const updates = columns - .filter((column) => column !== conflictTarget) - .map((column) => `${column}=EXCLUDED.${column}`); - - const query = ` - INSERT INTO ${schema}.${table} (${columns}) - VALUES (${placeholders}) - ON CONFLICT (${conflictTarget}) - DO UPDATE SET ${updates} - RETURNING * - `; - - return this.postgresql.executeQuery({ - text: format(query, schema, table), - values, - }); + description: "JSON representation of your table row values. For example: `{ \"product_name\": \"Laptop Pro 15\", \"price\": 1200.50, \"stock_quantity\": 50 }`", }, }, async run({ $ }) { const { - upsertRow, rowValues, ...args } = this; - const columns = Object.keys(rowValues); - const values = Object.values(rowValues); + const parsedRowValues = parseRowValues(rowValues); + + const columns = Object.keys(parsedRowValues); + const values = Object.values(parsedRowValues); - try { - const res = await upsertRow({ - columns, - values, - ...args, - }); - const summary = res - ? "Row upserted" - : "Row not upserted"; - $.export("$summary", summary); - return res; - } catch (error) { - let errorMsg = "Row not upserted due to an error. "; - errorMsg += `${error}`.includes("SSL verification failed") - ? "This could be because SSL verification failed. To resolve this, reconnect your account and set SSL Verification Mode: Skip Verification, and try again." - : `${error}`; - throw new Error(errorMsg); - } + const res = await this.postgresql.upsertRow({ + columns, + values, + errorMsg: "Row not upserted due to an error. ", + ...args, + }); + const summary = res + ? "Row upserted" + : "Row not upserted"; + $.export("$summary", summary); + return res; }, }; diff --git a/components/postgresql/common/utils.mjs b/components/postgresql/common/utils.mjs new file mode 100644 index 0000000000000..de76327adc95a --- /dev/null +++ b/components/postgresql/common/utils.mjs @@ -0,0 +1,16 @@ +export function parseRowValues(rowValues) { + if (!rowValues) { + return undefined; + } + if (Array.isArray(rowValues)) { + return rowValues.map(parseRowValues); + } + if (typeof rowValues === "string") { + try { + return JSON.parse(rowValues); + } catch (error) { + return rowValues; + } + } + return rowValues; +} diff --git a/components/postgresql/package.json b/components/postgresql/package.json index 9e46ac8ce3801..24808c1dee59a 100644 --- a/components/postgresql/package.json +++ b/components/postgresql/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/postgresql", - "version": "2.2.3", + "version": "2.2.4", "description": "Pipedream PostgreSQL Components", "main": "postgresql.app.mjs", "keywords": [ diff --git a/components/postgresql/postgresql.app.mjs b/components/postgresql/postgresql.app.mjs index 4e62729929ffd..e29f7ea2265a9 100644 --- a/components/postgresql/postgresql.app.mjs +++ b/components/postgresql/postgresql.app.mjs @@ -194,12 +194,17 @@ export default { * @returns {object[]} - The rows returned by the DB as a result of the * query. */ - async executeQuery(query) { + async executeQuery(query, errorMsg = "") { const client = await this._getClient(); try { const { rows } = await client.query(query); return rows; + } catch (error) { + errorMsg += `${error}`.includes("SSL verification failed") + ? "This could be because SSL verification failed. To resolve this, reconnect your account and set SSL Verification Mode: Skip Verification, and try again." + : `${error}`; + throw new Error(errorMsg); } finally { await this._endClient(client); } @@ -323,12 +328,13 @@ export default { * @param {string} value - A column value to search for * @returns A single database row */ - async findRowByValue(schema, table, column, value) { + async findRowByValue(schema, table, column, value, errorMsg) { const rows = await this.executeQuery({ text: format("SELECT * FROM %I.%I WHERE %I = $1", schema, table, column), values: [ value, ], + errorMsg, }); return rows[0]; }, @@ -338,12 +344,13 @@ export default { * @param {string} column - Column used to find the row(s) to delete * @param {string} value - A column value. Used to find the row(s) to delete */ - async deleteRows(schema, table, column, value) { + async deleteRows(schema, table, column, value, errorMsg) { return this.executeQuery({ text: format("DELETE FROM %I.%I WHERE %I = $1 RETURNING *", schema, table, column), values: [ value, ], + errorMsg, }); }, /** @@ -353,7 +360,7 @@ export default { * @param {array} values - Array of values corresponding to the column names provided * @returns The newly created row */ - async insertRow(schema, table, columns, values) { + async insertRow(schema, table, columns, values, errorMsg) { const placeholders = this.getPlaceholders({ values, }); @@ -364,6 +371,7 @@ export default { RETURNING * `, schema, table), values, + errorMsg, }); }, getPlaceholders({ @@ -381,7 +389,7 @@ export default { * and values representing new column values * @returns The newly updated row */ - async updateRow(schema, table, lookupColumn, lookupValue, rowValues) { + async updateRow(schema, table, lookupColumn, lookupValue, rowValues, errorMsg) { const columnsPlaceholders = this._getColumnsPlaceholders({ rowValues, fromIndex: 2, @@ -399,9 +407,35 @@ export default { lookupValue, ...Object.values(rowValues), ], + errorMsg, }); return response[0]; }, + upsertRow({ + schema, table, columns, values, conflictTarget = "id", errorMsg, + } = {}) { + const placeholders = this.getPlaceholders({ + values, + }); + + const updates = columns + .filter((column) => column !== conflictTarget) + .map((column) => `${column}=EXCLUDED.${column}`); + + const query = ` + INSERT INTO ${schema}.${table} (${columns}) + VALUES (${placeholders}) + ON CONFLICT (${conflictTarget}) + DO UPDATE SET ${updates} + RETURNING * + `; + + return this.executeQuery({ + text: format(query, schema, table), + values, + errorMsg, + }); + }, _getColumnsPlaceholders({ rowValues = {}, fromIndex = 1, diff --git a/components/postgresql/sources/new-column/new-column.mjs b/components/postgresql/sources/new-column/new-column.mjs index 2585db98b6c1b..e726300529e23 100644 --- a/components/postgresql/sources/new-column/new-column.mjs +++ b/components/postgresql/sources/new-column/new-column.mjs @@ -5,7 +5,7 @@ export default { name: "New Column", key: "postgresql-new-column", description: "Emit new event when a new column is added to a table. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "source", props: { ...common.props, diff --git a/components/postgresql/sources/new-or-updated-row/new-or-updated-row.mjs b/components/postgresql/sources/new-or-updated-row/new-or-updated-row.mjs index 980335e289df2..bc280634e1334 100644 --- a/components/postgresql/sources/new-or-updated-row/new-or-updated-row.mjs +++ b/components/postgresql/sources/new-or-updated-row/new-or-updated-row.mjs @@ -5,7 +5,7 @@ export default { name: "New or Updated Row", key: "postgresql-new-or-updated-row", description: "Emit new event when a row is added or modified. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "source", dedupe: "unique", props: { diff --git a/components/postgresql/sources/new-row-custom-query/new-row-custom-query.mjs b/components/postgresql/sources/new-row-custom-query/new-row-custom-query.mjs index 4c38d193edc1b..96437674bcadf 100644 --- a/components/postgresql/sources/new-row-custom-query/new-row-custom-query.mjs +++ b/components/postgresql/sources/new-row-custom-query/new-row-custom-query.mjs @@ -5,7 +5,7 @@ export default { name: "New Row Custom Query", key: "postgresql-new-row-custom-query", description: "Emit new event when new rows are returned from a custom query that you provide. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "source", dedupe: "unique", props: { @@ -40,6 +40,7 @@ export default { common.props.postgresql, "query", ], + description: "Specify the query to select new or updated rows since the last poll. For example, `SELECT * FROM users WHERE country = 'US'`", }, values: { propDefinition: [ @@ -48,6 +49,23 @@ export default { ], }, }, + hooks: { + async deploy() { + if (this.values && !Array.isArray(this.values)) { + throw new Error("No valid values provided. The values property must be an array."); + } + + const numberOfValues = this.query?.match(/\$/g)?.length || 0; + if (this.values && this.values.length !== numberOfValues) { + throw new Error("The number of values provided does not match the number of values in the query."); + } + + const isColumnUnique = await this.isColumnUnique(this.schema, this.table, this.column); + if (!isColumnUnique) { + throw new Error("The column selected contains duplicate values. Column must be unique"); + } + }, + }, methods: { ...common.methods, generateMeta(row, column) { @@ -59,34 +77,12 @@ export default { }, }, async run() { - const { - schema, - table, - column, - query, - values = [], - } = this; - - if (!Array.isArray(values)) { - throw new Error("No valid values provided. The values property must be an array."); - } - - const numberOfValues = query?.match(/\$/g)?.length || 0; - if (values.length !== numberOfValues) { - throw new Error("The number of values provided does not match the number of values in the query."); - } - - const isColumnUnique = await this.isColumnUnique(schema, table, column); - if (!isColumnUnique) { - throw new Error("The column selected contains duplicate values. Column must be unique"); - } - const rows = await this.postgresql.executeQuery({ - text: query, - values, + text: this.query, + values: this.values, }); for (const row of rows) { - const meta = this.generateMeta(row, column); + const meta = this.generateMeta(row, this.column); this.$emit(row, meta); } }, diff --git a/components/postgresql/sources/new-row/new-row.mjs b/components/postgresql/sources/new-row/new-row.mjs index a27c3e9ff247c..66b12e3ef92f9 100644 --- a/components/postgresql/sources/new-row/new-row.mjs +++ b/components/postgresql/sources/new-row/new-row.mjs @@ -5,7 +5,7 @@ export default { name: "New Row", key: "postgresql-new-row", description: "Emit new event when a new row is added to a table. [See the documentation](https://node-postgres.com/features/queries)", - version: "3.0.7", + version: "3.0.8", type: "source", dedupe: "unique", props: { @@ -45,6 +45,12 @@ export default { const column = this.column ? this.column : await this.postgresql.getPrimaryKey(this.table, this.schema); + + const isColumnUnique = await this.isColumnUnique(this.schema, this.table, column); + if (!isColumnUnique) { + throw new Error("The column selected contains duplicate values. Column must be unique"); + } + this._setColumn(column); await this.initialRows(this.schema, this.table, column); @@ -68,11 +74,6 @@ export default { }, async run() { const column = this._getColumn(); - const isColumnUnique = await this.isColumnUnique(this.schema, this.table, column); - if (!isColumnUnique) { - throw new Error("The column selected contains duplicate values. Column must be unique"); - } - await this.newRows(this.schema, this.table, column); }, }; diff --git a/components/postgresql/sources/new-table/new-table.mjs b/components/postgresql/sources/new-table/new-table.mjs index 3b9bdd2928cbb..97702024d4b93 100644 --- a/components/postgresql/sources/new-table/new-table.mjs +++ b/components/postgresql/sources/new-table/new-table.mjs @@ -5,7 +5,7 @@ export default { name: "New Table", key: "postgresql-new-table", description: "Emit new event when a new table is added to the database. [See the documentation](https://node-postgres.com/features/queries)", - version: "2.0.7", + version: "2.0.8", type: "source", props: { ...common.props, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 98ee964905365..705f5d53ccd26 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8564,7 +8564,17 @@ importers: specifier: ^1.5.1 version: 1.6.6 - components/neon_postgres: {} + components/neon_postgres: + dependencies: + '@pipedream/platform': + specifier: ^3.0.3 + version: 3.0.3 + '@pipedream/postgresql': + specifier: ^2.2.3 + version: 2.2.3 + pg-format: + specifier: ^1.0.4 + version: 1.0.4 components/nerv: {} @@ -18512,6 +18522,9 @@ packages: '@pipedream/platform@3.0.3': resolution: {integrity: sha512-7elalas41lnT8i6EAFkqB7fT/+hkLGEQ1njS6A7CVguTrEswaIYk/seKmkfkRY7+O6qncgnXswYIKCBML9Co7w==} + '@pipedream/postgresql@2.2.3': + resolution: {integrity: sha512-Co9r4UKvvimEPo9T4v+EfVn/Sqqw8+X7PtvsAF7tydq7CHpkmOT9eYAiD0Kuybg5WbuWuqDaXHHmhVtsAxgpBw==} + '@pipedream/quickbooks@0.5.1': resolution: {integrity: sha512-APqan523zeCY/JVBk6PhoRE5intfoBsK8LTj9xZSOlah0E9/zcVHaGJ9vQhaVjFLuPC9YYt3BemQpsjGFcU0fQ==} @@ -35350,6 +35363,15 @@ snapshots: transitivePeerDependencies: - debug + '@pipedream/postgresql@2.2.3': + dependencies: + '@pipedream/platform': 2.0.0 + pg: 8.13.1 + pg-format: 1.0.4 + transitivePeerDependencies: + - debug + - pg-native + '@pipedream/quickbooks@0.5.1': dependencies: '@pipedream/platform': 3.0.3