Skip to content

Commit

Permalink
Explicit closing of prepared statement portals in transactions to rel…
Browse files Browse the repository at this point in the history
…ease table locks. (#393)
  • Loading branch information
isoos authored Nov 20, 2024
1 parent 96588bd commit 10fce4d
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 3.4.4

- Fix: explicit closing of prepared statement portals in transactions to release table locks. [#393](https://github.com/isoos/postgresql-dart/pull/393)

## 3.4.3

- Fix: prevent hanging state by forwarding protocol-level parsing errors into the message stream.
Expand Down
18 changes: 18 additions & 0 deletions lib/src/messages/client_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class DescribeMessage extends ClientMessage {
buffer.writeUint8(_isPortal ? $P : $S);
buffer.writeEncodedString(name); // Name of prepared statement
}

@override
String toString() => 'DescribeMessage(name=$_name, portal=$_isPortal)';
}

class BindMessage extends ClientMessage {
Expand Down Expand Up @@ -248,6 +251,12 @@ class BindMessage extends ClientMessage {
buffer.writeUint16(1);
buffer.writeUint16(1);
}

@override
String toString() => 'BindMessage(${[
if (_portalName.isNotEmpty) 'portal=$_portalName',
if (_statementName.isNotEmpty) 'stmt=$_statementName',
].join(',')})';
}

class ExecuteMessage extends ClientMessage {
Expand All @@ -263,6 +272,9 @@ class ExecuteMessage extends ClientMessage {
buffer.writeEncodedString(portalName);
buffer.writeUint32(0);
}

@override
String toString() => 'ExecuteMessage(portal=$_portalName)';
}

class CloseMessage extends ClientMessage {
Expand All @@ -288,6 +300,9 @@ class CloseMessage extends ClientMessage {
..writeUint8(_isForPortal ? $P : $S);
buffer.writeEncodedString(name);
}

@override
String toString() => 'CloseMessage(name=$_name, portal=$_isForPortal)';
}

class SyncMessage extends ClientMessage {
Expand All @@ -298,6 +313,9 @@ class SyncMessage extends ClientMessage {
buffer.writeUint8(ClientMessageId.sync);
buffer.writeUint32(4);
}

@override
String toString() => 'SyncMessage';
}

class TerminateMessage extends ClientMessage {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/messages/server_messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class CloseCompleteMessage extends ServerMessage {
CloseCompleteMessage();

@override
String toString() => 'Bind Complete Message';
String toString() => 'Close Complete Message';
}

class ParameterDescriptionMessage extends ServerMessage {
Expand Down
33 changes: 32 additions & 1 deletion lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
Expand Down Expand Up @@ -258,7 +259,6 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
),
async.StreamSinkTransformer.fromHandlers(handleData: (msg, sink) {
print('[$hash][out] $msg');
print('[out] $msg');
sink.add(msg);
}),
));
Expand Down Expand Up @@ -646,6 +646,14 @@ class _PreparedStatement extends Statement {
final String _name;
final _PgSessionBase _session;

/// Apparently, when we are in a transaction and using extended query mode,
/// one needs to close the portals to release the locks on tables.
/// This queue will collect the portal names to close and they will be closed
/// when the prepared statement is disposed or a run call completes.
///
/// See more in https://github.com/isoos/postgresql-dart/issues/390
Queue<String>? _portalsToClose;

_PreparedStatement(this._description, this._name, this._session);

@override
Expand Down Expand Up @@ -674,17 +682,33 @@ class _PreparedStatement extends Statement {
);
} finally {
await subscription.cancel();
await _closePendingPortals();
}
}

@override
Future<void> dispose() async {
// Don't send a dispose message if the connection is already closed.
if (!_session._connection._isClosing) {
await _closePendingPortals();
await _session._sendAndWaitForQuery<CloseCompleteMessage>(
CloseMessage.statement(_name));
}
}

void _addPortalToClose(String portalName) {
_portalsToClose ??= Queue();
_portalsToClose!.add(portalName);
}

Future<void> _closePendingPortals() async {
final list = _portalsToClose;
while (list != null && list.isNotEmpty) {
final portalName = list.removeFirst();
await _session._sendAndWaitForQuery<CloseCompleteMessage>(
CloseMessage.portal(portalName));
}
}
}

