当前位置:网站首页>Système de collecte des journaux
Système de collecte des journaux
2022-06-27 04:06:00 【Alex CYL】
1. Contexte du projet
- Chaque système a un journal,Quand il y a un problème avec le système,Le problème doit être résolu par le journal
- Quand il y a moins de machines système,Connectez - vous au serveur pour voir les journaux pour répondre à vos besoins
- Quand les machines du système sont énormes,Il est presque impossible de se connecter à la machine pour voir les journaux
2. Solutions
a. Recueillir les journaux sur la machine en temps réel,Stockage unifié àSystème central
b. Et ensuite sur ces journauxCréer un index,Le journal correspondant peut être trouvé par la recherche
c. En offrant une interface convivialewebInterface,AdoptionwebLa recherche de journaux peut être effectuée
3. Problèmes rencontrés/Défis à relever
a. La quantité de journaux en temps réel est très importante,Des milliards par jour
b. Collecte quasi en temps réel des journaux,Le délai est contrôlé au niveau des minutes
c. Extensible horizontalement
4. Programmes de l'industrieELK
4.1 ELKIntroduction
En termes simples,ELKC'est parElasticsearch(Moteur de recherche flexible)、Logstash(Collecte des journaux)、Kibana(Voir le journal/VisualiséwebInterface)Une combinaison de trois logiciels libres,ELK- Oui.elasticUne collection complète de journaux développée par l'entreprise、Solutions d'entreprise analysées et présentées,Parmi ces trois logiciels,,Chaque logiciel est utilisé pour remplir différentes fonctions,ELKAussi appeléELKstack.
Voir le diagramme d'architecture , Regardons comment ces trois bêtes fonctionnent.

1. L'utilisateur envoie la demande au serveur
2. Le serveur transmet les données du Journal à enregistrer par demande réseau à logstash
3. logstash Après filtrage et nettoyage des données ,À transmettreElasticsearch
4. Elasticsearch Responsable de l'indexation des données ,Stockage
5. Utilisateurs accédantkibanaDewebPage,En temps réel( Délai inférieur à une seconde )Voir le journal
reference:
elkConfiguration
elkDétails
4.2 elk Questions relatives au programme
a. Coûts élevés d'exploitation et d'entretien, Pour chaque nouvelle collection de journaux , Nécessite une modification manuelle de la configuration
b. Surveillance manquante , Impossible d'obtenir avec précision logstashÉtat de
5. Aveckafka Conception du système de journalisation

Introduction à chaque composant:
a. Log Agent,Client de collecte de journaux, Utilisé pour collecter des journaux sur le serveur , Un sur chaque serveur log Agent
b. Kafka,Débit élevéDeFile d'attente distribuée,linkinDéveloppement,apacheProjets open source de haut niveau
c. ES,elasticsearch,Moteur de recherche open source,Base d'approvisionnementÀhttp restfulDewebInterface
d. Hadoop,Cadre informatique distribué, Plate - forme de traitement distribué de grandes quantités de données
5.1 kafkaScénario d'application:
1.Traitement asynchrone, Prends ça.Non critique Processus asynchrone , Améliorer le système Temps de réponseEtRobustesse

