Skip to main content

How to implement and test a vector store integration

This guide walks through how to implement and test a custom vector store that you have developed.

For testing, we will rely on the langchain-tests dependency we added in the previous bootstrapping guide.

Implementation

Let's say you're building a simple integration package that provides a ParrotVectorStore vector store integration for LangChain. Here's a simple example of what your project structure might look like:

langchain-parrot-link/
├── langchain_parrot_link/
│ ├── __init__.py
│ └── vectorstores.py
├── tests/
│ ├── __init__.py
│ └── test_vectorstores.py
├── pyproject.toml
└── README.md

Following the bootstrapping guide, all of these files should already exist, except for vectorstores.py and test_vectorstores.py. We will implement these files in this guide.

First we need an implementation for our vector store. This implementation will depend on your chosen database technology. langchain-core includes a minimal in-memory vector store that we can use as a guide. You can access the code here. For convenience, we also provide it below.

vectorstores.py
langchain_parrot_link/vectorstores.py
from __future__ import annotations

import json
import uuid
from collections.abc import Iterator, Sequence
from pathlib import Path
from typing import Any, Callable, Optional

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.load import dumpd, load
from langchain_core.vectorstores import VectorStore
from langchain_core.vectorstores.utils import _cosine_similarity as cosine_similarity
from langchain_core.vectorstores.utils import maximal_marginal_relevance


class InMemoryVectorStore(VectorStore):
"""In-memory vector store implementation.

Uses a dictionary, and computes cosine similarity for search using numpy.
"""

def __init__(self, embedding: Embeddings) -> None:
"""Initialize with the given embedding function.

Args:
embedding: embedding function to use.
"""
self.store: dict[str, dict[str, Any]] = {}
self.embedding = embedding

@property
def embeddings(self) -> Embeddings:
return self.embedding

def delete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None:
if ids:
for _id in ids:
self.store.pop(_id, None)

async def adelete(self, ids: Optional[Sequence[str]] = None, **kwargs: Any) -> None:
self.delete(ids)

def add_documents(
self,
documents: list[Document],
ids: Optional[list[str]] = None,
**kwargs: Any,
) -> list[str]:
"""Add documents to the store."""
texts = [doc.page_content for doc in documents]
vectors = self.embedding.embed_documents(texts)

if ids and len(ids) != len(texts):
msg = (
f"ids must be the same length as texts. "
f"Got {len(ids)} ids and {len(texts)} texts."
)
raise ValueError(msg)

id_iterator: Iterator[Optional[str]] = (
iter(ids) if ids else iter(doc.id for doc in documents)
)

ids_ = []

for doc, vector in zip(documents, vectors):
doc_id = next(id_iterator)
doc_id_ = doc_id if doc_id else str(uuid.uuid4())
ids_.append(doc_id_)
self.store[doc_id_] = {
"id": doc_id_,
"vector": vector,
"text": doc.page_content,
"metadata": doc.metadata,
}

return ids_

async def aadd_documents(
self, documents: list[Document], ids: Optional[list[str]] = None, **kwargs: Any
) -> list[str]:
"""Add documents to the store."""
texts = [doc.page_content for doc in documents]
vectors = await self.embedding.aembed_documents(texts)

if ids and len(ids) != len(texts):
msg = (
f"ids must be the same length as texts. "
f"Got {len(ids)} ids and {len(texts)} texts."
)
raise ValueError(msg)

id_iterator: Iterator[Optional[str]] = (
iter(ids) if ids else iter(doc.id for doc in documents)
)
ids_: list[str] = []

for doc, vector in zip(documents, vectors):
doc_id = next(id_iterator)
doc_id_ = doc_id if doc_id else str(uuid.uuid4())
ids_.append(doc_id_)
self.store[doc_id_] = {
"id": doc_id_,
"vector": vector,
"text": doc.page_content,
"metadata": doc.metadata,
}

return ids_

def get_by_ids(self, ids: Sequence[str], /) -> list[Document]:
"""Get documents by their ids.

Args:
ids: The ids of the documents to get.

Returns:
A list of Document objects.
"""
documents = []

for doc_id in ids:
doc = self.store.get(doc_id)
if doc:
documents.append(
Document(
id=doc["id"],
page_content=doc["text"],
metadata=doc["metadata"],
)
)
return documents

async def aget_by_ids(self, ids: Sequence[str], /) -> list[Document]:
"""Async get documents by their ids.

Args:
ids: The ids of the documents to get.

Returns:
A list of Document objects.
"""
return self.get_by_ids(ids)

