Skip to content

Commit 58b61d0

Browse files
JoanFMjina-bot
andauthored
feat: enable HTTP and Composite protocols for Deployment (jina-ai#5764)
Signed-off-by: Joan Fontanals Martinez <[email protected]> Co-authored-by: Jina Dev Bot <[email protected]>
1 parent 8e09470 commit 58b61d0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1729
-1297
lines changed

.github/workflows/cd.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ jobs:
359359
run: |
360360
export LINKERD2_VERSION=stable-2.11.4
361361
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
362-
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_deployment.py ./tests/k8s/test_graceful_request_handling.py
362+
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_deployment.py ./tests/k8s/test_k8s_graceful_request_handling.py
363363
timeout-minutes: 30
364364
- name: Check codecov file
365365
id: check_files

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ jobs:
232232
run: |
233233
export LINKERD2_VERSION=stable-2.11.4
234234
curl --proto '=https' --tlsv1.2 -sSfL https://run.linkerd.io/install | sh
235-
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_deployment.py ./tests/k8s/test_graceful_request_handling.py
235+
pytest -v -s --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml ./tests/k8s/test_k8s_deployment.py ./tests/k8s/test_k8s_graceful_request_handling.py
236236
timeout-minutes: 45
237237
- name: Check codecov file
238238
id: check_files

docs/concepts/flow/deployment-args.md

+8-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
| `reload` | If set, the Executor will restart while serving if YAML configuration source or Executor modules are changed. If YAML configuration is changed, the whole deployment is reloaded and new processes will be restarted. If only Python modules of the Executor have changed, they will be reloaded to the interpreter without restarting process. | `boolean` | `False` |
3737
| `install_requirements` | If set, try to install `requirements.txt` from the local Executor if exists in the Executor folder. If using Hub, install `requirements.txt` in the Hub Executor bundle to local. | `boolean` | `False` |
3838
| `port` | The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports. Then, every resulting address will be considered as one replica of the Executor. | `number` | `random in [49152, 65535]` |
39+
| `protocol` | Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET']. | `array` | `[<ProtocolType.GRPC: 0>]` |
3940
| `monitoring` | If set, spawn an http server with a prometheus endpoint to expose metrics | `boolean` | `False` |
4041
| `port_monitoring` | The port on which the prometheus server is exposed, default is a random port between [49152, 65535] | `number` | `random in [49152, 65535]` |
4142
| `retries` | Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) | `number` | `-1` |
@@ -57,4 +58,10 @@
5758
| `when` | The condition that the documents need to fulfill before reaching the Executor.The condition can be defined in the form of a `DocArray query condition <https://docarray.jina.ai/fundamentals/documentarray/find/#query-by-conditions>` | `object` | `None` |
5859
| `external` | The Deployment will be considered an external Deployment that has been started independently from the Flow.This Deployment will not be context managed by the Flow. | `boolean` | `False` |
5960
| `grpc_metadata` | The metadata to be passed to the gRPC request. | `object` | `None` |
60-
| `tls` | If set, connect to deployment using tls encryption | `boolean` | `False` |
61+
| `tls` | If set, connect to deployment using tls encryption | `boolean` | `False` |
62+
| `title` | The title of this HTTP server. It will be used in automatics docs such as Swagger UI. | `string` | `None` |
63+
| `description` | The description of this HTTP server. It will be used in automatics docs such as Swagger UI. | `string` | `None` |
64+
| `cors` | If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. | `boolean` | `False` |
65+
| `uvicorn_kwargs` | Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server<br><br>More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/ | `object` | `None` |
66+
| `ssl_certfile` | the path to the certificate file | `string` | `None` |
67+
| `ssl_keyfile` | the path to the key file | `string` | `None` |

