如何重新索引数据以保持向量存储与底层数据源同步
本指南假设您熟悉以下概念:
在这里,我们将使用 LangChain 索引 API 查看一个基本的索引工作流程。
索引 API 允许您从任何来源加载文档并将其与向量存储保持同步。具体来说,它可以:
- 避免向向量存储中写入重复内容
- 避免重写未更改的内容
- 避免对未更改的内容重新计算嵌入
所有这些都可以为您节省时间和金钱,并改善您的向量搜索结果。
重要的是,即使文档相对于原始源文档经过了多个转换步骤(例如,通过文本分块),索引 API 也能正常工作。
工作原理
LangChain 索引使用一个记录管理器 (RecordManager) 来跟踪向量存储中的文档写入。
在索引内容时,会为每个文档计算哈希值,并将以下信息存储在记录管理器中:
- 文档哈希(页面内容和元数据的哈希)
- 写入时间
- 源 ID - 每个文档应在元数据中包含一些信息,以便我们确定该文档的最终来源
删除模式
在将文档索引到向量存储时,向量存储中可能存在一些需要删除的现有文档。 在某些情况下,您可能想要删除与正在索引的新文档来自相同源的任何现有文档。 在其他情况下,您可能想要完全删除所有现有文档。索引 API 的删除模式让您选择所需的行为:
| 清理模式 | 去重内容 | 可并行化 | 清理已删除的源文档 | 清理源文档和/或派生文档的变更 | 清理时间 |
|---|---|---|---|---|---|
| None | ✅ | ✅ | ❌ | ❌ | - |
| Incremental | ✅ | ✅ | ❌ | ✅ | 持续进行 |
| Full | ✅ | ❌ | ✅ | ✅ | 索引结束时 |
None 不会执行任何自动清理,允许用户手动清理旧内容。
incremental 和 full 提供以下自动清理功能:
- 如果源文档或派生文档的内容发生了变化,
incremental或full模式都会清理(删除)以前的版本。 - 如果源文档已被删除(意味着它未包含在当前正在索引的文档中),
full清理模式会正确地从向量存储中删除它,但incremental模式不会。
当内容发生变更(例如,源 PDF 文件被修改)时,在索引期间可能会有一段时间新旧版本的内容都可能返回给用户。这发生在新内容写入之后,但旧版本删除之前。
incremental索引通过持续写入时进行清理,最大限度地缩短了这段时间。full模式在所有批次写入完成后进行清理。
要求
- 不要将其与通过其他方式预先填充内容的存储一起使用,因为记录管理器不会知道之前插入的记录。
- 仅适用于支持以下功能的 LangChain
vectorstore: a). 按 ID 添加文档(带 ids 参数的addDocuments方法) b). 按 ID 删除(带 ids 参数的 delete 方法)
兼容的向量存储:PGVector、Chroma、CloudflareVectorize、
ElasticVectorSearch、FAISS、MariaDB、MomentoVectorIndex、
Pinecone、SupabaseVectorStore、VercelPostgresVectorStore、
Weaviate、Xata
注意
记录管理器依赖基于时间的机制来确定哪些内容可以清理(当使用 full 或 incremental 清理模式时)。
如果两个任务连续运行,且第一个任务在时钟时间改变之前完成,则第二个任务可能无法清理内容。
在实际环境中,这不太可能成为问题,原因如下:
RecordManager使用更高精度的时间戳。- 数据需要在第一次和第二次任务运行之间发生变化,如果两次任务之间间隔很短,则这种情况不太可能发生。
- 索引任务通常耗时超过几毫秒。
快速开始
import { PostgresRecordManager } from "@langchain/community/indexes/postgres";
import { index } from "langchain/indexes";
import { PGVectorStore } from "@langchain/community/vectorstores/pgvector";
import { PoolConfig } from "pg";
import { OpenAIEmbeddings } from "@langchain/openai";
import { CharacterTextSplitter } from "@langchain/textsplitters";
import { BaseDocumentLoader } from "@langchain/core/document_loaders/base";
// First, follow set-up instructions at
// https://js.langchain.com/docs/modules/indexes/vector_stores/integrations/pgvector
const config = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "testlangchain",
columns: {
idColumnName: "id",
vectorColumnName: "vector",
contentColumnName: "content",
metadataColumnName: "metadata",
},
};
const vectorStore = await PGVectorStore.initialize(
new OpenAIEmbeddings(),
config
);
// Create a new record manager
const recordManagerConfig = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "upsertion_records",
};
const recordManager = new PostgresRecordManager(
"test_namespace",
recordManagerConfig
);
// Create the schema if it doesn't exist
await recordManager.createSchema();
// Index some documents
const doc1 = {
pageContent: "kitty",
metadata: { source: "kitty.txt" },
};
const doc2 = {
pageContent: "doggy",
metadata: { source: "doggy.txt" },
};
/**
* Hacky helper method to clear content. See the `full` mode section to to understand why it works.
*/
async function clear() {
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
});
}
// No cleanup
await clear();
// This mode does not do automatic clean up of old versions of content; however, it still takes care of content de-duplication.
console.log(
await index({
docsSource: [doc1, doc1, doc1, doc1, doc1, doc1],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
await clear();
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Second time around all content will be skipped
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/
// Updated content will be added, but old won't be deleted
const doc1Updated = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [doc1Updated, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 1,
}
*/
/*
Resulting records in the database:
[
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
{
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
}
]
*/
// Incremental mode
await clear();
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Indexing again should result in both documents getting skipped – also skipping the embedding operation!
console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/
// If we provide no documents with incremental indexing mode, nothing will change.
console.log(
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// If we mutate a document, the new version will be written and all old versions sharing the same source will be deleted.
// This only affects the documents with the same source id!
const changedDoc1 = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [changedDoc1],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 1,
numSkipped: 0,
}
*/
// Full mode
await clear();
// In full mode the user should pass the full universe of content that should be indexed into the indexing function.
// Any documents that are not passed into the indexing function and are present in the vectorStore will be deleted!
// This behavior is useful to handle deletions of source documents.
const allDocs = [doc1, doc2];
console.log(
await index({
docsSource: allDocs,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Say someone deleted the first doc:
const doc2Only = [doc2];
// Using full mode will clean up the deleted content as well.
// This afffects all documents regardless of source id!
console.log(
await index({
docsSource: doc2Only,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 1,
numSkipped: 1,
}
*/
await clear();
const newDoc1 = {
pageContent: "kitty kitty kitty kitty kitty",
metadata: { source: "kitty.txt" },
};
const newDoc2 = {
pageContent: "doggy doggy the doggy",
metadata: { source: "doggy.txt" },
};
const splitter = new CharacterTextSplitter({
separator: "t",
keepSeparator: true,
chunkSize: 12,
chunkOverlap: 2,
});
const newDocs = await splitter.splitDocuments([newDoc1, newDoc2]);
console.log(newDocs);
/*
[
{
pageContent: 'kitty kit',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty ki',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty',
metadata: {source: 'kitty.txt'},
},
{
pageContent: 'doggy doggy',
metadata: {source: 'doggy.txt'},
{
pageContent: 'the doggy',
metadata: {source: 'doggy.txt'},
}
]
*/
console.log(
await index({
docsSource: newDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 5,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
const changedDoggyDocs = [
{
pageContent: "woof woof",
metadata: { source: "doggy.txt" },
},
{
pageContent: "woof woof woof",
metadata: { source: "doggy.txt" },
},
];
console.log(
await index({
docsSource: changedDoggyDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 2,
numSkipped: 0,
}
*/
// Usage with document loaders
// Create a document loader
class MyCustomDocumentLoader extends BaseDocumentLoader {
load() {
return Promise.resolve([
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
]);
}
}
await clear();
const loader = new MyCustomDocumentLoader();
console.log(
await index({
docsSource: loader,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/
// Closing resources
await recordManager.end();
await vectorStore.end();
API Reference:
- PostgresRecordManager from
@langchain/community/indexes/postgres - index from
langchain/indexes - PGVectorStore from
@langchain/community/vectorstores/pgvector - OpenAIEmbeddings from
@langchain/openai - CharacterTextSplitter from
@langchain/textsplitters - BaseDocumentLoader from
@langchain/core/document_loaders/base
下一步
现在您已经了解了如何在 RAG 流水线中使用索引。
接下来,请查看有关检索的其他部分。