Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] RecordManager Does Not Properly Cleanup Vector Store's OLD VALUES #3570

Open
dkindlund opened this issue Nov 25, 2024 · 22 comments
Open
Labels
bug Something isn't working

Comments

@dkindlund
Copy link
Contributor

Describe the bug
In Flowise, I've created a new Document Store where an Airtable Document Loader feeds data into a Postgres Vector Store. We're also using Postgres to store the Record Manager records. Whenever I update an existing record in Airtable and then proceed to perform the Upsert operation, I see the new vector store record created but the OLD vector store record still EXISTS. The OLD vector store record does not get properly deleted!

To Reproduce
Steps to reproduce the behavior:

  1. Create a new Document Store
  2. Specify an Airtable Document Loader
  3. Specify a Postgres Vector Store and Record Manager (with Cleanup = FULL)
  4. Perform initial Upsert
  5. Change a single existing Airtable record
  6. Perform second Upsert
  7. Verify that BOTH the NEW and the OLD vector store records STILL exist in the vector store

Expected behavior
I would expect that Upserting would purge the OLD vector store record.

Screenshots
If applicable, add screenshots to help explain your problem.

Flow
If applicable, add exported flow in order to help replicating the problem.

Setup

  • Installation: docker
  • Flowise Version: 2.1.2
  • OS: Google Cloud Run Linux
  • Browser: chrome

Additional context
So my vector store table is named mechanisms_vec_v1 and my record manager table is named mechanisms_rm_v1. I THINK the way the deletion logic is supposed to work is a match on where mechanisms_vec_v1.id = mechanisms_rm_v1.key. HOWEVER, in my case, there is NO OVERLAP IN VALUES. For some reason, when a vector store document is inserted, that record's mechanisms_vec_v1.id IS NEVER THE SAME VALUE as it's corresponding mechanisms_rm_v1.key value.

Because of this, I think there's some sort of BUG between when the Airtable data gets loaded to when it gets added to the vector store -- specifically, I think the mechanisms_vec_v1.id computed value might NOT be getting set the same way as the mechanisms_rm_v1.key value.

It's exceptionally hard to figure out exactly where the bug is located, because of all the layers of indirection and use of langchain built-in libraries, but I hope these clues help, @HenryHengZJ . Thanks in advance!

@dkindlund
Copy link
Contributor Author

Here's what my RecordManager looks like:

rm

@HenryHengZJ HenryHengZJ added the bug Something isn't working label Nov 25, 2024
@HenryHengZJ
Copy link
Contributor

can you help me confirm 2 things:

  • does it happen only for airtable?
  • does it happen only for FULL, what bout incremental?

@dkindlund
Copy link
Contributor Author

Hey @HenryHengZJ , let me upgrade to the latest version to confirm if this bug still exists. Will get you another update with your answers after that. Thanks!

@BenMushen
Copy link

Hey @dkindlund , having the exact same issue. Did you manage to get sorted or figure a way around it?

Have also updated to the latest update and still having issues.

@iristhiawl95
Copy link

Hi @dkindlund and @BenMushen,
I'm also having the same issue using Postgres Vector Store, Postgres Record Manager and File Loader.
I tested using Pinecone Vector Store and Postgres Record Manager, it works fine.

Did you manage to figure a way around using Postgres vector store?

@dkindlund
Copy link
Contributor Author

can you help me confirm 2 things:

Hey @HenryHengZJ apologies for the delay on this.

  • does it happen only for airtable?

I don't think that's the case -- as others have commented this problem exists with other data sources (non-Airtable)

  • does it happen only for FULL, what bout incremental?

It doesn't matter if it's FULL or INCREMENTAL -- the same problem exists from what I see. The stale vec records still exist.

I think it's because there's no match between the vec.id and rm.key values -- but I'm not an expert here.

@dkindlund
Copy link
Contributor Author

Looking at the Postgres table schemas more closely:

flowise-data=> \d mechanisms_vec_v1
                 Table "public.mechanisms_vec_v1"
   Column    |  Type  | Collation | Nullable |      Default
-------------+--------+-----------+----------+--------------------
 id          | uuid   |           | not null | uuid_generate_v4()
 pageContent | text   |           |          |
 metadata    | jsonb  |           |          |
 embedding   | vector |           |          |
Indexes:
    "mechanisms_vec_v1_pkey" PRIMARY KEY, btree (id)

flowise-data=> \d mechanisms_rm_v1
                     Table "public.mechanisms_rm_v1"
   Column   |       Type       | Collation | Nullable |      Default
------------+------------------+-----------+----------+-------------------
 uuid       | uuid             |           | not null | gen_random_uuid()
 key        | text             |           | not null |
 namespace  | text             |           | not null |
 updated_at | double precision |           | not null |
 group_id   | text             |           |          |
Indexes:
    "mechanisms_rm_v1_pkey" PRIMARY KEY, btree (uuid)
    "mechanisms_rm_v1_key_namespace_key" UNIQUE CONSTRAINT, btree (key, namespace)

I'm thinking there has to be some sort of linkage between the vec.id field and the rm.key field, but the data types aren't exactly the same....

@dkindlund
Copy link
Contributor Author

Okay, after reviewing the code more deeply, I think I have some theories as to what is happening here...

The code is split between the indexing subsystem and the PostgreSQL RecordManager implementation:

