当前位置:网站首页>Information security - threat detection - Flink broadcast stream broadcaststate dual stream merging application in filtering security logs
Information security - threat detection - Flink broadcast stream broadcaststate dual stream merging application in filtering security logs
2022-07-06 15:47:00 【Empty one by one】

Threat detection -flink Dual stream merge is used to filter security logs
What problem to solve
The scenario of threat detection includes all kinds of servers ( Front end services 、java、go、php、 Databases and other different kinds of servers ), And there are dozens of kinds of logs collected from the server every day , There are various difficulties in doing threat analysis from these logs , The first step is to extract the really threatening logs from these logs , Filter out those logs that are not dangerous , Then the challenge comes , How to filter out those logs we trust , such as MySQL In the log on the server , We need to put ourselves log-agent Remove the log corresponding to the process .
How to solve
Let's first look at our overall process ,
- After the collection tool collects logs from the server , Let's talk about log writing to local log file first ;
- Then use common logging components such as logstash、filebeat And other tools write the contents of the log file to the remote log service ;
- The log service writes logs to MQ;
- flink Consumers from MQ Consumption to log , Clean the log ( Filter 、 completion );
- Finally, log data is entered into the original layer of threat detection real-time warehouse ;
The focus of this process is on 4 Step ,flink Consumers from source Get the log data , The data needs to be filtered , however flink Telling to run , How can we dynamically correct flink Add rules for data filtering ;
The specific process is as follows :

flink The log data of the dual stream merge filter is written to the original layer of the real-time data warehouse
Our solution is to adopt flink The function of merging two streams is used to solve the problem of sending the configuration to flink The problem of clustering ;
flink When cluster starts , Load the current data filtering configuration from the database and initialize it to memory , When mainstream source After the data comes , Apply filtering configuration to data , So as to achieve the purpose of data filtering ;
flink When the cluster is running , because flink The cluster cannot be dynamically updated . So we changed the configuration file of data filtering on the security management platform , adopt java Service write to MQ,topic:server-log-conf, stay flink Add a new configuration flow in the code source, And mainstream source And configuration flow source Conduct merge operation ;
Double current merge when , Get the changed configuration information from the configuration flow , And update the configuration information to memory . Apply the latest configuration information to the new data coming from the mainstream for data filtering ;
The key code to solve the problem
Define the configuration flow
MapStateDescriptor<String, String> CONF_MAP_DESCRIPTOR = new MapStateDescriptor<> ("conf-map-descriptor"
,BasicTypeInfo.STRING_TYPE_INFO
,BasicTypeInfo.STRING_TYPE_INFO);
FlinkKafkaConsumer<String> confConsumer = createConfStream(env);// Create configuration flow consumers
BroadcastStream<String> confStream = env.addSource(confConsumer)
.setParallelism(1)
.name("server-log-conf")
.broadcast(CONF_MAP_DESCRIPTOR);// Add the configuration flow to the environment , And defined as broadcast traffic Data filters
public class ServerLogFilter {
private static final Logger logger = Logger.getLogger(ServerLogFilter.class);
private static ExpressRunner runner = new ExpressRunner();// What is defined here QLExpression Filter expression
private static IExpressContext<String, Object> context = new DefaultContext<String, Object>();// Rule execution context
private static List<String> errors = new ArrayList<String>();// Error collector
private static List<String> rules = new ArrayList<String>() {
{
add(" obj.type=='exec' && obj.cmd like '%server_log_agent%' ");
}
};
public void setRule(String nRules) {
//rules = nRules.split() Here, the rule is split by separator and replaced rules
}
public boolean filter(ServerLog sl) {
for (String rule : rules) {
try {
if (inRule(sl, rule)) {
return true;
}
} catch (Exception ex) {
logger.error("hasInRules", ex);
}
}
return false;
}
public boolean inRule(ServerLog sl, String expression) throws Exception {
expressContext.put("obj", sl);
Boolean result = (Boolean) runner.execute(expression, context, errors, true, false);
return result;
}
}The configuration flow is merged into the mainstream , And reset the configuration information to memory
ServerLogFilter filter = new ServerLogFilter();
private DataStream<String> merge(DataStream<String> mainStream, BroadcastStream<String> confStream) {
// Broadcast the configuration stream to the mainstream
DataStream<String> mergedStream = mainStream.connect(confStream).process(new BroadcastProcessFunction<String, String, String>() {
int limits;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
limits = 0;
}
@Override
public void processElement(String s, ReadOnlyContext readOnlyContext, Collector<String> collector) {
if (!filter.filter(JSON.parseObject(s , ServerLog.class))) {
collector.collect(s);
}
}
@Override
public void processBroadcastElement(String rules, Context context, Collector<String> collector) {
// here rules Configuration information
if (StringUtils.isNullOrWhitespaceOnly(rules)) {
return;
}
try {
filter.setRule(rules); // Reset the configuration of data filtering
} catch (Exception ex) {
logger.error(String.format("merge.processBroadcastElement:%s", ex.toString()));
}
}
});
return mergedStream;
}Knowledge supplement -BroadcastState Introduction to
Broadcast status (Broadcast State) yes Operator State A special type of . If we need to configure 、 Low throughput event streams such as rules are broadcast to all downstream events Task when , You can use BroadcastState. The downstream Task Receive these configurations 、 Rule and save as BroadcastState, all Task Keep the state consistent in , Acting on the calculation of another data flow .
Simple understanding : A low throughput stream contains a set of rules , We want to evaluate all elements from another stream based on this rule .
scene : Dynamically update calculation rules .
The difference between broadcast state and other operator States is :
- It has one map Format , Used to define the storage structure
- It is only available for specific operators with broadcast stream and non broadcast stream inputs
- Such an operator can have multiple broadcast States with different names

