当前位置:网站首页>Information security - threat detection - Flink broadcast stream broadcaststate dual stream merging application in filtering security logs
Information security - threat detection - Flink broadcast stream broadcaststate dual stream merging application in filtering security logs
2022-07-06 15:47:00 【Empty one by one】

Threat detection -flink Dual stream merge is used to filter security logs
What problem to solve
The scenario of threat detection includes all kinds of servers ( Front end services 、java、go、php、 Databases and other different kinds of servers ), And there are dozens of kinds of logs collected from the server every day , There are various difficulties in doing threat analysis from these logs , The first step is to extract the really threatening logs from these logs , Filter out those logs that are not dangerous , Then the challenge comes , How to filter out those logs we trust , such as MySQL In the log on the server , We need to put ourselves log-agent Remove the log corresponding to the process .
How to solve
Let's first look at our overall process ,
- After the collection tool collects logs from the server , Let's talk about log writing to local log file first ;
- Then use common logging components such as logstash、filebeat And other tools write the contents of the log file to the remote log service ;
- The log service writes logs to MQ;
- flink Consumers from MQ Consumption to log , Clean the log ( Filter 、 completion );
- Finally, log data is entered into the original layer of threat detection real-time warehouse ;
The focus of this process is on 4 Step ,flink Consumers from source Get the log data , The data needs to be filtered , however flink Telling to run , How can we dynamically correct flink Add rules for data filtering ;
The specific process is as follows :

flink The log data of the dual stream merge filter is written to the original layer of the real-time data warehouse
Our solution is to adopt flink The function of merging two streams is used to solve the problem of sending the configuration to flink The problem of clustering ;
flink When cluster starts , Load the current data filtering configuration from the database and initialize it to memory , When mainstream source After the data comes , Apply filtering configuration to data , So as to achieve the purpose of data filtering ;
flink When the cluster is running , because flink The cluster cannot be dynamically updated . So we changed the configuration file of data filtering on the security management platform , adopt java Service write to MQ,topic:server-log-conf, stay flink Add a new configuration flow in the code source, And mainstream source And configuration flow source Conduct merge operation ;
Double current merge when , Get the changed configuration information from the configuration flow , And update the configuration information to memory . Apply the latest configuration information to the new data coming from the mainstream for data filtering ;
The key code to solve the problem
Define the configuration flow
MapStateDescriptor<String, String> CONF_MAP_DESCRIPTOR = new MapStateDescriptor<> ("conf-map-descriptor"
,BasicTypeInfo.STRING_TYPE_INFO
,BasicTypeInfo.STRING_TYPE_INFO);
FlinkKafkaConsumer<String> confConsumer = createConfStream(env);// Create configuration flow consumers
BroadcastStream<String> confStream = env.addSource(confConsumer)
.setParallelism(1)
.name("server-log-conf")
.broadcast(CONF_MAP_DESCRIPTOR);// Add the configuration flow to the environment , And defined as broadcast traffic Data filters
public class ServerLogFilter {
private static final Logger logger = Logger.getLogger(ServerLogFilter.class);
private static ExpressRunner runner = new ExpressRunner();// What is defined here QLExpression Filter expression
private static IExpressContext<String, Object> context = new DefaultContext<String, Object>();// Rule execution context
private static List<String> errors = new ArrayList<String>();// Error collector
private static List<String> rules = new ArrayList<String>() {
{
add(" obj.type=='exec' && obj.cmd like '%server_log_agent%' ");
}
};
public void setRule(String nRules) {
//rules = nRules.split() Here, the rule is split by separator and replaced rules
}
public boolean filter(ServerLog sl) {
for (String rule : rules) {
try {
if (inRule(sl, rule)) {
return true;
}
} catch (Exception ex) {
logger.error("hasInRules", ex);
}
}
return false;
}
public boolean inRule(ServerLog sl, String expression) throws Exception {
expressContext.put("obj", sl);
Boolean result = (Boolean) runner.execute(expression, context, errors, true, false);
return result;
}
}The configuration flow is merged into the mainstream , And reset the configuration information to memory
ServerLogFilter filter = new ServerLogFilter();
private DataStream<String> merge(DataStream<String> mainStream, BroadcastStream<String> confStream) {
// Broadcast the configuration stream to the mainstream
DataStream<String> mergedStream = mainStream.connect(confStream).process(new BroadcastProcessFunction<String, String, String>() {
int limits;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
limits = 0;
}
@Override
public void processElement(String s, ReadOnlyContext readOnlyContext, Collector<String> collector) {
if (!filter.filter(JSON.parseObject(s , ServerLog.class))) {
collector.collect(s);
}
}
@Override
public void processBroadcastElement(String rules, Context context, Collector<String> collector) {
// here rules Configuration information
if (StringUtils.isNullOrWhitespaceOnly(rules)) {
return;
}
try {
filter.setRule(rules); // Reset the configuration of data filtering
} catch (Exception ex) {
logger.error(String.format("merge.processBroadcastElement:%s", ex.toString()));
}
}
});
return mergedStream;
}Knowledge supplement -BroadcastState Introduction to
Broadcast status (Broadcast State) yes Operator State A special type of . If we need to configure 、 Low throughput event streams such as rules are broadcast to all downstream events Task when , You can use BroadcastState. The downstream Task Receive these configurations 、 Rule and save as BroadcastState, all Task Keep the state consistent in , Acting on the calculation of another data flow .
Simple understanding : A low throughput stream contains a set of rules , We want to evaluate all elements from another stream based on this rule .
scene : Dynamically update calculation rules .
The difference between broadcast state and other operator States is :
- It has one map Format , Used to define the storage structure
- It is only available for specific operators with broadcast stream and non broadcast stream inputs
- Such an operator can have multiple broadcast States with different names

Knowledge supplement -BroadcastState Operation process

In this paper, QLExpression The use of is given below in order to give the use method and code , Please refer to ;
边栏推荐
- 【练习-11】4 Values whose Sum is 0(和为0的4个值)
- 0-1背包問題(一)
- 洛谷P1102 A-B数对(二分,map,双指针)
- Opencv learning log 33 Gaussian mean filtering
- Opencv learning log 14 - count the number of coins in the picture (regardless of overlap)
- 对iptables进行常规操作
- 初入Redis
- 区间和------离散化
- Learning record: use STM32 external input interrupt
- 【练习-3】(Uva 442)Matrix Chain Multiplication(矩阵链乘)
猜你喜欢
随机推荐
F - Birthday Cake(山东省赛)
STM32如何使用STLINK下载程序:点亮LED跑马灯(库版本)
Market trend report, technical innovation and market forecast of geosynthetic clay liner in China
信息安全-威胁检测-NAT日志接入威胁检测平台详细设计
C语言必背代码大全
Research Report on market supply and demand and strategy of China's earth drilling industry
STM32学习记录:玩转按键控制蜂鸣器和LED
Research Report of cylindrical grinder industry - market status analysis and development prospect forecast
Opencv learning log 18 Canny operator
【练习-8】(Uva 246)10-20-30==模拟
信息安全-威胁检测-flink广播流BroadcastState双流合并应用在过滤安全日志
X-Forwarded-For详解、如何获取到客户端IP
Learning record: Tim - Basic timer
China's salt water membrane market trend report, technological innovation and market forecast
Learning record: Tim - capacitive key detection
Cost accounting [18]
Accounting regulations and professional ethics [1]
Flink 使用之 CEP
Cost accounting [14]
0-1背包问题(一)









