当前位置:网站首页>GoLang日志编程系统
GoLang日志编程系统
2022-07-26 02:57:00 【上后左爱】
日志监控系统
Nginx(日志文件) -> log_process (实时读取解析写入) -> influxdb(存储) ->grafana(前端日志展示器)
influxdb 属于GO语言编写的开源的时序型数据,着力于高性能 查询与存储时序型数据,influxdb 广泛的应用于存储系统的监控数据,IOT行业的实时数据。
- 目前市面上流行 TSDB(时序型处理数据库):influxDB, TimescaleDB, QuestDB
- influxDB 类似于NOSQL体验,自动适合标记集模型的技术的数据集;
- TimescaleDB 与 postgreSQL 兼容, 更加适合物联网数据,与PostgreSQL更好的兼容
- QuestDB: 支持InfluxDB内联协议和PostgreSQL, 但是生态问题比较大
项目简答介绍
本日志系统 DEMO,但是可以直接使用到生产环境上面,使用LOG_Process 读取Nginx ./Access.log, 使用influxDB 进行存取
log_process -path ./access.log influxdsn http://127.0.0.1:8086@[email protected]@[email protected]
常见并发模型
- 解决C10k 的问题 采用异步非阻塞的模型(Nginx, libevent, NodeJS)-- 问题 复杂度高 大量回调函数
- 协程(Go,Erlang, lua): 协线性函数一样写代码;理解根加轻量级别的线程
3. 程序并行执行 go foo() // 执行函数
4. mgs:= <- c 多个gorountine 需要进行通信
5. select 从多个channel 中读取数据 ,多个 channel 随机选择一个进行消费
6. 并发: 一个任务通过调度器让任务看起来运行 属于单核CPU(逻辑运行)对于IO密集型比较友好
7. 并行:任务真正的运行
在go 语言中 并发执行 ,使用三个不同 gorountine, 一个负责装填,一个负责运输,一个负责处理 ,让程序并发的运行起来,让任务更加的职责单一化 这种思想 也可以将 日志解析读取,写入模块进行单独小模块,每个模块让使用gorountine ,通过channel 数据交互,至于这么多gorountine 是在一个CPU调度执行还是分配到多个CPU上进行执行 ,取决于系统.
go 语言有自己的调度器, go fun() 属于一个独立的工作单元,go的调度器,根据每个可用的物理处理器分配一个逻辑处理器,通过这个逻辑处理器对 独立单元进行处理,
通过设置: runtime.GOMAXPROCS(1)//给调度器分配多小个具体的逻辑处理器
一台服务器的 物理处理器越多 ,go 获取到逻辑处理器也越多,导致器允许速度越快。 参考:https://blog.csdn.net/ice_fire_x/article/details/105141409
系统架构
日志解析的基本流程化的伪函数,如下的函数有两个缺陷,解析介入和解析后输出只能写死,所以需要进行扩展,接口方式进行扩展
package main
import (
"fmt"
"strings"
"time"
)
/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct {
path string // 读取文件路径
influxDBDsn string // influx data source
rc chan string // read module to process
wc chan string // process to influx
}
// 返回函数使用 指针, 结构体很大 不需要进行拷贝 性能优化
func (l *LogProcess) ReadFromFile() {
// 文件读取模块
line := "message"
l.rc <- line
}
func (l *LogProcess) Process() {
// 文件解析模块
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
func (l *LogProcess) writeToInfluxDB() {
fmt.Println(<-l.wc)
}
func main() {
// lp 引用类型
lp := &LogProcess{
path: "./tmp/access.log",
influxDBDsn: "username&password...",
rc: make(chan string),
wc: make(chan string),
}
// tree goroutine run
go lp.ReadFromFile()
go lp.Process()
// 需要定义 chan 将 Process 数据 传递给 influxDB
go lp.writeToInfluxDB()
time.Sleep(2 * time.Second)
}
接口方式约束 输入和输出 进行优化
package main
import (
"fmt"
"strings"
"time"
)
/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct {
rc chan string // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// 文件解析模块
data := <-l.rc
l.wc <- strings.ToUpper(data)
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
fmt.Println(<-wc)
}
type Read interface {
read(rc chan string)
}
type ReadFromFile struct {
path string // 读取文件
}
func (r *ReadFromFile) read(rc chan string) {
// 读取模块
line := "message"
rc <- line
}
func main() {
// lp 引用类型
r := &ReadFromFile{
path: "./tmp/access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan string),
wc: make(chan string),
read: r,
write: w,
}
// 通过接口方式 约束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通过参数注入方式
time.Sleep(2 * time.Second)
}
读取模块具体实现
- 从上次读取光标后开始逐行进行读取,无需每次都全部文件读取
package main
import (
"bufio"
"fmt"
"io"
"os"
"strings"
"time"
)
/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct {
rc chan []byte // read module to process
wc chan string // process to influx
read Read
write Writer
}
func (l *LogProcess) Process() {
// 文件解析模块
for v := range l.rc {
l.wc <- strings.ToUpper(string(v))
}
}
type Writer interface {
writer(wc chan string)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan string) {
// wc 通道另外一种读取方式
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // 读取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
// 实时系统: 从文件末尾逐行进行读取
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintln("open file error:%s", err.Error()))
}
// 文件末尾最开始进行读取
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d读取到文件末尾, 日志还没有写入
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintln("ReadBytes error:%s", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp 引用类型
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan string),
read: r,
write: w,
}
// 通过接口方式 约束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通过参数注入方式
time.Sleep(100 * time.Second)
}
日志解析模块
- 冲Read Chan 中读取每一行数据
- 正则方式提取所需要的监控数据
- 将数据写入到influxDB
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"regexp"
"strconv"
"time"
)
/** * 日志解析系统分为: 解析,读取,写入 */
type LogProcess struct {
rc chan []byte // read module to process
wc chan *Message // process to influx
read Read
write Writer
}
//日志写入结构体
type Message struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
// 通过正则表达式进行解析数据
r := regexp.MustCompile(`(\s*)`)
loc, _ := time.LoadLocation("Asia/shanghai")
// 文件解析模块
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
if len(ret) != 13 {
log.Println("FindStringSub match fail:", string(v))
continue
}
message := &Message{
}
location, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println("ParseInLocation fail:", err.Error(), ret[4])
}
message.TimeLocal = location
// 字符串类型转换成int
atoi, err := strconv.Atoi(ret[8])
if err != nil {
log.Println("strconv.Atoi fail:", err.Error(), ret[4])
}
message.BytesSent = atoi
l.wc <- message
}
}
type Writer interface {
writer(wc chan *Message)
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
func (w *WriteToInfluxDB) writer(wc chan *Message) {
// wc 通道另外一种读取方式
for x := range wc {
fmt.Println(x)
}
}
type Read interface {
read(rc chan []byte)
}
type ReadFromFile struct {
path string // 读取文件
}
func (r *ReadFromFile) read(rc chan []byte) {
// 实时系统: 从文件末尾逐行进行读取
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file error:%s\n", err.Error()))
}
// 文件末尾最开始进行读取
f.Seek(0, 2)
rd := bufio.NewReader(f)
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
// d读取到文件末尾, 日志还没有写入
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error:%s\n", err.Error()))
}
rc <- line[:len(line)-1]
}
}
func main() {
// lp 引用类型
r := &ReadFromFile{
path: "H:\\code\\goprogarm\\src\\access.log",
}
w := &WriteToInfluxDB{
influxDBDsn: "username&password"}
lp := &LogProcess{
rc: make(chan []byte),
wc: make(chan *Message),
read: r,
write: w,
}
// 通过接口方式 约束其功能
go lp.read.read(lp.rc)
go lp.Process()
go lp.write.writer(lp.wc)
// 通过参数注入方式
time.Sleep(100 * time.Second)
}
边栏推荐
- Jenkins' study notes are detailed
- Longest Substring Without Repeating Characters
- [SQL] CASE表达式
- What if the test / development programmer gets old? Lingering cruel facts
- 【C进阶】深入探索数据的存储(深度剖析+典例解读)
- How to speed up matrix multiplication
- 【方向盘】工具提效:Sublime Text 4的常用快捷键合集
- Shardingsphere data slicing
- JS get the time composition array of two time periods
- How to design automated test cases?
猜你喜欢

