Skip to content

Commit

Permalink
feat: JS clients can provide custom grpc transports (#6476) (#6479)
Browse files Browse the repository at this point in the history
Provides a contract for client applications to use a custom http/2
implementation. Roughly abstracted from our TypeScript grpc-web library,
with a few rough edges taken off, and no external dependencies.

Two integration tests are included, one which requires https (presently
only possible with manual testing, see #6421), and one which pretends to
contact the server but really responds to every request with a "success"
response and no payload.

No documentation required at this time, generated typescript includes
details on the new APIs.

Fixes #6404
Backport #6476
  • Loading branch information
niloc132 authored Dec 12, 2024
1 parent f98924b commit e67510f
Show file tree
Hide file tree
Showing 13 changed files with 511 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//
package io.deephaven.web.client.api;

import elemental2.core.Function;
import io.deephaven.web.client.api.grpc.GrpcTransportFactory;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsNullable;
import jsinterop.annotations.JsType;
Expand All @@ -29,20 +29,23 @@ public class ConnectOptions {

/**
* Set this to true to force the use of websockets when connecting to the deephaven instance, false to force the use
* of {@code fetch}.
* of {@code fetch}. Ignored if {@link #transportFactory} is set.
* <p>
* Defaults to null, indicating that the server URL should be checked to see if we connect with fetch or websockets.
*/
@JsNullable
public Boolean useWebsockets;

// TODO (deephaven-core#6214) provide our own grpc-web library that can replace fetch
// /**
// * Optional fetch implementation to use instead of the global {@code fetch()} call, allowing callers to provide a
// * polyfill rather than add a new global.
// */
// @JsNullable
// public Function fetch;
/**
* The transport factory to use for creating gRPC streams. If specified, the JS API will ignore
* {@link #useWebsockets} and its own internal logic for determining the appropriate transport to use.
* <p>
* Defaults to null, indicating that the JS API should determine the appropriate transport to use. If
* {@code useWebsockets} is set to true, the JS API will use websockets, otherwise if the server url begins with
* https, it will use fetch, otherwise it will use websockets.
*/
@JsNullable
public GrpcTransportFactory transportFactory;

public ConnectOptions() {

Expand All @@ -65,5 +68,8 @@ public ConnectOptions(Object connectOptions) {
// if (map.has("fetch")) {
// fetch = map.getAsAny("fetch").uncheckedCast();
// }
if (map.has("transportFactory")) {
transportFactory = map.getAsAny("transportFactory").uncheckedCast();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
import elemental2.core.JsArray;
import elemental2.core.JsSet;
import elemental2.promise.Promise;
import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc;
import io.deephaven.javascript.proto.dhinternal.grpcweb.client.RpcOptions;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse;
import io.deephaven.web.client.api.event.HasEventHandling;
import io.deephaven.web.client.api.grpc.MultiplexedWebsocketTransport;
import io.deephaven.web.client.ide.IdeSession;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.*;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket;
Expand Down Expand Up @@ -246,12 +244,8 @@ public void disconnected() {

public abstract void notifyServerShutdown(TerminationNotificationResponse success);

public boolean useWebsockets() {
Boolean useWebsockets = getOptions().useWebsockets;
if (useWebsockets == null) {
useWebsockets = getServerUrl().startsWith("http:");
}
return useWebsockets;
public boolean supportsClientStreaming() {
return getOptions().transportFactory.getSupportsClientStreaming();
}

public <T> T createClient(BiFunction<String, Object, T> constructor) {
Expand All @@ -261,12 +255,7 @@ public <T> T createClient(BiFunction<String, Object, T> constructor) {
public RpcOptions makeRpcOptions() {
RpcOptions options = RpcOptions.create();
options.setDebug(getOptions().debug);
if (useWebsockets()) {
// Replace with our custom websocket impl, with fallback to the built-in one
options.setTransport(o -> new MultiplexedWebsocketTransport(o, () -> {
Grpc.setDefaultTransport.onInvoke(Grpc.WebsocketTransport.onInvoke());
}));
}
options.setTransport(getOptions().transportFactory.adapt());
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ public BrowserHeaders metadata() {
}

public <ReqT, RespT> BiDiStream.Factory<ReqT, RespT> streamFactory() {
return new BiDiStream.Factory<>(info.useWebsockets(), this::metadata, config::newTicketInt);
return new BiDiStream.Factory<>(info.supportsClientStreaming(), this::metadata, config::newTicketInt);
}

public Promise<JsTable> newTable(String[] columnNames, String[] types, Object[][] data, String userTimeZone,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public interface NextStreamMessageFactory<Req> {
void nextStreamMessage(Req nextPayload, BrowserHeaders headers, JsBiConsumer<Object, Object> callback);
}
public static class Factory<ReqT, RespT> {
private final boolean useWebsockets;
private final boolean supportsClientStreaming;
private final Supplier<BrowserHeaders> headers;
private final IntSupplier nextIntTicket;

public Factory(boolean useWebsockets, Supplier<BrowserHeaders> headers, IntSupplier nextIntTicket) {
this.useWebsockets = useWebsockets;
public Factory(boolean supportsClientStreaming, Supplier<BrowserHeaders> headers, IntSupplier nextIntTicket) {
this.supportsClientStreaming = supportsClientStreaming;
this.headers = headers;
this.nextIntTicket = nextIntTicket;
}
Expand All @@ -51,8 +51,8 @@ public BiDiStream<ReqT, RespT> create(
OpenStreamFactory<ReqT> openEmulatedStream,
NextStreamMessageFactory<ReqT> nextEmulatedStream,
ReqT emptyReq) {
if (useWebsockets) {
return websocket(bidirectionalStream.openBiDiStream(headers.get()));
if (supportsClientStreaming) {
return bidi(bidirectionalStream.openBiDiStream(headers.get()));
} else {
return new EmulatedBiDiStream<>(
openEmulatedStream,
Expand All @@ -73,7 +73,7 @@ public static <Req, Resp> BiDiStream<Req, Resp> of(
IntSupplier nextIntTicket,
boolean useWebsocket) {
if (useWebsocket) {
return websocket(bidirectionalStream.openBiDiStream(headers.get()));
return bidi(bidirectionalStream.openBiDiStream(headers.get()));
} else {
return new EmulatedBiDiStream<>(
openEmulatedStream,
Expand All @@ -84,7 +84,7 @@ public static <Req, Resp> BiDiStream<Req, Resp> of(
}
}

public static <Req, Resp> BiDiStream<Req, Resp> websocket(Object bidirectionalStream) {
public static <Req, Resp> BiDiStream<Req, Resp> bidi(Object bidirectionalStream) {
return new WebsocketBiDiStream<>(Js.cast(bidirectionalStream));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.Uint8Array;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Transport;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsType;
import jsinterop.base.JsPropertyMap;

/**
* gRPC transport implementation.
*
*/
@JsType(namespace = "dh.grpc")
@TsInterface
public interface GrpcTransport {
/**
* Starts the stream, sending metadata to the server.
*
* @param metadata the headers to send the server when opening the connection
*/
void start(JsPropertyMap<HeaderValueUnion> metadata);

/**
* Sends a message to the server.
*
* @param msgBytes bytes to send to the server
*/
void sendMessage(Uint8Array msgBytes);

/**
* "Half close" the stream, signaling to the server that no more messages will be sent, but that the client is still
* open to receiving messages.
*/
void finishSend();

/**
* End the stream, both notifying the server that no more messages will be sent nor received, and preventing the
* client from receiving any more events.
*/
void cancel();

/**
* Helper to transform ts implementations to our own api.
*/
@JsIgnore
static GrpcTransport from(Transport tsTransport) {
return new GrpcTransport() {
@Override
public void start(JsPropertyMap<HeaderValueUnion> metadata) {
tsTransport.start(new BrowserHeaders(metadata));
}

@Override
public void sendMessage(Uint8Array msgBytes) {
tsTransport.sendMessage(msgBytes);
}

@Override
public void finishSend() {
tsTransport.finishSend();
}

@Override
public void cancel() {
tsTransport.cancel();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.Uint8Array;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.Transport;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportFactory;
import jsinterop.annotations.JsOverlay;
import jsinterop.annotations.JsProperty;
import jsinterop.annotations.JsType;
import jsinterop.base.Js;

/**
* Factory for creating gRPC transports.
*/
@TsInterface
@JsType(namespace = "dh.grpc", isNative = true)
public interface GrpcTransportFactory {
/**
* Create a new transport instance.
*
* @param options options for creating the transport
* @return a transport instance to use for gRPC communication
*/
GrpcTransport create(GrpcTransportOptions options);

/**
* Return true to signal that created transports may have {@link GrpcTransport#sendMessage(Uint8Array)} called on it
* more than once before {@link GrpcTransport#finishSend()} should be called.
*
* @return true to signal that the implementation can stream multiple messages, false otherwise indicating that
* Open/Next gRPC calls should be used
*/
@JsProperty
boolean getSupportsClientStreaming();

/**
* Adapt this factory to the transport factory used by the gRPC-web library.
*/
@JsOverlay
default TransportFactory adapt() {
return options -> {
GrpcTransport impl = create(GrpcTransportOptions.from(options));
return new Transport() {
@Override
public void cancel() {
impl.cancel();
}

@Override
public void finishSend() {
impl.finishSend();
}

@Override
public void sendMessage(Uint8Array msgBytes) {
impl.sendMessage(msgBytes);
}

@Override
public void start(BrowserHeaders metadata) {
impl.start(Js.cast(metadata.headersMap));
}
};
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.web.client.api.grpc;

import com.vertispan.tsdefs.annotations.TsInterface;
import elemental2.core.JsError;
import elemental2.core.Uint8Array;
import elemental2.dom.URL;
import io.deephaven.javascript.proto.dhinternal.browserheaders.BrowserHeaders;
import io.deephaven.javascript.proto.dhinternal.grpcweb.transports.transport.TransportOptions;
import jsinterop.annotations.JsFunction;
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsNullable;
import jsinterop.annotations.JsOptional;
import jsinterop.annotations.JsType;
import jsinterop.base.JsPropertyMap;

/**
* Options for creating a gRPC stream transport instance.
*/
@TsInterface
@JsType(namespace = "dh.grpc")
public class GrpcTransportOptions {
@JsFunction
@FunctionalInterface
public interface OnHeadersCallback {
void onHeaders(JsPropertyMap<HeaderValueUnion> headers, int status);
}

@JsFunction
@FunctionalInterface
public interface OnChunkCallback {
void onChunk(Uint8Array chunk);
}

@JsFunction
@FunctionalInterface
public interface OnEndCallback {
void onEnd(@JsOptional @JsNullable JsError error);
}

/**
* The gRPC method URL.
*/
public URL url;

/**
* True to enable debug logging for this stream.
*/
public boolean debug;

/**
* Callback for when headers and status are received. The headers are a map of header names to values, and the
* status is the HTTP status code. If the connection could not be made, the status should be 0.
*/
public OnHeadersCallback onHeaders;

/**
* Callback for when a chunk of data is received.
*/
public OnChunkCallback onChunk;

/**
* Callback for when the stream ends, with an error instance if it can be provided. Note that the present
* implementation does not consume errors, even if provided.
*/
public OnEndCallback onEnd;

/**
* Internal copy of options, to be used for fallback.
*/
@JsIgnore
public TransportOptions originalOptions;

/**
* Convert a {@link TransportOptions} instance to a {@link GrpcTransportOptions} instance.
*/
@JsIgnore
public static GrpcTransportOptions from(TransportOptions options) {
GrpcTransportOptions impl = new GrpcTransportOptions();
impl.url = new URL(options.getUrl());
impl.debug = options.isDebug();
impl.onHeaders = (headers, status) -> options.getOnHeaders().onInvoke(new BrowserHeaders(headers), status);
impl.onChunk = p0 -> {
// "false" because the underlying implementation doesn't rely on this anyway.
options.getOnChunk().onInvoke(p0, false);
};
impl.onEnd = options.getOnEnd()::onInvoke;
return impl;
}
}
Loading

0 comments on commit e67510f

Please sign in to comment.