docs/concepts/flow/executor-args.md

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
| `reload` | If set, the Executor will restart while serving if YAML configuration source or Executor modules are changed. If YAML configuration is changed, the whole deployment is reloaded and new processes will be restarted. If only Python modules of the Executor have changed, they will be reloaded to the interpreter without restarting process. | `boolean` | `False` |
3737
| `install_requirements` | If set, try to install `requirements.txt` from the local Executor if exists in the Executor folder. If using Hub, install `requirements.txt` in the Hub Executor bundle to local. | `boolean` | `False` |
3838
| `port` | The port for input data to bind to, default is a random port between [49152, 65535]. In the case of an external Executor (`--external` or `external=True`) this can be a list of ports. Then, every resulting address will be considered as one replica of the Executor. | `number` | `random in [49152, 65535]` |
39+
| `protocol` | Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET']. | `array` | `[<ProtocolType.GRPC: 0>]` |
3940
| `monitoring` | If set, spawn an http server with a prometheus endpoint to expose metrics | `boolean` | `False` |
4041
| `port_monitoring` | The port on which the prometheus server is exposed, default is a random port between [49152, 65535] | `number` | `random in [49152, 65535]` |
4142
| `retries` | Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) | `number` | `-1` |

docs/concepts/flow/gateway-args.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@
1212
| `title` | The title of this HTTP server. It will be used in automatics docs such as Swagger UI. | `string` | `None` |
1313
| `description` | The description of this HTTP server. It will be used in automatics docs such as Swagger UI. | `string` | `None` |
1414
| `cors` | If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access. | `boolean` | `False` |
15-
| `no_debug_endpoints` | If set, `/status` `/post` endpoints are removed from HTTP interface. | `boolean` | `False` |
16-
| `no_crud_endpoints` | If set, `/index`, `/search`, `/update`, `/delete` endpoints are removed from HTTP interface.<br><br> Any executor that has `@requests(on=...)` bound with those values will receive data requests. | `boolean` | `False` |
17-
| `expose_endpoints` | A JSON string that represents a map from executor endpoints (`@requests(on=...)`) to HTTP endpoints. | `string` | `None` |
1815
| `uvicorn_kwargs` | Dictionary of kwargs arguments that will be passed to Uvicorn server when starting the server<br><br>More details can be found in Uvicorn docs: https://www.uvicorn.org/settings/ | `object` | `None` |
1916
| `ssl_certfile` | the path to the certificate file | `string` | `None` |
2017
| `ssl_keyfile` | the path to the key file | `string` | `None` |
18+
| `no_debug_endpoints` | If set, `/status` `/post` endpoints are removed from HTTP interface. | `boolean` | `False` |
19+
| `no_crud_endpoints` | If set, `/index`, `/search`, `/update`, `/delete` endpoints are removed from HTTP interface.<br><br> Any executor that has `@requests(on=...)` bound with those values will receive data requests. | `boolean` | `False` |
20+
| `expose_endpoints` | A JSON string that represents a map from executor endpoints (`@requests(on=...)`) to HTTP endpoints. | `string` | `None` |
2121
| `expose_graphql_endpoint` | If set, /graphql endpoint is added to HTTP interface. | `boolean` | `False` |
22-
| `protocol` | Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET']. | `array` | `[<GatewayProtocolType.GRPC: 0>]` |
2322
| `host` | The host address of the runtime, by default it is 0.0.0.0. | `string` | `0.0.0.0` |
2423
| `proxy` | If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy | `boolean` | `False` |
2524
| `uses` | The config of the gateway, it could be one of the followings:<br> * the string literal of an Gateway class name<br> * a Gateway YAML file (.yml, .yaml, .jaml)<br> * a docker image (must start with `docker://`)<br> * the string literal of a YAML config (must start with `!` or `jtype: `)<br> * the string literal of a JSON config<br><br> When use it under Python, one can use the following values additionally:<br> - a Python dict that represents the config<br> - a text file stream has `.read()` interface | `string` | `None` |
@@ -42,6 +41,7 @@
4241
| `floating` | If set, the current Pod/Deployment can not be further chained, and the next `.add()` will chain after the last Pod/Deployment not this current one. | `boolean` | `False` |
4342
| `reload` | If set, the Gateway will restart while serving if YAML configuration source is changed. | `boolean` | `False` |
4443
| `port` | The port for input data to bind the gateway server to, by default, random ports between range [49152, 65535] will be assigned. The port argument can be either 1 single value in case only 1 protocol is used or multiple values when many protocols are used. | `number` | `random in [49152, 65535]` |
44+
| `protocol` | Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET']. | `array` | `[<ProtocolType.GRPC: 0>]` |
4545
| `monitoring` | If set, spawn an http server with a prometheus endpoint to expose metrics | `boolean` | `False` |
4646
| `port_monitoring` | The port on which the prometheus server is exposed, default is a random port between [49152, 65535] | `number` | `random in [49152, 65535]` |
4747
| `retries` | Number of retries per gRPC call. If <0 it defaults to max(3, num_replicas) | `number` | `-1` |

