当前位置:网站首页>Système de collecte des journaux

Système de collecte des journaux

2022-06-27 04:06:00 Alex CYL

1. Contexte du projet

  • Chaque système a un journal,Quand il y a un problème avec le système,Le problème doit être résolu par le journal
  • Quand il y a moins de machines système,Connectez - vous au serveur pour voir les journaux pour répondre à vos besoins
  • Quand les machines du système sont énormes,Il est presque impossible de se connecter à la machine pour voir les journaux

2. Solutions

a. Recueillir les journaux sur la machine en temps réel,Stockage unifié àSystème central
b. Et ensuite sur ces journauxCréer un index,Le journal correspondant peut être trouvé par la recherche
c. En offrant une interface convivialewebInterface,AdoptionwebLa recherche de journaux peut être effectuée

3. Problèmes rencontrés/Défis à relever

a. La quantité de journaux en temps réel est très importante,Des milliards par jour
b. Collecte quasi en temps réel des journaux,Le délai est contrôlé au niveau des minutes
c. Extensible horizontalement

4. Programmes de l'industrieELK

elk4.1 ELKIntroduction
En termes simples,ELKC'est parElasticsearch(Moteur de recherche flexible)、Logstash(Collecte des journaux)、Kibana(Voir le journal/VisualiséwebInterface)Une combinaison de trois logiciels libres,ELK- Oui.elasticUne collection complète de journaux développée par l'entreprise、Solutions d'entreprise analysées et présentées,Parmi ces trois logiciels,,Chaque logiciel est utilisé pour remplir différentes fonctions,ELKAussi appeléELKstack.

Voir le diagramme d'architecture , Regardons comment ces trois bêtes fonctionnent.

Insérer la description de l'image ici
1.  L'utilisateur envoie la demande au serveur
2.  Le serveur transmet les données du Journal à enregistrer par demande réseau à logstash
3. logstash Après filtrage et nettoyage des données ,À transmettreElasticsearch
4. Elasticsearch Responsable de l'indexation des données ,Stockage
5. Utilisateurs accédantkibanaDewebPage,En temps réel( Délai inférieur à une seconde )Voir le journal

reference:
elkConfiguration
elkDétails

4.2 elk Questions relatives au programme
a. Coûts élevés d'exploitation et d'entretien, Pour chaque nouvelle collection de journaux , Nécessite une modification manuelle de la configuration
b. Surveillance manquante , Impossible d'obtenir avec précision logstashÉtat de

5. Aveckafka Conception du système de journalisation

Insérer la description de l'image ici
Introduction à chaque composant:
a. Log Agent,Client de collecte de journaux, Utilisé pour collecter des journaux sur le serveur , Un sur chaque serveur log Agent
b. Kafka,Débit élevéDeFile d'attente distribuée,linkinDéveloppement,apacheProjets open source de haut niveau
c. ES,elasticsearch,Moteur de recherche open source,Base d'approvisionnementÀhttp restfulDewebInterface
d. Hadoop,Cadre informatique distribué, Plate - forme de traitement distribué de grandes quantités de données

5.1 kafkaScénario d'application:
1.Traitement asynchrone, Prends ça.Non critique Processus asynchrone , Améliorer le système Temps de réponseEtRobustesse
Insérer la description de l'image ici
Insérer la description de l'image ici

2.Découplage des applications,Via la file d'attente des messages
Insérer la description de l'image ici
3.Pic de débit
Comme double 11 secondes d'activité meurtrière , Une explosion soudaine du trafic , La mise en file d'attente des messages peut efficacement réduire le trafic de pointe , C'est - à - dire limiter la quantité de données à traiter en même temps
Insérer la description de l'image ici

5.2 zookeeper(Système de stockage distribué)Scénario d'application
Dans le système de collecte des journaux ,En généralkafka Va connecter un zookeeper
1. Inscription au service&Découverte de services
Insérer la description de l'image ici
Lorsque le fournisseur de services s'agrandit ou se rétrécit , Le fournisseur de services inscrit le service au Registre ;
Le registre informe les consommateurs de services des changements apportés aux services , Le consommateur de services réalise l'optimisation automatique de l'ordonnancement des tâches basée sur les informations de changement d'enregistrement ( Demande d'appel pour l'attribution de données à un nouveau fournisseur de services ou à un fournisseur qui arrêtera le Service )

2. Centre de configuration(Configuration automatisée)
1.Inwep La plateforme a modifié le Business , Transmission des informations sur les changements à zk
2.zk Envoyer l'information sur le changement d'entreprise de service à l'application d'entreprise correspondante ,
3. L'application d'affaires correspondante tire les informations de changement vers local pour modifier la configuration d'affaires
Configuration automatisée
Insérer la description de l'image ici

3.Serrure distribuée
zookeeperC'est très cohérent.
Plusieurs clients en même temps Zookeeper Créer le même znode, Un seul a été créé avec succès

5.3 zookeeperAveckafkaInstallation:
Parce quezookeeperEtkafkaBasé surjava,Installer d'abordJDK

sudo apt-get update
sudo apt-get install openjdk-8-jdk

zookeeperInstallation:
ubuntu Installationzookeeper
Installationzookeepr

sudo apt-get install zookeeperd	

Configurationzookeeper

cat /etc/zookeeper/conf/zoo.cfg | more //Voirzoo.cfgInformations de configuration pour
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# Place the dataLogDir to a separate physical disc for better performance
# dataLogDir=/disk2/zookeeper

# the port at which the clients will connect
clientPort=2181

# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
server.1=zookeeper1:2888:3888

Démarragezookeeper

Démarrageserver
$ sudo /usr/share/zookeeper/bin/zkServer.sh start
Voir l'état de démarrage
$ sudo /usr/share/zookeeper/bin/zkServer.sh status
Voir les informations de démarrage
ps -aux | grep zookeeper

Liensserver

sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181

Afficher les informations du Journal

 Les informations du journal sont configurables ,Adoptionzoo.cfg,Par défaut:
/var/log/zookeeper/zookeeper.log
 Vous pouvez voir les informations du journal pour voir quelques erreurs et détails 

kafkaInstallation

ubuntu18.04En bas.KafkaInstallation et déploiement

Installationkafka:
ubuntuÇa marche.wgetTéléchargement direct, J'ai téléchargé /home/cyl/kafkaTable des matières

wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz

Décompresser:

tar -zxvf kafka_2.12-3.1.0.tgz

Renommer

mv kafka_2.12-3.1.0 ./kafka

Créer un répertoire de stockage de journaux

[email protected]:~/kafka$ mkdir logs-1

Modifierkafka-serverProfil pour

[email protected]:~/kafka/kafka$ sudo vim config/server.properties 

Modifier le fichier de configuration21、31、36Et60D'accord

broker.id=1
listeners=PLAINTEXT://10.141.184:9092 # Pour un démarrage sans heurt broker 
advertised.listeners=PLAINTEXT://10.141.184:9092
log.dirs=/home/wzj/kafka/logs-1

DémarrageZookeeper
Il faut d'abord modifierconfig/zookeeper.propertiesConfiguration

[email protected]:~/kafka/kafka$ sudo ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Utilisation recommandée:

$ sudo /usr/share/zookeeper/bin/zkServer.sh start
Liensserver
$ sudo /usr/share/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181

DémarrageKafkaServices
Référence du processus de démarrage
Utiliser kafka-server-start.sh Démarrage kafka Services

[email protected]:~/kafka/kafka$ sudo ./bin/kafka-server-start.sh ./config/server.properties 

Créationtopic
Utiliser kafka-topics.sh Pour créer une copie unique d'une partition topic test

[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --create --bootstrap-server 10.141.65.188:9092 --replication-factor 1 --partitions 1 --topic nginxLog
Impossible d'utiliser icilocalhost:9092,

Problèmes rencontrés
Question 1: Modification de l'instruction de version

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Erreur signalée“Exception in thread “main” joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option”
Dans une version plus récente (2.2 Et plus)De Kafka Plus besoin. ZooKeeper Chaîne de connexion,C'est - à - dire:- -zookeeper localhost:2181.Utiliser Kafka BrokerDe --bootstrap-server localhost:9092Pour remplacer- -zookeeper localhost:2181.

Deuxième question:

WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Doit être compatible avec listenersEnregistrer la cohérence,Par exemple

listeners=PLAINTEXT://192.168.156.131:9092
#  Doit également être utilisé lors de l'utilisation de la commande 192.168.156.131:9092 Comme adresse de connexion ,Comme suit
./kafka-console-producer.sh --broker-list 192.168.156.131:9092 --topic userlog

Voir topic Liste

[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --list --bootstrap-server 10.141.65.188:9092

Générer un message,Créer un producteur de messages

[email protected]:~/kafka/kafka$ sudo ./bin/kafka-console-producer.sh --broker-list 10.141.65.188:9092 --topic nginxLog

Message de consommation,Créer un consommateur de messages

[email protected]:~/kafka/kafka$ sudo ./bin/kafka-console-consumer.sh  --bootstrap-server 10.141.65.188:9092 --topic nginxLog --from-beginning

Dans la fenêtre des messages de production ,Saisissez le contenu, Vous pouvez l'imprimer dans la fenêtre de consommation

VoirTopicMessage

[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --describe --bootstrap-server 10.141.65.188:9092 --topic nginxLog
Topic: nginxLog	TopicId: t6M81RsMRPGj2tZVXaxltw	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: nginxLog	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

La première ligne résume toutes les partitions,Chaque ligne supplémentaire donne des informations sur une partition. Comme nous n'avons qu'une seule partition,Donc il n'y a qu'une seule ligne.
“Leader”: Est le noeud responsable de toutes les lectures et écritures pour une partition donnée. Chaque noeud devient le leader de la Section de sélection aléatoire de la partition .
“Replicas”: Est une liste de noeuds qui copient ce journal de partition,Qu'ils soient ou non des leaders,Ou même s'ils sont actuellement actifs.
“Isr”: C'est un groupe.“Synchroniser”Copie.C'estreplications Sous - ensemble de la liste ,Actuellement vivant et dirigé vers le leader.

Supprimertopic

[email protected]:~/kafka/kafka$ sudo ./bin/kafka-topics.sh --delete --bootstrap-server 10.141.65.188:9092 --topic nginxLogtest

Commande de démarrage:
bin/kafka-server-start.sh -daemon config/server.properties
 
Créationtopic
./kafka-topics.sh --create --bootstrap-server spark01:9092 --replication-factor 1 --partitions 1 --topic test2
 
Voirtopic
./kafka-topics.sh --bootstrap-server spark01:9092 --list
 
AssignertopicDonnées de production moyennes
./kafka-console-producer.sh --broker-list spark01:9092 --topic test2
Par exemple:{
    "id":"1","name":"xiaoming","age":"20"}
 
VoirtopicContenu spécifique
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning
 
Créer un groupe de consommateurs
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --group kafkatest
 
 Voir les groupes de consommateurs 
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --list
 
 Voir les détails du consommateur 
./kafka-consumer-groups.sh --bootstrap-server spark01:9092 --describe  --group kafkatest
 
Données sur la consommation
./kafka-console-consumer.sh --bootstrap-server spark01:9092 --topic test2 --from-beginning

6 Mise en œuvre du Code

logagent Code de mise en œuvregithub
1.kafka demo:

package kafka

import (
	"fmt"

	"github.com/Shopify/sarama"
)

func main() {
    

	//ConfigurationkafkaEnvironnement
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true

	client, err := sarama.NewSyncProducer([]string{
    "10.141.65.188:9092"}, config)
	if err != nil {
    
		fmt.Println("producer close, err:", err)
		return
	}
	defer client.Close()

	for i := 0; i < 10; i++ {
    
		msg := &sarama.ProducerMessage{
    }
		msg.Topic = "nginxLogTest"
		msg.Value = sarama.StringEncoder("this is a good test, my message is good~~12")

		pid, offset, err := client.SendMessage(msg)
		if err != nil {
    
			fmt.Println("send message failed,", err)
			return
		}

		fmt.Printf("pid:%v offset:%v\n", pid, offset)
	}
}
  1. tailf demo
package tailf

import (
	"fmt"
	"time"

	"github.com/hpcloud/tail"
)

func main() {
     //main()
	filename := "./my.log"
	tails, err := tail.TailFile(filename, tail.Config{
    
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{
    Offset: 0, Whence: 2}, // Position de lecture 
		MustExist: false,                                // Exiger que les documents existent ou soient exposés 
		Poll:      true,
	})
	if err != nil {
    
		fmt.Println("tail file err:", err)
		return
	}
	var msg *tail.Line
	var ok bool
	for {
    
		msg, ok = <-tails.Lines
		if !ok {
    
			fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
			time.Sleep(100 * time.Millisecond)
			continue
		}
		fmt.Println("msg:", msg)
	}
}
  1. config demo
package main

import (
	"fmt"

	"github.com/astaxie/beego/config"
)

func main() {
    
	conf, err := config.NewConfig("ini", "./logcollect.conf")
	if err != nil {
    
		fmt.Println("new config failed, err:", err)
		return
	}

	port, err := conf.Int("server::port")
	if err != nil {
    
		fmt.Println("read server:port failed, err:", err)
		return
	}
	fmt.Println("Port:", port)

	log_level := conf.String("logs::log_level")
	fmt.Println("log_level:", log_level)

	log_port, err := conf.Int("logs::port")
	if err != nil {
    
		fmt.Println("read logs:port failed, err:", err)
		return
	}
	fmt.Println("log_Port:", log_port)

	log_path := conf.String("logs::log_path")
	fmt.Println("log_path:", log_path)
}
  1. logs demo
package main

import (
	"encoding/json"
	"fmt"

	"github.com/astaxie/beego/logs"
)

func main() {
    
	config := make(map[string]interface{
    })
	config["filename"] = "./logcollect.log"
	config["level"] = logs.LevelTrace

	configStr, err := json.Marshal(config)
	if err != nil {
    
		fmt.Println("marshal failed, err:", err)
		return
	}

	logs.SetLogger(logs.AdapterFile, string(configStr))

	logs.Debug("this is a test, my name is %s", "stu01~~")
	logs.Trace("this is a trace, my name is %s", "stu02~~")
	logs.Warn("this is a warn, my name is %s", "stu03~~")
}
原网站

版权声明
本文为[Alex CYL]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/178/202206270358509082.html