Skip to content

Commit b36e6bd

Browse files
authored
feat: add get_streamer helper and inject streamer info (jina-ai#5472)
1 parent dbb493c commit b36e6bd

File tree

8 files changed

+199
-63
lines changed

8 files changed

+199
-63
lines changed

docs/envs/index.md

+23-22
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,26 @@ For more information about the environment variable syntax used in Jina YAML con
5252

5353
The following environment variables are used internally in Jina:
5454

55-
| Environment variable | Description |
56-
|-------------------------------|--------------------------------------------------------------------------------------------------------|
57-
| `JINA_AUTH_TOKEN` | Authentication token of Jina Cloud |
58-
| `JINA_DEFAULT_HOST` | The default host where the server is exposed |
59-
| `JINA_DEFAULT_TIMEOUT_CTRL` | The default timeout time used by Flow to check the readiness of Executors |
60-
| `JINA_DEPLOYMENT_NAME` | The name of the deployment, used by the Head Runtime in Kubernetes to connect to different deployments |
61-
| `JINA_DISABLE_UVLOOP` | If set, Jina will not use uvloop event loop for concurrent execution |
62-
| `JINA_FULL_CLI` | If set, all the CLI options will be shown in help |
63-
| `JINA_GATEWAY_IMAGE` | Used when exporting a Flow to Kubernetes or docker-compose to override the default gateway image |
64-
| `JINA_GRPC_RECV_BYTES` | Set by the grpc service to keep track of the received bytes |
65-
| `JINA_GRPC_SEND_BYTES` | Set by the grpc service to keep track of the sent bytes |
66-
| `JINA_LOG_CONFIG` | The configuration used for the logger |
67-
| `JINA_LOG_LEVEL` | The logging level used: INFO, DEBUG, WARNING |
68-
| `JINA_LOG_NO_COLOR` | If set, disables color from rich console |
69-
| `JINA_MP_START_METHOD` | Sets the multiprocessing start method used by jina |
70-
| `JINA_RANDOM_PORT_MAX` | The max port number used when selecting random ports to apply for Executors or gateway |
71-
| `JINA_RANDOM_PORT_MIN` | The min port number used when selecting random ports to apply for Executors or gateway |
72-
| `JINA_LOCKS_ROOT` | The root folder where file locks for concurrent Executor initialization |
73-
| `JINA_OPTOUT_TELEMETRY` | If set, disables telemetry |
74-
| `JINA_K8S_ACCESS_MODES` | Configures the access modes for the PersistentVolumeClaim attached to the StatefulSet, when creating a StatefulSet in Kubernetes for an Executor using volumes. Defaults to '["ReadWriteOnce"]' |
75-
| `JINA_K8S_STORAGE_CLASS_NAME` | Configures the storage class for the PersistentVolumeClaim attached to the StatefulSet, when creating a StatefulSet in Kubernetes for an Executor using volumes. Defaults to 'standard' |
76-
| `JINA_K8S_STORAGE_CAPACITY` | Configures the capacity for the PersistentVolumeClaim attached to the StatefulSet, when creating a StatefulSet in Kubernetes for an Executor using volumes. Defaults to '10G' | |
55+
| Environment variable | Description |
56+
|-------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
57+
| `JINA_AUTH_TOKEN` | Authentication token of Jina Cloud |
58+
| `JINA_DEFAULT_HOST` | The default host where the server is exposed |
59+
| `JINA_DEFAULT_TIMEOUT_CTRL` | The default timeout time used by Flow to check the readiness of Executors |
60+
| `JINA_DEPLOYMENT_NAME` | The name of the deployment, used by the Head Runtime in Kubernetes to connect to different deployments |
61+
| `JINA_DISABLE_UVLOOP` | If set, Jina will not use uvloop event loop for concurrent execution |
62+
| `JINA_FULL_CLI` | If set, all the CLI options will be shown in help |
63+
| `JINA_GATEWAY_IMAGE` | Used when exporting a Flow to Kubernetes or docker-compose to override the default gateway image |
64+
| `JINA_GRPC_RECV_BYTES` | Set by the grpc service to keep track of the received bytes |
65+
| `JINA_GRPC_SEND_BYTES` | Set by the grpc service to keep track of the sent bytes |
66+
| `JINA_LOG_CONFIG` | The configuration used for the logger |
67+
| `JINA_LOG_LEVEL` | The logging level used: INFO, DEBUG, WARNING |
68+
| `JINA_LOG_NO_COLOR` | If set, disables color from rich console |
69+
| `JINA_MP_START_METHOD` | Sets the multiprocessing start method used by jina |
70+
| `JINA_RANDOM_PORT_MAX` | The max port number used when selecting random ports to apply for Executors or gateway |
71+
| `JINA_RANDOM_PORT_MIN` | The min port number used when selecting random ports to apply for Executors or gateway |
72+
| `JINA_LOCKS_ROOT` | The root folder where file locks for concurrent Executor initialization |
73+
| `JINA_OPTOUT_TELEMETRY` | If set, disables telemetry |
74+
| `JINA_K8S_ACCESS_MODES` | Configures the access modes for the PersistentVolumeClaim attached to the StatefulSet, when creating a StatefulSet in Kubernetes for an Executor using volumes. Defaults to '["ReadWriteOnce"]' |
75+
| `JINA_K8S_STORAGE_CLASS_NAME` | Configures the storage class for the PersistentVolumeClaim attached to the StatefulSet, when creating a StatefulSet in Kubernetes for an Executor using volumes. Defaults to 'standard' |
76+
| `JINA_K8S_STORAGE_CAPACITY` | Configures the capacity for the PersistentVolumeClaim attached to the StatefulSet, when creating a StatefulSet in Kubernetes for an Executor using volumes. Defaults to '10G' |
77+
| `JINA_STREAMER_ARGS` | Jina uses this variable to inject GatewayStreamer arguments into the host environment running a Gateway |

jina/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def _warning_on_one_line(message, category, filename, lineno, *args, **kwargs):
107107
'JINA_K8S_ACCESS_MODES',
108108
'JINA_K8S_STORAGE_CLASS_NAME',
109109
'JINA_K8S_STORAGE_CAPACITY',
110+
'JINA_STREAMER_ARGS',
110111
)
111112

