Skip to content

Commit ed5fb49

Browse files
committed
Fix stream next error handling
1 parent a2536ff commit ed5fb49

File tree

3 files changed

+18
-6
lines changed

3 files changed

+18
-6
lines changed

ydb/_utilities.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,10 @@ def __next__(self):
149149

150150

151151
class SyncResponseIterator(object):
152-
def __init__(self, it, wrapper):
152+
def __init__(self, it, wrapper, error_converter=None):
153153
self.it = it
154154
self.wrapper = wrapper
155+
self.error_converter = error_converter
155156

156157
def cancel(self):
157158
self.it.cancel()
@@ -161,9 +162,16 @@ def __iter__(self):
161162
return self
162163

163164
def _next(self):
164-
res = self.wrapper(next(self.it))
165+
try:
166+
res = self.wrapper(next(self.it))
167+
except BaseException as e:
168+
if self.error_converter:
169+
raise self.error_converter(e) from e
170+
raise e
171+
165172
if res is not None:
166173
return res
174+
167175
return self._next()
168176

169177
def next(self):

ydb/query/session.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from .transaction import QueryTxContext
2020

2121
from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT
22+
from .._errors import stream_error_converter
2223

2324

2425
logger = logging.getLogger(__name__)
@@ -362,12 +363,13 @@ def execute(
362363
)
363364

364365
return base.SyncResponseContextIterator(
365-
stream_it,
366-
lambda resp: base.wrap_execute_query_response(
366+
it=stream_it,
367+
wrapper=lambda resp: base.wrap_execute_query_response(
367368
rpc_state=None,
368369
response_pb=resp,
369370
session_state=self._state,
370371
session=self,
371372
settings=self._settings,
372373
),
374+
error_converter=stream_error_converter,
373375
)

ydb/query/transaction.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from ..connection import _RpcState as RpcState
1717

1818
from . import base
19+
from .._errors import stream_error_converter
1920
from ..settings import BaseRequestSettings
2021

2122
logger = logging.getLogger(__name__)
@@ -500,14 +501,15 @@ def execute(
500501
)
501502

502503
self._prev_stream = base.SyncResponseContextIterator(
503-
stream_it,
504-
lambda resp: base.wrap_execute_query_response(
504+
it=stream_it,
505+
wrapper=lambda resp: base.wrap_execute_query_response(
505506
rpc_state=None,
506507
response_pb=resp,
507508
session_state=self._session_state,
508509
tx=self,
509510
commit_tx=commit_tx,
510511
settings=self.session._settings,
511512
),
513+
error_converter=stream_error_converter,
512514
)
513515
return self._prev_stream

0 commit comments

Comments
 (0)