当前位置:网站首页>Flink + sklearn - use JPMML implement flink deployment on machine learning model
Flink + sklearn - use JPMML implement flink deployment on machine learning model
2022-08-02 15:27:00 【Hongyao】
目录
前言
flink1.12After a batch of feeling really sweet,The use of real-time computing personal feeling is better thanspark 的structured streaming要舒服的多.But no alternativespark完善,尤其是在1.8以后,flinkMLBasic is in a state of unavailable(Don't know if I didn't find the document,How to the latest version only framework,No model that feed).
But in the end finally foundjpmml的这个解决方案.简单来讲,pmmlIs a machine learning model in the middle of the format,Inside the file is withxml描述的.And then we can use the official offersklearn2pmml库,在python中使用skearnGood training models save as.pmml文件,然后再java中使用jpmmlRead the file to predict.
PMML概念
预言模型标记语言(Predictive Model Markup Language,PMML)是一种利用XMLDescribe and store the data mining model of standard language,它依托XMLItself unique data hierarchy theory and the application mode,Realize the portability of data mining in the model.
使用JPMML的操作步骤
训练模型——jpmml-sklearn
Related project warehouse
There are official use case,Content is quite rich.
安装Python库
The authors provide to rely on the requirement of the package,pipWell installation to go.
自己使用的时候sklearn是用conda安装过了,The other two as if onlypip安装,Set the tsinghua image source here,Speed is the objective.
好像安装sklearn2pmml时会自动安装sklearn-pandas?Should be dependent.
- Python 2.7, 3.4 or newer.
scikit-learn0.16.0 or newer.sklearn-pandas0.0.10 or newer.sklearn2pmml0.14.0 or newer.
pip install sklearn-pandas -i https://pypi.tuna.tsinghua.edu.cn/simple/
pip install sklearn2pmml -i https://pypi.tuna.tsinghua.edu.cn/simple/
生成pmmlModel is a trilogy
第一步——创建模型
Creating a model is not disorderly create
Creating a model to usesklearn2pmmlProvide us with the workflow of the(pipeline)
Within the workflow need into binary group,(名称,模型对象)
Binary group name is not specified,Every name is corresponding to a specific function oftransformer的,像"selector"The corresponding feature selection,“mapper”Characteristics of corresponding pretreatment,”pca“对应pca,”classifier“Corresponding classifier,”regressor“Corresponding regression machine.
Although mess created inpythonMay you can run,但是生成pmmlFile may go wrong.
吐槽:Can set the name of the many,But about how to set these two tuples,The authors are ingithubTo use the sample code given,A lot to use scattered in different corners of the project(主要是README),找起来还挺费劲(Estimates are made to be careful in search,要不就在issueJust ask the author),And also not unified document or something.(The author felt to write those instructions very detailed,Everyone can jump between the various links to find the answer to the question)
Method of use mostly in the projectREADME里面可以找到,Only under the demo content before I did.
from sklearn2pmml.pipeline import PMMLPipeline
from sklearn2pmml.decoration import ContinuousDomain
from sklearn2pmml import sklearn2pmml,SelectorProxy
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import VarianceThreshold
mapper = mapper = DataFrameMapper([
(X.columns.to_list(), [ContinuousDomain(with_data = False),StandardScaler()]),
])
params = {
"n_estimators":88,"random_state":420}
classifier = RandomForestClassifier(**params)
pipeline = PMMLPipeline([
("mapper",mapper),
("selector", SelectorProxy(VarianceThreshold())),
("classifier", classifier),
])
To specify the characteristics of line preprocessing neededmapper
DataFrameMapperThe incoming binary group list,前面是指定的列名,可以是多个,Followed by handling.The demo is a standard zoom,Line can also be hot coding alone.
ContinuousDomainIs characteristic of the library features a decorator,This is to continuous characteristics of decoration
- A decorator main effect is to make some error value、Null values and deal with the outliers.
- There are other like”顺序特征“,”分类特征“,”时间特征“的装饰器,具体可以看官方说明
- 比较坑的一点是,Characteristics of continuous decorator will learn the training data,Outliers analysis,Then at the time of forecast will force will determine outliers is illegal value,Leading to predict when refused to accept the characteristics of an error may occur.Here again the Settings
with_data = False可以避免这个问题.- with_dataIs to training set whether to analyze the data(Outliers analysis)
- The author designed so that if because,He should not think model is beyond the scope of accepting the value of the,So to force you to deal with outliers and so.
使用selector需要使用SelectorProxy对feature_selectionThe objects under the parcel.
第二步——训练模型
Training model needs to be two main operating,一个是fit训练,另一个是verify验证
- 如果在创建pipelineJoin the trained model forfit,pipeline也能工作,但是PMMLPipline的
active_fieldsThis field cannot be activated,进行verify会不通过.And then there is mentioned,The library for each feature for automatic analysis of the function can't be.- In a word must use this library providesPMMLPipelineCreating a model for training.
- verifyIs to verify this model.这一步非常重要,需要注意以下两点:
- In fact not just authentication,In the model is deployed to thejava上以后,Model will be with you to provide test data for preheating,So as to improve the prediction of the actual runtime speed.
- **To verify the data not too much,放15The training data almost.**Before don't know what this step is,Silly put in the whole training set to verify,然后javaThe model loading along while loading don't come out,It is has been using the training set data preheating.
pipeline.fit(X,y)
pipeline.verify(X.sample(15))
第三步——保存模型
from sklearn2pmml import sklearn2pmml
sklearn2pmml(pipeline, "StayAlertRFC.pmml", with_repr = True)
Regression task demo code
githubOn the return of the mission demo code
部署模型——jpmml-evaluator
maven依赖
这里推荐使用maven项目,里面pmmlThe relevant rely on these.
<pmml.version>1.5.15</pmml.version>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator</artifactId>
<version>${pmml.version}</version>
</dependency>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator-extension</artifactId>
<version>${pmml.version}</version>
</dependency>
读取模型
Model can be generated under the project resource path,After convenient packaging.

