Skip to content

Commit

Permalink
feat(document-search): batch ingestion of documents (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
ludwiktrammer authored Nov 18, 2024
1 parent 95f7021 commit c453926
Show file tree
Hide file tree
Showing 15 changed files with 498 additions and 40 deletions.
7 changes: 7 additions & 0 deletions docs/api_reference/document_search/execution_strategies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Execution Strategies

::: ragbits.document_search.ingestion.processor_strategies.ProcessingExecutionStrategy

::: ragbits.document_search.ingestion.processor_strategies.SequentialProcessing

::: ragbits.document_search.ingestion.processor_strategies.BatchedAsyncProcessing
73 changes: 73 additions & 0 deletions docs/how-to/document_search/async_processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# How to Ingest Documents Asynchronously

In Ragbits, a component called "processing execution strategy" controls how document processing is executed during ingestion. There are multiple execution strategies available in Ragbits that can be easily interchanged. You can also [create new custom execution strategies](create_custom_execution_strategy.md) to meet your specific needs.

!!! note
It's important to note that processing execution strategies are a separate concept from processors. While the former manage how the processing is executed, the latter deals with the actual processing of documents. Processors are managed by [DocumentProcessorRouter][ragbits.document_search.ingestion.document_processor.DocumentProcessorRouter].

# The Synchronous Execution Strategy

The default execution strategy in Ragbits is [`SequentialProcessing`][ragbits.document_search.ingestion.processor_strategies.SequentialProcessing]. This strategy processes documents one by one, waiting for each document to be processed before moving on to the next. Although it's the simplest and most straightforward strategy, it may be slow when processing a large number of documents.

Unless you specify a different strategy, Ragbits will use the `SequentialProcessing` strategy by default when ingesting documents:

```python
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta

documents = [
DocumentMeta.create_text_document_from_literal("Example document 1"),
DocumentMeta.create_text_document_from_literal("Example document 2"),
]

embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()

document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
)

await document_search.ingest(documents)
```

# The Asynchronous Execution Strategy

If you need to process documents simultaneously, you can use the [`BatchedAsyncProcessing`][ragbits.document_search.ingestion.processor_strategies.BatchedAsyncProcessing] execution strategy. This strategy uses Python's built-in `asyncio` library to process documents in parallel, making it faster than the `SequentialProcessing` strategy, especially with large document volumes.

To use the `BatchedAsyncProcessing` strategy, specify it when creating the [`DocumentSearch`][ragbits.document_search.DocumentSearch] instance:

```python
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta
from ragbits.document_search.ingestion.processor_strategies import BatchedAsyncProcessing

documents = [
DocumentMeta.create_text_document_from_literal("Example document 1"),
DocumentMeta.create_text_document_from_literal("Example document 2"),
]

embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()
processing_strategy = BatchedAsyncProcessing()

document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
processing_strategy=processing_strategy
)
```

Also, you can adjust the batch size for the `BatchedAsyncProcessing` strategy. The batch size controls how many documents are processed at once. By default, the batch size is 10, but you can modify it by passing the `batch_size` parameter to the `BatchedAsyncProcessing` constructor:

```python
processing_strategy = BatchedAsyncProcessing(batch_size=64)
```
91 changes: 91 additions & 0 deletions docs/how-to/document_search/create_custom_execution_strategy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# How to Create a Custom Execution Strategy

!!! note
To learn how to use a built-in asynchronous execution strategy, see [How to Ingest Documents Asynchronously](async_processing.md).

In Ragbits, document processing during ingestion is controlled by a component known as "processing execution strategy". It doesn't deal with the actual processing of documents, but rather, it orchestrates how the processing is executed.

Ragbits provides several built-in execution strategies that can be easily interchanged. You can also create your own custom execution strategy to fulfill your specific needs. This guide will show you how to develop a custom execution strategy using a somewhat impractical example of a strategy that processes documents one by one, but with a delay between each document.

## Implementing a Custom Execution Strategy
To create a custom execution strategy, you need to create a new class that inherits from [`ProcessingExecutionStrategy`][ragbits.document_search.ingestion.processor_strategies.ProcessingExecutionStrategy] and implement the abstract method `execute`. This method should take a list of documents and process them asynchronously. It should also implement the abstract method `process_documents`.

While implementing the `process_documents` method, you can use the built-in `process_document` method, which has the same signature and performs the actual processing of a single document.

```python
import asyncio

from ragbits.document_search.ingestion.processor_strategies import ProcessingExecutionStrategy

class DelayedExecutionStrategy(ProcessingExecutionStrategy):
async def process_documents(
self,
documents: Sequence[DocumentMeta | Document | Source],
processor_router: DocumentProcessorRouter,
processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
elements = []
for document in documents:
await asyncio.sleep(1)
element = await self.process_document(document, processor_router, processor_overwrite)
elements.append(element)
return elements
```

## Implementing an Advanced Custom Execution Strategy
Alternatively, instead of using the `process_document` method, you can process documents directly using the `processor_router` and `processor_overwrite` parameters. This gives you more control over the processing of documents.

```python
import asyncio

from ragbits.document_search.ingestion.processor_strategies import ProcessingExecutionStrategy

class DelayedExecutionStrategy(ProcessingExecutionStrategy):
async def process_documents(
self,
documents: Sequence[DocumentMeta | Document | Source],
processor_router: DocumentProcessorRouter,
processor_overwrite: BaseProvider | None = None,
) -> list[Element]:
elements = []
for document in documents:
# Convert the document to DocumentMeta
document_meta = await self.to_document_meta(document)

# Get the processor for the document
processor = processor_overwrite or processor_router.get_processor(document)

await asyncio.sleep(1)

element = await processor.process(document_meta)
elements.append(element)
return elements
```

## Using the Custom Execution Strategy
To use your custom execution strategy, you need to specify it when creating the [`DocumentSearch`][ragbits.document_search.DocumentSearch] instance:

```python
from ragbits.core.embeddings.litellm import LiteLLMEmbeddings
from ragbits.core.vector_stores.in_memory import InMemoryVectorStore
from ragbits.document_search import DocumentSearch
from ragbits.document_search.documents.document import DocumentMeta


documents = [
DocumentMeta.create_text_document_from_literal("Example document 1"),
DocumentMeta.create_text_document_from_literal("Example document 2"),
]

embedder = LiteLLMEmbeddings(
model="text-embedding-3-small",
)
vector_store = InMemoryVectorStore()
processing_strategy = DelayedExecutionStrategy()

document_search = DocumentSearch(
embedder=embedder,
vector_store=vector_store,
processing_strategy=processing_strategy
)
```
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Promptfoo Integration
# How to integrate Promptfoo with Ragbits

Ragbits' `Prompt` abstraction can be seamlessly integrated with the `promptfoo` tool. After installing `promptfoo` as
specified in the [promptfoo documentation](https://www.promptfoo.dev/docs/installation/), you can generate promptfoo
Expand Down
2 changes: 1 addition & 1 deletion docs/how-to/use_guardrails.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# How-To: Use Guardrails
# How to use Guardrails

Ragbits offers an expandable guardrails system. You can use one of the available guardrails or create your own to prevent toxic language, PII leaks etc.

Expand Down
12 changes: 9 additions & 3 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ copyright: Copyright © 2024 deepsense.ai
nav:
- rabgbits: index.md
- How-to Guides:
- integrations/promptfoo.md
- how-to/optimize.md
- how-to/use_guardrails.md
- how-to/integrations/promptfoo.md
- Document Search:
- how-to/document_search/async_processing.md
- how-to/document_search/create_custom_execution_strategy.md
- API Reference:
- Core:
- api_reference/core/prompt.md
Expand All @@ -17,8 +22,9 @@ nav:
- Document Search:
- api_reference/document_search/index.md
- api_reference/document_search/documents.md
- api_reference/document_search/ingestion.md

- Ingestion:
- api_reference/document_search/processing.md
- api_reference/document_search/execution_strategies.md
theme:
name: material
icon:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
from ragbits.document_search.documents.element import Element, ImageElement
from ragbits.document_search.documents.sources import Source
from ragbits.document_search.ingestion.document_processor import DocumentProcessorRouter
from ragbits.document_search.ingestion.processor_strategies import (
ProcessingExecutionStrategy,
SequentialProcessing,
get_processing_strategy,
)
from ragbits.document_search.ingestion.providers.base import BaseProvider
from ragbits.document_search.retrieval.rephrasers import get_rephraser
from ragbits.document_search.retrieval.rephrasers.base import QueryRephraser
Expand Down Expand Up @@ -48,6 +53,8 @@ class DocumentSearch:
vector_store: VectorStore
query_rephraser: QueryRephraser
reranker: Reranker
document_processor_router: DocumentProcessorRouter
processing_strategy: ProcessingExecutionStrategy

def __init__(
self,
Expand All @@ -56,12 +63,14 @@ def __init__(
query_rephraser: QueryRephraser | None = None,
reranker: Reranker | None = None,
document_processor_router: DocumentProcessorRouter | None = None,
processing_strategy: ProcessingExecutionStrategy | None = None,
) -> None:
self.embedder = embedder
self.vector_store = vector_store
self.query_rephraser = query_rephraser or NoopQueryRephraser()
self.reranker = reranker or NoopReranker()
self.document_processor_router = document_processor_router or DocumentProcessorRouter.from_config()
self.processing_strategy = processing_strategy or SequentialProcessing()

@classmethod
def from_config(cls, config: dict) -> "DocumentSearch":
Expand All @@ -78,12 +87,13 @@ def from_config(cls, config: dict) -> "DocumentSearch":
query_rephraser = get_rephraser(config.get("rephraser"))
reranker = get_reranker(config.get("reranker"))
vector_store = get_vector_store(config["vector_store"])
processing_strategy = get_processing_strategy(config.get("processing_strategy"))

providers_config_dict: dict = config.get("providers", {})
providers_config = DocumentProcessorRouter.from_dict_to_providers_config(providers_config_dict)
document_processor_router = DocumentProcessorRouter.from_config(providers_config)

return cls(embedder, vector_store, query_rephraser, reranker, document_processor_router)
return cls(embedder, vector_store, query_rephraser, reranker, document_processor_router, processing_strategy)

@traceable
async def search(self, query: str, config: SearchConfig | None = None) -> Sequence[Element]:
Expand Down Expand Up @@ -114,35 +124,6 @@ async def search(self, query: str, config: SearchConfig | None = None) -> Sequen
options=RerankerOptions(**config.reranker_kwargs),
)

async def _process_document(
self,
document: DocumentMeta | Document | Source,
document_processor: BaseProvider | None = None,
) -> list[Element]:
"""
Process a document and return the elements.
Args:
document: The document to process.
document_processor: The document processor to use. If not provided, the document processor will be
determined based on the document metadata.
Returns:
The elements.
"""
if isinstance(document, Source):
document_meta = await DocumentMeta.from_source(document)
elif isinstance(document, DocumentMeta):
document_meta = document
else:
document_meta = document.metadata

if document_processor is None:
document_processor = self.document_processor_router.get_provider(document_meta)

document_processor = self.document_processor_router.get_provider(document_meta)
return await document_processor.process(document_meta)

@traceable
async def ingest(
self,
Expand All @@ -157,10 +138,9 @@ async def ingest(
document_processor: The document processor to use. If not provided, the document processor will be
determined based on the document metadata.
"""
elements = []
# TODO: Parallelize
for document in documents:
elements.extend(await self._process_document(document, document_processor))
elements = await self.processing_strategy.process_documents(
documents, self.document_processor_router, document_processor
)
await self.insert_elements(elements)

async def insert_elements(self, elements: list[Element]) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import sys

from ragbits.core.utils.config_handling import get_cls_from_config

from .base import ProcessingExecutionStrategy
from .batched import BatchedAsyncProcessing
from .sequential import SequentialProcessing

__all__ = ["BatchedAsyncProcessing", "ProcessingExecutionStrategy", "SequentialProcessing"]


def get_processing_strategy(config: dict | None = None) -> ProcessingExecutionStrategy:
"""
Initializes and returns a ProcessingExecutionStrategy object based on the provided configuration.
Args:
config: A dictionary containing configuration details for the ProcessingExecutionStrategy.
Returns:
An instance of the specified ProcessingExecutionStrategy class, initialized with the provided config
(if any) or default arguments.
Raises:
KeyError: If the provided configuration does not contain a valid "type" key.
InvalidConfigurationError: If the provided configuration is invalid.
NotImplementedError: If the specified ProcessingExecutionStrategy class cannot be created from
the provided configuration.
"""
if config is None:
return SequentialProcessing()

strategy_cls = get_cls_from_config(config["type"], sys.modules[__name__])
return strategy_cls.from_config(config.get("config", {}))
Loading

0 comments on commit c453926

Please sign in to comment.