TensorFlow: Deploy model to Vespa through ONNX

This tutorial will cover the following steps:

  1. Download labeled data containing Vespa ranking features.
  2. Create a listwise dataset based on a TensorFlow data pipeline.
  3. Train a Learning to Rank model (LTR) model using the TensorFlow Ranking framework.
  4. Simplify the LTR model to be suitable for ranking in Vespa
  5. Convert to TensorFlow model to ONNX file format.
  6. Create and deploy a Vespa application that uses the TensorFlow model
  7. Feed data to the Vespa application
  8. Ensure that prediction from the model deployed to Vespa match those obtained from the model directly.

Install packages

!pip3 install -Uqq pyvespa learntorank numpy==1.23.5 pandas tensorflow tensorflow_ranking onnx tf2onnx

Get the data

import pandas as pd

Download labeled data containing Vespa ranking features collected from an MS Marco passage ranking application.

df = pd.read_csv("https://data.vespa.oath.cloud/blog/ranking/train_sample.csv")
df = df[
    ["document_id", 
     "query_id", 
     "label", 
     "fieldMatch(body).queryCompleteness",
     "fieldMatch(body).significance",
     "nativeRank",
    ]
]
df.shape
(100000, 6)

For each query_id, there is 9 irrelevant document_id with label = 0 and 1 relevant document_id with label = 1.

df.head(10)
document_id query_id label fieldMatch(body).queryCompleteness fieldMatch(body).significance nativeRank
0 27061 3 0 0.625 0.566311 0.042421
1 257 3 0 0.625 0.582570 0.039192
2 363 3 0 0.500 0.466030 0.034418
3 22682 3 0 0.625 0.566311 0.061149
4 160 3 0 0.500 0.437808 0.035017
5 228 3 0 0.500 0.437808 0.032697
6 3901893 3 0 0.750 0.748064 0.074917
7 1142680 3 1 0.750 0.748064 0.099112
8 141 3 0 0.500 0.442879 0.038093
9 3060834 3 0 0.750 0.763933 0.075347

Create a listwise dataset

Define some parameters required to setup the listwise data pipeline.

number_documents_per_query = 10            
feature_names = [                         
    "fieldMatch(body).queryCompleteness", 
    "fieldMatch(body).significance", 
    "nativeRank"
]
number_features = len(feature_names)
batch_size=32

Each feature data point will have the shape equal to (batch_size, number_documents_per_query, number_features) and each label data point will have shape equal to (batch_size, number_documents_per_query).

import tensorflow as tf

The code below creates a TensorFlow data pipeline (tf.data.Dataset) from our DataFrame and group the rows by the query_id variable to form a listwise dataset. We then configure the data pipeline to shuffle and set a batch size.

shuffle_buffer_size = 10000
ds = tf.data.Dataset.from_tensor_slices(
    {
        "features": tf.cast(df[feature_names].values, tf.float32),
        "label": tf.cast(df["label"].values, tf.float32),
        "query_id": tf.cast(df["query_id"].values, tf.int64),
    }
)

key_func = lambda x: x["query_id"]
reduce_func = lambda key, dataset: dataset.batch(
    number_documents_per_query, drop_remainder=True
)
listwise_ds = ds.group_by_window(
    key_func=key_func,
    reduce_func=reduce_func,
    window_size=number_documents_per_query,
)
listwise_ds = listwise_ds.map(lambda x: (x["features"], x["label"]))
listwise_ds = listwise_ds.shuffle(buffer_size=shuffle_buffer_size).batch(
    batch_size=batch_size
)

We can see the shape of the features and of the labels are as expected.

for d in listwise_ds.take(1):
    print(d[0].shape)
    print(d[1].shape)
(32, 10, 3)
(32, 10)

Create and compile model

We are going to create a linear model that can take a listwise data as input with shape (batch_size, number_documents_per_query, number_features) and output one prediction per document with shape (batch_size, number_documents_per_query)

input_layer = tf.keras.layers.Input(shape=(number_documents_per_query, number_features))
dense_layer = tf.keras.layers.Dense(
    1,
    use_bias=False,
    activation=None,
    name="dense"
)
output_layer = tf.keras.layers.Reshape((number_documents_per_query,))
model = tf.keras.Sequential(layers=[input_layer, dense_layer, output_layer])