jina/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def _warning_on_one_line(message, category, filename, lineno, *args, **kwargs):
6565

6666
# do not change this line manually
6767
# this is managed by proto/build-proto.sh and updated on every execution
68-
__proto_version__ = '0.1.13'
68+
__proto_version__ = '0.1.17'
6969

7070
try:
7171
__docarray_version__ = _docarray.__version__

jina/checker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import argparse
22

3-
from jina.enums import GatewayProtocolType
3+
from jina.enums import ProtocolType
44
from jina.helper import parse_host_scheme
55
from jina.logging.predefined import default_logger
66

jina/clients/__init__.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
__all__ = ['Client']
88

9-
from jina.enums import GatewayProtocolType
9+
from jina.enums import ProtocolType
1010

1111
if TYPE_CHECKING: # pragma: no cover
1212
from jina.clients.grpc import AsyncGRPCClient, GRPCClient
@@ -132,15 +132,13 @@ def Client(
132132
): # we need to parse the kwargs as soon as possible otherwise to get the gateway type
133133
args = parse_client(kwargs)
134134

135-
protocol = (
136-
args.protocol if args else kwargs.get('protocol', GatewayProtocolType.GRPC)
137-
)
135+
protocol = args.protocol if args else kwargs.get('protocol', ProtocolType.GRPC)
138136
if isinstance(protocol, str):
139-
protocol = GatewayProtocolType.from_string(protocol)
137+
protocol = ProtocolType.from_string(protocol)
140138

141139
is_async = (args and args.asyncio) or kwargs.get('asyncio', False)
142140

143-
if protocol == GatewayProtocolType.GRPC:
141+
if protocol == ProtocolType.GRPC:
144142
if is_async:
145143
from jina.clients.grpc import AsyncGRPCClient
146144

