Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]: #17936

Open
1 task done
mirallm opened this issue Feb 26, 2025 · 5 comments
Open
1 task done

[Question]: #17936

mirallm opened this issue Feb 26, 2025 · 5 comments
Labels
question Further information is requested

Comments

@mirallm
Copy link

mirallm commented Feb 26, 2025

Question Validation

  • I have searched both the documentation and discord for an answer.

Question

I have this error when running the code on databricks: Client.init() got an unexpected keyword argument 'proxies'
An error occurred while embedding the file: Client.init() got an unexpected keyword argument 'proxies'

Also, I would like to know how to manage RateLimit Error since I am paralelizing the creation of embeddings.

can you help me @dosubot

This is my code
`
async def create_index_nodes(self, nodes, embed_model):
"""
Asynchronously creates an index in Qdrant based on the provided nodes,
or loads the existing index and inserts nodes if the index already exists.

                    :param nodes: List of TextNodes to index.
                    :param embed_model: The model used for embedding the documents.
                    """
                    from llama_index.core.data_structs.data_structs import IndexDict
                    try:
                  
                        if self.vector_index is None:
                            
                            
                                # Create a new index if not already created
                                self.vector_index = VectorStoreIndex(
                                    nodes=nodes,
                                    use_async=False,
                                
                                    embed_model=embed_model,
                                    storage_context=self.storage_context, 
                                    show_progress=True
                                )
                                print(self.vector_index.index_struct)
                                print("Created new index.")
                        else:
                                # If index exists, load it and insert the nodes
                                print("Loading existing index and inserting nodes.")
                                await self.load_index(embed_model)
                            # index_struct = IndexDict()
                                index_struct=self.vector_index._build_index_from_nodes(nodes)
                              #  self.vector_index.insert_nodes(nodes)
                               # print(index_struct)
                                await self.vector_index._async_add_nodes_to_index(index_struct=index_struct, nodes=nodes, show_progress=True, min_seconds=12, max_seconds=120) #
                    except Exception as e:
                       print(e)
                       print(f"An error occurred while embedding the file: {e}")`
@mirallm mirallm added the question Further information is requested label Feb 26, 2025
Copy link

dosubot bot commented Feb 26, 2025

The error you're encountering, Client.__init__() got an unexpected keyword argument 'proxies', suggests that the Client class you're using does not accept a proxies argument. You should check the documentation or source code of the Client class to ensure you're using the correct parameters.

Regarding managing rate limiting when parallelizing tasks, you can use an asyncio.Semaphore to control the number of concurrent tasks. This helps prevent overwhelming the service you're interacting with. Here's a basic example of how you might implement this:

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

async def create_index_nodes(self, nodes, embed_model):
    semaphore = asyncio.Semaphore(10)  # Limit to 10 concurrent tasks

    @retry(reraise=True, stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def add_nodes_with_retry(index_struct, nodes):
        async with semaphore:
            await self.vector_index._async_add_nodes_to_index(index_struct=index_struct, nodes=nodes, show_progress=True)

    try:
        if self.vector_index is None:
            self.vector_index = VectorStoreIndex(
                nodes=nodes,
                use_async=False,
                embed_model=embed_model,
                storage_context=self.storage_context,
                show_progress=True
            )
            print(self.vector_index.index_struct)
            print("Created new index.")
        else:
            print("Loading existing index and inserting nodes.")
            await self.load_index(embed_model)
            index_struct = self.vector_index._build_index_from_nodes(nodes)
            await add_nodes_with_retry(index_struct, nodes)
    except Exception as e:
        print(e)
        print(f"An error occurred while embedding the file: {e}")

This code uses a semaphore to limit the number of concurrent tasks and the tenacity library to implement retry logic with exponential backoff, which can help manage rate limiting issues [1].

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other

@mirallm
Copy link
Author

mirallm commented Feb 26, 2025

@dosubot is my code correct to load existing index and add nodes in async way. I wasnt sure about the index_struct. Plus, how to add parameters for rate limit and retry

Copy link

dosubot bot commented Feb 26, 2025

To load an existing index and add nodes asynchronously, your approach seems mostly correct. However, ensure that the index_struct is properly initialized and used. Here's a refined version of your code with comments on the index_struct and how to handle rate limiting and retry logic:

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

async def create_index_nodes(self, nodes, embed_model):
    semaphore = asyncio.Semaphore(10)  # Limit to 10 concurrent tasks

    @retry(reraise=True, stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def add_nodes_with_retry(index_struct, nodes):
        async with semaphore:
            await self.vector_index._async_add_nodes_to_index(index_struct=index_struct, nodes=nodes, show_progress=True)

    try:
        if self.vector_index is None:
            # Create a new index if not already created
            self.vector_index = VectorStoreIndex(
                nodes=nodes,
                use_async=False,
                embed_model=embed_model,
                storage_context=self.storage_context,
                show_progress=True
            )
            print(self.vector_index.index_struct)
            print("Created new index.")
        else:
            # Load existing index and insert nodes
            print("Loading existing index and inserting nodes.")
            await self.load_index(embed_model)
            # Ensure index_struct is correctly initialized
            index_struct = self.vector_index._build_index_from_nodes(nodes)
            await add_nodes_with_retry(index_struct, nodes)
    except Exception as e:
        print(e)
        print(f"An error occurred while embedding the file: {e}")

Key Points:

  • Index Struct: Ensure index_struct is initialized using _build_index_from_nodes which returns an IndexDict object. This is crucial for adding nodes correctly [1].
  • Rate Limiting and Retry: Use asyncio.Semaphore to limit concurrent tasks and tenacity for retry logic with exponential backoff. Adjust the semaphore limit and retry parameters based on your specific rate limits and requirements.

This setup should help manage rate limits and ensure nodes are added to the index correctly.

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other

@mirallm
Copy link
Author

mirallm commented Feb 26, 2025

@dosubot but when im printing index_struct it is actually empty so i dont understand the use of it. Also i dont want to use Semaphore anymore, I will use my previous code , the one I sent in the beginning. I would like to know to manage rate limit (min_seconds, max_seconds) with VectorStoreIndex

Copy link

dosubot bot commented Feb 26, 2025

The index_struct is expected to be an instance of the class specified by self.index_struct_cls() and is used to organize and store the nodes being indexed. If it's printing as empty, ensure that nodes are being correctly added to it using methods like add_node within the _build_index_from_nodes method [1].

Regarding rate limiting with min_seconds and max_seconds, these parameters are used to define the minimum and maximum wait time between retries in seconds. They are part of the retry strategy configuration using the tenacity library, which can apply a retry mechanism to functions to automatically retry on failure. The min_seconds defaults to 4, and max_seconds defaults to 60 [2][3][4].

To manage rate limiting without using Semaphore, you can directly apply these parameters in your retry logic using the tenacity library, as shown in the previous examples. Adjust the min_seconds and max_seconds values according to your specific rate limit requirements.

To continue talking to Dosu, mention @dosu.


Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant