当前位置:网站首页>Golang log programming system
Golang log programming system
2022-07-26 03:01:00 【Upper back left love】
Log monitoring system
Nginx( Log files ) -> log_process ( Read in real time, parse and write ) -> influxdb( Storage ) ->grafana( Front end log viewer )
influxdb Belong to GO Open source sequential data written in , Focus on high performance Query and store sequential data ,influxdb Widely used in storage system monitoring data ,IOT Real time data of the industry .
- At present, it is popular in the market TSDB( Temporal processing database ):influxDB, TimescaleDB, QuestDB
- influxDB Be similar to NOSQL Experience , A dataset of technologies that automatically fit the tag set model ;
- TimescaleDB And postgreSQL compatible , More suitable for Internet of things data , And PostgreSQL Better compatibility
- QuestDB: Support InfluxDB Inline agreement and PostgreSQL, But the ecological problem is relatively big
Brief introduction to the project
This log system DEMO, But it can be directly used in the production environment , Use LOG_Process Read Nginx ./Access.log, Use influxDB To access
log_process -path ./access.log influxdsn http://127.0.0.1:8086@[email protected][email protected]@s
Common concurrency models
- solve C10k The problem of Adopt asynchronous non blocking model (Nginx, libevent, NodeJS)-- problem High complexity A large number of callback functions
- coroutines (Go,Erlang, lua): Write code like a collinear function ; Understand root plus lightweight threads
3. Programs are executed in parallel go foo() // Execute function
4. mgs:= <- c Multiple gorountine Need to communicate
5. select From many channel Read data from , Multiple channel Choose one at random for consumption
6. Concurrent : A task looks like running through the scheduler It belongs to single core CPU( Logical operation ) about IO Dense type is friendly
7. parallel : The task really runs
stay go In language Concurrent execution , Use three different gorountine, One is responsible for filling , One is responsible for transportation , One is responsible for dealing with , Let the program run concurrently , Make the task more simple This idea Can also be Log parsing read , Write modules to separate small modules , Each module lets you use gorountine , adopt channel Data interaction , As for so much gorountine It's in one. CPU Schedule execution or allocate to multiple CPU Go ahead and execute , Depends on the system .
go Language has its own scheduler , go fun() It belongs to an independent work unit ,go The scheduler , Allocate a logical processor according to each available physical processor , Through this logic processor Independent unit for processing ,
By setting : runtime.GOMAXPROCS(1)// Assign several specific logical processors to the scheduler
A server The more physical processors ,go Get more logical processors , The faster the generator allows . Reference resources :https://blog.csdn.net/ice_fire_x/article/details/105141409
System architecture
The basic process pseudo function of log parsing , The following function has two defects , Parsing intervention and output after parsing can only be written dead , So we need to expand , Interface expansion
package main
import (
"fmt"
"strings"
"time"
)
/** * The log parsing system is divided into : analysis , Read , write in */
type LogProcess struct {
path string // Read file path
influxDBDsn string // influx data source
rc chan string // read module to process
wc chan string // process to influx
}
// The return function uses The pointer , The structure is very large No need to copy performance optimization
func (l *LogProcess) ReadFromFile() {
// File reading module
line := "message"
l.rc <- line
}
func (l *LogProcess) Process() {
// File parsing module
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
func (l *LogProcess) writeToInfluxDB() {
fmt.Println(<-l.wc)
}
func main() {
// lp Reference type
lp := &LogProcess{
path: "./tmp/access.log",
influxDBDsn: "username&password...",
rc: make(chan string),
wc: make(chan string),
}
// tree goroutine run
go lp.ReadFromFile()
go lp.Process()
// Need to define chan take Process data Pass to influxDB
go lp.writeToInfluxDB()
time.Sleep(2 * time.Second)
}
Interface mode constraints Input and output To optimize
package main
import (
"fmt"
"strings"
"time"
)
/** * The log parsing system is divided into : analysis , Read , write in */
type LogProcess struct {
rc chan string // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// File parsing module
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
fmt.Println(<-wc)
}
type Read interface {
read(rc chan string)
}
type ReadFromFile struct {
path string // Read the file
}
func (r *ReadFromFile) read(rc chan string) {
// Read module
line := "message"
rc <- line
}
func main() {
// lp Reference type
r := &ReadFromFile{
path: "./tmp/access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan string),
wc: make(chan string),
read: r,
write: w,
}
// Through the interface Constrain its function
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// Through parameter injection
time.Sleep(2 * time.Second)
}
Specific implementation of reading module
- Read line by line since the cursor was last read , There is no need to read all the files every time
package main
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"time"
)
/** * The log parsing system is divided into : analysis , Read , write in */
type LogProcess struct {
rc chan []byte // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// File parsing module
for v := range l.rc {
l.wc <- strings.ToUpper(string(v))
}
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
// wc Another way to read the channel
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // Read the file
}
func (r *ReadFromFile) read(rc chan []byte) {
// Real time systems : Read line by line from the end of the file
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintln("open file error:%s", err.Error()))
}
// The end of the file is read at the beginning
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d Read to the end of the file , The log has not been written
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintln("ReadBytes error:%s", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp Reference type
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan string),
read: r,
write: w,
}
// Through the interface Constrain its function
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// Through parameter injection
time.Sleep(100 * time.Second)
}
Log parsing module
- blunt Read Chan Read each row of data in
- Extract the required monitoring data in a regular way
- Writes data to influxDB
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"regexp"
"strconv"
"time"
)
/** * The log parsing system is divided into : analysis , Read , write in */
type LogProcess struct {
rc chan []byte // read module to process
wc chan *Message // process to influx
read Read
write Writer
}
// Log writing structure
type Message struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
// Parse data through regular expressions
r := regexp.MustCompile(`(\s*)`)
loc, _ := time.LoadLocation("Asia/shanghai")
// File parsing module
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
if len(ret) != 13 {
log.Println("FindStringSub match fail:", string(v))
continue
}
message := &Message{
}
location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println("ParseInLocation fail:", err.Error(), ret[4])
}
message.TimeLocal = location
// The string type is converted to int
atoi, err := strconv.Atoi(ret[8])
if err != nil {
log.Println("strconv.Atoi fail:", err.Error(), ret[4])
}
message.BytesSent = atoi
l.wc <- message
}
}
type Writer interface {
writer(wc chan *Message)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan *Message) {
// wc Another way to read the channel
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // Read the file
}
func (r *ReadFromFile) read(rc chan []byte) {
// Real time systems : Read line by line from the end of the file
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error:%s\n", err.Error()))
}
// The end of the file is read at the beginning
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d Read to the end of the file , The log has not been written
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp Reference type
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan *Message),
read: r,
write: w,
}
// Through the interface Constrain its function
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// Through parameter injection
time.Sleep(100 * time.Second)
}
边栏推荐
- Get hours, minutes and seconds
- Pinia plugin persist, a data persistence plug-in of Pinia
- Literature speed reading | in the face of danger, anxious people run faster?
- [detailed explanation of key and difficult points of document operation]
- Convert rich text to normal text
- Extended Physics-InformedNeural Networks论文详解
- 简单使用 MySQL 索引
- Method of manually cloning virtual machine in esxi6.7
- Win11麦克风权限的开启方法
- Code dynamically controls textview to move right (not XML)
猜你喜欢

Information system project managers must recite the core examination site (50). The contract content is not clearly stipulated

Games101 review: shading, rendering pipelines

HLS Experiment 1 -- multiplier

(PC+WAP)织梦模板蔬菜水果类网站

MySQL tutorial: MySQL database learning classic (from getting started to mastering)

MySQL教程:MySQL数据库学习宝典(从入门到精通)

AMD64 (x86_64) architecture ABI document:

重装Win7系统如何进行?

C language layered understanding (C language function)

移位距离和假设的应用
随机推荐
I hope you can help me with MySQL
Usage of '...' in golang
[translation] cloud like internal load balancer for kubernetes?
文件操作(一)——文件简介与文件的打开方式和关闭
Programming example of STM32 state machine -- fully automatic washing machine (Part 1)
一切的源头,代码分支策略的选择
【C语言】深入理解 整型提升 和 算术转换
Standardize your own debug process
From the annual reports of major apps, we can see that user portraits - labels know you better than you do
Shardingsphere data slicing
hello world驱动(二)-初级版
Pipnet: face key point detection for natural scenes "pixel in pixel net: directions efficient facial landmark detection in the wild"
朋友刚学完自动化测试就拿25Koffer,我功能测试何时才能到头?
Convert rich text to normal text
massCode 一款优秀的开源代码片段管理器
[steering wheel] how to transfer the start command and idea: VM parameters, command line parameters, system parameters, environment variable parameters, main method parameters
Neo4j 导入csv数据报错:Neo4j load csv error : Couldn‘t load the external resource
Masscode is an excellent open source code fragment manager
MySQL教程:MySQL数据库学习宝典(从入门到精通)
Binary search 33. search rotation sort array