Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC][WIP] Establish connection to chrome background service #2921

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion drift/lib/src/remote/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class DriftProtocol {
}

Message deserialize(Object message) {
if (message is! List) throw const FormatException('Cannot read message');
if (message is! List)
throw FormatException('Cannot read message ${message.runtimeType}');

final tag = message[0];
final id = message[1] as int;
Expand Down
137 changes: 117 additions & 20 deletions drift/lib/web/worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
/// For more details on how to use this library, see [the documentation].
///
/// [the documentation]: https://drift.simonbinder.eu/web/#using-web-workers
// ignore_for_file: public_member_api_docs

library drift.web.workers;

import 'dart:async';
import 'dart:html';
// import 'dart:html';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';

import 'package:async/async.dart';
import 'package:drift/drift.dart';
Expand Down Expand Up @@ -115,18 +120,7 @@ enum DriftWorkerMode {
/// contains additional information and an example on how to use workers with
/// Dart and Drift.
void driftWorkerMain(QueryExecutor Function() openConnection) {
final self = WorkerGlobalScope.instance;
_RunningDriftWorker worker;

if (self is SharedWorkerGlobalScope) {
worker = _RunningDriftWorker(true, openConnection);
} else if (self is DedicatedWorkerGlobalScope) {
worker = _RunningDriftWorker(false, openConnection);
} else {
throw StateError('This worker is neither a shared nor a dedicated worker');
}

worker.start();
_RunningDriftWorker(true, openConnection).start();
}

/// Spawn or connect to a web worker written with [driftWorkerMain].
Expand Down Expand Up @@ -154,10 +148,16 @@ Future<DatabaseConnection> connectToDriftWorker(String workerJsUri,
worker.postMessage(webChannel.port1, [webChannel.port1]);
channel = webChannel.port2.channel();
} else {
final worker = SharedWorker(workerJsUri, 'drift database');
final port = worker.port!;

var didGetInitializationResponse = false;
final chrome =
(WorkerGlobalScope.instance as JSObject).getProperty("chrome".toJS);
final runtime = (chrome as JSObject).getProperty("runtime".toJS);
final jsPort = (runtime as JSObject)
.callMethod("connect".toJS, ChromeConnectInfo(name: "drift database"));
final port = jsPort as ChromeRuntimePort;
// final worker = SharedWorker(workerJsUri, 'drift database');
// final port = worker.port!;

var didGetInitializationResponse = true;
port.postMessage(mode.name);
channel = port.channel().transformStream(StreamTransformer.fromHandlers(
handleData: (data, sink) {
Expand Down Expand Up @@ -185,6 +185,92 @@ Future<DatabaseConnection> connectToDriftWorker(String workerJsUri,
return connectToRemoteAndInitialize(channel);
}

extension type ChromeConnectInfo._(JSObject _) implements JSObject {
external String get name;

external ChromeConnectInfo({required String name});
}

@JS()
@staticInterop
class ChromeRuntimePort {}

extension ChromeRuntimePortX on ChromeRuntimePort {
external void disconnect();
@JS('postMessage')
external void _postMessage(JSAny? message);

void postMessage(Object? message) => _postMessage(message.jsify());

external void close();

external String get name;

external ChromePortOnMessage get onMessage;
}

@JS()
@staticInterop
class ChromePortOnMessage {}

@JS()
@staticInterop
class ChromePortOnDisconnect {}

extension ChromeRuntimePortOnMessageX on ChromePortOnMessage {
@JS('addListener')
external void _addListener(JSExportedDartFunction listener);
// external void _removeListener(JSExportedDartFunction listener);

void addListener(ChromePortOnMessageListener listener) =>
_addListener(listener.toJS);
}

typedef ChromePortOnMessageListener = void Function(
JSAny? message, ChromeRuntimePort port);

const _disconnectMessage = '_disconnect';

/// Extension to transform a raw [MessagePort] from web workers into a Dart
/// [StreamChannel].
extension PortToChannel on ChromeRuntimePort {
/// Converts this port to a two-way communication channel, exposed as a
/// [StreamChannel].
///
/// This can be used to implement a remote database connection over service
/// workers.
///
/// The [explicitClose] parameter can be used to control whether a close
/// message should be sent through the channel when it is closed. This will
/// cause it to be closed on the other end as well. Note that this is not a
/// reliable way of determining channel closures though, as there is no event
/// for channels being closed due to a tab or worker being closed.
/// Both "ends" of a JS channel calling [channel] on their part must use the
/// value for [explicitClose].
StreamChannel<Object?> channel({bool explicitClose = false}) {
final controller = StreamChannelController<Object?>();
onMessage.addListener((message, port) {
if (explicitClose && message == _disconnectMessage) {
// Other end has closed the connection
controller.local.sink.close();
} else {
controller.local.sink.add(message.dartify());
}
});

controller.local.stream.listen(postMessage, onDone: () {
// Closed locally, inform the other end.
if (explicitClose) {
postMessage(_disconnectMessage);
}

close();
});

return controller.foreign;
}
}

class _RunningDriftWorker {
final bool isShared;
final QueryExecutor Function() connectionFactory;
Expand All @@ -196,10 +282,22 @@ class _RunningDriftWorker {
_RunningDriftWorker(this.isShared, this.connectionFactory);

void start() {
print("DOING DRIFT START");
if (isShared) {
const event = EventStreamProvider<MessageEvent>('connect');
event.forTarget(self).listen(_newConnection);
final chrome = (self as JSObject).getProperty("chrome".toJS);
final runtime = (chrome as JSObject).getProperty("runtime".toJS);
final onConnect = (runtime as JSObject).getProperty("onConnect".toJS);
(onConnect as JSObject).callMethod(
"addListener".toJS,
((ChromeRuntimePort port) {
print("CONNECTED");
_newConnection(port);
}).toJS,
);
// const event = EventStreamProvider<MessageEvent>('connect');
// event.forTarget(self).listen(_newConnection);
} else {
print("DRIFT START NOT SHARED");
const event = EventStreamProvider<MessageEvent>('message');
event.forTarget(self).map((e) => e.data).listen(_handleMessage);
}
Expand All @@ -226,9 +324,8 @@ class _RunningDriftWorker {
}

/// Handle a new connection, which implies that this worker is shared.
void _newConnection(MessageEvent event) {
void _newConnection(ChromeRuntimePort outgoingPort) {
assert(isShared);
final outgoingPort = event.ports.first;

// We still don't know whether this shared worker is supposed to host the
// server itself or whether this is delegated to a dedicated worker managed
Expand Down
3 changes: 2 additions & 1 deletion drift/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ dev_dependencies:
build_runner_core: ^7.0.0
build_verify: ^3.0.0
build_web_compilers: ^4.0.3
drift_dev: any
drift_dev:
path: ../drift_dev
drift_testcases:
path: ../extras/integration_tests/drift_testcases
http: ^0.13.4
Expand Down