08-MLOps与工程落地——特征存储:Hopsworks
特征存储Hopsworks特征管理、数据版本、特征监控一、Hopsworks概述1.1 什么是Hopsworksimportmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings(ignore)print(*60)print(Hopsworks企业级特征存储平台)print(*60)# Hopsworks架构图fig,axplt.subplots(figsize(14,8))ax.axis(off)# 组件components{特征存储:(0.2,0.7),模型训练:(0.5,0.7),模型服务:(0.8,0.7),特征工程:(0.2,0.4),数据版本:(0.5,0.4),监控告警:(0.8,0.4),}forname,(x,y)incomponents.items():circleplt.Circle((x,y),0.08,colorlightblue,ecblack)ax.add_patch(circle)ax.text(x,y,name,hacenter,vacenter,fontsize7)# 连接ax.annotate(,xy(0.5,0.62),xytext(0.28,0.7),arrowpropsdict(arrowstyle-,lw1))ax.annotate(,xy(0.72,0.7),xytext(0.58,0.7),arrowpropsdict(arrowstyle-,lw1))ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title(Hopsworks架构,fontsize14)plt.tight_layout()plt.show()print(\n Hopsworks核心功能:)print( - 特征存储在线离线)print( - 特征版本管理)print( - 特征血缘追踪)print( - 特征监控)print( - 与Spark/Kafka集成)二、Hopsworks安装2.1 安装配置defhopsworks_setup():Hopsworks安装print(\n*60)print(Hopsworks安装)print(*60)code # 1. 使用Docker Compose安装 git clone https://github.com/logicalclocks/hopsworks.git cd hopsworks docker-compose up -d # 2. 访问Web UI # http://localhost:8080 # 默认账号: adminhopsworks.ai / admin # 3. Python客户端安装 # pip install hsfs # 4. 连接Hopsworks import hsfs connection hsfs.connection( hostlocalhost, port6181, projectmy_project ) # 5. 获取特征存储 fs connection.get_feature_store() print(code)hopsworks_setup()三、特征组管理3.1 创建特征组deffeature_group_management():特征组管理print(\n*60)print(特征组管理)print(*60)code import hsfs import pandas as pd import numpy as np from datetime import datetime # 1. 连接Hopsworks connection hsfs.connection( hostlocalhost, port6181, projectmy_project ) fs connection.get_feature_store() # 2. 创建特征组 user_features fs.create_feature_group( nameuser_features, version1, descriptionUser demographic and behavioral features, primary_key[user_id], event_timeevent_timestamp, online_enabledTrue, statistics_config{ enabled: True, histograms: True, correlations: True } ) # 3. 准备数据 data pd.DataFrame({ user_id: range(1, 1001), age: np.random.randint(18, 70, 1000), gender: np.random.choice([M, F], 1000), city: np.random.choice([北京, 上海, 广州, 深圳], 1000), user_activity_score: np.random.uniform(0, 100, 1000), event_timestamp: [datetime.now() for _ in range(1000)] }) # 4. 插入数据 user_features.insert(data) # 5. 创建第二个特征组 item_features fs.create_feature_group( nameitem_features, version1, descriptionItem attributes, primary_key[item_id], event_timeevent_timestamp, online_enabledTrue ) item_data pd.DataFrame({ item_id: range(1, 501), category: np.random.choice([electronics, clothing, books], 500), price: np.random.uniform(10, 1000, 500), rating: np.random.uniform(1, 5, 500), event_timestamp: [datetime.now() for _ in range(500)] }) item_features.insert(item_data) # 6. 创建训练数据集 train_dataset fs.create_training_dataset( namerecommendation_dataset, version1, descriptionTraining data for recommendation model, data_formatparquet ) # 7. 查询特征组 user_fg fs.get_feature_group(user_features, version1) print(fFeature group: {user_fg.name}, version: {user_fg.version}) print(fFeatures: {user_fg.features}) print(code)feature_group_management()四、特征查询4.1 特征查询deffeature_query():特征查询print(\n*60)print(特征查询)print(*60)code from hsfs import connection connection hsfs.connection() fs connection.get_feature_store() # 1. 获取特征组 user_fg fs.get_feature_group(user_features, version1) item_fg fs.get_feature_group(item_features, version1) # 2. 构建查询 query user_fg.select_all().join( item_fg.select_all(), on[user_id, item_id] ) # 3. 读取数据 df query.read() print(fData shape: {df.shape}) # 4. 带过滤条件的查询 filtered_query user_fg.select([age, gender, user_activity_score]).filter( (user_fg.age 18) (user_fg.user_activity_score 50) ) filtered_df filtered_query.read() print(fFiltered data shape: {filtered_df.shape}) # 5. 时间范围查询 time_query user_fg.select_all().filter( (user_fg.event_timestamp 2024-01-01) (user_fg.event_timestamp 2024-12-31) ) time_df time_query.read() # 6. 特征服务预定义特征组合 feature_service fs.create_feature_service( namerecommendation_features, features[user_fg, item_fg], descriptionFeatures for recommendation model ) # 使用特征服务 service_df feature_service.get_feature_vector( entity_rows[{user_id: 123, item_id: 456}] ) print(fFeature vector: {service_df}) print(code)feature_query()五、特征版本管理5.1 版本控制deffeature_versioning():特征版本管理print(\n*60)print(特征版本管理)print(*60)code import hsfs import pandas as pd connection hsfs.connection() fs connection.get_feature_store() # 1. 创建新版本特征组 user_features_v2 fs.create_feature_group( nameuser_features, version2, descriptionUser features with new fields, primary_key[user_id], event_timeevent_timestamp ) # 2. 添加新特征 new_data pd.DataFrame({ user_id: range(1, 1001), age: np.random.randint(18, 70, 1000), gender: np.random.choice([M, F], 1000), city: np.random.choice([北京, 上海, 广州, 深圳], 1000), user_activity_score: np.random.uniform(0, 100, 1000), purchase_frequency: np.random.randint(0, 50, 1000), avg_order_value: np.random.uniform(50, 500, 1000), event_timestamp: pd.date_range(2024-01-01, periods1000, freqH) }) user_features_v2.insert(new_data) # 3. 列出所有版本 versions fs.get_feature_group_versions(user_features) for v in versions: print(fVersion {v.version}: created at {v.created_at}) # 4. 获取特定版本 fg_v1 fs.get_feature_group(user_features, version1) fg_v2 fs.get_feature_group(user_features, version2) # 5. 版本对比 v1_data fg_v1.read(limit10) v2_data fg_v2.read(limit10) print(fV1 columns: {v1_data.columns.tolist()}) print(fV2 columns: {v2_data.columns.tolist()}) # 6. 删除旧版本谨慎操作 # fg_v1.delete() print(code)feature_versioning()六、特征监控6.1 监控配置deffeature_monitoring():特征监控print(\n*60)print(特征监控)print(*60)code from hsfs import connection import pandas as pd connection hsfs.connection() fs connection.get_feature_store() # 1. 获取特征组 user_fg fs.get_feature_group(user_features, version1) # 2. 查看统计信息 statistics user_fg.statistics print(fStatistics: {statistics}) # 3. 计算特征统计 user_fg.compute_statistics() # 4. 获取特征分布 histogram user_fg.get_histogram(feature_nameage) print(fAge distribution: {histogram}) # 5. 特征相关性 correlations user_fg.get_correlation_matrix() print(fCorrelation matrix: {correlations}) # 6. 数据质量监控 from hsfs.monitoring import Rule, Alert # 定义监控规则 rule Rule( nameage_range_check, feature_nameage, conditionbetween, threshold{min: 18, max: 100}, severityWARNING ) # 添加告警 alert Alert( nameage_out_of_range, rulerule, actionEMAIL, recipients[data-teamexample.com] ) # 应用到特征组 user_fg.add_alert(alert) # 7. 查看监控历史 alerts user_fg.get_alerts() for alert in alerts: print(fAlert: {alert.name}, status: {alert.status}, triggered at: {alert.triggered_at}) print(code)feature_monitoring()七、与MLflow集成7.1 MLflow集成defmlflow_integration():MLflow集成print(\n*60)print(MLflow集成)print(*60)code import hsfs import mlflow from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score # 1. 连接Hopsworks connection hsfs.connection() fs connection.get_feature_store() # 2. 获取特征 user_fg fs.get_feature_group(user_features, version1) item_fg fs.get_feature_group(item_features, version1) # 3. 构建训练数据 query user_fg.select_all().join(item_fg.select_all()) training_df query.read() # 4. MLflow训练 with mlflow.start_run() as run: # 记录特征存储信息 mlflow.log_param(feature_store, hopsworks) mlflow.log_param(feature_group_version, 1) # 训练模型 X training_df.drop([user_id, item_id, label], axis1) y training_df[label] model RandomForestClassifier(n_estimators100) model.fit(X, y) # 记录指标 accuracy model.score(X, y) mlflow.log_metric(accuracy, accuracy) # 保存模型 mlflow.sklearn.log_model(model, model) # 记录特征组到MLflow mlflow.log_dict( {feature_group: user_fg.name, version: user_fg.version}, feature_info.json ) # 5. 关联模型和特征组 model_uri fruns:/{run.info.run_id}/model feature_group_ref fs.create_feature_group_reference( nameuser_features, version1, model_urimodel_uri ) print(code)mlflow_integration()八、总结特性FeastHopsworks开源✅✅在线存储Redis/DynamoDBMySQL Cluster离线存储BigQuery/SnowflakeHive/Delta Lake版本管理有限完整监控基础完善血缘追踪基础完善企业功能有限丰富Hopsworks vs Feast选择Hopsworks: 企业级、完整功能、监控完善Feast: 轻量级、开源、易于集成