@@ -149,7 +147,7 @@ def Client(
149147
from jina.clients.grpc import GRPCClient
150148

151149
return GRPCClient(args, **kwargs)
152-
elif protocol == GatewayProtocolType.WEBSOCKET:
150+
elif protocol == ProtocolType.WEBSOCKET:
153151
if is_async:
154152
from jina.clients.websocket import AsyncWebSocketClient
155153

@@ -158,7 +156,7 @@ def Client(
158156
from jina.clients.websocket import WebSocketClient
159157

160158
return WebSocketClient(args, **kwargs)
161-
elif protocol == GatewayProtocolType.HTTP:
159+
elif protocol == ProtocolType.HTTP:
162160
if is_async:
163161
from jina.clients.http import AsyncHTTPClient
164162

jina/clients/base/http.py

+56-17
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,34 @@
1818
class HTTPBaseClient(BaseClient):
1919
"""A MixIn for HTTP Client."""
2020

21+
def __init__(self, *args, **kwargs):
22+
super().__init__(*args, **kwargs)
23+
self._endpoints = []
24+
25+
async def _get_endpoints_from_openapi(self):
26+
def extract_paths_by_method(spec):
27+
paths_by_method = {}
28+
for path, methods in spec['paths'].items():
29+
for method, details in methods.items():
30+
if method not in paths_by_method:
31+
paths_by_method[method] = []
32+
paths_by_method[method].append(path.strip('/'))
33+
34+
return paths_by_method
35+
import aiohttp
36+
import json
37+
38+
proto = 'https' if self.args.tls else 'http'
39+
target_url = f'{proto}://{self.args.host}:{self.args.port}/openapi.json'
40+
try:
41+
async with aiohttp.ClientSession() as session:
42+
async with session.get(target_url) as response:
43+
content = await response.read()
44+
openapi_response = json.loads(content.decode())
45+
self._endpoints = extract_paths_by_method(openapi_response).get('post', [])
46+
except:
47+
pass
48+
2149
async def _is_flow_ready(self, **kwargs) -> bool:
2250
"""Sends a dry run to the Flow to validate if the Flow is ready to receive requests
2351
@@ -57,18 +85,18 @@ async def _is_flow_ready(self, **kwargs) -> bool:
5785
return False
5886

5987
async def _get_results(
60-
self,
61-
inputs: 'InputType',
62-
on_done: 'CallbackFnType',
63-
on_error: Optional['CallbackFnType'] = None,
64-
on_always: Optional['CallbackFnType'] = None,
65-
max_attempts: int = 1,
66-
initial_backoff: float = 0.5,
67-
max_backoff: float = 0.1,
68-
backoff_multiplier: float = 1.5,
69-
results_in_order: bool = False,
70-
prefetch: Optional[int] = None,
71-
**kwargs,
88+
self,
89+
inputs: 'InputType',
90+
on_done: 'CallbackFnType',
91+
on_error: Optional['CallbackFnType'] = None,
92+
on_always: Optional['CallbackFnType'] = None,
93+
max_attempts: int = 1,
94+
initial_backoff: float = 0.5,
95+
max_backoff: float = 0.1,
96+
backoff_multiplier: float = 1.5,
97+
results_in_order: bool = False,
98+
prefetch: Optional[int] = None,
99+
**kwargs,
72100
):
73101
"""
74102
:param inputs: the callable
@@ -89,15 +117,26 @@ async def _get_results(
89117

90118
self.inputs = inputs
91119
request_iterator = self._get_requests(**kwargs)
120+
on = kwargs.get('on', '/post')
121+
if len(self._endpoints) == 0:
122+
await self._get_endpoints_from_openapi()
92123

93124
async with AsyncExitStack() as stack:
94125
cm1 = ProgressBar(
95-
total_length=self._inputs_length, disable=not (self.show_progress)
126+
total_length=self._inputs_length, disable=not self.show_progress
96127
)
97128
p_bar = stack.enter_context(cm1)
98-
99129
proto = 'https' if self.args.tls else 'http'
100-
url = f'{proto}://{self.args.host}:{self.args.port}/post'
130+
endpoint = on.strip('/')
131+
has_default_endpoint = 'default' in self._endpoints
132+
133+
if endpoint != '' and endpoint in self._endpoints:
134+
url = f'{proto}://{self.args.host}:{self.args.port}/{on.strip("/")}'
135+
elif has_default_endpoint:
136+
url = f'{proto}://{self.args.host}:{self.args.port}/default'
137+
else:
138+
url = f'{proto}://{self.args.host}:{self.args.port}/post'
139+
101140
iolet = await stack.enter_async_context(
102141
HTTPClientlet(
103142
url=url,
@@ -112,7 +151,7 @@ async def _get_results(
112151
)
113152

114153
def _request_handler(
115-
request: 'Request',
154+
request: 'Request',
116155
) -> 'Tuple[asyncio.Future, Optional[asyncio.Future]]':
117156
"""
118157
For HTTP Client, for each request in the iterator, we `send_message` using
@@ -135,7 +174,7 @@ def _result_handler(result):
135174
**streamer_args,
136175
)
137176
async for response in streamer.stream(
138-
request_iterator=request_iterator, results_in_order=results_in_order
177+
request_iterator=request_iterator, results_in_order=results_in_order
139178
):
140179
r_status = response.status
141180

0 commit comments

Comments
 (0)