Skip to content

Commit

Permalink
fix(trino): db session error in handle cursor (#31265)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1e0c04f)
  • Loading branch information
justinpark authored and michael-s-molina committed Dec 3, 2024
1 parent 1dcccc0 commit 4d2f13e
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions superset/db_engine_specs/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=consider-using-transaction
from __future__ import annotations

import contextlib
Expand Down Expand Up @@ -185,6 +184,8 @@ def handle_cursor(cls, cursor: Cursor, query: Query) -> None:
if tracking_url := cls.get_tracking_url(cursor):
query.tracking_url = tracking_url

db.session.commit() # pylint: disable=consider-using-transaction

# if query cancelation was requested prior to the handle_cursor call, but
# the query was still executed, trigger the actual query cancelation now
if query.extra.get(QUERY_EARLY_CANCEL_KEY):
Expand Down Expand Up @@ -213,6 +214,7 @@ def execute_with_cursor(
# Fetch the query ID beforehand, since it might fail inside the thread due to
# how the SQLAlchemy session is handled.
query_id = query.id
query_database = query.database

execute_result: dict[str, Any] = {}
execute_event = threading.Event()
Expand All @@ -234,7 +236,7 @@ def _execute(
with app.app_context():
for key, value in g_copy.__dict__.items():
setattr(g, key, value)
cls.execute(cursor, sql, query.database)
cls.execute(cursor, sql, query_database)
except Exception as ex: # pylint: disable=broad-except
results["error"] = ex
finally:
Expand All @@ -251,6 +253,8 @@ def _execute(
)
execute_thread.start()

# Wait for the thread to start before continuing
time.sleep(0.1)
# Wait for a query ID to be available before handling the cursor, as
# it's required by that method; it may never become available on error.
while not cursor.query_id and not execute_event.is_set():
Expand All @@ -272,7 +276,7 @@ def _execute(
def prepare_cancel_query(cls, query: Query) -> None:
if QUERY_CANCEL_KEY not in query.extra:
query.set_extra_json_key(QUERY_EARLY_CANCEL_KEY, True)
db.session.commit()
db.session.commit() # pylint: disable=consider-using-transaction

@classmethod
def cancel_query(cls, cursor: Cursor, query: Query, cancel_query_id: str) -> bool:
Expand Down

0 comments on commit 4d2f13e

Please sign in to comment.