2.Découplage des applications,Via la file d'attente des messages
3.Pic de débit
Comme double 11 secondes d'activité meurtrière , Une explosion soudaine du trafic , La mise en file d'attente des messages peut efficacement réduire le trafic de pointe , C'est - à - dire limiter la quantité de données à traiter en même temps 
5.2 zookeeper(Système de stockage distribué)Scénario d'application
Dans le système de collecte des journaux ,En généralkafka Va connecter un zookeeper
1. Inscription au service&Découverte de services
Lorsque le fournisseur de services s'agrandit ou se rétrécit , Le fournisseur de services inscrit le service au Registre ;
Le registre informe les consommateurs de services des changements apportés aux services , Le consommateur de services réalise l'optimisation automatique de l'ordonnancement des tâches basée sur les informations de changement d'enregistrement ( Demande d'appel pour l'attribution de données à un nouveau fournisseur de services ou à un fournisseur qui arrêtera le Service )
2. Centre de configuration(Configuration automatisée)
1.Inwep La plateforme a modifié le Business , Transmission des informations sur les changements à zk
2.zk Envoyer l'information sur le changement d'entreprise de service à l'application d'entreprise correspondante ,
3. L'application d'affaires correspondante tire les informations de changement vers local pour modifier la configuration d'affaires
Configuration automatisée
3.Serrure distribuée
zookeeperC'est très cohérent.
Plusieurs clients en même temps Zookeeper Créer le même znode, Un seul a été créé avec succès
5.3 zookeeperAveckafkaInstallation:
Parce quezookeeperEtkafkaBasé surjava,Installer d'abordJDK
sudo apt-get update
sudo apt-get install openjdk-8-jdk
zookeeperInstallation:
ubuntu Installationzookeeper
Installationzookeepr
sudo apt-get install zookeeperd
Configurationzookeeper
cat /etc/zookeeper/conf/zoo.cfg | more //Voirzoo.cfgInformations de configuration pour
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# Place the dataLogDir to a separate physical disc for better performance
# dataLogDir=/disk2/zookeeper
# the port at which the clients will connect
clientPort=2181
# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
server.1=zookeeper1:2888:3888
Démarragezookeeper
Démarrageserver
$ sudo /usr/share/zookeeper/bin/zkServer.sh start
Voir l'état de démarrage
$ sudo /usr/share/zookeeper/bin/zkServer.sh status
Voir les informations de démarrage
ps -aux | grep zookeeper
Liensserver
sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
Afficher les informations du Journal
Les informations du journal sont configurables ,Adoptionzoo.cfg,Par défaut:
/var/log/zookeeper/zookeeper.log
Vous pouvez voir les informations du journal pour voir quelques erreurs et détails
kafkaInstallation
ubuntu18.04En bas.KafkaInstallation et déploiement
Installationkafka:
ubuntuÇa marche.wgetTéléchargement direct, J'ai téléchargé /home/cyl/kafkaTable des matières
wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz
Décompresser:
tar -zxvf kafka_2.12-3.1.0.tgz
Renommer
mv kafka_2.12-3.1.0 ./kafka
Créer un répertoire de stockage de journaux
[email protected]:~/kafka$ mkdir logs-1
Modifierkafka-serverProfil pour
[email protected]:~/kafka/kafka$ sudo vim config/server.properties
Modifier le fichier de configuration21、31、36Et60D'accord
broker.id=1
listeners=PLAINTEXT://10.141.184:9092 # Pour un démarrage sans heurt broker
advertised.listeners=PLAINTEXT://10.141.184:9092
log.dirs=/home/wzj/kafka/logs-1
DémarrageZookeeper
Il faut d'abord modifierconfig/zookeeper.propertiesConfiguration
[email protected]:~/kafka/kafka$ sudo ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Utilisation recommandée:
$ sudo /usr/share/zookeeper/bin/zkServer.sh start
Liensserver
$ sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
DémarrageKafkaServices
Référence du processus de démarrage
Utiliser kafka-server-start.sh Démarrage kafka Services
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-server-start.sh ./config/server.properties
Créationtopic
Utiliser kafka-topics.sh Pour créer une copie unique d'une partition topic test
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --create --bootstrap-server 10.141.65.188:9092 --replication-factor 1 --partitions 1 --topic nginxLog
Impossible d'utiliser icilocalhost:9092,
Problèmes rencontrés
Question 1: Modification de l'instruction de version
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Erreur signalée“Exception in thread “main” joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option”
Dans une version plus récente (2.2 Et plus)De Kafka Plus besoin. ZooKeeper Chaîne de connexion,C'est - à - dire:- -zookeeper localhost:2181.Utiliser Kafka BrokerDe --bootstrap-server localhost:9092Pour remplacer- -zookeeper localhost:2181.
Deuxième question:
WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Doit être compatible avec listenersEnregistrer la cohérence,Par exemple
listeners=PLAINTEXT://192.168.156.131:9092
# Doit également être utilisé lors de l'utilisation de la commande 192.168.156.131:9092 Comme adresse de connexion ,Comme suit
./kafka-console-producer.sh --broker-list 192.168.156.131:9092 --topic userlog
Voir topic Liste
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --list --bootstrap-server 10.141.65.188:9092
Générer un message,Créer un producteur de messages
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-console-producer.sh --broker-list 10.141.65.188:9092 --topic nginxLog
Message de consommation,Créer un consommateur de messages
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-console-consumer.sh --bootstrap-server 10.141.65.188:9092 --topic nginxLog --from-beginning
Dans la fenêtre des messages de production ,Saisissez le contenu, Vous pouvez l'imprimer dans la fenêtre de consommation
VoirTopicMessage
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --describe --bootstrap-server 10.141.65.188:9092 --topic nginxLog
Topic: nginxLog TopicId: t6M81RsMRPGj2tZVXaxltw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: nginxLog Partition: 0 Leader: 1 Replicas: 1 Isr: 1
La première ligne résume toutes les partitions,Chaque ligne supplémentaire donne des informations sur une partition. Comme nous n'avons qu'une seule partition,Donc il n'y a qu'une seule ligne.
“Leader”: Est le noeud responsable de toutes les lectures et écritures pour une partition donnée. Chaque noeud devient le leader de la Section de sélection aléatoire de la partition .
“Replicas”: Est une liste de noeuds qui copient ce journal de partition,Qu'ils soient ou non des leaders,Ou même s'ils sont actuellement actifs.
“Isr”: C'est un groupe.“Synchroniser”Copie.C'estreplications Sous - ensemble de la liste ,Actuellement vivant et dirigé vers le leader.
Supprimertopic
[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --delete --bootstrap-server 10.141.65.188:9092 --topic nginxLogtest
Commande de démarrage:
bin/kafka-server-start.sh -daemon config/server.properties
Créationtopic
./kafka-topics.sh --create --bootstrap-server spark01:9092 --replication-factor 1 --partitions 1 --topic test2
Voirtopic
./kafka-topics.sh --bootstrap-server spark01:9092 --list
AssignertopicDonnées de production moyennes
./kafka-console-producer.sh --broker-list spark01:9092 --topic test2
Par exemple:{
"id":"1","name":"xiaoming","age":"20"}
VoirtopicContenu spécifique
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning
Créer un groupe de consommateurs
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --group kafkatest
Voir les groupes de consommateurs
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --list
Voir les détails du consommateur
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --describe --group kafkatest
Données sur la consommation
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning
6 Mise en œuvre du Code
logagent Code de mise en œuvregithub
1.kafka demo:
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
//ConfigurationkafkaEnvironnement
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{
"10.141.65.188:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
for i := 0; i < 10; i++ {
msg := &sarama.ProducerMessage{
}
msg.Topic = "nginxLogTest"
msg.Value = sarama.StringEncoder("this is a good test, my message is good~~12")
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
}
- tailf demo
package tailf
import (
"fmt"
"time"
"github.com/hpcloud/tail"
)
func main() {
//main()
filename := "./my.log"
tails, err := tail.TailFile(filename, tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{
Offset: 0, Whence: 2}, // Position de lecture
MustExist: false, // Exiger que les documents existent ou soient exposés
Poll: true,
})
if err != nil {
fmt.Println("tail file err:", err)
return
}
var msg *tail.Line
var ok bool
for {
msg, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
fmt.Println("msg:", msg)
}
}
- config demo
package main
import (
"fmt"
"github.com/astaxie/beego/config"
)
func main() {
conf, err := config.NewConfig("ini", "./logcollect.conf")
if err != nil {
fmt.Println("new config failed, err:", err)
return
}
port, err := conf.Int("server::port")
if err != nil {
fmt.Println("read server:port failed, err:", err)
return
}
fmt.Println("Port:", port)
log_level := conf.String("logs::log_level")
fmt.Println("log_level:", log_level)
log_port, err := conf.Int("logs::port")
if err != nil {
fmt.Println("read logs:port failed, err:", err)
return
}
fmt.Println("log_Port:", log_port)
log_path := conf.String("logs::log_path")
fmt.Println("log_path:", log_path)
}
- logs demo
package main
import (
"encoding/json"
"fmt"
"github.com/astaxie/beego/logs"
)
func main() {
config := make(map[string]interface{
})
config["filename"] = "./logcollect.log"
config["level"] = logs.LevelTrace
configStr, err := json.Marshal(config)
if err != nil {
fmt.Println("marshal failed, err:", err)
return
}
logs.SetLogger(logs.AdapterFile, string(configStr))
logs.Debug("this is a test, my name is %s", "stu01~~")
logs.Trace("this is a trace, my name is %s", "stu02~~")
logs.Warn("this is a warn, my name is %s", "stu03~~")
}
边栏推荐
- MATLAB | 三个趣的圆相关的数理性质可视化
- Products change the world
- 电商产品如何在知乎上进行推广和打广告?
- JMeter distributed pressure measurement
- Is the truth XX coming? Why are test / development programmers unwilling to work overtime? This is a crazy state
- iOS开发:对于动态库共享缓存(dyld)的了解
- 面试-01
- MATLAB | 基于分块图布局的三纵坐标图绘制
- 文旅灯光秀打破时空限制,展现景区夜游魅力
- Overview of Tsinghua & Huawei | semantic communication: Principles and challenges
猜你喜欢

Resnet152 pepper pest image recognition 1.0

快速掌握 ASP.NET 身份认证框架 Identity - 通过邮件重置密码

I found a JSON visualization tool artifact. I love it!

Ldr6028 OTG data transmission scheme for mobile devices while charging

ESP8266

Matlab | drawing of three ordinate diagram based on block diagram layout

Il manque beaucoup de fichiers et de répertoires tels que scripts pendant et après l'installation d'anaconda3

JMeter takes the result of the previous request as the parameter of the next request

Kotlin Compose compositionLocalOf 与 staticCompositionLocalOf

Building lightweight target detection based on mobilenet-yolov4
随机推荐
Games101 job 7 improvement - implementation process of micro surface material
Pat grade a 1023 have fun with numbers
PAT甲级 1024 Palindromic Number
2021:Passage Retrieval for Outside-KnowledgeVisual Question Answering通道检索的外部知识视觉问答
2021:Zero-shot Visual Question Answering using Knowledge Graphs使用知识图的零次视觉问答
windows上安装MySQL
Description of replacement with STM32 or gd32
PAT甲级 1019 General Palindromic Number
LDR6028 手机设备一边充电一边OTG传输数据方案
Installation of low code development platform nocobase
Why does C throw exceptions when accessing null fields?
PostgreSQL基础命令教程:创建新用户admin来访问PostgreSQL
PAT甲级 1020 Tree Traversals
面对AI人才培养的“产学研”鸿沟,昇腾AI如何做厚产业人才黑土地?
Pat grade a 1021 deep root
Servlet and JSP final review examination site sorting 42 questions and 42 answers
PAT甲级 1018 Public Bike Management
fplan-电源规划
苹果手机证书构体知识
Promise [II. Promise source code] [detailed code comments / complete test cases]