Standalone ColBERT with Vespa for end-to-end retrieval and ranking¶
This notebook illustrates using ColBERT package to produce token vectors, instead of using the native Vespa colbert embedder.
This guide illustrates how to feed and query using a single passage representation
- Compress token vectors using binarization compatible with Vespa unpackbits used in ranking. This implements the binarization
of token-level vectors using
numpy
. - Use Vespa hex feed format for binary vectors doc.
- Query examples.
As a bonus, this also demonstrates how to use ColBERT end-to-end with Vespa for both retrieval and ranking. The retrieval step searches the binary token-level representations using hamming distance. This uses 32 nearestNeighbor operators in the same query, each finding 100 nearest hits in hamming space. Then the results are re-ranked using the full-blown MaxSim calculation.
See Announcing the Vespa ColBERT embedder for details on ColBERT and the binary quantization used to compress ColBERT's token-level vectors.
!pip3 install -U pyvespa colbert-ai numpy torch transformers<=4.49.0
Load a checkpoint with colbert and obtain document and query embeddings
from colbert.modeling.checkpoint import Checkpoint
from colbert.infra import ColBERTConfig
ckpt = Checkpoint(
"colbert-ir/colbertv2.0", colbert_config=ColBERTConfig(root="experiments")
)
passage = [
"Alan Mathison Turing was an English mathematician, computer scientist, logician, cryptanalyst, philosopher and theoretical biologist."
]
vectors = ckpt.docFromText(passage)[0]
vectors.shape
torch.Size([27, 128])
In this case, we got 27 token-level embeddings, each using 128 float dimensions. This includes CLS token and special tokens used to differentiate the query from the document encoding.
query_vectors = ckpt.queryFromText(["Who was Alan Turing?"])[0]
query_vectors.shape
torch.Size([32, 128])
Routines for binarization and output in Vespa tensor format that can be used in queries and in JSON feed.
import numpy as np
import torch
from binascii import hexlify
from typing import Dict, List
def binarize_token_vectors_hex(vectors: torch.Tensor) -> Dict[str, str]:
binarized_token_vectors = np.packbits(np.where(vectors > 0, 1, 0), axis=1).astype(
np.int8
)
vespa_token_feed = dict()
for index in range(0, len(binarized_token_vectors)):
vespa_token_feed[index] = str(
hexlify(binarized_token_vectors[index].tobytes()), "utf-8"
)
return vespa_token_feed
def float_query_token_vectors(vectors: torch.Tensor) -> Dict[str, List[float]]:
vespa_token_feed = dict()
for index in range(0, len(vectors)):
vespa_token_feed[index] = vectors[index].tolist()
return vespa_token_feed
import json
print(json.dumps(binarize_token_vectors_hex(vectors)))
print(json.dumps(float_query_token_vectors(query_vectors)))
Defining the Vespa application¶
PyVespa helps us build the Vespa application package. A Vespa application package consists of configuration files, schemas, models, and code (plugins).
First, we define a Vespa schema with the fields we want to store and their type.
We use HNSW with hamming distance for retrieval
from vespa.package import Schema, Document, Field
colbert_schema = Schema(
name="doc",
document=Document(
fields=[
Field(name="id", type="string", indexing=["summary"]),
Field(name="passage", type="string", indexing=["index", "summary"]),
Field(
name="colbert",
type="tensor<int8>(token{}, v[16])",
indexing=["attribute", "summary", "index"],
attribute=["distance-metric:hamming"],
),
]
),
)
from vespa.package import ApplicationPackage
vespa_app_name = "colbert"
vespa_application_package = ApplicationPackage(
name=vespa_app_name, schema=[colbert_schema]
)
We need to define all the query input tensors. We are going to input up to 32 query tensors in binary form these are used for retrieval
query_binary_input_tensors = []
for index in range(0, 32):
query_binary_input_tensors.append(
("query(binary_vector_{})".format(index), "tensor<int8>(v[16])")
)
Note that we just use max sim in the first phase ranking over all the hits that are retrieved by the query
from vespa.package import RankProfile, Function, FirstPhaseRanking
colbert = RankProfile(
name="default",
inputs=[
("query(qt)", "tensor<float>(querytoken{}, v[128])"),
*query_binary_input_tensors,
],
functions=[
Function(
name="max_sim",
expression="""
sum(
reduce(
sum(
query(qt) * unpack_bits(attribute(colbert)) , v
),
max, token
),
querytoken
)
""",
)
],
first_phase=FirstPhaseRanking(expression="max_sim"),
)
colbert_schema.add_rank_profile(colbert)
Deploy the application to Vespa Cloud¶
With the configured application, we can deploy it to Vespa Cloud. It is also possible to deploy the app using docker; see the Hybrid Search - Quickstart guide for an example of deploying it to a local docker container.
Install the Vespa CLI.
!pip3 install vespacli
To deploy the application to Vespa Cloud we need to create a tenant in the Vespa Cloud:
Create a tenant at console.vespa-cloud.com (unless you already have one). This step requires a Google or GitHub account, and will start your free trial. Make note of the tenant name, it is used in the next steps.
Configure Vespa Cloud date-plane security¶
Create Vespa Cloud data-plane mTLS cert/key-pair. The mutual certificate pair is used to talk to your Vespa cloud endpoints. See Vespa Cloud Security Guide for details.
We save the paths to the credentials for later data-plane access without using pyvespa APIs.
import os
os.environ["TENANT_NAME"] = "vespa-team" # Replace with your tenant name
vespa_cli_command = (
f'vespa config set application {os.environ["TENANT_NAME"]}.{vespa_app_name}'
)
!vespa config set target cloud
!{vespa_cli_command}
!vespa auth cert -N
Validate that we have the expected data-plane credential files:
from os.path import exists
from pathlib import Path
cert_path = (
Path.home()
/ ".vespa"
/ f"{os.environ['TENANT_NAME']}.{vespa_app_name}.default/data-plane-public-cert.pem"
)
key_path = (
Path.home()
/ ".vespa"
/ f"{os.environ['TENANT_NAME']}.{vespa_app_name}.default/data-plane-private-key.pem"
)
if not exists(cert_path) or not exists(key_path):
print(
"ERROR: set the correct paths to security credentials. Correct paths above and rerun until you do not see this error"
)
Note that the subsequent Vespa Cloud deploy call below will add data-plane-public-cert.pem
to the application before deploying it to Vespa Cloud, so that
you have access to both the private key and the public certificate. At the same time, Vespa Cloud only knows the public certificate.
Configure Vespa Cloud control-plane security¶
Authenticate to generate a tenant level control plane API key for deploying the applications to Vespa Cloud, and save the path to it.
The generated tenant api key must be added in the Vespa Console before attempting to deploy the application.
To use this key in Vespa Cloud click 'Add custom key' at
https://console.vespa-cloud.com/tenant/TENANT_NAME/account/keys
and paste the entire public key including the BEGIN and END lines.
!vespa auth api-key
from pathlib import Path
api_key_path = Path.home() / ".vespa" / f"{os.environ['TENANT_NAME']}.api-key.pem"
Deploy to Vespa Cloud¶
Now that we have data-plane and control-plane credentials ready, we can deploy our application to Vespa Cloud!
PyVespa
supports deploying apps to the development zone.
Note: Deployments to dev and perf expire after 7 days of inactivity, i.e., 7 days after running deploy. This applies to all plans, not only the Free Trial. Use the Vespa Console to extend the expiry period, or redeploy the application to add 7 more days.
from vespa.deployment import VespaCloud
def read_secret():
"""Read the API key from the environment variable. This is
only used for CI/CD purposes."""
t = os.getenv("VESPA_TEAM_API_KEY")
if t:
return t.replace(r"\n", "\n")
else:
return t
vespa_cloud = VespaCloud(
tenant=os.environ["TENANT_NAME"],
application=vespa_app_name,
key_content=read_secret() if read_secret() else None,
key_location=api_key_path,
application_package=vespa_application_package,
)
Now deploy the app to Vespa Cloud dev zone.
The first deployment typically takes 2 minutes until the endpoint is up.
from vespa.application import Vespa
app: Vespa = vespa_cloud.deploy()
from vespa.io import VespaResponse
vespa_feed_format = {
"id": "1",
"passage": passage[0],
"colbert": binarize_token_vectors_hex(vectors),
}
with app.syncio() as sync:
response: VespaResponse = sync.feed_data_point(
data_id=1, fields=vespa_feed_format, schema="doc"
)
Querying¶
Now we create all the query token vectors in binary form and use 32 nearestNeighbor query operators that are combined with OR. These hits are then exposed to ranking where the final MaxSim is performed using the unpacked binary representations.
query_vectors = ckpt.queryFromText(["Who was Alan Turing?"])[0]
binary_query_input_tensors = binarize_token_vectors_hex(query_vectors)
binary_query_vectors = dict()
nn_operators = list()
for index in range(0, 32):
name = "input.query(binary_vector_{})".format(index)
nn_argument = "binary_vector_{}".format(index)
value = binary_query_input_tensors[index]
binary_query_vectors[name] = value
nn_operators.append("({targetHits:100}nearestNeighbor(colbert, %s))" % nn_argument)
nn_operators = " OR ".join(nn_operators)
'({targetHits:100}nearestNeighbor(colbert, binary_vector_0)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_1)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_2)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_3)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_4)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_5)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_6)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_7)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_8)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_9)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_10)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_11)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_12)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_13)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_14)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_15)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_16)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_17)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_18)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_19)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_20)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_21)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_22)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_23)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_24)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_25)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_26)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_27)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_28)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_29)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_30)) OR ({targetHits:100}nearestNeighbor(colbert, binary_vector_31))'
from vespa.io import VespaQueryResponse
import json
response: VespaQueryResponse = app.query(
yql="select * from doc where {}".format(nn_operators),
ranking="default",
body={
"presentation.format.tensors": "short-value",
"input.query(qt)": float_query_token_vectors(query_vectors),
**binary_query_vectors,
},
)
assert response.is_successful()
print(json.dumps(response.hits[0], indent=2))
{ "id": "id:doc:doc::1", "relevance": 100.57648777961731, "source": "colbert_content", "fields": { "sddocname": "doc", "documentid": "id:doc:doc::1", "id": "1", "passage": "Alan Mathison Turing was an English mathematician, computer scientist, logician, cryptanalyst, philosopher and theoretical biologist.", "colbert": { "0": [ 3, 120, 69, 0, 37, -60, -58, -95, -120, 32, -127, 67, -36, 68, -106, -12 ], "1": [ -106, 40, -119, -128, 96, -60, -58, 33, 48, 96, -127, 67, -100, 96, -106, -12 ], "2": [ -28, -84, 73, -18, 113, -60, -51, 40, -96, 121, 4, 24, -99, 68, -47, -60 ], "3": [ -13, 40, 75, -124, 65, 64, -32, -53, 12, 64, 125, 4, 24, -64, -69, 101 ], "4": [ 33, -54, 113, 24, 77, -36, -44, 3, -32, -72, 40, 41, -38, 102, 53, -35 ], "5": [ 3, -22, 73, -95, 73, -51, 85, -128, -121, 25, 17, 68, 90, 64, -113, -28 ], "6": [ -109, -72, -114, 0, 97, -58, -57, -95, 40, -96, -112, 67, -97, -85, -42, -12 ], "7": [ -112, 56, -114, 0, 97, -58, -57, -83, 40, -96, -127, 67, -97, 43, -42, -12 ], "8": [ 22, -71, 65, 96, 0, -60, 108, 37, 16, 106, -55, 115, -117, -56, -28, -12 ], "9": [ -106, -72, 94, 30, 32, -60, -60, -19, 24, -56, -47, -63, -40, -53, -103, -11 ], "10": [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], "11": [ -126, 121, 3, -103, 32, 70, 103, -23, 88, -55, -61, 71, -101, -106, -8, -68 ], "12": [ 18, 24, -106, 30, 36, -42, -60, 104, 57, -120, -128, -61, -67, -53, -100, -11 ], "13": [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], "14": [ 22, 49, -38, 17, 36, -42, -25, 65, 25, -56, -45, -59, -102, -2, -65, 125 ], "15": [ -105, 25, -50, 16, 0, -42, -28, 45, 48, -56, -112, -55, -3, -87, -112, -11 ], "16": [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], "17": [ 55, 43, -62, 33, -91, 68, 99, 32, 72, 10, -41, 70, -117, -78, -73, -11 ], "18": [ 3, 53, -117, 20, 36, -42, 79, 33, 9, -120, -41, 69, -36, -69, -111, 117 ], "19": [ 23, 16, -42, 20, 44, -42, -26, 33, 57, -120, -112, -63, -3, -24, -108, -11 ], "20": [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], "21": [ -110, 53, -106, 28, 32, -42, -58, 77, 61, -56, -42, -15, -68, -5, -110, -11 ], "22": [ -109, 56, -114, 0, 96, -42, -58, -83, 40, -96, -128, -61, -99, -21, -44, -12 ], "23": [ 18, 57, -50, 30, 36, 86, -60, 69, 9, -120, -48, -63, -75, -22, -98, -11 ], "24": [ 30, -71, -106, 26, 32, -42, -50, 104, 56, 64, -48, -61, -4, -8, -104, -12 ], "25": [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], "26": [ 7, 56, 70, 0, 36, -58, -42, 33, -104, 34, -127, 67, -99, 96, -105, -12 ] } } }
Another example where we brute-force "true" search without a retrieval step using nearestNeighbor or other filters.
from vespa.io import VespaQueryResponse
import json
response: VespaQueryResponse = app.query(
yql="select * from doc where true",
ranking="default",
body={
"presentation.format.tensors": "short-value",
"input.query(qt)": float_query_token_vectors(query_vectors),
},
)
assert response.is_successful()
print(json.dumps(response.hits[0], indent=2))
vespa_cloud.delete()