Federated model training¶
Introduction¶
Federated Learning is a machine learning technique where algorithms are trained across multiple distributed edge devices or servers, each having its own local data samples.
FATE (Federated AI Technology Enabler) is the world’s pioneering open-source framework for industrial-grade federated learning. It empowers businesses and institutions to collaborate on data while prioritizing the safety and privacy of the data involved. The FATE project leverages cutting-edge technologies such as Multi-Party Computation (MPC) and Homomorphic Encryption (HE) to build a robust and secure computing protocol, enabling a wide range of secure machine learning tasks, including logistic regression, tree-based algorithms, deep learning, and transfer learning, among others.
KubeFATE is a solution that allows running FATE in containerized environments. It offers the capability to deploy FATE clusters with just one click, while also providing features to monitor the status of running FATE clusters, view logs, and perform version upgrades.
FATE-Job is a task management tool specifically designed for FATE. It facilitates the submission and querying of FATE tasks through the use of the Kubernetes API.
FATE-Operator simplifies the deployment of FATE, KubeFATE and FATE-Job into Kubernetes clusters. It has been integrated into Kubeflow on vSphere, facilitating effortless utilization within the platform.
This tutorial offers a comprehensive, step-by-step guide for demonstrating the usage of FATE-Operator on a Kubeflow on vSphere cluster:
Cluster Deployment: Setting up the FATE cluster.
Federated Learning with FATE: Initial federated training on the deployed FATE cluster.
Prerequisites¶
Kubeflow on vSphere v1.6.1
Deployed an additional FATE cluster with the ID 10000 or set both collaborative parties to 9999 (refer to the comments in the code example).
Cluster Deployment¶
Deploy the FATE-Operator¶
Get source code
git clone https://github.com/kubeflow/fate-operator.git
cd fate-operator
Deploy the FATE-Operator CRDs
kustomize build config/crd | kubectl apply -f -
Deploy the FATE-Operator controller-manager
kustomize build config/default | kubectl apply -f -
Deploy the FATE cluster¶
1. Install KubeFATE¶
Configuring RBAC (Role-Based Access Control) Permissions for KubeFATE.
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: Namespace
metadata:
name: kube-fate
labels:
name: kube-fate
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: kubefate-admin
namespace: kube-fate
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: kubefate
namespace: fate-9999
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: kubefate-admin
namespace: kube-fate
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: kubefate
namespace: fate-10000
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: kubefate-admin
namespace: kube-fate
EOF
Set the key of KubeFATE
cat << EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
name: kubefate-secret
namespace: kube-fate
type: Opaque
stringData:
kubefateUsername: admin
kubefatePassword: admin
mariadbUsername: kubefate
mariadbPassword: kubefate
EOF
Deploy kubefate, here is the v1.3.0 version of kubefate
cat << EOF | kubectl apply -f -
apiVersion: app.kubefate.net/v1beta1
kind: Kubefate
metadata:
name: kubefate-sample
namespace: kube-fate
spec:
# kubefate image tag
image: federatedai/kubefate:v1.4.0
# ingress host
ingressDomain: kubefate.net
# serviceAccountName
serviceAccountName: kubefate-admin
# kubefate config
volumeSource:
hostPath:
path: /home/kubefate/mysql/db
type: DirectoryOrCreate
config:
- name: MYSQL_USER
valueFrom:
secretKeyRef:
name: kubefate-secret
key: mariadbUsername
- name: MYSQL_PASSWORD
valueFrom:
secretKeyRef:
name: kubefate-secret
key: mariadbPassword
- name: FATECLOUD_DB_USERNAME
valueFrom:
secretKeyRef:
name: kubefate-secret
key: mariadbUsername
- name: FATECLOUD_DB_PASSWORD
valueFrom:
secretKeyRef:
name: kubefate-secret
key: mariadbPassword
- name: FATECLOUD_REPO_NAME
value: "kubefate"
- name: FATECLOUD_REPO_URL
value: "https://federatedai.github.io/KubeFATE"
- name: FATECLOUD_USER_USERNAME
valueFrom:
secretKeyRef:
name: kubefate-secret
key: kubefateUsername
- name: FATECLOUD_USER_PASSWORD
valueFrom:
secretKeyRef:
name: kubefate-secret
key: kubefatePassword
- name: FATECLOUD_LOG_LEVEL
value: "debug"
- name: FATECLOUD_LOG_NOCOLOR
value: "true"
EOF
Check kubefate status
kubectl get Kubefate -n kube-fate
NAME INGRESSDOMAIN STATUS
kubefate-sample kubefate.net Running
2. Install FATE¶
To establish a FATE Cluster, we use FATE version 1.5.1. By removing comments in the YAML file, you can easily configure the parameters of the FATE Cluster, enabling seamless connections with other FATE Clusters. This interconnected network forms the foundation of federated learning, empowering collaborative learning across distributed nodes.
cat << EOF | kubectl apply -f -
apiVersion: app.kubefate.net/v1beta1
kind: FateCluster
metadata:
name: fatecluster-sample
namespace: fate-9999
spec:
kubefate:
name: kubefate-sample
namespace: kube-fate
clusterSpec:
name: fate-9999
namespace: fate-9999
chartName: fate
chartVersion: v1.5.1
partyId: 9999
registry: ""
imageTag: ""
pullPolicy: ""
imagePullSecrets:
- name: myregistrykey
persistence: false
istio:
enabled: false
modules:
- rollsite
- clustermanager
- nodemanager
- mysql
- python
- fateboard
- client
backend: eggroll
host:
fateboard: 9999.fateboard.kubefate.net
client: 9999.notebook.kubefate.net
# sparkUI: 9999.spark.kubefate.net
# rabbitmqUI: 9999.rabbitmq.kubefate.net
rollsite:
type: NodePort
nodePort: 30091
exchange:
ip: 192.168.0.1
port: 30000
partyList:
- partyId: 10000
partyIp: 192.168.0.1
partyPort: 30101
nodeSelector: {}
# lbrollsite:
# type: NodePort
# nodePort: 30091
# size: "2M"
# exchangeList:
# - id: 9991
# ip: 192.168.0.1
# port: 30910
# nodeSelector:
nodemanager:
count: 3
sessionProcessorsPerNode: 4
# storageClass: "nodemanagers"
# accessMode: ReadWriteOnce
# size: 2Gi
list:
- name: nodemanager
nodeSelector: {}
sessionProcessorsPerNode: 2
subPath: "nodemanager"
existingClaim: ""
storageClass: "nodemanager"
accessMode: ReadWriteOnce
size: 1Gi
python:
type: NodePort
httpNodePort: 30097
grpcNodePort: 30092
nodeSelector: {}
enabledNN: false
# spark:
# master: spark://spark-master:7077
# home:
# cores_per_node: 20
# nodes: 2
# hdfs:
# name_node: hdfs://namenode:9000
# path_prefix:
# rabbitmq:
# host: rabbitmq
# mng_port: 15672
# port: 5672
# user: fate
# password: fate
# # default conf/rabbitmq_route_table.yaml
# route_table:
# nginx:
# host: nginx
# http_port: 9300
# grpc_port: 9310
mysql:
nodeSelector: {}
ip: mysql
port: 3306
database: eggroll_meta
user: fate
password: fate_dev
subPath: ""
existingClaim: ""
storageClass: "mysql"
accessMode: ReadWriteOnce
size: 1Gi
# externalMysqlIp: mysql
# externalMysqlPort: 3306
# externalMysqlDatabase: eggroll_meta
# externalMysqlUser: fate
# externalMysqlPassword: fate_dev
servingIp: 192.168.9.1
servingPort: 30209
# spark:
# master:
# Image: "federatedai/spark-master"
# ImageTag: "1.5.0-release"
# replicas: 1
# cpu: "100m"
# memory: "512Mi"
# nodeSelector:
# type: ClusterIP
# worker:
# Image: "federatedai/spark-worker"
# ImageTag: "1.5.0-release"
# replicas: 2
# cpu: "1000m"
# memory: "512Mi"
# nodeSelector:
# type: ClusterIP
# hdfs:
# namenode:
# nodeSelector:
# type: ClusterIP
# datanode:
# nodeSelector:
# type: ClusterIP
# nginx:
# nodeSelector:
# type: ClusterIP
# httpNodePort: 30093
# grpcNodePort: 30098
# route_table:
# 10000:
# proxy:
# - host: 192.168.0.1
# http_port: 30103
# grpc_port: 30108
# fateflow:
# - host: 192.168.0.1
# http_port: 30107
# grpc_port: 30102
# rabbitmq:
# nodeSelector:
# type: ClusterIP
# nodePort: 30094
# default_user: fate
# default_pass: fate
# user: fate
# password: fate
# route_table:
# 10000:
# host: 192.168.0.1
# port: 30104
EOF
Check FATE cluster status
kubectl get fatecluster -n fate-9999
NAME PARTYID STATUS
fatecluster-sample 9999 Running
Federated Learning with FATE¶
There are two options for running FATE training tasks: either by submitting them using the “fate-job” command or by using the “fateclient” with FATE pipeline. Both approaches provide convenient ways to execute and manage your FATE training tasks.
fate-job¶
To start a FATE training task, you can execute the following commands. The specifics of the task, such as the pipeline and modules configuration, can be customized within the “pipeline” and “modulesConf” sections of the “fate_v1alpha1_fatejob.yaml” file.
kubectl apply -f https://raw.githubusercontent.com/kubeflow/fate-operator/master/config/samples/app_v1beta1_fatejob.yaml
fateclient¶
During the model experimentation phase, leveraging the fateclient offers a user-friendly approach to define and submit FATE tasks. This streamlined process provides convenience and ease-of-use when configuring and starting FATE jobs.
To obtain the Jupyter Notebook URL, you can use the following command which is already installed fateclient.
kubectl get ingress -n fate-9999
NAMESPACE NAME CLASS HOSTS ADDRESS PORTS AGE
fate-9999 fateboard <none> 9999.fateboard.kubefate.net 80 13m
fate-9999 notebook <none> 9999.notebook.kubefate.net 80 13m
Initiate the pipeline to establish connectivity with fateflow.
!pipeline init --ip fateflow --port 9380
Before proceeding, ensure that all participants have uploaded their respective data to FATE. Once this is done, follow the steps outlined in the notebook page:
The guest party should upload their data. You can use the provided sample file, “breast_homo_guest.csv,” and replace it with your own dataset.
import os
from pipeline.backend.pipeline import PipeLine
from pipeline.utils.tools import load_job_config
guest = 9999
data_base = "/data/projects/fate/"
# partition for data storage
partition = 4
# table name and namespace, used in FATE job configuration
dense_data = {"name": "breast_homo_guest", "namespace": f"experiment"}
pipeline_upload = PipeLine().set_initiator(role="guest", party_id=guest).set_roles(guest=guest)
# add upload data info
# path to csv file(s) to be uploaded
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_homo_guest.csv"),
table_name=dense_data["name"], # table name
namespace=dense_data["namespace"], # namespace
head=1, partition=partition, # data info
id_delimiter=",")
# upload both data
pipeline_upload.upload(drop=1)
The host party should upload their data. Use the provided example file, “breast_homo_host.csv,” and replace it with your own dataset.
import os
from pipeline.backend.pipeline import PipeLine
from pipeline.utils.tools import load_job_config
host = 10000 # Please change to 9999 if only one party is deployed
data_base = "/data/projects/fate/"
# partition for data storage
partition = 4
# table name and namespace, used in FATE job configuration
dense_data = {"name": "breast_homo_host", "namespace": f"experiment"}
pipeline_upload = PipeLine().set_initiator(role="host", party_id=host).set_roles(host=host)
# add upload data info
# path to csv file(s) to be uploaded
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_homo_host.csv"),
table_name=dense_data["name"], # table name
namespace=dense_data["namespace"], # namespace
head=1, partition=partition, # data info
id_delimiter=",")
# upload both data
pipeline_upload.upload(drop=1)
Use the FATE pipeline to create a federated training task specifically for homo-lr. This will enable you to perform federated learning using the homomorphic logistic regression (homo-lr) algorithm.
import argparse
import json
from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataTransform
from pipeline.component import Evaluation
from pipeline.component import HomoLR
from pipeline.component import Reader
from pipeline.component import FeatureScale
from pipeline.interface import Data
from pipeline.utils.tools import load_job_config
# obtain config
guest = 9999
host = 10000 # Please change to 9999 if only one party is deployed
arbiter = 9999
guest_train_data = {"name": "breast_homo_guest", "namespace": f"experiment"}
host_train_data = {"name": "breast_homo_host", "namespace": f"experiment"}
# initialize pipeline
pipeline = PipeLine()
# set job initiator
pipeline.set_initiator(role='guest', party_id=guest)
# set participants information
pipeline.set_roles(guest=guest, host=host, arbiter=arbiter)
# define Reader components to read in data
reader_0 = Reader(name="reader_0")
# configure Reader for guest
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)
# configure Reader for host
reader_0.get_party_instance(role='host', party_id=host).component_param(table=host_train_data)
# define DataTransform components
data_transform_0 = DataTransform(
name="data_transform_0",
with_label=True,
output_format="dense") # start component numbering at 0
scale_0 = FeatureScale(name='scale_0')
param = {
"penalty": "L2",
"optimizer": "sgd",
"tol": 1e-05,
"alpha": 0.01,
"max_iter": 30,
"early_stop": "diff",
"batch_size": -1,
"learning_rate": 0.15,
"decay": 1,
"decay_sqrt": True,
"init_param": {
"init_method": "zeros"
},
"cv_param": {
"n_splits": 4,
"shuffle": True,
"random_seed": 33,
"need_cv": False
}
}
homo_lr_0 = HomoLR(name='homo_lr_0', **param)
# add components to pipeline, in order of task execution
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
# set data input sources of intersection components
pipeline.add_component(scale_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(homo_lr_0, data=Data(train_data=scale_0.output.data))
evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")
evaluation_0.get_party_instance(role='host', party_id=host).component_param(need_run=False)
pipeline.add_component(evaluation_0, data=Data(data=homo_lr_0.output.data))
# compile pipeline once finished adding modules, this step will form conf and dsl files for running job
pipeline.compile()
# fit model
pipeline.fit()
deploy_components = [data_transform_0, scale_0, homo_lr_0]
pipeline.deploy_component(components=deploy_components)
#
predict_pipeline = PipeLine()
# # add data reader onto predict pipeline
predict_pipeline.add_component(reader_0)
# # add selected components from train pipeline onto predict pipeline
# # specify data source
predict_pipeline.add_component(
pipeline, data=Data(
predict_input={
pipeline.data_transform_0.input.data: reader_0.output.data}))
predict_pipeline.compile()
predict_pipeline.predict()
dsl_json = predict_pipeline.get_predict_dsl()
conf_json = predict_pipeline.get_predict_conf()
# import json
json.dump(dsl_json, open('./homo-lr-normal-predict-dsl.json', 'w'), indent=4)
json.dump(conf_json, open('./homo-lr-normal-predict-conf.json', 'w'), indent=4)
# query component summary
print(json.dumps(pipeline.get_component("homo_lr_0").get_summary(), indent=4, ensure_ascii=False))
print(json.dumps(pipeline.get_component("evaluation_0").get_summary(), indent=4, ensure_ascii=False))
Upon successful completion of the task, you will be able to examine the outcomes of the federated training process.