def _similarity_search_with_score_by_vector(
self,
embedding: list[float],
k: int = 4,
filter: Optional[Callable[[Document], bool]] = None,
**kwargs: Any,
) -> list[tuple[Document, float, list[float]]]:
# get all docs with fixed order in list
docs = list(self.store.values())

if filter is not None:
docs = [
doc
for doc in docs
if filter(Document(page_content=doc["text"], metadata=doc["metadata"]))
]

if not docs:
return []

similarity = cosine_similarity([embedding], [doc["vector"] for doc in docs])[0]

# get the indices ordered by similarity score
top_k_idx = similarity.argsort()[::-1][:k]

return [
(
Document(
id=doc_dict["id"],
page_content=doc_dict["text"],
metadata=doc_dict["metadata"],
),
float(similarity[idx].item()),
doc_dict["vector"],
)
for idx in top_k_idx
# Assign using walrus operator to avoid multiple lookups
if (doc_dict := docs[idx])
]

def similarity_search_with_score_by_vector(
self,
embedding: list[float],
k: int = 4,
filter: Optional[Callable[[Document], bool]] = None,
**kwargs: Any,
) -> list[tuple[Document, float]]:
return [
(doc, similarity)
for doc, similarity, _ in self._similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter, **kwargs
)
]

def similarity_search_with_score(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> list[tuple[Document, float]]:
embedding = self.embedding.embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding,
k,
**kwargs,
)
return docs

async def asimilarity_search_with_score(
self, query: str, k: int = 4, **kwargs: Any
) -> list[tuple[Document, float]]:
embedding = await self.embedding.aembed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding,
k,
**kwargs,
)
return docs

def similarity_search_by_vector(
self,
embedding: list[float],
k: int = 4,
**kwargs: Any,
) -> list[Document]:
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding,
k,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]

async def asimilarity_search_by_vector(
self, embedding: list[float], k: int = 4, **kwargs: Any
) -> list[Document]:
return self.similarity_search_by_vector(embedding, k, **kwargs)

def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> list[Document]:
return [doc for doc, _ in self.similarity_search_with_score(query, k, **kwargs)]

async def asimilarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> list[Document]:
return [
doc
for doc, _ in await self.asimilarity_search_with_score(query, k, **kwargs)
]

def max_marginal_relevance_search_by_vector(
self,
embedding: list[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> list[Document]:
prefetch_hits = self._similarity_search_with_score_by_vector(
embedding=embedding,
k=fetch_k,
**kwargs,
)

try:
import numpy as np
except ImportError as e:
msg = (
"numpy must be installed to use max_marginal_relevance_search "
"pip install numpy"
)
raise ImportError(msg) from e

mmr_chosen_indices = maximal_marginal_relevance(
np.array(embedding, dtype=np.float32),
[vector for _, _, vector in prefetch_hits],
k=k,
lambda_mult=lambda_mult,
)
return [prefetch_hits[idx][0] for idx in mmr_chosen_indices]

def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> list[Document]:
embedding_vector = self.embedding.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding_vector,
k,
fetch_k,
lambda_mult=lambda_mult,
**kwargs,
)

async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> list[Document]:
embedding_vector = await self.embedding.aembed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding_vector,
k,
fetch_k,
lambda_mult=lambda_mult,
**kwargs,
)

@classmethod
def from_texts(
cls,
texts: list[str],
embedding: Embeddings,
metadatas: Optional[list[dict]] = None,
**kwargs: Any,
) -> InMemoryVectorStore:
store = cls(
embedding=embedding,
)
store.add_texts(texts=texts, metadatas=metadatas, **kwargs)
return store

@classmethod
async def afrom_texts(
cls,
texts: list[str],
embedding: Embeddings,
metadatas: Optional[list[dict]] = None,
**kwargs: Any,
) -> InMemoryVectorStore:
store = cls(
embedding=embedding,
)
await store.aadd_texts(texts=texts, metadatas=metadatas, **kwargs)
return store

@classmethod
def load(
cls, path: str, embedding: Embeddings, **kwargs: Any
) -> InMemoryVectorStore:
"""Load a vector store from a file.

Args:
path: The path to load the vector store from.
embedding: The embedding to use.
kwargs: Additional arguments to pass to the constructor.

Returns:
A VectorStore object.
"""
_path: Path = Path(path)
with _path.open("r") as f:
store = load(json.load(f))
vectorstore = cls(embedding=embedding, **kwargs)
vectorstore.store = store
return vectorstore