class _BoundStatement extends Stream<ResultRow> implements ResultStream {
Expand Down Expand Up @@ -718,6 +742,7 @@ class _PgResultStreamSubscription
final _schema = Completer<ResultSchema>();
final _done = Completer<void>();
ResultSchema? _resultSchema;
_BoundStatement? _boundStatement;

@override
PgConnectionImplementation get connection => session._connection;
Expand All @@ -729,6 +754,7 @@ class _PgResultStreamSubscription
_BoundStatement statement, this._controller, this._source)
: session = statement.statement._session,
ignoreRows = false,
_boundStatement = statement,
_trace = StackTrace.current {
_scheduleStatement(() async {
connection._pending = this;
Expand Down Expand Up @@ -885,6 +911,11 @@ class _PgResultStreamSubscription
// we'll get this more than once.
_affectedRowsSoFar += message.rowsAffected;
case ReadyForQueryMessage():
// It looks like simple query protocol statements, or statements outside of a transaction
// do not need the portal to be closed explicitly.
if (message.state == ReadyForQueryMessageState.transaction) {
_boundStatement?.statement._addPortalToClose(_portalName);
}
await _completeQuery();
case CopyBothResponseMessage():
// This message indicates a successful start for Streaming Replication.
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: postgres
description: PostgreSQL database driver. Supports statement reuse and binary protocol and connection pooling.
version: 3.4.3
version: 3.4.4
homepage: https://github.com/isoos/postgresql-dart
topics:
- sql
Expand Down
69 changes: 53 additions & 16 deletions test/v3_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -471,37 +471,74 @@ void main() {
});
});

withPostgresServer('issue 390', (server) {
test(
'issue 390 - Cannot ALTER TABLE because it is being used by active queries in this session',
() async {
final conn = await server.newConnection();
const tableToAlter = 'table_to_alter';
const otherTable = 'other_table';
withPostgresServer('issue 390 - closing the portal of prepared statements',
(server) {
const tableToAlter = 'table_to_alter';
const otherTable = 'other_table';

setUp(() async {
final conn = await server.newConnection();
await conn.execute('''
CREATE TABLE $tableToAlter (
a_id INTEGER PRIMARY KEY NOT NULL,
a_other_id INTEGER NOT NULL
);''');

await conn.execute(
'CREATE TABLE $otherTable (other_id INTEGER PRIMARY KEY NOT NULL);');
await conn.close();
});

tearDown(() async {
final conn = await server.newConnection();
await conn.execute('DROP TABLE $tableToAlter;');
await conn.execute('DROP TABLE $otherTable;');
});

test('non-transaction update', () async {
final conn = await server.newConnection();
await conn.execute('SELECT * FROM $tableToAlter;');
await conn.execute('''
ALTER TABLE $tableToAlter
ADD CONSTRAINT fk_other
FOREIGN KEY (a_other_id)
REFERENCES $otherTable(other_id);''');
});

test('transaction via statement', () async {
final conn = await server.newConnection();
await conn.execute('BEGIN');
await conn.execute('SELECT * FROM $tableToAlter;');
await conn.execute('''
ALTER TABLE $tableToAlter
ADD CONSTRAINT fk_other
FOREIGN KEY (a_other_id)
REFERENCES $otherTable(other_id);''');
await conn.execute('COMMIT');
});

test('transaction via statement - simple query protocol', () async {
final conn = await server.newConnection(queryMode: QueryMode.simple);
await conn.execute('BEGIN');
await conn.execute('SELECT * FROM $tableToAlter;');
await conn.execute('''
ALTER TABLE $tableToAlter
ADD CONSTRAINT fk_other
FOREIGN KEY (a_other_id)
REFERENCES $otherTable(other_id);''');
await conn.execute('COMMIT');
});

test('transaction via runTx', () async {
final conn = await server.newConnection();
await conn.runTx((tx) async {
// Select from the table that will be altered
await tx.execute('SELECT * FROM $tableToAlter;');

// Add a foreign key constraint
await tx.execute('''
ALTER TABLE $tableToAlter
ADD CONSTRAINT fk_other
FOREIGN KEY (a_other_id)
ADD CONSTRAINT fk_other
FOREIGN KEY (a_other_id)
REFERENCES $otherTable(other_id);''');
});

// Should not throw
}, skip: 'investigation needed');
});
});
}

Expand Down

0 comments on commit 10fce4d

Please sign in to comment.