In this tutorial, we want to optimize the Normalized Discounted Cumulative Gain at position 10 (NDCG@10). We then select a loss function that is a smooth approximation of the NDCG metric and create a stateless NDCG@10 metric to use when compiling the model defined above.

import tensorflow_ranking as tfr

ndcg = tfr.keras.metrics.NDCGMetric(topn=10)
def ndcg_stateless(y_true, y_pred):
    """
    Create stateless metric so that we can compute the validation metric 
    from scratch at the end of each epoch.
    """
    ndcg.reset_states()
    return ndcg(y_true, y_pred)

optimizer = tf.keras.optimizers.Adagrad(learning_rate=2)
model.compile(
    optimizer=optimizer,
    loss=tfr.keras.losses.ApproxNDCGLoss(),
    metrics=ndcg_stateless,
)

Use the listwise dataset to fit the model:

history = model.fit(listwise_ds, epochs=20)
Epoch 1/20
304/304 [==============================] - 8s 3ms/step - loss: -0.6522 - ndcg_stateless: 0.6874
Epoch 2/20
304/304 [==============================] - 1s 921us/step - loss: -0.6959 - ndcg_stateless: 0.7159
Epoch 3/20
304/304 [==============================] - 1s 905us/step - loss: -0.7001 - ndcg_stateless: 0.7166
Epoch 4/20
304/304 [==============================] - 1s 904us/step - loss: -0.7025 - ndcg_stateless: 0.7168
Epoch 5/20
304/304 [==============================] - 1s 901us/step - loss: -0.7043 - ndcg_stateless: 0.7165
Epoch 6/20
304/304 [==============================] - 1s 920us/step - loss: -0.7106 - ndcg_stateless: 0.7242
Epoch 7/20
304/304 [==============================] - 1s 903us/step - loss: -0.7355 - ndcg_stateless: 0.7647
Epoch 8/20
304/304 [==============================] - 1s 898us/step - loss: -0.7399 - ndcg_stateless: 0.7662
Epoch 9/20
304/304 [==============================] - 1s 923us/step - loss: -0.7430 - ndcg_stateless: 0.7679
Epoch 10/20
304/304 [==============================] - 1s 911us/step - loss: -0.7450 - ndcg_stateless: 0.7679
Epoch 11/20
304/304 [==============================] - 1s 955us/step - loss: -0.7464 - ndcg_stateless: 0.7682
Epoch 12/20
304/304 [==============================] - 1s 914us/step - loss: -0.7475 - ndcg_stateless: 0.7683
Epoch 13/20
304/304 [==============================] - 1s 919us/step - loss: -0.7485 - ndcg_stateless: 0.7689
Epoch 14/20
304/304 [==============================] - 1s 909us/step - loss: -0.7493 - ndcg_stateless: 0.7682
Epoch 15/20
304/304 [==============================] - 1s 904us/step - loss: -0.7499 - ndcg_stateless: 0.7692
Epoch 16/20
304/304 [==============================] - 1s 900us/step - loss: -0.7506 - ndcg_stateless: 0.7691
Epoch 17/20
304/304 [==============================] - 1s 893us/step - loss: -0.7513 - ndcg_stateless: 0.7699
Epoch 18/20
304/304 [==============================] - 1s 1ms/step - loss: -0.7516 - ndcg_stateless: 0.7694
Epoch 19/20
304/304 [==============================] - 1s 910us/step - loss: -0.7520 - ndcg_stateless: 0.7694
Epoch 20/20
304/304 [==============================] - 1s 830us/step - loss: -0.7524 - ndcg_stateless: 0.7686

Simplify model input/output for deployment

After training the model by minimizing a listwise loss function, we can simplify the model before deploying it to Vespa. At inference time, Vespa will evaluate each document individually and use a ranking function to rank documents.

Therefore, the input layer will expect a tensor named input with shape equal to (1, number_features).

simpler_model = tf.keras.Sequential(
    [tf.keras.layers.Input(shape=(number_features,), batch_size=1, name="input"), 
     dense_layer
    ]
)

We are going to save the simpler_model to disk and then use the tf2onnx tool to convert the model to ONNX format.

simpler_model.save("simpler_keras_model")
WARNING:tensorflow:Compiled the loaded model, but the compiled metrics have yet to be built. `model.compile_metrics` will be empty until you train or evaluate the model.
INFO:tensorflow:Assets written to: simpler_keras_model/assets
INFO:tensorflow:Assets written to: simpler_keras_model/assets
from tf2onnx import convert

