08-MLOps与工程落地——模型注册表与模型服务
模型注册表与模型服务MLflow Model Registry、Seldon Core一、模型注册表概述1.1 什么是模型注册表importmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings(ignore)print(*60)print(模型注册表集中管理模型版本)print(*60)# 模型注册表架构图fig,axplt.subplots(figsize(12,8))ax.axis(off)# 组件components{训练流水线:(0.2,0.7),模型注册表:(0.5,0.7),模型服务:(0.8,0.7),版本管理:(0.2,0.4),阶段转换:(0.5,0.4),监控告警:(0.8,0.4),}forname,(x,y)incomponents.items():circleplt.Circle((x,y),0.08,colorlightblue,ecblack)ax.add_patch(circle)ax.text(x,y,name,hacenter,vacenter,fontsize8)# 连接ax.annotate(,xy(0.4,0.7),xytext(0.28,0.7),arrowpropsdict(arrowstyle-,lw2))ax.annotate(,xy(0.72,0.7),xytext(0.58,0.7),arrowpropsdict(arrowstyle-,lw2))ax.annotate(,xy(0.5,0.62),xytext(0.5,0.48),arrowpropsdict(arrowstyle-,lw1))ax.annotate(,xy(0.8,0.62),xytext(0.8,0.48),arrowpropsdict(arrowstyle-,lw1))ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title(模型注册表架构,fontsize14)plt.tight_layout()plt.show()print(\n 模型注册表核心功能:)print( - 模型版本管理)print( - 阶段转换Staging/Production/Archived)print( - 模型元数据存储)print( - 模型血缘追踪)print( - 部署集成)二、MLflow Model Registry2.1 模型注册defmlflow_registry():MLflow模型注册print(\n*60)print(MLflow Model Registry)print(*60)code import mlflow import mlflow.sklearn from mlflow.tracking import MlflowClient from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split # 1. 设置MLflow mlflow.set_tracking_uri(http://localhost:5000) mlflow.set_experiment(model_registry_demo) # 2. 训练和注册模型 with mlflow.start_run(run_namemodel_v1) as run: # 训练模型 X, y make_classification(n_samples1000, n_features20, random_state42) X_train, X_test, y_train, y_test train_test_split(X, y, test_size0.2) model RandomForestClassifier(n_estimators100, max_depth10) model.fit(X_train, y_train) # 记录参数和指标 mlflow.log_param(n_estimators, 100) mlflow.log_param(max_depth, 10) mlflow.log_metric(accuracy, model.score(X_test, y_test)) # 注册模型 mlflow.sklearn.log_model( model, model, registered_model_nameproduction_model ) run_id run.info.run_id # 3. 使用Client管理模型 client MlflowClient() # 创建注册模型 client.create_registered_model(production_model) # 添加模型版本 model_uri fruns:/{run_id}/model client.create_model_version( nameproduction_model, sourcemodel_uri, run_idrun_id, descriptionRandom Forest model v1 ) # 4. 模型版本转换 # 转换到Staging client.transition_model_version_stage( nameproduction_model, version1, stageStaging ) # 转换到Production client.transition_model_version_stage( nameproduction_model, version1, stageProduction ) # 5. 更新模型描述 client.update_model_version( nameproduction_model, version1, descriptionImproved with feature engineering ) # 6. 添加模型标签 client.set_model_version_tag( nameproduction_model, version1, keyaccuracy, value0.95 ) # 7. 获取模型版本信息 model_version client.get_model_version(production_model, 1) print(fVersion: {model_version.version}) print(fStage: {model_version.stage}) print(fDescription: {model_version.description}) # 8. 列出所有版本 versions client.search_model_versions(nameproduction_model) for v in versions: print(fVersion {v.version}: Stage {v.stage}) # 9. 加载生产模型 model mlflow.pyfunc.load_model(models:/production_model/Production) print(code)mlflow_registry()2.2 模型版本管理defmodel_version_management():模型版本管理print(\n*60)print(模型版本管理)print(*60)code from mlflow.tracking import MlflowClient import mlflow client MlflowClient() # 1. 注册多个版本 versions [ {n_estimators: 50, max_depth: 5}, {n_estimators: 100, max_depth: 10}, {n_estimators: 150, max_depth: 15}, {n_estimators: 200, max_depth: 20} ] for i, params in enumerate(versions, 1): with mlflow.start_run(run_namefmodel_v{i}): # 训练模型 model RandomForestClassifier(**params) model.fit(X_train, y_train) # 记录参数 mlflow.log_params(params) mlflow.log_metric(accuracy, model.score(X_test, y_test)) # 注册模型 mlflow.sklearn.log_model( model, model, registered_model_nameproduction_model ) # 2. 比较模型版本 def compare_models(model_name): versions client.search_model_versions(fname{model_name}) results [] for v in versions: # 获取运行ID run_id v.run_id run mlflow.get_run(run_id) results.append({ version: v.version, stage: v.stage, accuracy: run.data.metrics.get(accuracy), params: run.data.params }) # 按准确率排序 results.sort(keylambda x: x[accuracy], reverseTrue) for r in results: print(fVersion {r[version]} ({r[stage]}): accuracy{r[accuracy]:.4f}) return results # 3. 设置模型别名 client.set_registered_model_alias(production_model, champion, 3) client.set_registered_model_alias(production_model, challenger, 4) # 使用别名加载 champion_model mlflow.pyfunc.load_model(models:/production_modelchampion) challenger_model mlflow.pyfunc.load_model(models:/production_modelchallenger) # 4. A/B测试 def ab_test(champion_model, challenger_model, test_data, ratio0.1): A/B测试 import random results {champion: 0, challenger: 0} for x in test_data: if random.random() ratio: # 使用挑战者模型 pred challenger_model.predict(x) results[challenger] 1 else: # 使用冠军模型 pred champion_model.predict(x) results[champion] 1 return results # 5. 模型回滚 def rollback_model(model_name, target_version): 回滚到指定版本 # 获取当前生产版本 current client.get_latest_versions(model_name, stages[Production])[0] # 将当前版本归档 client.transition_model_version_stage( namemodel_name, versioncurrent.version, stageArchived ) # 将目标版本提升为生产 client.transition_model_version_stage( namemodel_name, versiontarget_version, stageProduction ) print(fRolled back from version {current.version} to {target_version}) print(code)model_version_management()三、Seldon Core模型服务3.1 Seldon Core部署defseldon_core():Seldon Core部署print(\n*60)print(Seldon Core模型服务)print(*60)code # 1. Seldon Deployment定义 apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: sklearn-model namespace: production spec: name: sklearn-deployment predictors: - name: default graph: name: classifier type: MODEL modelUri: gs://seldon-models/sklearn/iris implementation: SKLEARN_SERVER replicas: 2 traffic: 100 # 2. 多模型部署金丝雀 apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: canary-deployment spec: predictors: - name: stable graph: name: stable-model modelUri: gs://models/stable implementation: SKLEARN_SERVER replicas: 2 traffic: 90 - name: canary graph: name: canary-model modelUri: gs://models/canary implementation: SKLEARN_SERVER replicas: 1 traffic: 10 # 3. 自定义模型服务 apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: custom-model spec: predictors: - name: custom graph: name: custom-model type: MODEL implementation: CUSTOM modelUri: gs://models/custom envSecretRefName: model-secrets serviceAccountName: seldon-service-account resources: requests: memory: 2Gi cpu: 1 limits: memory: 4Gi cpu: 2 replicas: 2 # 4. 推理图Pipeline apiVersion: machinelearning.seldon.io/v1 kind: SeldonDeployment metadata: name: inference-graph spec: predictors: - name: pipeline graph: name: transformer type: TRANSFORMER children: - name: model1 type: MODEL children: - name: model2 type: MODEL endpoint: type: REST print(code)seldon_core()3.2 模型服务集成defseldon_integration():Seldon Core集成print(\n*60)print(Seldon Core集成)print(*60)code # 1. Python自定义模型服务 import numpy as np import joblib from seldon_core.user_model import SeldonComponent class CustomModel(SeldonComponent): def __init__(self, model_path): self.model joblib.load(model_path) def predict(self, X, features_namesNone, **kwargs): 预测接口 predictions self.model.predict(X) return predictions def predict_proba(self, X, features_namesNone, **kwargs): 概率预测接口 probabilities self.model.predict_proba(X) return probabilities # 2. Dockerfile FROM python:3.9-slim RUN pip install seldon-core scikit-learn joblib COPY model.pkl /model.pkl COPY model_server.py /model_server.py CMD [python, /model_server.py] # 3. 服务请求示例 import requests import json # REST请求 def predict_rest(features): url http://seldon-service.production.svc.cluster.local/v1/models/default/predict payload { data: { ndarray: [features] } } response requests.post(url, jsonpayload) return response.json() # gRPC请求 import grpc from seldon_core.proto import prediction_pb2, prediction_pb2_grpc def predict_grpc(features): channel grpc.insecure_channel(seldon-service.production:8000) stub prediction_pb2_grpc.SeldonStub(channel) request prediction_pb2.SeldonMessage( dataprediction_pb2.DefaultData( ndarrayprediction_pb2.NDArray(valuesfeatures) ) ) response stub.Predict(request) return response # 4. 性能测试 def benchmark_seldon(): import time features [[1, 2, 3, 4]] n_requests 100 start time.time() for _ in range(n_requests): predict_rest(features) elapsed time.time() - start print(fAverage latency: {elapsed/n_requests*1000:.2f}ms) print(fThroughput: {n_requests/elapsed:.2f} req/s) print(code)seldon_integration()四、端到端工作流4.1 完整MLOps流程defend_to_end_workflow():端到端MLOps工作流print(\n*60)print(端到端MLOps工作流)print(*60)code import mlflow from mlflow.tracking import MlflowClient import requests import json class MLOpsPipeline: def __init__(self, model_name, tracking_urihttp://mlflow:5000): self.model_name model_name mlflow.set_tracking_uri(tracking_uri) self.client MlflowClient() def train_and_register(self, training_func, version, params): 训练并注册模型 with mlflow.start_run(run_nameftrain_v{version}): # 训练模型 model training_func(**params) # 记录参数和指标 mlflow.log_params(params) accuracy evaluate_model(model) mlflow.log_metric(accuracy, accuracy) # 注册模型 mlflow.sklearn.log_model( model, model, registered_model_nameself.model_name ) # 获取run_id run_id mlflow.active_run().info.run_id # 添加标签 self.client.set_model_version_tag( nameself.model_name, versionversion, keyaccuracy, valuestr(accuracy) ) return run_id def promote_to_staging(self, version): 提升到Staging环境 self.client.transition_model_version_stage( nameself.model_name, versionversion, stageStaging ) # 部署到Staging self._deploy_to_environment(staging, version) def promote_to_production(self, version): 提升到Production环境 # 获取当前生产版本 current self.client.get_latest_versions(self.model_name, stages[Production]) # 验证新模型性能 if not self._validate_model(version): raise Exception(Model validation failed) # 执行金丝雀部署 self._canary_deploy(version, current[0].version if current else None) # 转换阶段 self.client.transition_model_version_stage( nameself.model_name, versionversion, stageProduction ) if current: self.client.transition_model_version_stage( nameself.model_name, versioncurrent[0].version, stageArchived ) def _deploy_to_environment(self, env, version): 部署到指定环境 model_uri fmodels:/{self.model_name}/{version} # 创建Seldon部署 deployment { apiVersion: machinelearning.seldon.io/v1, kind: SeldonDeployment, metadata: { name: f{self.model_name}-{env}, namespace: env }, spec: { predictors: [{ name: default, graph: { name: classifier, implementation: SKLEARN_SERVER, modelUri: model_uri }, replicas: 2 }] } } # 应用部署 # kubectl.apply(deployment) def _validate_model(self, version): 验证模型性能 model mlflow.pyfunc.load_model(fmodels:/{self.model_name}/{version}) # 在验证集上测试 accuracy test_model(model) return accuracy 0.85 def _canary_deploy(self, new_version, old_version): 金丝雀部署 # 创建金丝雀部署配置 pass # 使用示例 pipeline MLOpsPipeline(production_model) # 训练新版本 pipeline.train_and_register(train_func, version5, params{n_estimators: 150}) # 提升到Staging pipeline.promote_to_staging(5) # 测试验证后提升到Production pipeline.promote_to_production(5) print(code)end_to_end_workflow()五、总结组件功能工具模型注册版本管理MLflow Registry阶段管理Staging/ProductionMLflow模型服务推理部署Seldon Core金丝雀灰度发布Seldon Core监控性能监控Prometheus最佳实践使用MLflow Registry管理模型版本实施阶段转换流程使用Seldon Core进行生产部署实施金丝雀发布策略集成监控和告警