Annotation development management third-party beans

Project management: lean management method

图像识别(六)| 激活函数

Arthas view the source code of the loaded class (JAD)

重装Win7系统如何进行?
![[steering wheel] use the 60 + shortcut keys of idea to share with you, in order to improve efficiency (reconstruction)](/img/b4/62a4c06743fdedacdffd9b156a760f.png)
[steering wheel] use the 60 + shortcut keys of idea to share with you, in order to improve efficiency (reconstruction)

Simply use MySQL index
![[steering wheel] how to transfer the start command and idea: VM parameters, command line parameters, system parameters, environment variable parameters, main method parameters](/img/97/159d7df5e2d11b129c400d61e3fde6.png)
[steering wheel] how to transfer the start command and idea: VM parameters, command line parameters, system parameters, environment variable parameters, main method parameters
![[SQL] CASE表达式](/img/05/1bbb0b5099443f7ce5f5511703477e.png)
[SQL] CASE表达式

Application of shift distance and hypothesis
随机推荐
Autojs cloud control source code + display
[introduction to C language] zzulioj 1006-1010
(9) Attribute introspection
Literature speed reading | in the face of danger, anxious people run faster?
Binary search 33. search rotation sort array
文件操作(一)——文件简介与文件的打开方式和关闭
基础知识-网络与服务器
MySQL tutorial: MySQL database learning classic (from getting started to mastering)
富文本转化为普通文本
Have you ever seen this kind of dynamic programming -- the stock problem of state machine dynamic programming (Part 1)
Wechat official account mutual aid, open white groups, and small white newspaper groups to keep warm
Turn on the LED
关于mysql的问题,希望个位能帮一下忙
Safety margin of mass consumption
Self-supervised learning method to solve the inverse problem of Fokker-Planck Equation
ShardingSphere数据分片
Cycle and branch (I)
朋友刚学完自动化测试就拿25Koffer,我功能测试何时才能到头?
如何有效的去防止别人穿该网站首页快照
AMD64 (x86_64) architecture ABI document: medium