Kubernetes和机器学习工作负载:硬核实践指南
Kubernetes和机器学习工作负载硬核实践指南 硬核开场各位技术老铁们今天咱们来聊聊Kubernetes和机器学习的那些事儿。别跟我说你还在本地跑模型训练那都2023年了现在玩机器学习容器化部署、分布式训练才是王道。Kubernetes作为容器编排的王者简直就是为机器学习工作负载量身定做的。今天susu就带你们一步步把ML workload跑在K8s上从单机到分布式从训练到推理全给你整明白 核心内容1. 为什么在Kubernetes上运行机器学习工作负载资源弹性根据训练需求自动扩缩容不用再为了一次性训练任务买高配机器环境一致性容器化确保训练和推理环境一致避免本地能跑线上崩了的尴尬多租户隔离团队共享集群资源按项目隔离资源利用最大化统一管理训练、推理、监控全在K8s里搞定运维成本直线下降2. 基础部署单机机器学习工作负载先从简单的开始咱们部署一个单机的TensorFlow训练任务。2.1 创建训练任务的Pod配置apiVersion: v1 kind: Pod metadata: name: tf-training labels: app: tf-training spec: containers: - name: tf-training image: tensorflow/tensorflow:2.10.0-gpu command: [python3] args: [-c, import tensorflow as tf; mnist tf.keras.datasets.mnist; (x_train, y_train), (x_test, y_test) mnist.load_data(); model tf.keras.models.Sequential([tf.keras.layers.Flatten(input_shape(28, 28)), tf.keras.layers.Dense(128, activationrelu), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10)]); model.compile(optimizeradam, losssparse_categorical_crossentropy, metrics[accuracy]); model.fit(x_train, y_train, epochs5); test_loss, test_acc model.evaluate(x_test, y_test, verbose2); print(\nTest accuracy:, test_acc)] resources: limits: nvidia.com/gpu: 1 volumeMounts: - name: data mountPath: /data volumes: - name: data emptyDir: {}2.2 部署并查看训练状态# 部署训练任务 kubectl apply -f tf-training.yaml # 查看训练日志 kubectl logs -f tf-training # 查看Pod状态 kubectl get pods tf-training3. 进阶分布式机器学习训练单机训练太慢安排咱们用Kubernetes部署分布式TensorFlow训练。3.1 创建分布式训练配置apiVersion: apps/v1 kind: Job metadata: name: tf-distributed-training spec: parallelism: 3 completions: 3 template: metadata: labels: app: tf-distributed spec: containers: - name: tf-worker image: tensorflow/tensorflow:2.10.0-gpu command: [bash, -c] args: - | if [ $POD_NAME tf-distributed-training-0 ]; then # 主节点 python3 -c import tensorflow as tf import os tf_config { cluster: { worker: [tf-distributed-training-0:2222, tf-distributed-training-1:2222, tf-distributed-training-2:2222] }, task: {type: worker, index: 0} } os.environ[TF_CONFIG] str(tf_config) strategy tf.distribute.MultiWorkerMirroredStrategy() with strategy.scope(): mnist tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) mnist.load_data() model tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape(28, 28)), tf.keras.layers.Dense(128, activationrelu), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) model.compile(optimizeradam, losssparse_categorical_crossentropy, metrics[accuracy]) model.fit(x_train, y_train, epochs5, batch_size64) print(Training completed!) else # 工作节点 python3 -c import tensorflow as tf import os import time worker_index int(os.environ[POD_NAME].split(-)[-1]) tf_config { cluster: { worker: [tf-distributed-training-0:2222, tf-distributed-training-1:2222, tf-distributed-training-2:2222] }, task: {type: worker, index: worker_index} } os.environ[TF_CONFIG] str(tf_config) strategy tf.distribute.MultiWorkerMirroredStrategy() with strategy.scope(): mnist tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) mnist.load_data() model tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape(28, 28)), tf.keras.layers.Dense(128, activationrelu), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10) ]) model.compile(optimizeradam, losssparse_categorical_crossentropy, metrics[accuracy]) model.fit(x_train, y_train, epochs5, batch_size64) print(fWorker {worker_index} completed!) fi env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name resources: limits: nvidia.com/gpu: 1 restartPolicy: OnFailure3.2 部署分布式训练# 部署分布式训练任务 kubectl apply -f tf-distributed-training.yaml # 查看所有训练Pod kubectl get pods -l apptf-distributed # 查看主节点日志 kubectl logs -f tf-distributed-training-04. 模型服务部署TensorFlow Serving训练完模型总得部署服务吧安排用TensorFlow Serving在Kubernetes上部署模型推理服务。4.1 准备模型先把训练好的模型保存到存储卷里这里咱们用PVC。apiVersion: v1 kind: PersistentVolumeClaim metadata: name: model-pvc spec: accessModes: - ReadWriteOnce resources: requests: storage: 10Gi4.2 部署TensorFlow ServingapiVersion: apps/v1 kind: Deployment metadata: name: tf-serving labels: app: tf-serving spec: replicas: 2 selector: matchLabels: app: tf-serving template: metadata: labels: app: tf-serving spec: containers: - name: tf-serving image: tensorflow/serving:2.10.0 ports: - containerPort: 8501 volumeMounts: - name: model-volume mountPath: /models args: - --model_namemnist - --model_base_path/models/mnist resources: requests: cpu: 1 memory: 2Gi limits: cpu: 2 memory: 4Gi volumes: - name: model-volume persistentVolumeClaim: claimName: model-pvc --- apiVersion: v1 kind: Service metadata: name: tf-serving-service spec: selector: app: tf-serving ports: - port: 8501 targetPort: 8501 type: LoadBalancer4.3 测试模型服务# 部署服务 kubectl apply -f tf-serving.yaml # 查看服务状态 kubectl get svc tf-serving-service # 测试推理 MODEL_SERVICE_IP$(kubectl get svc tf-serving-service -o jsonpath{.status.loadBalancer.ingress[0].ip}) curl -X POST http://$MODEL_SERVICE_IP:8501/v1/models/mnist:predict -d {instances: [[[0.0, 0.0, 0.0, ..., 0.0]]]}5. 高级玩法使用Kubeflow管理机器学习工作流Kubernetes原生的配置玩腻了试试Kubeflow专为机器学习打造的K8s扩展。5.1 安装Kubeflow# 安装Kubeflow export KF_NAMEkubeflow export BASE_DIR/home/user/kubeflow export KF_DIR${BASE_DIR}/${KF_NAME} mkdir -p ${KF_DIR} cd ${KF_DIR} # 使用kfctl安装 wget -O kfctl.tar.gz https://github.com/kubeflow/kfctl/releases/download/v1.2.0/kfctl_v1.2.0-0-gbc038f9_linux.tar.gz tar -xvf kfctl.tar.gz # 部署Kubeflow ./kfctl apply -f https://raw.githubusercontent.com/kubeflow/manifests/v1.2-branch/kfdef/kfctl_k8s_istio.v1.2.0.yaml # 查看部署状态 kubectl get pods -n kubeflow5.2 使用Kubeflow Pipeline创建机器学习工作流# pipeline.py import kfp from kfp import dsl dsl.pipeline( nameMNIST Training Pipeline, descriptionA pipeline to train MNIST model on Kubernetes ) def mnist_training_pipeline(): # 数据准备 data_prep dsl.ContainerOp( nameData Preparation, imagetensorflow/tensorflow:2.10.0, command[python3, -c], arguments[import tensorflow as tf; mnist tf.keras.datasets.mnist; (x_train, y_train), (x_test, y_test) mnist.load_data(); import numpy as np; np.savez(\/data/mnist.npz\, x_trainx_train, y_trainy_train, x_testx_test, y_testy_test)] ) # 模型训练 training dsl.ContainerOp( nameModel Training, imagetensorflow/tensorflow:2.10.0-gpu, command[python3, -c], arguments[import numpy as np; data np.load(\/data/mnist.npz\); x_train, y_train, x_test, y_test data[x_train], data[y_train], data[x_test], data[y_test]; import tensorflow as tf; model tf.keras.models.Sequential([tf.keras.layers.Flatten(input_shape(28, 28)), tf.keras.layers.Dense(128, activation\relu\), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10)]); model.compile(optimizer\adam\, loss\sparse_categorical_crossentropy\, metrics[\accuracy\]); model.fit(x_train, y_train, epochs5); model.save(\/model/mnist_model\)] ) training.after(data_prep) # 模型评估 evaluation dsl.ContainerOp( nameModel Evaluation, imagetensorflow/tensorflow:2.10.0, command[python3, -c], arguments[import numpy as np; data np.load(\/data/mnist.npz\); x_test, y_test data[x_test], data[y_test]; import tensorflow as tf; model tf.keras.models.load_model(\/model/mnist_model\); test_loss, test_acc model.evaluate(x_test, y_test, verbose2); print(f\Test accuracy: {test_acc}\)] ) evaluation.after(training) if __name__ __main__: kfp.compiler.Compiler().compile(mnist_training_pipeline, mnist_pipeline.yaml)️ 最佳实践资源管理为训练任务设置合理的资源请求和限制使用节点亲和性将训练任务调度到GPU节点利用Kubernetes的自动扩缩容功能根据负载调整资源存储策略训练数据使用ReadWriteMany的存储类模型文件使用持久化存储确保训练结果不丢失考虑使用对象存储如S3、GCS存储大规模训练数据监控与日志部署Prometheus和Grafana监控GPU使用率和训练指标使用ELK栈收集和分析训练日志设置训练任务的健康检查和告警机制安全与隔离为不同的机器学习项目创建独立的命名空间使用RBAC控制对集群资源的访问对敏感数据进行加密存储和传输CI/CD集成建立模型训练和部署的CI/CD流水线自动测试模型性能并生成评估报告实现模型版本管理和回滚机制 总结Kubernetes不仅是容器编排的利器更是机器学习工作负载的理想运行环境。通过本文的实践你应该已经掌握了在Kubernetes上部署单机和分布式机器学习训练任务使用TensorFlow Serving部署模型推理服务利用Kubeflow管理端到端的机器学习工作流实施资源管理、存储策略、监控和安全的最佳实践记住技术的核心在于实践。别光看动手试试把你的机器学习模型部署到Kubernetes上体验一下云原生时代的机器学习工作流。有问题随时留言susu会第一时间回复你susu碎碎念分布式训练时节点间网络带宽很重要建议使用高速网络GPU资源珍贵训练完成后记得清理资源模型服务要考虑并发性能适当调整副本数Kubeflow虽然强大但部署复杂小团队可以从基础K8s开始觉得有用点个赞再走咱们下期见