Skip to main content

如何重新索引数据以保持向量存储与底层数据源同步

前提条件

本指南假设您熟悉以下概念:

在这里,我们将使用 LangChain 索引 API 查看一个基本的索引工作流程。

索引 API 允许您从任何来源加载文档并将其与向量存储保持同步。具体来说,它可以:

  • 避免向向量存储中写入重复内容
  • 避免重写未更改的内容
  • 避免对未更改的内容重新计算嵌入

所有这些都可以为您节省时间和金钱,并改善您的向量搜索结果。

重要的是,即使文档相对于原始源文档经过了多个转换步骤(例如,通过文本分块),索引 API 也能正常工作。

工作原理

LangChain 索引使用一个记录管理器 (RecordManager) 来跟踪向量存储中的文档写入。

在索引内容时,会为每个文档计算哈希值,并将以下信息存储在记录管理器中:

  • 文档哈希(页面内容和元数据的哈希)
  • 写入时间
  • 源 ID - 每个文档应在元数据中包含一些信息,以便我们确定该文档的最终来源

删除模式

在将文档索引到向量存储时,向量存储中可能存在一些需要删除的现有文档。 在某些情况下,您可能想要删除与正在索引的新文档来自相同源的任何现有文档。 在其他情况下,您可能想要完全删除所有现有文档。索引 API 的删除模式让您选择所需的行为:

清理模式去重内容可并行化清理已删除的源文档清理源文档和/或派生文档的变更清理时间
None-
Incremental持续进行
Full索引结束时

None 不会执行任何自动清理,允许用户手动清理旧内容。

incrementalfull 提供以下自动清理功能:

  • 如果源文档或派生文档的内容发生了变化,incrementalfull 模式都会清理(删除)以前的版本。
  • 如果源文档已被删除(意味着它未包含在当前正在索引的文档中),full 清理模式会正确地从向量存储中删除它,但 incremental 模式不会。

当内容发生变更(例如,源 PDF 文件被修改)时,在索引期间可能会有一段时间新旧版本的内容都可能返回给用户。这发生在新内容写入之后,但旧版本删除之前。

  • incremental 索引通过持续写入时进行清理,最大限度地缩短了这段时间。
  • full 模式在所有批次写入完成后进行清理。

要求

  1. 不要将其与通过其他方式预先填充内容的存储一起使用,因为记录管理器不会知道之前插入的记录。
  2. 仅适用于支持以下功能的 LangChain vectorstore: a). 按 ID 添加文档(带 ids 参数的 addDocuments 方法) b). 按 ID 删除(带 ids 参数的 delete 方法)

兼容的向量存储:PGVectorChromaCloudflareVectorizeElasticVectorSearchFAISSMariaDBMomentoVectorIndexPineconeSupabaseVectorStoreVercelPostgresVectorStoreWeaviateXata

注意

记录管理器依赖基于时间的机制来确定哪些内容可以清理(当使用 fullincremental 清理模式时)。

如果两个任务连续运行,且第一个任务在时钟时间改变之前完成,则第二个任务可能无法清理内容。

在实际环境中,这不太可能成为问题,原因如下:

  1. RecordManager 使用更高精度的时间戳。
  2. 数据需要在第一次和第二次任务运行之间发生变化,如果两次任务之间间隔很短,则这种情况不太可能发生。
  3. 索引任务通常耗时超过几毫秒。

快速开始

import { PostgresRecordManager } from "@langchain/community/indexes/postgres";
import { index } from "langchain/indexes";
import { PGVectorStore } from "@langchain/community/vectorstores/pgvector";
import { PoolConfig } from "pg";
import { OpenAIEmbeddings } from "@langchain/openai";
import { CharacterTextSplitter } from "@langchain/textsplitters";
import { BaseDocumentLoader } from "@langchain/core/document_loaders/base";

// First, follow set-up instructions at
// https://js.langchain.com/docs/modules/indexes/vector_stores/integrations/pgvector

const config = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "testlangchain",
columns: {
idColumnName: "id",
vectorColumnName: "vector",
contentColumnName: "content",
metadataColumnName: "metadata",
},
};

const vectorStore = await PGVectorStore.initialize(
new OpenAIEmbeddings(),
config
);

// Create a new record manager
const recordManagerConfig = {
postgresConnectionOptions: {
type: "postgres",
host: "127.0.0.1",
port: 5432,
user: "myuser",
password: "ChangeMe",
database: "api",
} as PoolConfig,
tableName: "upsertion_records",
};
const recordManager = new PostgresRecordManager(
"test_namespace",
recordManagerConfig
);

