当前位置:网站首页>Golang log programming system
Golang log programming system
2022-07-26 03:01:00 【Upper back left love】
Log monitoring system
Nginx( Log files ) -> log_process ( Read in real time, parse and write ) -> influxdb( Storage ) ->grafana( Front end log viewer )
influxdb Belong to GO Open source sequential data written in , Focus on high performance Query and store sequential data ,influxdb Widely used in storage system monitoring data ,IOT Real time data of the industry .
- At present, it is popular in the market TSDB( Temporal processing database ):influxDB, TimescaleDB, QuestDB
- influxDB Be similar to NOSQL Experience , A dataset of technologies that automatically fit the tag set model ;
- TimescaleDB And postgreSQL compatible , More suitable for Internet of things data , And PostgreSQL Better compatibility
- QuestDB: Support InfluxDB Inline agreement and PostgreSQL, But the ecological problem is relatively big
Brief introduction to the project
This log system DEMO, But it can be directly used in the production environment , Use LOG_Process Read Nginx ./Access.log, Use influxDB To access
log_process -path ./access.log influxdsn http://127.0.0.1:8086@[email protected][email protected]@s
Common concurrency models
- solve C10k The problem of Adopt asynchronous non blocking model (Nginx, libevent, NodeJS)-- problem High complexity A large number of callback functions
- coroutines (Go,Erlang, lua): Write code like a collinear function ; Understand root plus lightweight threads
3. Programs are executed in parallel go foo() // Execute function
4. mgs:= <- c Multiple gorountine Need to communicate
5. select From many channel Read data from , Multiple channel Choose one at random for consumption
6. Concurrent : A task looks like running through the scheduler It belongs to single core CPU( Logical operation ) about IO Dense type is friendly
7. parallel : The task really runs
stay go In language Concurrent execution , Use three different gorountine, One is responsible for filling , One is responsible for transportation , One is responsible for dealing with , Let the program run concurrently , Make the task more simple This idea Can also be Log parsing read , Write modules to separate small modules , Each module lets you use gorountine , adopt channel Data interaction , As for so much gorountine It's in one. CPU Schedule execution or allocate to multiple CPU Go ahead and execute , Depends on the system .
go Language has its own scheduler , go fun() It belongs to an independent work unit ,go The scheduler , Allocate a logical processor according to each available physical processor , Through this logic processor Independent unit for processing ,
By setting : runtime.GOMAXPROCS(1)// Assign several specific logical processors to the scheduler
A server The more physical processors ,go Get more logical processors , The faster the generator allows . Reference resources :https://blog.csdn.net/ice_fire_x/article/details/105141409
System architecture
The basic process pseudo function of log parsing , The following function has two defects , Parsing intervention and output after parsing can only be written dead , So we need to expand , Interface expansion
package main
import (
"fmt"
"strings"
"time"
)
/** * The log parsing system is divided into : analysis , Read , write in */
type LogProcess struct {
path string // Read file path
influxDBDsn string // influx data source
rc chan string // read module to process
wc chan string // process to influx
}
// The return function uses The pointer , The structure is very large No need to copy performance optimization
func (l *LogProcess) ReadFromFile() {
// File reading module
line := "message"
l.rc <- line
}
func (l *LogProcess) Process() {
// File parsing module
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
func (l *LogProcess) writeToInfluxDB() {
fmt.Println(<-l.wc)
}
func main() {
// lp Reference type
lp := &LogProcess{
path: "./tmp/access.log",
influxDBDsn: "username&password...",
rc: make(chan string),
wc: make(chan string),
}
// tree goroutine run
go lp.ReadFromFile()
go lp.Process()
// Need to define chan take Process data Pass to influxDB
go lp.writeToInfluxDB()
time.Sleep(2 * time.Second)
}
Interface mode constraints Input and output To optimize
package main
import (
"fmt"
"strings"
"time"
)
/** * The log parsing system is divided into : analysis , Read , write in */
type LogProcess struct {
rc chan string // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// File parsing module
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
fmt.Println(<-wc)
}
type Read interface {
read(rc chan string)
}
type ReadFromFile struct {
path string // Read the file
}
func (r *ReadFromFile) read(rc chan string) {
// Read module
line := "message"
rc <- line
}
func main() {
// lp Reference type
r := &ReadFromFile{
path: "./tmp/access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan string),
wc: make(chan string),
read: r,
write: w,
}
// Through the interface Constrain its function
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// Through parameter injection
time.Sleep(2 * time.Second)
}
Specific implementation of reading module
- Read line by line since the cursor was last read , There is no need to read all the files every time
package main
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"time"
)
/** * The log parsing system is divided into : analysis , Read , write in */
type LogProcess struct {
rc chan []byte // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// File parsing module
for v := range l.rc {
l.wc <- strings.ToUpper(string(v))
}
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
// wc Another way to read the channel
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // Read the file
}
func (r *ReadFromFile) read(rc chan []byte) {
// Real time systems : Read line by line from the end of the file
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintln("open file error:%s", err.Error()))
}
// The end of the file is read at the beginning
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d Read to the end of the file , The log has not been written
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintln("ReadBytes error:%s", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp Reference type
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan string),
read: r,
write: w,
}
// Through the interface Constrain its function
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// Through parameter injection
time.Sleep(100 * time.Second)
}
Log parsing module
- blunt Read Chan Read each row of data in
- Extract the required monitoring data in a regular way
- Writes data to influxDB
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"regexp"
"strconv"
"time"
)
/** * The log parsing system is divided into : analysis , Read , write in */
type LogProcess struct {
rc chan []byte // read module to process
wc chan *Message // process to influx
read Read
write Writer
}
// Log writing structure
type Message struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
// Parse data through regular expressions
r := regexp.MustCompile(`(\s*)`)
loc, _ := time.LoadLocation("Asia/shanghai")
// File parsing module
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
if len(ret) != 13 {
log.Println("FindStringSub match fail:", string(v))
continue
}
message := &Message{
}
location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println("ParseInLocation fail:", err.Error(), ret[4])
}
message.TimeLocal = location
// The string type is converted to int
atoi, err := strconv.Atoi(ret[8])
if err != nil {
log.Println("strconv.Atoi fail:", err.Error(), ret[4])
}
message.BytesSent = atoi
l.wc <- message
}
}
type Writer interface {
writer(wc chan *Message)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan *Message) {
// wc Another way to read the channel
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // Read the file
}
func (r *ReadFromFile) read(rc chan []byte) {
// Real time systems : Read line by line from the end of the file
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error:%s\n", err.Error()))
}
// The end of the file is read at the beginning
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d Read to the end of the file , The log has not been written
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp Reference type
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan *Message),
read: r,
write: w,
}
// Through the interface Constrain its function
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// Through parameter injection
time.Sleep(100 * time.Second)
}
边栏推荐
- High score technical document sharing of ink Sky Wheel - Database Security (48 in total)
- HLS Experiment 1 -- multiplier
- (九)属性自省
- Extended Physics-InformedNeural Networks论文详解
- 规范自己debug的流程
- Multithreaded programming
- MySQL tutorial: MySQL database learning classic (from getting started to mastering)
- How can users create data tables on Web pages and store them in the database
- assert _ Aligns
- Is it safe to open galaxy securities account by mobile phone?
猜你喜欢

ShardingSphere数据分片

AMD64 (x86_64) architecture ABI document:

STM32——PWM学习笔记

Autojs cloud control source code + display

Vofa+ serial port debugging assistant

【方向盘】使用IDEA的60+个快捷键分享给你,权为了提效(重构篇)

Extended Physics-InformedNeural Networks论文详解

图像识别(七)| 池化层是什么?有什么作用?

Basics - network and server

Literature speed reading | in the face of danger, anxious people run faster?
随机推荐
对于稳定性测试必需关注的26点
How to effectively prevent others from wearing the homepage snapshot of the website
持续交付和DevOps是一对好基友
ES6 advanced - inherit parent class attributes with constructors
C language layered understanding (C language function)
How to design automated test cases?
The source of everything, the choice of code branching strategy
I hope you can help me with MySQL
STM——EXTI外部中断学习笔记
重装Win7系统如何进行?
如何根据登录测试的需求设计测试用例?
Usage of arguments.callee
[detailed explanation of key and difficult points of document operation]
JVM内存模型解析
AMD64 (x86_64) architecture ABI document: medium
循环与分支(一)
Get hours, minutes and seconds
c语言分层理解(c语言函数)
VR panoramic shooting and production of business center helps businesses effectively attract people
Article setting top