112113
__default_host__ = _os.environ.get(

jina/serve/gateway.py

+12
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,18 @@ def __init__(
111111
aio_tracing_client_interceptors=self.runtime_args.aio_tracing_client_interceptors,
112112
tracing_client_interceptor=self.runtime_args.tracing_client_interceptor,
113113
)
114+
GatewayStreamer._set_env_streamer_args(
115+
graph_representation=graph_description,
116+
executor_addresses=deployments_addresses,
117+
graph_conditions=graph_conditions,
118+
deployments_metadata=deployments_metadata,
119+
deployments_no_reduce=deployments_no_reduce,
120+
timeout_send=self.runtime_args.timeout_send,
121+
retries=self.runtime_args.retries,
122+
compression=self.runtime_args.compression,
123+
runtime_name=self.runtime_args.runtime_name,
124+
prefetch=self.runtime_args.prefetch,
125+
)
114126

115127
def _add_runtime_args(self, _runtime_args: Optional[Dict]):
116128
from jina.parsers import set_gateway_runtime_args_parser

jina/serve/streamer.py

+59-33
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import json
2+
import os
13
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
24

35
from docarray import DocumentArray
6+
47
from jina.logging.logger import JinaLogger
58
from jina.serve.networking import GrpcConnectionPool
69
from jina.serve.runtimes.gateway.graph.topology_graph import TopologyGraph
@@ -24,22 +27,22 @@ class GatewayStreamer:
2427
"""
2528

2629
def __init__(
27-
self,
28-
graph_representation: Dict,
29-
executor_addresses: Dict[str, Union[str, List[str]]],
30-
graph_conditions: Dict = {},
31-
deployments_metadata: Dict[str, Dict[str, str]] = {},
32-
deployments_no_reduce: List[str] = [],
33-
timeout_send: Optional[float] = None,
34-
retries: int = 0,
35-
compression: Optional[str] = None,
36-
runtime_name: str = 'custom gateway',
37-
prefetch: int = 0,
38-
logger: Optional['JinaLogger'] = None,
39-
metrics_registry: Optional['CollectorRegistry'] = None,
40-
meter: Optional['Meter'] = None,
41-
aio_tracing_client_interceptors: Optional[Sequence['ClientInterceptor']] = None,
42-
tracing_client_interceptor: Optional['OpenTelemetryClientInterceptor'] = None,
30+
self,
31+
graph_representation: Dict,
32+
executor_addresses: Dict[str, Union[str, List[str]]],
33+
graph_conditions: Dict = {},
34+
deployments_metadata: Dict[str, Dict[str, str]] = {},
35+
deployments_no_reduce: List[str] = [],
36+
timeout_send: Optional[float] = None,
37+
retries: int = 0,
38+
compression: Optional[str] = None,
39+
runtime_name: str = 'custom gateway',
40+
prefetch: int = 0,
41+
logger: Optional['JinaLogger'] = None,
42+
metrics_registry: Optional['CollectorRegistry'] = None,
43+
meter: Optional['Meter'] = None,
44+
aio_tracing_client_interceptors: Optional[Sequence['ClientInterceptor']] = None,
45+
tracing_client_interceptor: Optional['OpenTelemetryClientInterceptor'] = None,
4346
):
4447
"""
4548
:param graph_representation: A dictionary describing the topology of the Deployments. 2 special nodes are expected, the name `start-gateway` and `end-gateway` to
@@ -97,14 +100,14 @@ def __init__(
97100
self._streamer.Call = self._streamer.stream
98101

99102
def _create_connection_pool(
100-
self,
101-
deployments_addresses,
102-
compression,
103-
metrics_registry,
104-
meter,
105-
logger,
106-
aio_tracing_client_interceptors,
107-
tracing_client_interceptor,
103+
self,
104+
deployments_addresses,
105+
compression,
106+
metrics_registry,
107+
meter,
108+
logger,
109+
aio_tracing_client_interceptors,
110+
tracing_client_interceptor,
108111
):
109112
# add the connections needed
110113
connection_pool = GrpcConnectionPool(
@@ -135,14 +138,14 @@ def stream(self, *args, **kwargs):
135138
return self._streamer.stream(*args, **kwargs)
136139

137140
async def stream_docs(
138-
self,
139-
docs: DocumentArray,
140-
request_size: int = 100,
141-
return_results: bool = False,
142-
exec_endpoint: Optional[str] = None,
143-
target_executor: Optional[str] = None,
144-
parameters: Optional[Dict] = None,
145-
results_in_order: bool = False
141+
self,
142+
docs: DocumentArray,
143+
request_size: int = 100,
144+
return_results: bool = False,
145+
exec_endpoint: Optional[str] = None,
146+
target_executor: Optional[str] = None,
147+
parameters: Optional[Dict] = None,
148+
results_in_order: bool = False,
146149
):
147150
"""
148151
stream documents and stream responses back.
@@ -170,7 +173,9 @@ def _req_generator():
170173
req.parameters = parameters
171174
yield req
172175

173-
async for resp in self.stream(request_iterator=_req_generator(), results_in_order=results_in_order):
176+
async for resp in self.stream(
177+
request_iterator=_req_generator(), results_in_order=results_in_order
178+
):
174179
if return_results:
175180
yield resp
176181
else:
@@ -184,3 +189,24 @@ async def close(self):
184189
await self._connection_pool.close()
185190

186191
Call = stream
192+
193+
@staticmethod
194+
def get_streamer():
195+
"""
196+
Return a streamer object based on the current environment context.
197+
The streamer object is contructed using runtime arguments stored in the `JINA_STREAMER_ARGS` environment variable.
198+
If this method is used outside a Jina context (process not controlled/orchestrated by jina), this method will
199+
raise an error.
200+
The streamer object does not have tracing/instrumentation capabilities.
201+
202+
:return: Returns an instance of `GatewayStreamer`
203+
"""
204+
if 'JINA_STREAMER_ARGS' in os.environ:
205+
args_dict = json.loads(os.environ['JINA_STREAMER_ARGS'])
206+
return GatewayStreamer(**args_dict)
207+
else:
208+
raise OSError('JINA_STREAMER_ARGS environment variable is not set')
209+
210+
@staticmethod
211+
def _set_env_streamer_args(**kwargs):
212+
os.environ['JINA_STREAMER_ARGS'] = json.dumps(kwargs)

tests/unit/orchestrate/flow/flow-construct/test_flow_custom_gateway.py

+20-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
_validate_dummy_custom_gateway_response,
1111
)
1212
from tests.unit.yaml.dummy_gateway import DummyGateway
13+
from tests.unit.yaml.dummy_gateway_get_streamer import DummyGatewayGetStreamer
1314

1415
cur_dir = os.path.dirname(os.path.abspath(__file__))
1516
_dummy_gateway_yaml_path = os.path.join(
@@ -28,7 +29,12 @@
2829
@pytest.mark.parametrize(
2930
'uses,uses_with,expected',
3031
[
31-
('DummyGateway', {}, {'arg1': None, 'arg2': None, 'arg3': 'default-arg3'}),
32+
(DummyGateway, {}, {'arg1': None, 'arg2': None, 'arg3': 'default-arg3'}),
33+
(
34+
DummyGatewayGetStreamer,
35+
{},
36+
{'arg1': None, 'arg2': None, 'arg3': 'default-arg3'},
37+
),
3238
(
3339
_dummy_gateway_yaml_path,
3440
{},
@@ -40,7 +46,12 @@
4046
{'arg1': 'hello', 'arg2': 'world', 'arg3': 'default-arg3'},
4147
),
4248
(
43-
'DummyGateway',
49+
DummyGateway,
50+
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
51+
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
52+
),
53+
(
54+
DummyGatewayGetStreamer,
4455
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
4556
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
4657
),
@@ -55,7 +66,12 @@
5566
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
5667
),
5768
(
58-
'DummyGateway',
69+
DummyGateway,
70+
{'arg1': 'arg1'},
71+
{'arg1': 'arg1', 'arg2': None, 'arg3': 'default-arg3'},
72+
),
73+
(
74+
DummyGatewayGetStreamer,
5975
{'arg1': 'arg1'},
6076
{'arg1': 'arg1', 'arg2': None, 'arg3': 'default-arg3'},
6177
),
@@ -74,9 +90,7 @@
7490
def test_flow_custom_gateway_no_executor(uses, uses_with, expected):
7591

7692
flow = (
77-
Flow()
78-
.config_gateway(uses=uses, uses_with=uses_with)
79-
.add(uses='ProcessExecutor')
93+
Flow().config_gateway(uses=uses, uses_with=uses_with).add(uses=ProcessExecutor)
8094
)
8195
with flow:
8296
_validate_dummy_custom_gateway_response(flow.port, expected)

tests/unit/serve/gateway/test_gateway.py

+16
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
_validate_dummy_custom_gateway_response,
1515
)
1616
from tests.unit.yaml.dummy_gateway import DummyGateway
17+
from tests.unit.yaml.dummy_gateway_get_streamer import DummyGatewayGetStreamer
1718

1819
cur_dir = os.path.dirname(os.path.abspath(__file__))
1920
_dummy_gateway_yaml_path = os.path.join(cur_dir, '../../yaml/test-custom-gateway.yml')
@@ -84,6 +85,11 @@ def _start_worker_runtime(uses):
8485
'uses,uses_with,expected',
8586
[
8687
('DummyGateway', {}, {'arg1': None, 'arg2': None, 'arg3': 'default-arg3'}),
88+
(
89+
'DummyGatewayGetStreamer',
90+
{},
91+
{'arg1': None, 'arg2': None, 'arg3': 'default-arg3'},
92+
),
8793
(
8894
_dummy_gateway_yaml_path,
8995
{},
@@ -99,6 +105,11 @@ def _start_worker_runtime(uses):
99105
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
100106
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
101107
),
108+
(
109+
'DummyGatewayGetStreamer',
110+
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
111+
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
112+
),
102113
(
103114
_dummy_gateway_yaml_path,
104115
{'arg1': 'arg1', 'arg2': 'arg2', 'arg3': 'arg3'},
@@ -114,6 +125,11 @@ def _start_worker_runtime(uses):
114125
{'arg1': 'arg1'},
115126
{'arg1': 'arg1', 'arg2': None, 'arg3': 'default-arg3'},
116127
),
128+
(
129+
'DummyGatewayGetStreamer',
130+
{'arg1': 'arg1'},
131+
{'arg1': 'arg1', 'arg2': None, 'arg3': 'default-arg3'},
132+
),
117133
(
118134
_dummy_gateway_yaml_path,
119135
{'arg1': 'arg1'},

tests/unit/yaml/dummy_gateway.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
from pydantic import BaseModel
55
from uvicorn import Config, Server
66

7-
from jina import Gateway, __default_host__
8-
from jina.clients.request import request_generator
7+
from jina import Gateway
98

109

1110
class DummyResponseModel(BaseModel):

0 commit comments

Comments
 (0)