当前位置:网站首页>Information security - threat detection - Flink broadcast stream broadcaststate dual stream merging application in filtering security logs
Information security - threat detection - Flink broadcast stream broadcaststate dual stream merging application in filtering security logs
2022-07-06 15:47:00 【Empty one by one】
Threat detection -flink Dual stream merge is used to filter security logs
What problem to solve
The scenario of threat detection includes all kinds of servers ( Front end services 、java、go、php、 Databases and other different kinds of servers ), And there are dozens of kinds of logs collected from the server every day , There are various difficulties in doing threat analysis from these logs , The first step is to extract the really threatening logs from these logs , Filter out those logs that are not dangerous , Then the challenge comes , How to filter out those logs we trust , such as MySQL In the log on the server , We need to put ourselves log-agent Remove the log corresponding to the process .
How to solve
Let's first look at our overall process ,
- After the collection tool collects logs from the server , Let's talk about log writing to local log file first ;
- Then use common logging components such as logstash、filebeat And other tools write the contents of the log file to the remote log service ;
- The log service writes logs to MQ;
- flink Consumers from MQ Consumption to log , Clean the log ( Filter 、 completion );
- Finally, log data is entered into the original layer of threat detection real-time warehouse ;
The focus of this process is on 4 Step ,flink Consumers from source Get the log data , The data needs to be filtered , however flink Telling to run , How can we dynamically correct flink Add rules for data filtering ;
The specific process is as follows :
flink The log data of the dual stream merge filter is written to the original layer of the real-time data warehouse
Our solution is to adopt flink The function of merging two streams is used to solve the problem of sending the configuration to flink The problem of clustering ;
flink When cluster starts , Load the current data filtering configuration from the database and initialize it to memory , When mainstream source After the data comes , Apply filtering configuration to data , So as to achieve the purpose of data filtering ;
flink When the cluster is running , because flink The cluster cannot be dynamically updated . So we changed the configuration file of data filtering on the security management platform , adopt java Service write to MQ,topic:server-log-conf, stay flink Add a new configuration flow in the code source, And mainstream source And configuration flow source Conduct merge operation ;
Double current merge when , Get the changed configuration information from the configuration flow , And update the configuration information to memory . Apply the latest configuration information to the new data coming from the mainstream for data filtering ;
The key code to solve the problem
Define the configuration flow
MapStateDescriptor<String, String> CONF_MAP_DESCRIPTOR = new MapStateDescriptor<> ("conf-map-descriptor"
,BasicTypeInfo.STRING_TYPE_INFO
,BasicTypeInfo.STRING_TYPE_INFO);
FlinkKafkaConsumer<String> confConsumer = createConfStream(env);// Create configuration flow consumers
BroadcastStream<String> confStream = env.addSource(confConsumer)
.setParallelism(1)
.name("server-log-conf")
.broadcast(CONF_MAP_DESCRIPTOR);// Add the configuration flow to the environment , And defined as broadcast traffic
Data filters
public class ServerLogFilter {
private static final Logger logger = Logger.getLogger(ServerLogFilter.class);
private static ExpressRunner runner = new ExpressRunner();// What is defined here QLExpression Filter expression
private static IExpressContext<String, Object> context = new DefaultContext<String, Object>();// Rule execution context
private static List<String> errors = new ArrayList<String>();// Error collector
private static List<String> rules = new ArrayList<String>() {
{
add(" obj.type=='exec' && obj.cmd like '%server_log_agent%' ");
}
};
public void setRule(String nRules) {
//rules = nRules.split() Here, the rule is split by separator and replaced rules
}
public boolean filter(ServerLog sl) {
for (String rule : rules) {
try {
if (inRule(sl, rule)) {
return true;
}
} catch (Exception ex) {
logger.error("hasInRules", ex);
}
}
return false;
}
public boolean inRule(ServerLog sl, String expression) throws Exception {
expressContext.put("obj", sl);
Boolean result = (Boolean) runner.execute(expression, context, errors, true, false);
return result;
}
}
The configuration flow is merged into the mainstream , And reset the configuration information to memory
ServerLogFilter filter = new ServerLogFilter();
private DataStream<String> merge(DataStream<String> mainStream, BroadcastStream<String> confStream) {
// Broadcast the configuration stream to the mainstream
DataStream<String> mergedStream = mainStream.connect(confStream).process(new BroadcastProcessFunction<String, String, String>() {
int limits;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
limits = 0;
}
@Override
public void processElement(String s, ReadOnlyContext readOnlyContext, Collector<String> collector) {
if (!filter.filter(JSON.parseObject(s , ServerLog.class))) {
collector.collect(s);
}
}
@Override
public void processBroadcastElement(String rules, Context context, Collector<String> collector) {
// here rules Configuration information
if (StringUtils.isNullOrWhitespaceOnly(rules)) {
return;
}
try {
filter.setRule(rules); // Reset the configuration of data filtering
} catch (Exception ex) {
logger.error(String.format("merge.processBroadcastElement:%s", ex.toString()));
}
}
});
return mergedStream;
}
Knowledge supplement -BroadcastState Introduction to
Broadcast status (Broadcast State) yes Operator State A special type of . If we need to configure 、 Low throughput event streams such as rules are broadcast to all downstream events Task when , You can use BroadcastState. The downstream Task Receive these configurations 、 Rule and save as BroadcastState, all Task Keep the state consistent in , Acting on the calculation of another data flow .
Simple understanding : A low throughput stream contains a set of rules , We want to evaluate all elements from another stream based on this rule .
scene : Dynamically update calculation rules .
The difference between broadcast state and other operator States is :
- It has one map Format , Used to define the storage structure
- It is only available for specific operators with broadcast stream and non broadcast stream inputs
- Such an operator can have multiple broadcast States with different names
Knowledge supplement -BroadcastState Operation process
In this paper, QLExpression The use of is given below in order to give the use method and code , Please refer to ;
边栏推荐
- ucorelab4
- Learning record: USART serial communication
- Research Report of pharmaceutical solvent industry - market status analysis and development prospect prediction
- China chart recorder market trend report, technology dynamic innovation and market forecast
- 1010 things that college students majoring in it must do before graduation
- Opencv learning log 33 Gaussian mean filtering
- 力扣刷题记录--完全背包问题(一)
- Borg Maze (BFS+最小生成树)(解题报告)
- MATLAB实例:阶跃函数的两种表达方式
- Accounting regulations and professional ethics [5]
猜你喜欢
随机推荐
想应聘程序员,您的简历就该这样写【精华总结】
信息安全-安全编排自动化与响应 (SOAR) 技术解析
【高老师UML软件建模基础】20级云班课习题答案合集
Cost accounting [18]
区间和------离散化
ucore lab 2
Shell脚本编程
0-1背包问题(一)
China's PCB connector market trend report, technological innovation and market forecast
TCP的三次握手与四次挥手
X-Forwarded-For详解、如何获取到客户端IP
Accounting regulations and professional ethics [2]
用C语言写网页游戏
Opencv learning log 14 - count the number of coins in the picture (regardless of overlap)
MATLAB实例:阶跃函数的两种表达方式
China's earthwork equipment market trend report, technical dynamic innovation and market forecast
Opencv learning log 16 paperclip count
入门C语言基础问答
C语言学习笔记
Cost accounting [15]