def dump(self, path: str) -> None:
"""Dump the vector store to a file.

Args:
path: The path to dump the vector store to.
"""
_path: Path = Path(path)
_path.parent.mkdir(exist_ok=True, parents=True)
with _path.open("w") as f:
json.dump(dumpd(self.store), f, indent=2)

All vector stores must inherit from the VectorStore base class. This interface consists of methods for writing, deleting and searching for documents in the vector store.

VectorStore supports a variety of synchronous and asynchronous search types (e.g., nearest-neighbor or maximum marginal relevance), as well as interfaces for adding documents to the store. See the API Reference for all supported methods. The required methods are tabulated below:

Method/PropertyDescription
add_documentsAdd documents to the vector store.
deleteDelete selected documents from vector store (by IDs)
get_by_idsGet selected documents from vector store (by IDs)
similarity_searchGet documents most similar to a query.
embeddings (property)Embeddings object for vector store.
from_textsInstantiate vector store via adding texts.

Note that InMemoryVectorStore implements some optional search types, as well as convenience methods for loading and dumping the object to a file, but this is not necessary for all implementations.

tip

The in-memory vector store is tested against the standard tests in the LangChain Github repository. You can always use this as a starting point.

Testing

To implement our test files, we will subclass test classes from the langchain_tests package. These test classes contain the tests that will be run. We will just need to configure what vector store implementation is tested.

Setup

First we need to install certain dependencies. These include:

  • pytest: For running tests
  • pytest-asyncio: For testing async functionality
  • langchain-tests: For importing standard tests
  • langchain-core: This should already be installed, but is needed to define our integration.

If you followed the previous bootstrapping guide, these should already be installed.

Add and configure standard tests

The langchain-test package implements suites of tests for testing vector store integrations. By subclassing the base classes for each standard test, you get all of the standard tests for that type.

note

The full set of tests that run can be found in the API reference. See details:

Here's how you would configure the standard tests for a typical vector store (using ParrotVectorStore as a placeholder):

# title="tests/integration_tests/test_vectorstores.py"

from typing import AsyncGenerator, Generator

import pytest
from langchain_core.vectorstores import VectorStore
from langchain_parrot_link.vectorstores import ParrotVectorStore
from langchain_tests.integration_tests.vectorstores import (
AsyncReadWriteTestSuite,
ReadWriteTestSuite,
)


class TestSync(ReadWriteTestSuite):
@pytest.fixture()
def vectorstore(self) -> Generator[VectorStore, None, None]: # type: ignore
"""Get an empty vectorstore."""
store = ParrotVectorStore(self.get_embeddings())
# note: store should be EMPTY at this point
# if you need to delete data, you may do so here
try:
yield store
finally:
# cleanup operations, or deleting data
pass


class TestAsync(AsyncReadWriteTestSuite):
@pytest.fixture()
async def vectorstore(self) -> AsyncGenerator[VectorStore, None]: # type: ignore
"""Get an empty vectorstore."""
store = ParrotVectorStore(self.get_embeddings())
# note: store should be EMPTY at this point
# if you need to delete data, you may do so here
try:
yield store
finally:
# cleanup operations, or deleting data
pass
API Reference:VectorStore

There are separate suites for testing synchronous and asynchronous methods. Configuring the tests consists of implementing pytest fixtures for setting up an empty vector store and tearing down the vector store after the test run ends.

For example, below is the ReadWriteTestSuite for the Chroma integration:

from typing import Generator

import pytest
from langchain_core.vectorstores import VectorStore
from langchain_tests.integration_tests.vectorstores import ReadWriteTestSuite

from langchain_chroma import Chroma


class TestSync(ReadWriteTestSuite):
@pytest.fixture()
def vectorstore(self) -> Generator[VectorStore, None, None]: # type: ignore
"""Get an empty vectorstore."""
store = Chroma(embedding_function=self.get_embeddings())
try:
yield store
finally:
store.delete_collection()
pass
API Reference:VectorStore

Note that before the initial yield, we instantiate the vector store with an embeddings object. This is a pre-defined "fake" embeddings model that will generate short, arbitrary vectors for documents. You can use a different embeddings object if desired.

In the finally block, we call whatever integration-specific logic is needed to bring the vector store to a clean state. This logic is executed in between each test (e.g., even if tests fail).

Run standard tests

After setting tests up, you would run them with the following command from your project root:

pytest --asyncio-mode=auto tests/integration_tests

Test suite information and troubleshooting

Each test method documents:

  1. Troubleshooting tips;
  2. (If applicable) how test can be skipped.

This information along with the full set of tests that run can be found in the API reference. See details:


Was this page helpful?