Write it at the front ELK Three swordsmen (ElasticSearch,Logstash,Kibana) Basically, it can meet the requirements of log collection 、 Information processing 、 Statistical analysis 、 Visual reports and other log analysis work , But for us …… Is too heavy , And the technology stack is not all the way . Our scenario is to collect the business systems on the servers of various business departments , So try not to affect the performance of the server , Collect in the least invasive way , Do not do other unnecessary operations . thus , Log collection at the front end , Compare with others Logstash、Flume After the acquisition tool , Decided to use lightweight
Filebeat
As a log collection tool ,Filebeat use go Development , No additional deployment environment is required to run , comparison Flume rely on jdk A lot lighter , And low memory consumption .
The acquisition link is as follows :Filebeat Log collection 、 After processing the conversion , Pushed to the kafka, use Clickhouse Of kafka Engine for consumption and storage . thus , I'm going to call it KFC Combine .
Filebeat Deploy the collection target environment :
System :Window Server 2008 R2 Enterprise
Log categories :IIS journal 、 Business system log
Log path :D:/IIS/www.A.com/logs/.txt 、D:/IIS/www.B.com/logs/.txt、D:/IIS/www.C.com/logs/*.txt
Filebeat:7.12.1(https://www.elastic.co/cn/downloads/beats/filebeat)
Because the collection is windows operating system , Recommended download Filebeat Compressed package , And windows How services work , Use the installation package msi Installation is inconvenient for debugging , Frequent uninstallation is required 、 Installation operation , After downloading, unzip the configuration file filebeat.yml To configure .
Example of business system log format :
2021-04-06 11:21:17,940 [39680] DEBUG Zc - time:0ms update XXX set ModifyTime=GETDATE(), [State] = 190, [FuZeRen] = ' Zhang San ' where [ID] = '90aa9a69-7a33-420e-808c-624693c65aef' and [CompanyID] = '9e52867e-2035-4148-b09e-55a90b3020d5' 2021-04-06 11:21:21,612 [22128] DEBUG Service ModelBase - time:0ms (/api/XXX/XXX/XXX?InfoID=6d43b831-6169-46d2-9518-f7c9ed6fe39c&ValidateStatus=1) Update material status 2021-04-06 11:21:21,612 [22128] DEBUG Zc - time:0ms select ID from XXX where InfoRelationID='6d43b831-6169-46d2-9518-f7c9ed6fe39c' 2021-04-06 11:21:21,612 [22128] DEBUG Zc - time:0ms insert into XXXX(ValidateDate ,[ID],[ValidateState],[ValidateUser],[ValidateUserID],[ValidateUnit],[ValidateUnitID],[ValidateUnitType],[InfoRelationID]) values( GETDATE(),'c77cf4ab-71b5-46c7-b91b-2829d73aa700',1,'XXXX','0387f889-e1d4-48aa-b275-2241da1d2c9e','XXXXX Co., LTD. ','2f2a94c8-c23c-4e8a-98b3-c32a9b0487f7',0,'6d43b831-6119-46d2-9518-f7c9ed6fe39c')2021-04-06 03:25:22,237 [46840] ERROR ASP.global_asax - time:0ms Client information :Ip:116.238.55.21, 173.131.245.61 browser :Chrome edition :68 operating system :WinNT Server error message : page :http://www.A.com:803/dbapp_53475dbapp_e524534.php Error source :System.Web.Mvc stack trace : at System.Web.Mvc.DefaultControllerFactory.GetControllerInstance(RequestContext requestContext, Type controllerType)at System.Web.Mvc.DefaultControllerFactory.CreateController(RequestContext requestContext, String controllerName)at System.Web.Mvc.MvcHandler.ProcessRequestInit(HttpContextBase httpContext, IController& controller, IControllerFactory& factory)at System.Web.Mvc.MvcHandler.BeginProcessRequest(HttpContextBase httpContext, AsyncCallback callback, Object state)at System.Web.HttpApplication.CallHandlerExecutionStep.System.Web.HttpApplication.IExecutionStep.Execute()at System.Web.HttpApplication.ExecuteStep(IExecutionStep step, Boolean& completedSynchronously)FileBeat To configure :
max_procs: 2queue: mem: events: 2048 flush.min_events: 2048# ============================== Filebeat inputs ===============================filebeat.inputs:# Management system - type: log enabled: true encoding: GB2312 paths: - D:/IIS/www.A.com/logs/
.txt multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}' multiline.negate: true multiline.match: after fields: topic: 'dlbZcZGBSyslogs' fields_under_root: true # Unit system - type: log enabled: true encoding: GB2312 paths: - D:/IIS/www.B.com/logs/
.txt ### Multiline options multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}' multiline.negate: true multiline.match: after fields: topic: 'dlbZcDWSyslogs' fields_under_root: true # Personal systems - type: log enabled: true encoding: GB2312 paths: - D:/IIS/www.C.com/logs/
.txt ### Multiline options multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}' multiline.negate: true multiline.match: after fields: topic: 'dlbZcMySyslogs' fields_under_root: true# Debug output #output.console:# pretty: true#output.file:# path: "D:/bigData"# filename: filebeat.log# -------------------------------- Kafka Output --------------------------------output.kafka: # Boolean flag to enable or disable the output module. enabled: true hosts: ["192.168.1.10:9092"] # The Kafka topic used for produced events. The setting can be a format string # using any event field. To set the topic from document type use
%{[type]}
. topic: '%{[topic]}' # Authentication details. Password is required if username is set. #username: '' #password: '' # The number of concurrent load-balanced Kafka output workers. worker: 2 max_message_bytes: 10000000# ================================= Processors =================================processors: - add_host_metadata: when.not.contains.tags: forwarded - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ - script: lang: javascript id: my_filter tag: enable source: > function process(event) { var str = event.Get("message"); var sp = str.split(" "); var log_datetime = sp.slice(0,2).join(" "); var regEx = /^\d{4}-\d{2}-\d{2}
/; if(log_datetime.match(regEx) != null) { event.Put("log_datetime",log_datetime); event.Put("log_index",sp.slice(2,3).join(" ").replace("[","").replace("]","")); event.Put("log_level",sp.slice(3,4).join(" ")); if(str.match(/(?<=time:)\S
(?=ms)/)!=null) { var spTime= str.split("time:"); var spPre = spTime[0].split(" "); var spNext = spTime[1].split(" "); event.Put("log_class",spPre.slice(4).join(" ")); var log_execTime= spNext.slice(0,1).join(" ").replace("ms",""); regEx = /^(-|+)?\d+(.\d+)?$/; if(regEx.test(log_execTime)) { event.Put("log_execTime",log_execTime); } else { event.Put("log_execTime","-1"); } event.Put("log_message",spNext.slice(1).join(" ")); } else { event.Put("log_class",sp.slice(4,5).join(" ")); event.Put("log_execTime","-1"); event.Put("log_message",sp.slice(6).join(" ")); } return; } } event.Cancel(); } - drop_fields: fields: ["@timestamp", "message", "host", "ecs", "agent", "@metadata", "log", "input"] The above configuration instructions :
max_procs: Set the maximum... That can be executed at the same time CPU Count ;
queue : Internal queue information ;
Filebeat inputs: The entry of log data source collection ;
Other fields are described below :
# Log type - type: log # Turn on enabled: true# Coding format , If you have Chinese, you must set encoding: GB2312# route paths: - D:/IIS/www.A.com/logs/*.txt# Multiple line matching prefix multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'# Turn on multiline matching multiline.negate: true# After opening multiple lines , Matching is merging to the previous message multiline.match: after# Add a field , be used for kafka Of topic The identification of fields: topic: 'dlbZcZGBSyslogs'# Field added in output json Under the root directory of fields_under_root: true//https://www.cnblogs.com/EminemJK/p/15165961.htmlKafka Output:kafka Configuration information , Mainly topic: '%{[topic]}' Set up , Because multiple data sources are collected here , For different topic, At the time of data source input , Fields have been set, such as topic: 'dlbZcZGBSyslogs' , Therefore, placeholders are used here to flexibly set ;
Processors: Configure the processor , That is to process the collected log information , Processing is processing by line , When the string is processed , have access to js Syntax to process strings ;Filebeat Processors can be varied , See the documentation for details .
in addition , When debugging , You can use file output or Console Output to observe the output data format after processing , And fine tune :
output.file: path: "D:/bigData" filename: filebeat.logIIS My diary is similar , Just fine tune the processing logic , By knowing one method you will know all .
For other configurations, please refer to the official website documentation :h**ttps://www.elastic.co/guide/en/beats/filebeat/current/index.html
Kafka To configure Kafka No special treatment , Here, we just receive messages , Just create a new theme .
// Personal systems bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcMySyslogs// Unit system bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcDWSyslogs// Management system bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcZGBSyslogspartitions The size of the number of partitions , Depending on how many consumers are set up , Here we have three servers to do Clickhouse Clusters as consumers , So the number of partitions is set to 3, General situation , The total number of consumers should not be greater than the number of partitions , Only one consumer can be assigned to each partition .
Clickhouse To configure Clickhouse Three segmented clusters , If you are a stand-alone , Just modify the grammar accordingly .
Create... On each server kafka Engine watch :
CREATE TABLE kafka_dlb_ZC_My_syslogs ( log_datetime DateTime64, log_index String, log_level String, log_class String, log_message String, log_execTime Float32, server String ) ENGINE = Kafka SETTINGS kafka_broker_list = '192.168.1.10:9092', kafka_topic_list = 'dlbZcMySyslogs', kafka_group_name = 'dlbZcMySyslogs_sys', kafka_format = 'JSONEachRow', kafka_num_consumers = 1; Create entity table :
CREATE TABLE dlb_ZC_My_syslogs on cluster cluster_3s_1r ( log_datetime DateTime64, log_index String, log_level String, log_class String, log_message String, log_execTime Float32, server String ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/dlb_ZC_My_syslogs', '{replica}') ORDER BY toDate(log_datetime) PARTITION BY toYYYYMM(log_datetime);//https://www.cnblogs.com/EminemJK/p/15165961.html Entity tables are created using clusters , If it is stand-alone, please delete on cluster cluster_3s_1r , Modify the table engine . If it's already turned on zookeeper And open the copy table , Run it once on any server .
Create materialized views on each server :
CREATE MATERIALIZED VIEW viem_dlb_ZC_My_syslogs_consumer TO dlb_ZC_My_syslogs AS SELECT * FROM kafka_dlb_ZC_My_syslogs; Create a distributed view ( Optional , Please ignore ):
CREATE TABLE Dis_dlb_ZC_My_syslogs ON CLUSTER cluster_3s_1r AS LogsDataBase.dlb_ZC_My_syslogs ENGINE = Distributed(cluster_3s_1r, 'LogsDataBase', 'dlb_ZC_My_syslogs',rand()); The distributed table will aggregate the table information of each partition in the cluster , Execute once .
Run incidentally provides a fast run Filebeat And uninstalled bat Script :
Operation service :
//windows server2008 The above version of the server cd %~dp0.\install-service-filebeat.ps1pause//windows server 2008 And the following servers cd %~dp0 PowerShell.exe -ExecutionPolicy RemoteSigned -File .\install-service-filebeat.ps1 pause Uninstall service :
//windows server2008 The above version of the server cd %~dp0.\uninstall-service-filebeat.ps1pause//windows server2008 And the following versions of the server cd %~dp0PowerShell.exe -ExecutionPolicy RemoteSigned -File .\uninstall-service-filebeat.ps1pause
After running , In Task Manager , Set the service to run .
View distributed data :
Data collection is complete , perfect .
Now that the data is available , There are many ways to visualize data , With Grafana For example :
author :EminemJK( Mountain for Mr. ) Source :https://www.cnblogs.com/EminemJK/
原网站版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/208/202207271832172592.html