!python3 -m tf2onnx.convert --saved-model simpler_keras_model --output simpler_keras_model.onnx
<frozen runpy>:128: RuntimeWarning: 'tf2onnx.convert' found in sys.modules after import of package 'tf2onnx', but prior to execution of 'tf2onnx.convert'; this may result in unpredictable behaviour
2023-08-08 14:09:40,224 - WARNING - '--tag' not specified for saved_model. Using --tag serve
2023-08-08 14:09:40,328 - INFO - Signatures found in model: [serving_default].
2023-08-08 14:09:40,328 - WARNING - '--signature_def' not specified, using first signature: serving_default
2023-08-08 14:09:40,328 - INFO - Output names: ['dense']
2023-08-08 14:09:40,328 - WARNING - Could not search for non-variable resources. Concrete function internal representation may have changed.
WARNING:tensorflow:From /usr/local/lib/python3.11/site-packages/tf2onnx/tf_loader.py:557: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This API was designed for TensorFlow v1. See https://www.tensorflow.org/guide/migrate for instructions on how to migrate your code to TensorFlow v2.
2023-08-08 14:09:40,379 - WARNING - From /usr/local/lib/python3.11/site-packages/tf2onnx/tf_loader.py:557: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
This API was designed for TensorFlow v1. See https://www.tensorflow.org/guide/migrate for instructions on how to migrate your code to TensorFlow v2.
2023-08-08 14:09:40,388 - INFO - Using tensorflow=2.13.0, onnx=1.14.0, tf2onnx=1.8.4/cd55bf
2023-08-08 14:09:40,388 - INFO - Using opset <onnx, 9>
2023-08-08 14:09:40,389 - INFO - Computed 0 values for constant folding
2023-08-08 14:09:40,395 - INFO - Optimizing ONNX model
2023-08-08 14:09:40,402 - INFO - After optimization: Identity -5 (5->0)
2023-08-08 14:09:40,403 - INFO - 
2023-08-08 14:09:40,403 - INFO - Successfully converted TensorFlow model simpler_keras_model to ONNX
2023-08-08 14:09:40,403 - INFO - Model inputs: ['input:0']
2023-08-08 14:09:40,403 - INFO - Model outputs: ['dense']
2023-08-08 14:09:40,403 - INFO - ONNX model is saved at simpler_keras_model.onnx

We can inspect the onnx model input and output. We first load the ONNX model:

import onnx                  

m = onnx.load("simpler_keras_model.onnx")

As mentioned before, the model expects a tensor named input with shape (1, 3).

m.graph.input
[name: "input"
type {
  tensor_type {
    elem_type: 1
    shape {
      dim {
        dim_value: 1
      }
      dim {
        dim_value: 3
      }
    }
  }
}
]

The output will be a tensor named dense with shape (1,1).

m.graph.output
[name: "dense"
type {
  tensor_type {
    elem_type: 1
    shape {
      dim {
        dim_value: 1
      }
      dim {
        dim_value: 1
      }
    }
  }
}
]

Define the application package

This section will use the Vespa python API pyvespa to create an application package with a ranking function that uses the tensorflow model exported to ONNX.

The data used to train the model was derived from a Vespa application based on the MS Marco passage dataset. So, we are going to name the application msmarco, and start by adding two fields: id to hold the document id and text to hold the passages from the msmarco dataset.

indexing configuration: We add "summary" to the indexing parameter because we want to include both the id and the text field in the query results. The "attribute" indicates that the field id will be stored in-memory. The "index" indicates that Vespa will create a search index for the text field.

from vespa.package import ApplicationPackage, Field

app_package = ApplicationPackage(name="msmarco")

app_package.schema.add_fields(
    Field(name="id", type="string", indexing=["summary", "attribute"]),
    Field(name="text", type="string", indexing=["summary", "index"])
)

Note that at each step along the application package definition, we can inspect the content of the Vespa search definition file:

print(app_package.schema.schema_to_text)
schema msmarco {
    document msmarco {
        field id type string {
            indexing: summary | attribute
        }
        field text type string {
            indexing: summary | index
        }
    }
}

