Introduction
The following is a walk-through of an example of how to Deploy a model from MLflow to KServe with Kubeflow Pipelines v2.
The code is available in the Cake shared drive at “shared/kubeflow-pipeline-examples/kfp-v2-api-sample/mlflow-to-kserve/mlflow_to_kserve.py”
Read further for a more detailed discussion of the code, or an overview of running the code as a pipeline.
Table of Contents
Let's look at our first component: mlflow_download_artifacts
.
This function downloads artifacts from MLflow. It takes the model name and stage as inputs and outputs the model.
Within the function, we set the MLflow tracking URI and construct the model URI. We then retrieve the run ID and download the artifacts to the specified path.
@dsl.component(base_image="python:3.8", packages_to_install=["mlflow==2.9.2"])
def mlflow_download_artifacts(model_name: str, stage: str, alias: str, model: Output[Model]):
"""Get the run ID for a model stage from MLflow and download artifacts
Args:
model_name (str): The name of the model
stage (str): The stage of the model to get the run ID for (e.g. "Staging" or "Production").
alias (str): The alias of the model to get the run ID. Note that MLFlow does not allow stages
and aliases to be used simultaneously.
model (Output[Model]): The output model. (This is a kfp "Output", so you don't have to specify it when
calling this method; it's created and provided by the KFP API)
"""
if (stage and alias):
raise Exception(f"Stages and Aliases cannot be used simultaenously")
import mlflow
mlflow.set_tracking_uri("<http://mlflow-server.mlflow.svc.cluster.local>")
model_url = None
if stage:
model_uri = f"models:/{model_name}/{stage}"
if alias:
model_uri = f"models:/{model_name}@{alias}"
run_id = mlflow.models.get_model_info(model_uri).run_id
mlflow.artifacts.download_artifacts(run_id=run_id, dst_path=model.path)