// Create the schema if it doesn't exist
await recordManager.createSchema();

// Index some documents
const doc1 = {
pageContent: "kitty",
metadata: { source: "kitty.txt" },
};

const doc2 = {
pageContent: "doggy",
metadata: { source: "doggy.txt" },
};

/**
* Hacky helper method to clear content. See the `full` mode section to to understand why it works.
*/
async function clear() {
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
});
}

// No cleanup
await clear();
// This mode does not do automatic clean up of old versions of content; however, it still takes care of content de-duplication.

console.log(
await index({
docsSource: [doc1, doc1, doc1, doc1, doc1, doc1],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

await clear();

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Second time around all content will be skipped

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/

// Updated content will be added, but old won't be deleted

const doc1Updated = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};

console.log(
await index({
docsSource: [doc1Updated, doc2],
recordManager,
vectorStore,
options: {
cleanup: undefined,
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 0,
numSkipped: 1,
}
*/

/*
Resulting records in the database:
[
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
{
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
}
]
*/

// Incremental mode
await clear();

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Indexing again should result in both documents getting skipped – also skipping the embedding operation!

console.log(
await index({
docsSource: [doc1, doc2],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 2,
}
*/

// If we provide no documents with incremental indexing mode, nothing will change.
console.log(
await index({
docsSource: [],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// If we mutate a document, the new version will be written and all old versions sharing the same source will be deleted.
// This only affects the documents with the same source id!

const changedDoc1 = {
pageContent: "kitty updated",
metadata: { source: "kitty.txt" },
};
console.log(
await index({
docsSource: [changedDoc1],
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 1,
numUpdated: 0,
numDeleted: 1,
numSkipped: 0,
}
*/

// Full mode
await clear();
// In full mode the user should pass the full universe of content that should be indexed into the indexing function.

// Any documents that are not passed into the indexing function and are present in the vectorStore will be deleted!

// This behavior is useful to handle deletions of source documents.
const allDocs = [doc1, doc2];
console.log(
await index({
docsSource: allDocs,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Say someone deleted the first doc:

const doc2Only = [doc2];

// Using full mode will clean up the deleted content as well.
// This afffects all documents regardless of source id!

console.log(
await index({
docsSource: doc2Only,
recordManager,
vectorStore,
options: {
cleanup: "full",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 0,
numUpdated: 0,
numDeleted: 1,
numSkipped: 1,
}
*/

await clear();

const newDoc1 = {
pageContent: "kitty kitty kitty kitty kitty",
metadata: { source: "kitty.txt" },
};

const newDoc2 = {
pageContent: "doggy doggy the doggy",
metadata: { source: "doggy.txt" },
};

const splitter = new CharacterTextSplitter({
separator: "t",
keepSeparator: true,
chunkSize: 12,
chunkOverlap: 2,
});

const newDocs = await splitter.splitDocuments([newDoc1, newDoc2]);
console.log(newDocs);
/*
[
{
pageContent: 'kitty kit',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty ki',
metadata: {source: 'kitty.txt'}
},
{
pageContent: 'tty kitty',
metadata: {source: 'kitty.txt'},
},
{
pageContent: 'doggy doggy',
metadata: {source: 'doggy.txt'},
{
pageContent: 'the doggy',
metadata: {source: 'doggy.txt'},
}
]
*/

console.log(
await index({
docsSource: newDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);
/*
{
numAdded: 5,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

const changedDoggyDocs = [
{
pageContent: "woof woof",
metadata: { source: "doggy.txt" },
},
{
pageContent: "woof woof woof",
metadata: { source: "doggy.txt" },
},
];

console.log(
await index({
docsSource: changedDoggyDocs,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 2,
numSkipped: 0,
}
*/

// Usage with document loaders

// Create a document loader
class MyCustomDocumentLoader extends BaseDocumentLoader {
load() {
return Promise.resolve([
{
pageContent: "kitty",
metadata: { source: "kitty.txt" },
},
{
pageContent: "doggy",
metadata: { source: "doggy.txt" },
},
]);
}
}

await clear();

const loader = new MyCustomDocumentLoader();

console.log(
await index({
docsSource: loader,
recordManager,
vectorStore,
options: {
cleanup: "incremental",
sourceIdKey: "source",
},
})
);

/*
{
numAdded: 2,
numUpdated: 0,
numDeleted: 0,
numSkipped: 0,
}
*/

// Closing resources
await recordManager.end();
await vectorStore.end();

API Reference:

下一步

现在您已经了解了如何在 RAG 流水线中使用索引。

接下来,请查看有关检索的其他部分。


Was this page helpful?


You can also leave detailed feedback on GitHub.