Knowledge supplement -BroadcastState Operation process

In this paper, QLExpression The use of is given below in order to give the use method and code , Please refer to ;
边栏推荐
- Research Report on surgical fluid treatment industry - market status analysis and development prospect prediction
- 力扣刷题记录--完全背包问题(一)
- Gartner:关于零信任网络访问最佳实践的五个建议
- CS zero foundation introductory learning record
- HDU-6025-Coprime Sequence(女生赛)
- 【高老师UML软件建模基础】20级云班课习题答案合集
- Opencv learning log 15 count the number of solder joints and output
- B - 代码派对(女生赛)
- 【练习-1】(Uva 673) Parentheses Balance/平衡的括号 (栈stack)
- nodejs爬虫
猜你喜欢

基于web的照片数码冲印网站

【练习-4】(Uva 11988)Broken Keyboard(破损的键盘) ==(链表)

7-1 懂的都懂 (20 分)

Learning records: serial communication and solutions to errors encountered

B - 代码派对(女生赛)

信息安全-安全编排自动化与响应 (SOAR) 技术解析

学习记录:串口通信和遇到的错误解决方法

ucore lab5

【练习-7】Crossword Answers

Optimization method of path problem before dynamic planning
随机推荐
C语言是低级和高级的分水岭
China potato slicer market trend report, technical dynamic innovation and market forecast
Cost accounting [21]
UCORE Lab 1 system software startup process
【高老师UML软件建模基础】20级云班课习题答案合集
最全编程语言在线 API 文档
Cost accounting [17]
【练习-1】(Uva 673) Parentheses Balance/平衡的括号 (栈stack)
力扣刷题记录
Cost accounting [14]
TCP的三次握手与四次挥手
Research Report on shell heater industry - market status analysis and development prospect forecast
B - 代码派对(女生赛)
【练习-8】(Uva 246)10-20-30==模拟
初入Redis
Cost accounting [19]
7-1 懂的都懂 (20 分)
信息安全-安全编排自动化与响应 (SOAR) 技术解析
Research Report on medical anesthesia machine industry - market status analysis and development prospect prediction
ucore lab5