await recordManager.update(

async update(keys: string[], updateOptions?: UpdateOptions): Promise<void> {

Here's my raw notes so far:

When you see that a document’s uid (used as the record manager’s key) doesn’t match the vector store’s id, it indicates that somewhere in the insertion workflow the intended identifier isn’t “flowing through” as expected. Here are some theories and scenarios that might explain this discrepancy:

  1. Vector Store Ignores Supplied IDs
  • What Happens:
    The indexing code calls vectorStore.addDocuments(docsToIndex, { ids: uids }) expecting that the provided uids (generated from document hashing) will be used as the document’s id.

  • Possible Issue:
    Some vector store implementations—especially those built on PostgreSQL—have the id column defined with a default like uuid_generate_v4(). This means that even if you pass in custom ids, the database might ignore them and generate a new UUID automatically.

  • Consequence:
    The record manager’s key (set to the external uid) will differ from the vector store’s auto-generated id.

  1. Schema Definition and Default Values
  • What Happens:
    The vector store table is often defined with something like: id UUID NOT NULL DEFAULT uuid_generate_v4()

  • Possible Issue:
    If the insertion process doesn’t override this default (or if the vector store’s insertion logic doesn’t support custom ids properly), the database will always use uuid_generate_v4() to populate the id field.

  • Consequence:
    Even if your code passes in a uid, the stored id ends up being a newly generated UUID, leading to a mismatch with the record manager’s key.

  1. Bug or Misimplementation in the Vector Store Insertion Method
  • What Happens:
    The indexing code expects that the ids parameter provided to vectorStore.addDocuments will be used for the inserted document records.

  • Possible Issue:
    A bug in the implementation or a misinterpretation of the API could cause the supplied ids to be ignored. The vector store might be defaulting to its own id generation mechanism instead of using the provided uid.

  • Consequence:
    The record manager gets the externally computed uid (which becomes the rm.key), but the vector store’s document record gets an auto-generated id.

  1. Post-Insertion Transformations or Race Conditions
  • What Happens:
    In some systems, additional processing steps might be applied to the document after insertion.

  • Possible Issue:
    Although less common, if there is a transformation or a re-hashing step applied to the document before it is fully stored, the final identifier might be altered. Similarly, if multiple processes are writing concurrently, a race condition might cause an update to overwrite the intended id.

  • Consequence:
    This could lead to an inconsistency between the vector store id and the record manager key, although this is typically less likely than the issues with defaults and API behavior.

Summary

The most likely theories are:

  • Vector Store Implementation: It’s auto-generating ids (via a default function) and ignoring your provided uids.
  • Schema Defaults: The vector store table’s schema forces the id to be generated automatically, leading to a mismatch.
  • API Bug/Misconfiguration: The parameter for custom ids isn’t being honored in the vector store insertion call.

In any of these cases, the intended one-to-one mapping (i.e. vec.id should equal rm.key) breaks down, which in turn disrupts cleanup logic that relies on matching these identifiers. The remedy usually involves either configuring the vector store to accept externally supplied ids or adjusting the workflow to update the record manager after retrieving the actual inserted id from the vector store.

@dkindlund
Copy link
Contributor Author

dkindlund commented Feb 21, 2025

After further analysis, I think there's some sort of subtle bug specific to the vectorStore.addDocuments() function:

await vectorStore.addDocuments(docsToIndex, { ids: uids })

The vec.id field is intended to be set by passing the computed document uid (from _HashedDocument.fromDocument) as the ids parameter in the call to vectorStore.addDocuments(). The underlying vector store driver should then insert these ids into the id column of the vec table. However, if the driver fails to override the table’s default auto-generation of the id, PostgreSQL will generate a new id instead—causing the mismatch between vec.id and rm.key.

Still investigating...

@dkindlund
Copy link
Contributor Author

Okay, so because we're using the typeorm driver, the underlying code that powers the vectorStore.addDocuments call can be found here:

https://github.com/langchain-ai/langchainjs/blob/d303e909d6b73da0bda4c0b7065ff974a980e5b0/libs/langchain-community/src/vectorstores/typeorm.ts#L114-L149

In this implementation, the vector store’s table (the vec table) is defined so that its id column is automatically generated. Here’s how it works step-by-step:

  1. Entity Schema Definition:When the TypeORMVectorStore is constructed, it creates an entity schema for documents.

Notice the column definition for id:

const TypeORMDocumentEntity = new EntitySchema<TypeORMVectorStoreDocument>({
  name: fields.tableName ?? defaultDocumentTableName,
  columns: {
    id: {
      generated: "uuid",
      type: "uuid",
      primary: true,
    },
    pageContent: { type: String },
    metadata: { type: "jsonb" },
    embedding: { type: String },
  },
});

Here, the id field is marked with generated: "uuid", which tells TypeORM that this field should be auto-generated as a UUID. It’s not expecting any value to be provided for id.

  1. Database Table Creation:

In the ensureTableInDatabase method, the table is created with a SQL statement that includes:

CREATE TABLE IF NOT EXISTS ${this.tableName} (
  "id" uuid NOT NULL DEFAULT uuid_generate_v4() PRIMARY KEY,
  "pageContent" text,
  metadata jsonb,
  embedding vector
);

This means that if an insert does not explicitly provide a value for id, PostgreSQL will automatically generate one using uuid_generate_v4().

  1. Document Insertion Flow:

When documents are added, the following happens:

  • Embedding Calculation:
    The documents’ page content is embedded, and the resulting vectors are prepared.

  • Mapping to Rows:
    In the addVectors method, each document is converted to a row object:

const documentRow = {
  pageContent: documents[idx].pageContent,
  embedding: embeddingString,
  metadata: documents[idx].metadata,
};

WARNING: Notice that no id is provided in this object.

  • Saving Rows:
    The code then calls:
await documentRepository.save(chunk);

THEREFORE: Because the row object doesn’t include an id, PostgreSQL uses the default value (uuid_generate_v4()) to generate a new UUID for the id field.

In summary:
The vec.id field is set by the database automatically. The insertion code does not provide an explicit id when calling documentRepository.save(chunk), so the default behavior in the table definition takes over—using uuid_generate_v4() to generate a new UUID. This means that even if the indexing process computes a UID for each document (to be used in the Record Manager), that UID is not passed along to the vector store insertion. Instead, the vector store ends up with auto-generated ids that do not necessarily match the externally computed UIDs.

This is the crux of the problem, I believe.

@dkindlund
Copy link
Contributor Author

Pinpointing the exact location of the code where this problem exists:

await vectorStore.addDocuments(docsToIndex, { ids: uids })

See that second parameter that is getting passed into vectorStore.addDocuments:

await vectorStore.addDocuments(docsToIndex, { ids: uids })
                                            ^^^^^^^^^^^^^

If we look at the typeorm implementation of addDocuments, we see this from:
https://github.com/langchain-ai/langchainjs/blob/d303e909d6b73da0bda4c0b7065ff974a980e5b0/libs/langchain-community/src/vectorstores/typeorm.ts#L121

async addDocuments(documents: Document[]): Promise<void> {

Notice how the typeorm implementation DOES NOT ACCEPT the second argument of { ids: uids }.

Therefore, the supplied ids by the Flowise code are getting SILENTLY DROPPED and IGNORED upon insert into the vector store.

JavaScript functions are inherently variadic, which means that if you pass more arguments than the function declares, the extra arguments are simply ignored rather than causing an error. In this case, the TypeORM implementation of addDocuments only declares a single parameter for the documents array, so when Flowise passes the second argument ({ ids: uids }), it’s silently dropped because the function doesn’t reference it. This behavior is standard in JavaScript and isn’t considered an error unless the function explicitly checks for unexpected parameters.

@HenryHengZJ , I'm not sure how to fix this issue but at least I think I've pinpointed the exact problem. Does this help?

@dkindlund
Copy link
Contributor Author

Further note -- this appears to be a bug specific to the typeorm implementation. Let me explain:

https://github.com/langchain-ai/langchainjs/blob/d303e909d6b73da0bda4c0b7065ff974a980e5b0/libs/langchain-community/src/vectorstores/pgvector.ts#L350

This PGVectorStore implementation does not suffer the same problem. Here’s why:

  1. Explicit ids Parameter Handling:

The addDocuments method in PGVectorStore is defined to accept an optional second argument:

async addDocuments(
  documents: Document[],
  options?: { ids?: string[] }
): Promise<void> { ... }

It then passes these ids down to addVectors.

  1. Rows Construction in addVectors:

In the addVectors method, if the ids option is provided, it appends the corresponding id (from the ids array) to each row:

if (ids) {
  values.push(ids[i]);
}

So each row will contain the document’s content, embedding, metadata, (optionally a collection id), and finally the provided id.

  1. Dynamic Query Construction in buildInsertQuery:
    The buildInsertQuery method checks whether the rows have an extra element (the id) and, if so, adds the id column to the list of columns being inserted:
if (rows.length !== 0 && columns.length === rows[0].length - 1) {
  columns.push(this.idColumnName);
}

This dynamically includes the id column (e.g. "id") in the INSERT query.

  1. Outcome:
    Because the provided ids are explicitly appended to the row values and the insert query is built to include the id column when they are present, the PGVectorStore implementation honors the supplied ids. This means that instead of relying on the database default (which would auto-generate a new uuid), the document’s id in the vector store (vec.id) is set to the value provided in the ids array.

Conclusion:
Unlike the TypeORM driver where the extra argument { ids: uids } is silently ignored, the PGVectorStore implementation is designed to accept and use custom ids. Therefore, if you supply ids via the second argument, those values will be inserted into the id field of the table, avoiding the mismatch problem.

@dkindlund
Copy link
Contributor Author

A decent workaround here might be to try to use the PGVector driver instead of the TypeORM driver -- however, I see that the PGVector driver was recently disabled for some reason... I'm not sure why...

@dkindlund
Copy link
Contributor Author

dkindlund commented Feb 21, 2025

Okay, from this commit, I think I see why it was disabled:
a2a475b#diff-4a1a4686f399cfa5448e01ad9b44f53e26ff79e5438d8748ae77771f25ac4c7eR2

This comment explains why in v2.2.5:

Temporary disabled due to increasing open connections without releasing them
Use TypeORM instead

So while the workaround to use PGVector might work -- it's unfortunately disabled in v2.2.5 because of some other presumably PGVector-specific bug...

@dkindlund
Copy link
Contributor Author

Hey @HenryHengZJ , I took a closer look at your PGVector implementation in order to try and identify the root cause behind the increasing open connections issue... here's what I've found.

Based on analyzing this version of the code:
https://github.com/FlowiseAI/Flowise/blob/flowise-ui%402.2.5/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts

The bug is in the custom override of the pool’s query method in the adaptInstance function. Here are the issues:

  1. No Proper Error Handling:
    The overridden function calls the base query function and then releases the connection:
const queryResult = await basePoolQueryFn(queryString, parameters)
instance.client.release()
instance.client = undefined
return queryResult

However, if the base query function throws an error, the release call is skipped. Without a try/finally block, the connection remains open, gradually increasing the number of open connections.

  1. Shared Client for Concurrent Queries:
    The code stores the connection in instance.client and reuses it:
if (!instance.client) {
    instance.client = await instance.pool.connect()
}

If multiple queries are fired concurrently, they might conflict over the same instance.client. This improper sharing can lead to connections not being released correctly or new connections piling up.

  1. Overriding pool.query:
    By replacing instance.pool.query with a custom function that manually manages connection acquisition and release, it bypasses the built-in connection management of the pg Pool. Any mistakes in this custom logic (as above) will result in leaked connections.

In summary, the lack of a try/finally block (or equivalent error handling) and the unsafe management of a shared client in the overridden pool.query function are the primary bugs leading to increasing open connections without proper release.

@dkindlund
Copy link
Contributor Author

Below is one recommended diff that wraps the query execution in a try/finally block so that the connection is always released, even when an error occurs. In the original code, the custom override does this:

// Run base function
const queryResult = await basePoolQueryFn(queryString, parameters)

// ensure connection is released
instance.client.release()
instance.client = undefined

return queryResult

Instead, we can change it to:

-    // Run base function
-    const queryResult = await basePoolQueryFn(queryString, parameters)
-
-    // ensure connection is released
-    instance.client.release()
-    instance.client = undefined
-
-    return queryResult
+    let queryResult;
+    try {
+        queryResult = await basePoolQueryFn(queryString, parameters);
+    } finally {
+        if (instance.client) {
+            instance.client.release();
+            instance.client = undefined;
+        }
+    }
+    return queryResult;

Explanation:

  • try/finally Block:
    By wrapping the call to basePoolQueryFn in a try block and placing the release logic in finally, we guarantee that the connection is released even if the query throws an error.

  • Connection Management:
    This prevents the connection from lingering in instance.client and helps avoid an ever‐increasing number of open connections.

Note:
An even cleaner approach might be to avoid storing a shared connection in instance.client altogether and always use pool.connect() within the scope of each query. That said, the diff above directly addresses the identified bug in the existing override.

This diff should help mitigate the problem of unreleased connections in the PGVector driver’s custom query override.

Hope this helps, @HenryHengZJ

@dkindlund
Copy link
Contributor Author

Hey @BenMushen and @iristhiawl95 , so hopefully this explains the root cause behind the issue. I'm not sure how @HenryHengZJ wants to resolve it. Essentially, if the TypeORM implementation is going to eventually be replaced with the PGVector driver, then I could see him just fixing the PGVector driver, which ultimately solves this issue.

Otherwise, the fix for the TypeORM issue looks complicated, as it might require committing some upstream fixes in the langchainjs library in order to resolve this issue.

So, to recap (TL;DR version):

  1. For now: While this bug exists, your best course of action in the meantime is:
  • Anytime you need to update your records, DROP the vec and rm tables MANUALLY
  • Then have Flowise rebuild the tables from scratch, everytime
  • This works fine for small record counts (< 10,000 records)
  • But it will absolutely stop working for record counts larger than 10,000
  1. Short term: Assuming @HenryHengZJ fixes the other PGVector bug first, the short term workaround would be to upgrade to a (future) version of Flowise that has the PGVector fix and then switch from TypeORM to PGVector drivers -- then this original, underlying bug will be resolved.

  2. Long term: If you absolutely need to stick with the TypeORM driver, then it might be a much longer time to fix that issue, as it likely requires upstream patches to langchainjs. Only @HenryHengZJ can provide an ETA as to how long that might take.

Hope that helps!

@dkindlund
Copy link
Contributor Author

Another possible temp option:

  1. Downgrade to Flowise v2.1.5 and switch to using the PGVector driver. I've confirmed that the original bug is resolved BUT you'll still have the increasing open connections issue that @HenryHengZJ already flagged.

@dkindlund
Copy link
Contributor Author

Update: I got a chance to use Option #4 and I can confirm that Record Manager operations work perfectly -- both FULL and INCREMENTAL mode now work as expected.

I'm working with about 30K records and I can say that FULL cleanup mode takes about 10x longer to run than INCREMENTAL (which is sort of expected).

That said, it feels like the whole indexing algorithm feels a bit slow, though. The only way the system can determine what records are duplicative, is by inserting ALL records again into the DB table and then perform some sort of differential SQL query to prune the duplicates. This ends up causing a very large IO overhead on the DB.

A faster way to accomplish this, might be to implement a bloom filter within the RecordManager codebase -- as records are inserted into the DB, store that record's hash in a bloom filter. Then, during the next update, cross-check the bloom filter before inserting new records in the DB. It might be higher CPU utilization but it should still be orders of magnitude less overhead than the subsequent DB calls.

@dkindlund
Copy link
Contributor Author

Hey @HenryHengZJ , after analyzing the indexing.ts logic more closely, here's a summary of possible experimental improvements to this code. To be honest, this does add quite a bit of complexity, so I totally understand if you'd rather keep the logic as simple as possible -- however, this hopefully provides some possible avenues of improvement if you decided to go in this direction.

Original Problem: The indexing system was potentially performing unnecessary database operations and duplicating work across multiple containers when dealing with large document sets.

Key Improvements:

  1. Bloom Filter Implementation

    • Original Issue: System was making expensive database calls to check document existence for every record
    • Solution: Added probabilistic data structure that can quickly determine if a document has been processed
    • Benefits:
      • Reduces database load by filtering out definite non-matches
      • Memory efficient (uses bit array instead of storing full records)
      • Configurable false positive rate (default 0.01)
      • Uses multiple hash functions (xxhash and MurmurHash3) for better distribution
  2. Redis-Based Distributed Storage

    • Original Issue: Multiple containers couldn't share processing state, leading to duplicate work
    • Solution: Added Redis-based storage with distributed locking
    • Benefits:
      • Allows multiple containers to share Bloom filter state
      • Prevents race conditions via Redlock implementation
      • Supports namespacing for different document stores
      • Automatic TTL management to prevent stale data
  3. Namespace-Specific Filters

    • Original Issue: Different document stores were potentially interfering with each other's processing
    • Solution: Implemented namespace-based separation of Bloom filters
    • Benefits:
      • Each vector store gets its own isolated filters
      • Prevents false positives across different document stores
      • Supports cleanup operations specific to each namespace
  4. Optimized Batch Processing

    • Original Issue: System was making too many individual Redis operations
    • Solution: Added batched updates to Redis with periodic synchronization
    • Benefits:
      • Reduces Redis I/O operations
      • Maintains reasonable memory usage
      • Provides balance between consistency and performance
  5. Efficient Bit Array Storage

    • Original Issue: Potential memory inefficiency when storing large sets of documents
    • Solution: Implemented compact bit array using Uint8Array
    • Benefits:
      • Minimal memory footprint
      • Efficient serialization/deserialization
      • Better performance for large document sets
  6. Robust Error Handling

    • Original Issue: System could leave locks in place if errors occurred
    • Solution: Added comprehensive error handling and cleanup
    • Benefits:
      • Proper release of distributed locks
      • Graceful fallback if Redis is unavailable
      • Consistent cleanup of temporary resources
  7. Configurable Options

    • Original Issue: One-size-fits-all approach didn't work for different use cases
    • Solution: Added configurable parameters for Bloom filter behavior
    • Benefits:
      • Adjustable false positive rates
      • Configurable TTL for cached filters
      • Optional usage of Bloom filters
      • Tunable batch sizes and lock timeouts
  8. Dynamic Bloom Filter Resizing

    • Original Issue: Fixed-size Bloom filters could become inefficient or unreliable when document counts exceed initial estimates
    • Solution: Implemented automatic resizing with reconstruction when filter reaches 75% capacity
    • Benefits:
      • Automatically adapts to growing document sets
      • Maintains consistent false positive rate
      • Prevents performance degradation from overloaded filters
    • Performance Impact:
      Baseline (No Bloom Filter):
      - Every doc requires DB check
      - 100K docs = 100K DB operations
      - Consistent but high DB load
      
      With Bloom Filter (Normal Operation):
      - 100K docs = 100K Bloom checks + (~1K DB checks from false positives)
      - ~99% reduction in DB operations
      
      During Filter Resize (occurs at 75% capacity):
      - One-time cost per resize
      - At 75K docs: ~1-3 second pause for reconstruction
      - Subsequent resizes at 150K, 300K, etc. (logarithmic growth)
      - Each resize requires one bulk DB fetch + filter reconstruction
      

Implementation Notes for Maintainers:

  1. Dependencies added:

    • ioredis for Redis operations
    • redlock for distributed locking
    • xxhash-wasm and murmurhash3js-revisited for efficient hashing
  2. Configuration requirements:

    • Redis URL must be provided when Bloom filters are enabled
    • Namespace is derived from vector store configuration
    • Memory usage scales with expected document count and false positive rate
  3. Backward compatibility:

    • All new features are opt-in via useBloomFilter option
    • Existing functionality remains unchanged when Bloom filters are disabled
    • No breaking changes to existing API
  4. Performance characteristics:

    • Redis operations are batched to reduce network overhead
    • Bloom filter parameters auto-adjust based on document count
    • Distributed locks are short-lived to prevent contention
    • Filter resize impact:
      • Infrequent: Occurs logarithmically (at 75K, 150K, 300K docs, etc.)
      • Duration: ~1-3 seconds per resize operation
      • Recovery: Immediate return to optimal performance after resize
      • Trade-off: Brief pause during resize vs. sustained high performance

And here's an approximate DIFF that illustrates these improvements to the indexing.ts file based on this location:

https://raw.githubusercontent.com/FlowiseAI/Flowise/c0a74782d8f1dbe118d2ed3aa40dd292d25d9119/packages/components/src/indexing.ts

NOTE: This code has NOT been fully tested and MAY contain errors.

// New imports
+ import Redis from 'ioredis'
+ import { default as Redlock } from 'redlock'
+ import XXH from 'xxhash-wasm'
+ import { MurmurHash3 } from 'murmurhash3js-revisited'

// Modified IndexOptions
export type IndexOptions = {
     // ... existing options ...
+    /**
+     * Enable Bloom filter optimization
+     */
+    useBloomFilter?: boolean
+    /**
+     * Redis URL for BloomFilter store
+     */
+    redisUrl?: string
+    /**
+     * Bloom filter false positive rate
+     */
+    bloomFilterFalsePositiveRate?: number
+    /**
+     * TTL for Bloom filters in Redis
+     */
+    bloomFilterTTL?: number
}

// New class definitions
+ /**
+  * Optimized Bloom Filter implementation using multiple hash functions
+  * for better distribution and false positive rate control
+  */
+ class BloomFilter {
+     private bitArray: Uint8Array
+     private hashFunctions: number
+     private size: number
+     private xxhash: any
+     private itemCount: number
+     private maxItems: number
+     private readonly MAX_FILL_RATIO = 0.75 // Threshold before needing resize
+ 
+     constructor(expectedItems: number, falsePositiveRate = 0.01) {
+         this.size = this.calculateOptimalSize(expectedItems, falsePositiveRate)
+         this.hashFunctions = this.calculateOptimalHashFunctions(expectedItems, this.size)
+         this.bitArray = new Uint8Array(Math.ceil(this.size / 8))
+         this.itemCount = 0
+         this.maxItems = expectedItems
+         // Initialize xxhash asynchronously - will be ready before first use
+         XXH().then(xxh => { this.xxhash = xxh })
+     }
+ 
+     private calculateOptimalSize(n: number, p: number): number {
+         return Math.ceil(-(n * Math.log(p)) / (Math.log(2) ** 2))
+     }
+ 
+     private calculateOptimalHashFunctions(n: number, m: number): number {
+         return Math.max(1, Math.round((m / n) * Math.log(2)))
+     }
+ 
+     private async getHashValues(item: string): Promise<number[]> {
+         const results: number[] = []
+         // Use xxhash for first hash
+         const hash1 = this.xxhash ? this.xxhash.h64(item) : MurmurHash3.x64.hash128(item)
+         // Use MurmurHash for second hash
+         const hash2 = MurmurHash3.x64.hash128(item)
+ 
+         // Double hashing technique for generating multiple hash values
+         for (let i = 0; i < this.hashFunctions; i++) {
+             const combinedHash = (hash1 + i * hash2) % this.size
+             results.push(Math.abs(combinedHash))
+         }
+         return results
+     }
+ 
+     private isFull(): boolean {
+         return this.itemCount >= this.maxItems * this.MAX_FILL_RATIO
+     }
+
+     private async resize(): Promise<void> {
+         const newSize = this.maxItems * 2
+         const newFilter = new BloomFilter(newSize, this.getFalsePositiveRate())
+        
+         // Copy existing bit array
+         const oldBitArray = this.bitArray
+         this.maxItems = newSize
+         this.size = this.calculateOptimalSize(newSize, this.getFalsePositiveRate())
+         this.hashFunctions = this.calculateOptimalHashFunctions(newSize, this.size)
+         this.bitArray = new Uint8Array(Math.ceil(this.size / 8))
+        
+         // Signal that filter needs reconstruction
+         return {
+             needsReconstruction: true,
+             newSize: newSize
+         }
+     }
+
+     private getFalsePositiveRate(): number {
+         // Calculate current false positive rate based on item count and size
+         return Math.pow(1 - Math.exp(-this.hashFunctions * this.itemCount / this.size), this.hashFunctions)
+     }
+
+     async add(item: string): Promise<void> {
+         if (this.isFull()) {
+             return await this.resize()
+         }
+
+         const hashes = await this.getHashValues(item)
+         for (const hash of hashes) {
+             const byteIndex = Math.floor(hash / 8)
+             const bitIndex = hash % 8
+             this.bitArray[byteIndex] |= (1 << bitIndex)
+         }
+         this.itemCount++
+         return { needsReconstruction: false }
+     }
+ 
+     async test(item: string): Promise<boolean> {
+         const hashes = await this.getHashValues(item)
+         for (const hash of hashes) {
+             const byteIndex = Math.floor(hash / 8)
+             const bitIndex = hash % 8
+             if (!(this.bitArray[byteIndex] & (1 << bitIndex))) {
+                 return false
+             }
+         }
+         return true
+     }
+ 
+     toJSON(): { 
+         bitArray: number[],
+         hashFunctions: number, 
+         size: number,
+         itemCount: number,
+         maxItems: number 
+     } {
+         return {
+             bitArray: Array.from(this.bitArray),
+             hashFunctions: this.hashFunctions,
+             size: this.size,
+             itemCount: this.itemCount,
+             maxItems: this.maxItems
+         }
+     }
+ 
+     static fromJSON(data: { 
+         bitArray: number[], 
+         hashFunctions: number, 
+         size: number,
+         itemCount: number,
+         maxItems: number 
+     }): BloomFilter {
+         const filter = new BloomFilter(1) // Temporary initialization
+         filter.size = data.size
+         filter.hashFunctions = data.hashFunctions
+         filter.bitArray = new Uint8Array(data.bitArray)
+         filter.itemCount = data.itemCount
+         filter.maxItems = data.maxItems
+         return filter
+     }
+ }
+ 
+ /**
+  * Redis-based store for BloomFilters with distributed locking
+  */
+ class RedisBloomFilterStore {
+     private redis: Redis
+     private redlock: Redlock
+     private ttlSeconds: number
+     private lockTimeout: number
+ 
+     constructor(redisUrl: string, options: { 
+         ttlSeconds?: number,
+         lockTimeout?: number 
+     } = {}) {
+         this.redis = new Redis(redisUrl)
+         this.redlock = new Redlock(
+             [this.redis],
+             {
+                 retryJitter: 200,
+                 retryCount: 5,
+                 driftFactor: 0.01,
+             }
+         )
+         this.ttlSeconds = options.ttlSeconds ?? 3600
+         this.lockTimeout = options.lockTimeout ?? 5000
+     }
+ 
+     private getKey(namespace: string, filterType: 'existing' | 'processed'): string {
+         return `bloom:${namespace}:${filterType}`
+     }
+ 
+     private getLockKey(namespace: string, filterType: 'existing' | 'processed'): string {
+         return `lock:${this.getKey(namespace, filterType)}`
+     }
+ 
+     async get(namespace: string, filterType: 'existing' | 'processed'): Promise<BloomFilter | null> {
+         try {
+             const data = await this.redis.get(this.getKey(namespace, filterType))
+             if (!data) return null
+             return BloomFilter.fromJSON(JSON.parse(data))
+         } catch (error) {
+             console.error('Error retrieving BloomFilter:', error)
+             return null
+         }
+     }
+ 
+     async set(
+         namespace: string,
+         filterType: 'existing' | 'processed',
+         filter: BloomFilter
+     ): Promise<void> {
+         const lockKey = this.getLockKey(namespace, filterType)
+         let lock
+ 
+         try {
+             lock = await this.redlock.acquire([lockKey], this.lockTimeout)
+             // Get current filter to check if we need to merge during resize
+             const currentFilter = await this.get(namespace, filterType)
+             if (currentFilter) {
+                 // If current filter exists and new filter is larger, merge them
+                 if (filter.maxItems > currentFilter.maxItems) {
+                     // Merge operation would happen here
+                     // We'd need to add all items from the current filter to the new one
+                     // This would require keeping track of actual items or reconstructing from source
+                 }
+             }
+             await this.redis
+                 .multi()
+                 .set(
+                     this.getKey(namespace, filterType),
+                     JSON.stringify(filter.toJSON()),
+                     'EX',
+                     this.ttlSeconds
+                 )
+                 .exec()
+         } finally {
+             if (lock) await lock.release().catch(console.error)
+         }
+     }
+ 
+     async delete(namespace: string, filterType: 'existing' | 'processed'): Promise<void> {
+         await this.redis.del(this.getKey(namespace, filterType))
+     }
+ }

// Modified index function
export async function index(args: IndexArgs): Promise<IndexingResult> {
     const { docsSource, recordManager, vectorStore, options } = args
-    const { batchSize = 100, cleanup, sourceIdKey, cleanupBatchSize = 1000, forceUpdate = false, vectorStoreName } = options ?? {}
+    const { 
+        batchSize = 100, 
+        cleanup, 
+        sourceIdKey, 
+        cleanupBatchSize = 1000, 
+        forceUpdate = false, 
+        vectorStoreName,
+        useBloomFilter = false,
+        redisUrl,
+        bloomFilterFalsePositiveRate = 0.01,
+        bloomFilterTTL = 3600
+    } = options ?? {}

+    // New validation
+    if (useBloomFilter && !redisUrl) {
+        throw new Error("redisUrl is required when useBloomFilter is enabled")
+    }

+    // Initialize BloomFilter store
+    let bloomFilterStore: RedisBloomFilterStore | null = null
+    if (useBloomFilter && redisUrl) {
+        bloomFilterStore = new RedisBloomFilterStore(redisUrl, {
+            ttlSeconds: bloomFilterTTL
+        })
+    }

     // ... existing setup code ...

+    // Initialize Bloom filters
+    let existingDocsFilter: BloomFilter | null = null
+    let processedDocsFilter: BloomFilter | null = null
+
+    if (useBloomFilter && bloomFilterStore) {
+        // Try to retrieve existing filters
+        existingDocsFilter = await bloomFilterStore.get(namespace, 'existing')
+        processedDocsFilter = await bloomFilterStore.get(namespace, 'processed')
+
+        // Create new filters if they don't exist
+        if (!existingDocsFilter || !processedDocsFilter) {
+            const existingRecordCount = (await recordManager.listKeys({})).length
+            const estimatedTotalSize = Math.max(existingRecordCount, docs.length) * 1.5
+
+            existingDocsFilter = new BloomFilter(
+                estimatedTotalSize,
+                bloomFilterFalsePositiveRate
+            )
+            processedDocsFilter = new BloomFilter(
+                estimatedTotalSize,
+                bloomFilterFalsePositiveRate
+            )
+
+            // Pre-populate existing docs filter
+            for await (const keyBatch of streamingBatch(
+                await recordManager.listKeys({}),
+                { 
+                    maxMemoryMB: 512, // Default memory limit
+                    maxBatchSize: 1000,
+                    minBatchSize: 100
+                }
+            )) {
+                for (const key of keyBatch) {
+                    await existingDocsFilter.add(key)
+                }
+            }
+
+            // Store the initialized filters
+            await bloomFilterStore.set(namespace, 'existing', existingDocsFilter)
+            await bloomFilterStore.set(namespace, 'processed', processedDocsFilter)
+        }
+    }

     for (const batch of batches) {
         const hashedDocs = _deduplicateInOrder(batch.map((doc) => _HashedDocument.fromDocument(doc)))

-        const batchExists = await recordManager.exists(hashedDocs.map((doc) => doc.uid))
+        // Modified existence check
+        let batchExists: boolean[]
+        if (useBloomFilter && existingDocsFilter) {
+            // Use Bloom filter for initial check
+            batchExists = await Promise.all(
+                hashedDocs.map(doc => existingDocsFilter.test(doc.uid))
+            )
+            
+            // Verify positive matches with database
+            const probableExists = hashedDocs.filter((_, i) => batchExists[i])
+            if (probableExists.length > 0) {
+                const confirmedExists = await recordManager.exists(
+                    probableExists.map(doc => doc.uid)
+                )
+                
+                // Update batchExists with confirmed results
+                let confirmedIndex = 0
+                batchExists = batchExists.map(exists => 
+                    exists ? confirmedExists[confirmedIndex++] : false
+                )
+            }
+        } else {
+            batchExists = await recordManager.exists(hashedDocs.map((doc) => doc.uid))
+        }

         // ... existing document processing ...

+        // Update Bloom filters
+        if (useBloomFilter && processedDocsFilter) {
+            for (const doc of hashedDocs) {
+                const addResult = await processedDocsFilter.add(doc.uid)
+                if (addResult.needsReconstruction) {
+                    // Create new filter with larger size
+                    const newFilter = new BloomFilter(
+                        addResult.newSize!,
+                        bloomFilterFalsePositiveRate
+                    )
+                    
+                    // We need to reconstruct the filter from source
+                    // This might require reading all processed documents again
+                    const processedDocs = await recordManager.listKeys({
+                        after: indexStartDt
+                    })
+                    
+                    for (const processedDoc of processedDocs) {
+                        await newFilter.add(processedDoc)
+                    }
+                    
+                    // Update the filter in Redis
+                    await bloomFilterStore!.set(namespace, 'processed', newFilter)
+                    processedDocsFilter = newFilter
+                }
+            }
+        }

         if (cleanup === 'incremental') {
             // ... existing code ...
-            const uidsToDelete = await recordManager.listKeys({
+            let uidsToDelete = await recordManager.listKeys({
                 before: indexStartDt,
                 groupIds: sourceIds
             })
+            // Add Bloom filter check for cleanup
+            if (useBloomFilter && processedDocsFilter) {
+                uidsToDelete = (await Promise.all(
+                    uidsToDelete.map(async uid => ({
+                        uid,
+                        processed: await processedDocsFilter.test(uid)
+                    }))
+                )).filter(item => !item.processed)
+                    .map(item => item.uid)
+            }
         }
     }

+    // Final update of processed filter
+    if (useBloomFilter && processedDocsFilter && bloomFilterStore) {
+        await bloomFilterStore.set(namespace, 'processed', processedDocsFilter)
+    }

     if (cleanup === 'full') {
+        let continuing = true
-        let uidsToDelete = await recordManager.listKeys({
-            before: indexStartDt,
-            limit: cleanupBatchSize
-        })
-        while (uidsToDelete.length > 0) {
-            await vectorStore.delete({ ids: uidsToDelete })
-            await recordManager.deleteKeys(uidsToDelete)
-            numDeleted += uidsToDelete.length
+        while (continuing) {
+            let uidsToDelete = await recordManager.listKeys({
+                before: indexStartDt,
+                limit: cleanupBatchSize
+            })
+
+            if (uidsToDelete.length === 0) {
+                continuing = false
+                continue
+            }
+
+            // Use Bloom filter to check which docs weren't processed
+            if (useBloomFilter && processedDocsFilter) {
+                uidsToDelete = (await Promise.all(
+                    uidsToDelete.map(async uid => ({
+                        uid,
+                        processed: await processedDocsFilter.test(uid)
+                    }))
+                )).filter(item => !item.processed)
+                    .map(item => item.uid)
+            }
+
+            if (uidsToDelete.length > 0) {
+                await vectorStore.delete({ ids: uidsToDelete })
+                await recordManager.deleteKeys(uidsToDelete)
+                numDeleted += uidsToDelete.length
+            }
+
             uidsToDelete = await recordManager.listKeys({
                 before: indexStartDt,
                 limit: cleanupBatchSize
             })
         }
     }

     // ... rest of the function remains the same ...
}

@dkindlund
Copy link
Contributor Author

And here's a comprehensive example of how this code could be called upstream:

const result = await index({
    docsSource,
    recordManager,
    vectorStore,
    options: {
        // Basic indexing options (original)
        cleanup: 'full',                    // or 'incremental'
        batchSize: 1000,                    // Default: 100
        sourceIdKey: 'source',              // Required for incremental cleanup
        cleanupBatchSize: 1000,             // Default: 1000
        forceUpdate: false,                 // Default: false
        vectorStoreName: 'my_store',        // Namespace for this store

        // Bloom filter options (new)
        useBloomFilter: true,               // Enable Bloom filter optimization
        redisUrl: 'redis://localhost:6379', // Required if useBloomFilter is true
        bloomFilterFalsePositiveRate: 0.01, // Default: 0.01 (1%)
        bloomFilterTTL: 3600,               // Default: 3600 (1 hour)

        // Memory management options (new)
        maxMemoryMB: 512,                   // Default: 512
        maxBatchSize: 1000,                 // Default: 1000
        minBatchSize: 100,                  // Default: 100

        // Redis lock options (new)
        lockTimeout: 5000,                  // Default: 5000 (5 seconds)
    }
});

// TypeScript type reference:
interface IndexOptions {
    // Original options
    batchSize?: number;
    cleanup?: 'full' | 'incremental';
    sourceIdKey?: string | ((doc: DocumentInterface) => string);
    cleanupBatchSize?: number;
    forceUpdate?: boolean;
    vectorStoreName?: string;

    // New Bloom filter options
    useBloomFilter?: boolean;
    redisUrl?: string;
    bloomFilterFalsePositiveRate?: number;
    bloomFilterTTL?: number;

    // New memory management options
    maxMemoryMB?: number;
    maxBatchSize?: number;
    minBatchSize?: number;

    // New Redis lock options
    lockTimeout?: number;
}

All of these options are optional except:

  1. redisUrl is required if useBloomFilter is true
  2. sourceIdKey is required if cleanup is 'incremental'

A minimal configuration with Bloom filter enabled would be:

const result = await index({
    docsSource,
    recordManager,
    vectorStore,
    options: {
        useBloomFilter: true,
        redisUrl: 'redis://localhost:6379',
        vectorStoreName: 'my_store'
    }
});

The implementation will use sensible defaults for all other options.

@dkindlund
Copy link
Contributor Author

Long term: If you absolutely need to stick with the TypeORM driver, then it might be a much longer time to fix that issue, as it likely requires upstream patches to langchainjs. Only @HenryHengZJ can provide an ETA as to how long that might take.

I've opened in the upstream langchainjs project to hopefully track the long-term fix for this:
langchain-ai/langchainjs#7739

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants