Skip to content

fix: manage async client sessions in pinecone #1884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-FileCopyrightText: 2023-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0
import asyncio
from copy import copy
from typing import Any, Dict, List, Literal, Optional

Expand Down Expand Up @@ -72,7 +73,9 @@ def __init__(

self._index = None
self._async_index = None
self._async_client = None
self._dummy_vector = [-10.0] * self.dimension
self._async_init_lock = asyncio.Lock()

def _initialize_index(self):
if self._index is not None:
Expand Down Expand Up @@ -102,39 +105,39 @@ def _initialize_index(self):
self._dummy_vector = [-10.0] * self.dimension

async def _initialize_async_index(self):
if self._async_index is not None:
return self._async_index

async_client = PineconeAsyncio(api_key=self.api_key.resolve_value(), source_tag="haystack")

indexes = await async_client.list_indexes()
if self.index_name not in indexes.names():
logger.info(f"Index {self.index_name} does not exist. Creating a new index.")
pinecone_spec = self._convert_dict_spec_to_pinecone_object(self.spec)
new_index = await async_client.create_index(
name=self.index_name, dimension=self.dimension, spec=pinecone_spec, metric=self.metric
)
host = new_index["host"]
else:
logger.info(
f"Connecting to existing index {self.index_name}. `dimension`, `spec`, and `metric` will be ignored."
)
host = next((index["host"] for index in indexes if index["name"] == self.index_name), None)
async with self._async_init_lock:
if self._async_index is not None:
return self._async_index

self._async_client = PineconeAsyncio(api_key=self.api_key.resolve_value(), source_tag="haystack")

indexes = await self._async_client.list_indexes()
if self.index_name not in indexes.names():
logger.info(f"Index {self.index_name} does not exist. Creating a new index.")
pinecone_spec = self._convert_dict_spec_to_pinecone_object(self.spec)
new_index = await self._async_client.create_index(
name=self.index_name, dimension=self.dimension, spec=pinecone_spec, metric=self.metric
)
host = new_index["host"]
else:
logger.info(
f"Connecting to existing index {self.index_name}. `dimension`,"
f"`spec`, and `metric` will be ignored."
)
host = next((index["host"] for index in indexes if index["name"] == self.index_name), None)

self._async_index = async_client.IndexAsyncio(host=host)
self._async_index = self._async_client.IndexAsyncio(host=host)

index_stats = await self._async_index.describe_index_stats()
actual_dimension = index_stats.get("dimension")
if actual_dimension and actual_dimension != self.dimension:
logger.warning(
f"Dimension of index {self.index_name} is {actual_dimension}, but {self.dimension} was specified. "
"The specified dimension will be ignored."
"If you need an index with a different dimension, please create a new one."
)
self.dimension = actual_dimension or self.dimension
self._dummy_vector = [-10.0] * self.dimension

await async_client.close()
index_stats = await self._async_index.describe_index_stats()
actual_dimension = index_stats.get("dimension")
if actual_dimension and actual_dimension != self.dimension:
logger.warning(
f"Dimension of index {self.index_name} is {actual_dimension}, but {self.dimension} was specified. "
"The specified dimension will be ignored."
"If you need an index with a different dimension, please create a new one."
)
self.dimension = actual_dimension or self.dimension
self._dummy_vector = [-10.0] * self.dimension

@staticmethod
def _convert_dict_spec_to_pinecone_object(spec: Dict[str, Any]):
Expand Down Expand Up @@ -532,3 +535,29 @@ def _prepare_documents_for_writing(
)

return self._convert_documents_to_pinecone_format(documents)

async def close_async(self):
"""
Call this once you're completely done with all async operations.
It closes the PineconeAsyncio client (and all underlying aiohttp sessions).
"""
errors = []

if self._async_index:
try:
await self._async_index.close()
except Exception as e:
errors.append(f"Error closing async index: {e}")
finally:
self._async_index = None

if self._async_client:
try:
await self._async_client.close()
except Exception as e:
errors.append(f"Error closing async client: {e}")
finally:
self._async_client = None

if errors:
logger.warning(f"Errors during async cleanup: {', '.join(errors)}")