package xyz.hyhy.stayalert.flink.utils;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.jpmml.evaluator.Evaluator;
import org.jpmml.evaluator.InputField;
import org.jpmml.evaluator.LoadingModelEvaluatorBuilder;
import org.xml.sax.SAXException;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@Slf4j
public class PMMLUtils {
public static void main(String[] args) throws IOException, JAXBException, SAXException {
Evaluator evaluator = loadEvaluator("/StayAlertRFC.pmml");
// Printing input (x1, x2, .., xn) fields
List<? extends InputField> inputFields = evaluator.getInputFields();
Map<String, Object> obj2 = JSONObject.parseObject("{\"V11\":33.7824}");
Double d = (Double) obj2.get("V11");
inputFields.get(3).prepare(d);
}
/** * 载入PMML模型的方法 * * @param pmmlFileName * @return * @throws JAXBException * @throws SAXException * @throws IOException */
public static Evaluator loadEvaluator(String pmmlFileName) throws JAXBException, SAXException, IOException {
Evaluator evaluator = new LoadingModelEvaluatorBuilder()
.load(PMMLUtils.class.getResourceAsStream(pmmlFileName))
.build();
evaluator.verify(); //自校验——预热模型
log.info("StayAlertClassification assessment unit self checking&预热完成");
return evaluator;
}
}
这里使用LoadingModelEvaluatorBuilder载入模型,注意load()方法可以传入FileThe type also can pass inInputStream类型,这里一定要使用PMMLUtils.class.getResourceAsStream(pmmlFileName)To get spread into the parameter file,使用getResource在ideaMay be use,But packaged deployment tolinuxOn no.
读取文件Evaluator evaluator = loadEvaluator("/StayAlertRFC.pmml");
- Path to add slash in front of the
/
进行预测
package xyz.hyhy.stayalert.flink.prediction;
import org.dmg.pmml.FieldName;
import org.jpmml.evaluator.Evaluator;
import org.jpmml.evaluator.EvaluatorUtil;
import org.jpmml.evaluator.FieldValue;
import org.jpmml.evaluator.InputField;
import org.xml.sax.SAXException;
import xyz.hyhy.stayalert.flink.utils.PMMLUtils;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class StayAlertPredictor {
private Evaluator evaluator;
private List<InputField> inputFields;
public StayAlertPredictor() throws IOException, JAXBException, SAXException {
evaluator = PMMLUtils.loadEvaluator("/LightStayAlertRFC.pmml");
inputFields = evaluator.getInputFields();
}
public Boolean predict(Map<String, ?> inputRecord) {
if (inputRecord == null) {
throw new NullPointerException("Predictors can't enter empty record");
}
Map<FieldName, FieldValue> arguments = new LinkedHashMap<>();
// From the data source model toPMMLModel by field mapping records
for (InputField inputField : inputFields) {
FieldName inputName = inputField.getName();
Object rawValue = inputRecord.get(inputName.getValue());
Double doubleValue = Double.parseDouble(rawValue.toString());
// Converts any user to provide the value of the knownPMML值
FieldValue inputValue = inputField.prepare(doubleValue);
arguments.put(inputName, inputValue);
}
// With a known to evaluate the characteristics of a model
Map<FieldName, ?> results = evaluator.evaluate(arguments);
// Decoupling results fromjpmml-evaluator运行时环境
Map<String, ?> resultRecord = EvaluatorUtil.decodeAll(results);
//For forecast and returns the result
Integer isAlert = (Integer) resultRecord.get("IsAlert");
return isAlert == 1;
}
}
在Flink中使用
package xyz.hyhy.stayalert.flink.task;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Collector;
import org.xml.sax.SAXException;
import xyz.hyhy.stayalert.flink.pojo.UserDataPOJO;
import xyz.hyhy.stayalert.flink.prediction.StayAlertPredictor;
import javax.xml.bind.JAXBException;
import java.io.IOException;
public class StayAlertPredictTask {
private static StayAlertPredictor predictor;
static {
try {
predictor = new StayAlertPredictor();
} catch (IOException e) {
e.printStackTrace();
} catch (JAXBException e) {
e.printStackTrace();
} catch (SAXException e) {
e.printStackTrace();
}
}
private StayAlertPredictTask() {
}
public static SingleOutputStreamOperator<UserDataPOJO> predict(DataStream<UserDataPOJO> ds) {
return ds.flatMap(new FlatMapFunction<UserDataPOJO, UserDataPOJO>() {
@Override
public void flatMap(UserDataPOJO userDataPOJO,
Collector<UserDataPOJO> collector) throws Exception {
try {
//Determine whether a distraction
boolean isAlert = predictor.predict(userDataPOJO.getDeviceFeature());
userDataPOJO.setIsAlert(isAlert);
collector.collect(userDataPOJO);
userDataPOJO.setIsAlert(null);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
It encapsulates the calculation flow of ways to create a,The forecasting model with the static class become,Directly inside the operator use it.
总结
个人感想
This with down,Personally feel a little(无关紧要哈,Mainly use so have doubts, think about some things):
- javaIn the load forecasting model is a prediction of a data,就很适合flink的流计算.
- 生成的.pmml模型很大,像生成的88An assessment of random forest model has more than two hundredM,不过打包成jarPackage after compressed,实际只有13M左右了
- 感觉使用pmmlAfter than before usesparkml还爽,首先就是在python上训练,然后就是Support of the model more了,像sparkMLDoesn't seem to support random forests before and use it every time you say.
- 然后就是使用sklearn2pmmlWork on the integration of data preprocessing process are灵活的,能做的事情比较多.
其他说明
Before the project is to do aflink大作业,Blog is mainly andpmmlUse the relevant code,The complete code interested can go to the这里下载.
边栏推荐
猜你喜欢

Mysql connection error solution

基于最小二乘法的线性回归分析方程中系数的估计

Win7遇到错误无法正常开机进桌面怎么解决?

win10任务栏不合并图标如何设置

Win10电脑需要安装杀毒软件吗?

cmake配置libtorch报错Failed to compute shorthash for libnvrtc.so

What should I do if I install a solid-state drive in Win10 and still have obvious lags?

flink+sklearn——使用jpmml实现flink上的机器学习模型部署

关于c语言的调试技巧

Mysql的锁
随机推荐
GMP scheduling model of golang
使用npx -p @storybook/cli sb init安装失败,手把手搭建专属的storybook
FP5207电池升压 5V9V12V24V36V42V大功率方案
Win11电脑一段时间不操作就断网怎么解决
DP1332E内置c8051的mcu内核NFC刷卡芯片国产兼容NXP
Binder机制(下篇)
2021-10-14
将SSE指令转换为ARM NEON指令
Golang 垃圾回收机制详解
FP7195降压恒流PWM转模拟调光零压差大功率驱动方案原理图
日常-笔记
The SSE instructions into ARM NEON
What is Win10 God Mode for?How to enable God Mode in Windows 10?
MATLAB制作简易小动画入门详解
Failed to install using npx -p @storybook/cli sb init, build a dedicated storybook by hand
jest测试,组件测试
Makefile容易犯错的语法
flink+sklearn——使用jpmml实现flink上的机器学习模型部署
Win10 Settings screen out from lack of sleep?Win10 set the method that never sleep
MATLAB绘制平面填充图入门详解