Add simpler_keras_model.onnx to the schema. * The model_name is an id that can be used in the ranking function to identify which model to use. * The model_file_path is the current path of the .onnx file. When deploying the application, pyvespa will move the file to the correct location inside the Vespa application package folder. * The inputs maps the name of the inputs contained in the ONNX model to the name of the Vespa source that will be used as input to the model. In this case we will create a function called vespa_input that output a tensor of type float with the expected shape (1, 3). * The outputs maps the output name in the ONNX file to the output name that will be recognized by Vespa.

from vespa.package import OnnxModel

app_package.schema.add_model(
    OnnxModel(
        model_name="ltr_tensorflow",
        model_file_path="simpler_keras_model.onnx",
        inputs={"input": "vespa_input"},
        outputs={"dense": "dense"},
    )
)

It is possible to see the addition of the onnx-model section in the search definition below. Note that the model file is expected to be under the files folder inside the final application package folder, but pyvespa takes care of the model file placement when deploying the application.

print(app_package.schema.schema_to_text)
schema msmarco {
    document msmarco {
        field id type string {
            indexing: summary | attribute
        }
        field text type string {
            indexing: summary | index
        }
    }
    onnx-model ltr_tensorflow {
        file: files/ltr_tensorflow.onnx
        input input:0: vespa_input
        output dense: dense
    }
}

Add a rank profile named tensorflow that uses the TensorFlow model to rank documents. * first_phase: We use the Vespa ranking feature onnx to access the ONNX model named ltr_tensorflow and use the output dense. We apply the sum because Vespa requires the relevance score to be a scaler and the output of the ONNX model in this case is a tensor of shape (1,1). * vespa_input function: The ONNX model was trained with the features fieldMatch(text).queryCompleteness, fieldMatch(text).significance and nativeRank(text) and expects and tensor of shape (1,3) containing those features. * summary_features: Summary features allow us to specify Vespa features to be included in the output of a query. In this case, we want to access to the model inputs and output to check if the Vespa model evaluation is the same as if we use the original TensorFlow model.

from vespa.package import RankProfile, Function

app_package.schema.add_rank_profile(
    RankProfile(
        name="tensorflow", 
        first_phase="sum(onnx(ltr_tensorflow).dense)", 
        functions=[
            Function(
                name="vespa_input", 
                expression="tensor<float>(x[1],y[3]):[["
                    "fieldMatch(text).queryCompleteness, "
                    "fieldMatch(text).significance, "
                    "nativeRank(text)"
                "]]"
            )
        ],
        summary_features=[
            "onnx(ltr_tensorflow)", 
            "fieldMatch(text).queryCompleteness", 
            "fieldMatch(text).significance", 
            "nativeRank(text)"
        ]
    )
)

The rank-profile called tensorflow can be seen below:

print(app_package.schema.schema_to_text)
schema msmarco {
    document msmarco {
        field id type string {
            indexing: summary | attribute
        }
        field text type string {
            indexing: summary | index
        }
    }
    onnx-model ltr_tensorflow {
        file: files/ltr_tensorflow.onnx
        input input:0: vespa_input
        output dense: dense
    }
    rank-profile tensorflow {
        function vespa_input() {
            expression {
                tensor<float>(x[1],y[3]):[[fieldMatch(text).queryCompleteness, fieldMatch(text).significance, nativeRank(text)]]
            }
        }
        first-phase {
            expression {
                sum(onnx(ltr_tensorflow).dense)
            }
        }
        summary-features {
            onnx(ltr_tensorflow)
            fieldMatch(text).queryCompleteness
            fieldMatch(text).significance
            nativeRank(text)
        }
    }
}

Now that we are done with the application package definition. We can deploy the application:

from vespa.deployment import VespaDocker

vespa_docker = VespaDocker()
app = vespa_docker.deploy(application_package=app_package)
Waiting for configuration server, 0/300 seconds...
Waiting for configuration server, 5/300 seconds...
Waiting for application status, 0/300 seconds...
Waiting for application status, 5/300 seconds...
Waiting for application status, 10/300 seconds...
Waiting for application status, 15/300 seconds...
Waiting for application status, 20/300 seconds...
Waiting for application status, 25/300 seconds...
Finished deployment.

Feed the application

Once the application is running, it is time to feed msmarco passage data to it.

from learntorank.passage import PassageData

dataset = PassageData.load()

We are going to use only 10 documents because our goal here is to show that Vespa returns the correct predictions from the TensorFlow model.

