Kubeflow Pipelines with MLFlow and Seldon Core¶
Introduction¶
In this section we show you how to build your advanced machine learning operations (MLOps) pipelines using Kubeflow Pipelines (KFP), MLFlow and Seldon Core and get them to work seamlessly together to deliver your machine learning (ML) model to production in a scalable, efficient manner.
You use a wine dataset in the example. You predict the wine’s quality based on a chemical analysis. The higher the quality score is, the better the wine tastes.
Get started¶
Set up your environment¶
Grab the code samples and execute them together in order to get the most out of the example. You need a working Kubeflow on vSphere deployment with MLFLow up and running.
Going through the pipeline, each step shows you something new.
Add privilege to access MinIO for MLFlow¶
Check if pod-defaults is in the user’s namespace.
kubectl get poddefault -n admin
If there is no pod-defaults in the user’s namespace, run the following command to create pod-defaults in the user’s namespace to access MinIO.
USER_NAMESPACE=admin
#MLflow
cat <<EOF | kubectl create -n $USER_NAMESPACE -f -
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: mlflow-server-minio
spec:
desc: Allow access to MLFlow
env:
- name: MLFLOW_S3_ENDPOINT_URL
value: http://minio.kubeflow:9000
- name: MLFLOW_TRACKING_URI
value: http://mlflow-server.kubeflow.svc.cluster.local:5000
selector:
matchLabels:
mlflow-server-minio: "true"
EOF
cat <<EOF | kubectl create -n $USER_NAMESPACE -f -
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: access-minio
spec:
desc: Allow access to Minio
selector:
matchLabels:
access-minio: "true"
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: mlpipeline-minio-artifact
key: accesskey # fill minio access key about yourself env.
optional: false
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: mlpipeline-minio-artifact
key: secretkey # fill minio access secret key about yourself env.
optional: false
- name: MINIO_ENDPOINT_URL
value: http://minio.kubeflow:9000 # update yourself env's minio url.
EOF
kubectl get secret mlflow-server-seldon-init-container-s3-credentials --namespace=kubeflow -o yaml \
| sed "s/namespace: kubeflow/namespace: $USER_NAMESPACE/" \
| sed 's/name: mlflow-server-seldon-init-container-s3-credentials/name: seldon-init-container-secret/g' \
| kubectl apply -n $USER_NAMESPACE-f -
Download data¶
The first step is to download the source data and put it in object storage.
Whenever you need to add a step to the pipeline, first check if it already exists in the Kubeflow Pipeline components registry.
This way adding a new step to the pipeline is simple – you can either load it from the URL or download and upload it from a local file.
web_downloader_op = kfp.components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')
Just like that, you developed your first step!
Preprocess our ML source data¶
For the preprocessing step you need a different approach. Each data preprocessing step is different, so it’s unlikely to find what you need in the KFP components registry. During the experiment phase, preprocessing is usually done in a Jupyter notebook. So wrap the code into a Python function so that you convert it into a component. It’s important to notice that pandas import
is inside the Python function because the package needs to be imported inside the Docker container that eventually runns the step.
from kfp.components import InputPath, OutputPath
def preprocess(file_path: InputPath('CSV'), output_file: OutputPath('parquet')):
import pandas as pd
df = pd.read_csv(file_path, header=0, sep=";")
df.columns = [c.lower().replace(' ', '_') for c in df.columns]
df.to_parquet(output_file)
You have a function write tests for it to make sure it works correctly. Now you wrap it into the container so the Kubernetes platform underneath Kubeflow on vSphere knows how to invoke your code. You use the Docker image for Python 3.9 and install additional Python packages using Python’s pip
package installer.
preprocess_op = kfp.components.create_component_from_func(
func=preprocess,
base_image='python:3.9',
packages_to_install=['pandas', 'pyarrow'])
This method allows you to quickly build a pipeline in a way that does not require additional resources like template files. It also works nicely to build the pipeline using a notebook. However installing Python packages each time the step is executed is not ideal, especially if the pipeline is invoked frequently.
Train your ML prediction model¶
This preprocessing step is created using a function-based component too. The difference in this step is that you need to make calls to MLFlow and MinIO – and these calls require setting environment variables. How to securely handle setting up the environment variables is discussed later. Additionally, you change the training code, so that all of the information about the experiment is saved in MLFLow and the ML model artifacts this step generates is stored in MinIO.
from kfp.components import InputPath
def train(file_path: InputPath('parquet')) -> str:
import pandas as pd
from sklearn.model_selection import train_test_split
import mlflow
from sklearn.linear_model import ElasticNet
df = pd.read_parquet(file_path)
target_column = 'quality'
train_x, test_x, train_y, test_y = train_test_split(
df.drop(columns=[target_column]),
df[target_column])
with mlflow.start_run(run_name='wine_models'):
lr = ElasticNet(alpha=0.5, l1_ratio=0.5, random_state=42)
lr.fit(train_x, train_y)
result = mlflow.sklearn.log_model(lr, "model")
return f"{mlflow.get_artifact_uri()}/{result.artifact_path}"
The value returned from the step is the model URI – the path to the model file in MinIO. But if you need to return more than a single value, you can use a NamedTuple. For more details, take a look at Building Python function-based components.
training_op = kfp.components.create_component_from_func(
func=train,
base_image='python:3.9',
packages_to_install=['pandas', 'pyarrow', 'sklearn', 'mlflow', 'boto3'])
The training container is created based on the same Python 3.9 Docker image as used in the previous step. When creating steps from functions, it’s best to try and use the same image whenever possible, in order to take full advantage of the Kubernetes caching mechanism for Docker images.
Deploy your ML model¶
You deploy the inference server that hosts your ML model using a Docker container-based, microservices approach. The code for this step is not in the experiment notebook. You use Seldon Core for deployment together with MLFLow Server so that you take full advantage of features like monitoring your deployment without building a dedicated Docker image. The model artifacts are downloaded by the Seldon Core deployment from your MinIO object storage. In this step, you use kubectl to apply your SeldonDeployment
configuration. The URI containing the path to the ML model is provided to the training step externally.
The deployment step is split into a few parts:
create the command-line application
wrap it in the Docker image and publish it
create the component configuration
create the step from the component configuration file
Create the command-line application¶
Firstly, create a command-line application, which calls kubectl
with a file generated from a Jinja template as a parameter:
import argparse
import subprocess
from jinja2 import Environment, FileSystemLoader
def deploy(model_uri: str):
with open("/tmp/manifest.yaml", "w") as f:
env = Environment(loader=FileSystemLoader('./templates'),
trim_blocks=True, lstrip_blocks=True)
template = env.get_template('deploy-manifest.j2')
f.write(template.render(model_uri=model_uri))
result = subprocess.call(['kubectl', 'apply', '-f', '/tmp/manifest.yaml', '-n', 'admin'])
assert result == 0
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Data processor')
parser.add_argument('--model_uri', help='Model URI')
args = parser.parse_args()
deploy(args.model_uri)
Build and push the Docker image¶
Next, use Docker to build and push an image to the Docker image registry. The Dockerfile, and the build script are as below:
# building script
VERSION=<version>
REPO=<repository>
docker build . -t $REPO:$VERSION
docker push $REPO:$VERSION
docker inspect --format="{{index .RepoDigests 0}}" "$REPO:$VERSION"
Create a component configuration file¶
Thirdly, create a Kubeflow on vSphere pipeline step configuration file using the output from docker inspect
. This configuration file is crucial to share your Kubeflow pipeline step with others.
name: Deploy model
description: deploy model using seldon core
inputs:
- { name: model_uri, type: String, description: 'model URI' }
implementation:
container:
image: <image-name>
command: [
python3,
src/deploy.py,
--model_uri,
{ inputValue: model_uri }
]
Load your component¶
Finally, load the components in a similar way to the Download data step. You use the configuration file created in the third step to specify which Docker image to use, how it is invoked and what the input and output parameters are.
deploy_op = kfp.components.load_component_from_file(
os.path.join('components', 'deploy', 'component.yaml'))
The biggest advantage of this component creation method is extensibility. You may use any language to create the command-line application implementation as you want. You use Python and Jinja (a free templating engine) in order to keep the code clean. You have your manifest code in the template file, which is not possible with a function-based approach. Additionally, you do not need to install Python packages every time you execute the step. That means faster executions.
Feel free to use the Docker image set up and pushed into Docker image repository.
Put the MLOps pipeline together¶
You’ve defined all the components – now let’s create a pipeline from them. You need to put them in proper order, define inputs and outputs and add appropriate configuration values.
@dsl.pipeline(
name="e2e_wine_pipeline",
description="WINE pipeline")
def wine_pipeline(url):
web_downloader_task = web_downloader_op(url=url)
preprocess_task = preprocess_op(file=web_downloader_task.outputs['data'])
train_task = (training_op(file=preprocess_task.outputs['output'])
.add_env_variable(V1EnvVar(name='MLFLOW_TRACKING_URI', value='http://mlflow-server.kubeflow.svc.cluster.local:5000'))
.add_env_variable(V1EnvVar(name='MLFLOW_S3_ENDPOINT_URL', value='http://minio.kubeflow.svc.cluster.local:9000'))
.add_env_variable(V1EnvVar (name='accesskey', value='AWS_ACCESS_KEY_ID'))
.add_env_variable(V1EnvVar (name='secretkey', value='AWS_SECRET_ACCESS_KEY'))
)
deploy_task = deploy_op(model_uri=train_task.output)
You don’t need to specify the order of the steps explicitly. When you set input-output dependencies, the steps will order themselves. Convenient, right?!
When looking at the training step, it differs from the others. It requires additional configuration. You need to add some sensitive data using Kubernetes secrets and the rest using environment variables. Kubeflow Pipelines supports multiple ways to add secrets to the pipeline steps and for more information.
Now, the coding part is completed. All that’s left is to see the results of your pipeline. Run pipeline.py
to generate wine-pipeline.yaml
in the generated folder. You then open the Kubeflow on vSphere Dashboard with your browser, create a new pipeline with your YAML file and – the moment of truth – run the pipeline.
Check the Inference endpoint¶
To be 100% sure it works – check if the inference endpoint is responding correctly. First, go to the Kubernetes cluster and port-forward or expose the newly created service.
# check pod's status
$ kubectl get po -n admin
NAME READY STATUS RESTARTS AGE
ml-pipeline-ui-artifact-5cfb68f5b7-97kjc 2/2 Running 4 (47h ago) 2d
ml-pipeline-visualizationserver-665bb6b8fc-f5nkm 2/2 Running 4 (47h ago) 2d
e2e-wine-pipeline-nk6qh-1447540704 0/2 Completed 0 22h
e2e-wine-pipeline-nk6qh-2458232327 0/2 Completed 0 22h
e2e-wine-pipeline-nk6qh-2359496741 0/2 Completed 0 22h
e2e-wine-pipeline-nk6qh-105037618 0/2 Completed 0 22h
mlflow-wine-super-model-0-classifier-5c79775bb6-bv9dn 3/3 Running 0 22h
# check service's status
$ kubectl get svc -n admin
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
ml-pipeline-visualizationserver ClusterIP 10.152.183.97 <none> 8888/TCP 2d
ml-pipeline-ui-artifact ClusterIP 10.152.183.103 <none> 80/TCP 2d
mlflow-wine-super-model-classifier ClusterIP 10.152.183.245 <none> 9000/TCP,9500/TCP 22h
mlflow-wine-super-model ClusterIP 10.152.183.236 <none> 8000/TCP,5001/TCP 22h
# port-forward or expose the newly created service to localhost
$ kubectl port-forward service/mlflow-wine-super-model -n admin 8000:8000
Forwarding from 127.0.0.1:8000 -> 8000
Forwarding from [::1]:8000 -> 8000
Then, use curl
in another terminal to see if the endpoint is responding correctly.
curl -X POST http://127.0.0.1:8000/api/v1.0/predictions -H 'Content-Type: application/json' -d '{"data":{"ndarray":[[5.6, 0.31, 0.37, 1.4, 0.074, 12.0, 96.0, 0.9954, 3.32, 0.58, 9.2]]}}'
Seldon Core supports batch inference out-of-the-box and its performance is much better than calling the endpoint in a loop.
Troubleshooting¶
Can not resolve hostname for download data URL¶
Sometimes hostname is combined with a domain name, which cannot be resolved. The domain name information on this machine can be viewed in this file /etc/resolv.conf
.
To solved this, just add . after domain name to prevent CoreDNS from using fully qualified domain names (FQDN) as hostnames, such as raw.githubusercontent.com.
ML Model file is not compatible with seldonio/mlflowserver:1.14.0-dev
¶
ML Model file saved with mlflow=2
in mlflowserver
is not compatible with seldonio/mlflowserver:1.14.0-dev
. The error shows
conda_env_create.py TypeError: join()
argument must be str
or bytes
, not dict
. And this issue has been fixed. But there is no update to conda_env_create.py
code for seldonio/mlflowserver:1.14.0-dev
Docker image.
Choose one of the following solutions to solve this error.
Solution 1: Update the latest conda_env_create.py into
seldonio/mlflowserver:1.14.0-dev
Docker image and commit the new Docker image to use.Solution 2: Modify
MLModel
file# MLModel saved with mlflow=1 artifact_path: model flavors: python_function: env: conda.yaml loader_module: mlflow.sklearn model_path: model.pkl predict_fn: predict python_version: 3.9.16 sklearn: code: null pickled_model: model.pkl serialization_format: cloudpickle sklearn_version: 1.2.1 mlflow_version: 2.1.1 model_uuid: 9971f5db741348cda16bfb3fc4cfff18 run_id: 4a02ebc811b84e1194b452b38c2d96d8 utc_time_created: '2023-02-01 08:13:32.310337'