data = dataset.get_corpus().head(10)
data.rename(columns={'doc_id': 'id'}, inplace=True)
data.head()
id text
0 5954248 Why GameStop is excited for Dragon Age: Inquis...
1 7290700 metaplasia definition: 1. abnormal change of o...
2 5465518 Candice Net Worth. According to the report of ...
3 3100518 Under the Base Closure Act, March AFB was down...
4 3207764 There are a number of career opportunities for...

Feed the data to the application.

result = app.feed_df(df=data, include_id=True)
Successful documents fed: 10/10.
Batch progress: 1/1.

Validate Vespa predictions

Get query from the small dev set to use to validate Vespa TensorFlow predictions.

query_text = dataset.get_queries(type="dev").iloc[0,1]
query_text = query_text.replace("'", "")
query_text
'why say the sky is the limit'

The code below shows the YQL expression that will be used to select the documents to be ranked.

"select * from sources * where ({{grammar: 'any', defaultIndex: 'text'}}userInput('{}'))".format(query_text)
"select * from sources * where ({grammar: 'any', defaultIndex: 'text'}userInput('why say the sky is the limit'))"

The function get_vespa_prediction_and_features will match documents using the YQL expression above and rank the documents with the rank-profile tensorflow that we defined in the Vespa application package.

def get_vespa_prediction_and_features(query_text):
    # Send query and extract hits
    hits = app.query(
                body={
                    "yql": "select * from sources * where ({{'grammar': 'any', 'defaultIndex': 'text'}}userInput('{}'));".format(query_text),
                    "ranking": "tensorflow"
                }
            ).hits
    result =[]
    # For each hit, extract the inputs to the model along with model predictions computed by Vespa
    for hit in hits:
        result.append({
            "fieldMatch(text).queryCompleteness": hit["fields"]["summaryfeatures"]["fieldMatch(text).queryCompleteness"],
            "fieldMatch(text).significance": hit["fields"]["summaryfeatures"]["fieldMatch(text).significance"],
            "nativeRank(text)": hit["fields"]["summaryfeatures"]["nativeRank(text)"],
            "vespa_prediction": hit["relevance"],             
        })
    return pd.DataFrame.from_records(result)

Inputs and vespa predictions:

predictions = get_vespa_prediction_and_features(query_text=query_text)
predictions
fieldMatch(text).queryCompleteness fieldMatch(text).significance nativeRank(text) vespa_prediction
0 0.285714 0.199799 0.061853 0.360788
1 0.571429 0.415687 0.086940 -0.128510
2 0.428571 0.302071 0.065154 -0.240481
3 0.428571 0.302071 0.050600 -0.670632
4 0.428571 0.302071 0.049802 -0.694231
5 0.285714 0.199799 0.025552 -0.712175
6 0.428571 0.302071 0.045398 -0.824390

Compute predictions from the TensorFlow model simpler_model directly:

predictions["tf_prediction"] = predictions[
    ["fieldMatch(text).queryCompleteness", "fieldMatch(text).significance", "nativeRank(text)"]
].apply(lambda x: simpler_model.predict([x.tolist()])[0][0], axis=1)
1/1 [==============================] - 0s 71ms/step
1/1 [==============================] - 0s 28ms/step
1/1 [==============================] - 0s 29ms/step
1/1 [==============================] - 0s 29ms/step
1/1 [==============================] - 0s 28ms/step
1/1 [==============================] - 0s 26ms/step
1/1 [==============================] - 0s 26ms/step
predictions
fieldMatch(text).queryCompleteness fieldMatch(text).significance nativeRank(text) vespa_prediction tf_prediction
0 0.285714 0.199799 0.061853 0.360788 0.360788
1 0.571429 0.415687 0.086940 -0.128510 -0.128510
2 0.428571 0.302071 0.065154 -0.240481 -0.240481
3 0.428571 0.302071 0.050600 -0.670632 -0.670632
4 0.428571 0.302071 0.049802 -0.694231 -0.694231
5 0.285714 0.199799 0.025552 -0.712175 -0.712176
6 0.428571 0.302071 0.045398 -0.824390 -0.824390

Check that the predictions from the model deployed in Vespa are (almost) equal to the predictions obtained directly from the model.

from numpy.testing import assert_almost_equal

assert_almost_equal(predictions["vespa_prediction"].tolist(), predictions["tf_prediction"].tolist(), 5)

Clean environment

import shutil

shutil.rmtree("simpler_keras_model") 
vespa_docker.container.stop(timeout=600)
vespa_docker.container.remove()