当前位置:网站首页>浪潮云溪分布式数据库 Tracing(二)—— 源码解析
浪潮云溪分布式数据库 Tracing(二)—— 源码解析
2022-07-07 22:13:00 【浪潮云溪数据库】
按照【云溪数据库Tracing(一)】介绍的使用opentracing要求,本文着重介绍云溪数据库Tracing模块中是如何实现Span,SpanContexts和Tracer的。
Part 1 - Tracing 模块调用关系
1.1 Traincg模块包含的文件列表
Tracer.go :定义了opentracing 中的trace相关接口的实现。Tracer_span.go :定义了opentracing中的span 相关操作的实现。Tags.go :定义了 opentracing中关于tags的相关接口。Shadow.go :不是opentracing中的概念,这里主要实现与zipkin的通信,用于tracing 信息推送到外部的zipkin中。
1.2 各个文件之间的调用关系
在cluster_settings.go中会创建tracer,供全局使用,其他模块中使用这个Tracer实现span的创建和其他操作,例如设定span名称、设定tag 、增加log等操作。
Part 2 - Opentracing
在云溪数据库中的实现
以下是只是列出了部分接口实现,并非全部。
2.1 Span 接口实现:
GetContext实现:API用于获取Span中的SpanContext,主要功能是先创建一个map[string]string类型的baggageCopy,将span中的mu.Baggage 读出写入baggageCopy,创建新的spanContext,并且返回。
func (s *span) Context() opentracing.SpanContext {
s.mu.Lock()
defer s.mu.Unlock()
baggageCopy := make(map[string]string, len(s.mu.Baggage))
for k, v := range s.mu.Baggage {
baggageCopy[k] = v
}
sc := &spanContext{
spanMeta: s.spanMeta,
Baggage: baggageCopy,
}
if s.shadowTr != nil {
sc.shadowTr = s.shadowTr
sc.shadowCtx = s.shadowSpan.Context()
}
if s.isRecording() {
sc.recordingGroup = s.mu.recordingGroup
sc.recordingType = s.mu.recordingType
}
return sc
}
Finished实现:API用于结束一个Span的记录和追踪。
func (s *span) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}
SetTag实现:用于向指定的Span添加Tag信息。
func (s *span) SetTag(key string, value interface{}) opentracing.Span {
return s.setTagInner(key, value, false /* locked */)
}
Log实现:用于向指定的Span添加Log信息。
func (s *span) LogKV(alternatingKeyValues ...interface{}) {
fields, err := otlog.InterleavedKVToFields(alternatingKeyValues...)
if err != nil {
s.LogFields(otlog.Error(err), otlog.String("function", "LogKV"))
return
}
s.LogFields(fields...)
}
SetBaggageItem实现:用于向指定的Span增加Baggage信息,主要是用于跨进程追踪使用。
func (s *span) SetBaggageItem(restrictedKey, value string) opentracing.Span {
s.mu.Lock()
defer s.mu.Unlock()
return s.setBaggageItemLocked(restrictedKey, value)
}
BaggageItem实现:用于获取指定的Baggage信息。
func (s *span) BaggageItem(restrictedKey string) string {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.Baggage[restrictedKey]
}
SetOperationName实现:用于设定Span 的名称。
func (s *span) SetOperationName(operationName string) opentracing.Span { if s.shadowTr != nil { s.shadowSpan.SetOperationName(operationName) } s.operation = operationName return s}
Tracer实现:用于获取Span属于哪个Tracer。
// Tracer is part of the opentracing.Span interface.
func (s *span) Tracer() opentracing.Tracer {
return s.tracer
}
2.2 SpanContext 接口实现:
ForeachBaggageItem实现:用于遍历spanContext中的baggage信息。
func (sc *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
for k, v := range sc.Baggage {
if !handler(k, v) {
break
}
}
}
2.3 Tracer接口实现:
Inject实现:用于向carrier中注入SpanContext信息
// Inject is part of the opentracing.Tracer interface.
func (t *Tracer) Inject(
osc opentracing.SpanContext, format interface{}, carrier interface{},
) error {
……
// We only
support the HTTPHeaders/TextMap format.
if format != opentracing.HTTPHeaders && format != opentracing.TextMap {
return opentracing.ErrUnsupportedFormat
}
mapWriter, ok := carrier.(opentracing.TextMapWriter)
if !ok {
return opentracing.ErrInvalidCarrier
}
sc, ok := osc.(*spanContext)
if !ok {
return opentracing.ErrInvalidSpanContext
}
mapWriter.Set(fieldNameTraceID, strconv.FormatUint(sc.TraceID, 16))
mapWriter.Set(fieldNameSpanID, strconv.FormatUint(sc.SpanID, 16))
for k, v := range sc.Baggage {
mapWriter.Set(prefixBaggage+k, v)
}
……
return nil
}
Extract实现:用于从carrier中抽取出SpanContext信息。
func (t *Tracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
// We only
support the HTTPHeaders/TextMap format.
if format != opentracing.HTTPHeaders && format != opentracing.TextMap {
return noopSpanContext{}, opentracing.ErrUnsupportedFormat
}
mapReader, ok := carrier.(opentracing.TextMapReader)
if !ok {
return noopSpanContext{}, opentracing.ErrInvalidCarrier
}
var sc spanContext
……
err :=
mapReader.ForeachKey(func(k, v string) error {
switch k = strings.ToLower(k); k {
case fieldNameTraceID:
var err error
sc.TraceID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
case fieldNameSpanID:
var err error
sc.SpanID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
case fieldNameShadowType:
shadowType = v
default:
if strings.HasPrefix(k, prefixBaggage) {
if sc.Baggage == nil {
sc.Baggage = make(map[string]string)
}
sc.Baggage[strings.TrimPrefix(k, prefixBaggage)] = v
} else if strings.HasPrefix(k, prefixShadow) {
if shadowCarrier == nil {
shadowCarrier = make(opentracing.TextMapCarrier)
}
// We build a
shadow textmap with the original shadow keys.
shadowCarrier.Set(strings.TrimPrefix(k, prefixShadow), v)
}
}
return nil
})
if err != nil {
return noopSpanContext{}, err
}
if sc.TraceID == 0 &&
sc.SpanID == 0 {
return noopSpanContext{}, nil
}
……
return &sc, nil
}
StartSpan接口实现:用于创建一个新的Span,可根据传入不同opts来实现不同Span的初始化。
func (t *Tracer) StartSpan(
operationName string, opts ...opentracing.StartSpanOption,
) opentracing.Span {
// Fast paths to
avoid the allocation of StartSpanOptions below when tracing
// is disabled: if we have no options
or a single SpanReference (the common
// case) with a noop context, return a
noop span now.
if len(opts) == 1 {
if o, ok := opts[0].(opentracing.SpanReference); ok {
if IsNoopContext(o.ReferencedContext) {
return &t.noopSpan
}
}
}
shadowTr := t.getShadowTracer()
……
return s
}
2.4 noop span 实现:
noop span实现:使监控代码不依赖Tracer和Span的返回值,防止程序异常退出。
type noopSpan struct {
tracer *Tracer
}
var _ opentracing.Span = &noopSpan{}
func (n *noopSpan) Context() opentracing.SpanContext { return noopSpanContext{} }
func (n *noopSpan) BaggageItem(key string) string { return "" }
func (n *noopSpan) SetTag(key string, value interface{}) opentracing.Span { return n }
func (n *noopSpan) Finish() {}
func (n *noopSpan) FinishWithOptions(opts opentracing.FinishOptions) {}
func (n *noopSpan) SetOperationName(operationName string) opentracing.Span { return n }
func (n *noopSpan) Tracer() opentracing.Tracer { return n.tracer }
func (n *noopSpan) LogFields(fields ...otlog.Field) {}
func (n *noopSpan) LogKV(keyVals ...interface{}) {}
func (n *noopSpan) LogEvent(event string) {}
func (n *noopSpan) LogEventWithPayload(event string, payload interface{}) {}
func (n *noopSpan) Log(data opentracing.LogData) {}
func (n *noopSpan) SetBaggageItem(key, val string) opentracing.Span {
if key == Snowball {
panic("attempting to set Snowball on a noop span; use the Recordable option
to StartSpan")
}
return n
}
Part3 - 云溪数据库中
Opentracing 简单使用示例
3.1 开启Tracer Recording测试
云溪数据库中 开始创建的span均是no operator span,需要手动调用StartRecording,将span转换为可record状态,才能正常对span进行操作。
func TestTracerRecording(t *testing.T) {
tr := NewTracer()
noop1 := tr.StartSpan("noop")
if _, noop := noop1.(*noopSpan); !noop {
t.Error("expected noop span")
}
noop1.LogKV("hello", "void")
noop2 := tr.StartSpan("noop2", opentracing.ChildOf(noop1.Context()))
if _, noop := noop2.(*noopSpan); !noop {
t.Error("expected noop child span")
}
noop2.Finish()
noop1.Finish()
s1 := tr.StartSpan("a", Recordable)
if _, noop := s1.(*noopSpan); noop {
t.Error("Recordable (but not recording) span should not be noop")
}
if !IsBlackHoleSpan(s1) {
t.Error("Recordable span should be black hole")
}
// Unless recording is actually started, child spans are still noop.
noop3 := tr.StartSpan("noop3", opentracing.ChildOf(s1.Context()))
if _, noop := noop3.(*noopSpan); !noop {
t.Error("expected noop child span")
}
noop3.Finish()
s1.LogKV("x", 1)
StartRecording(s1, SingleNodeRecording)
s1.LogKV("x", 2)
s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context()))
if IsBlackHoleSpan(s2) {
t.Error("recording span should not be black hole")
}
s2.LogKV("x", 3)
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: unfinished=
x: 2
span b:
tags: unfinished=
x: 3
`); err != nil {
t.Fatal(err)
}
if err := TestingCheckRecordedSpans(GetRecording(s2), `
span b:
tags: unfinished=
x: 3
`); err != nil {
t.Fatal(err)
}
s3 := tr.StartSpan("c", opentracing.FollowsFrom(s2.Context()))
s3.LogKV("x", 4)
s3.SetTag("tag", "val")
s2.Finish()
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: unfinished=
x: 2
span b:
x: 3
span c:
tags: tag=val unfinished=
x: 4
`); err != nil {
t.Fatal(err)
}
s3.Finish()
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: unfinished=
x: 2
span b:
x: 3
span c:
tags: tag=val
x: 4
`); err != nil {
t.Fatal(err)
}
StopRecording(s1)
s1.LogKV("x", 100)
if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil {
t.Fatal(err)
}
// The child span is still recording.
s3.LogKV("x", 5)
if err := TestingCheckRecordedSpans(GetRecording(s3), `
span c:
tags: tag=val
x: 4
x: 5
`); err != nil {
t.Fatal(err)
}
s1.Finish()
}
3.2 创建childSpan 测试
测试StartChildSpan,根据已有span创建出一个新的span,为已有span的子span。
func TestStartChildSpan(t *testing.T) {
tr := NewTracer()
sp1 := tr.StartSpan("parent", Recordable)
StartRecording(sp1, SingleNodeRecording)
sp2 := StartChildSpan("child", sp1, nil /* logTags */, false /*separateRecording*/)
sp2.Finish()
sp1.Finish()
if err := TestingCheckRecordedSpans(GetRecording(sp1), `
span parent:
span child:
`); err != nil {
t.Fatal(err)
}
sp1 = tr.StartSpan("parent", Recordable)
StartRecording(sp1, SingleNodeRecording)
sp2 = StartChildSpan("child", sp1, nil /* logTags */, true /*separateRecording*/)
sp2.Finish()
sp1.Finish()
if err := TestingCheckRecordedSpans(GetRecording(sp1), `
span parent:
`); err != nil {
t.Fatal(err)
}
if err := TestingCheckRecordedSpans(GetRecording(sp2), `
span child:
`); err != nil {
t.Fatal(err)
}
sp1 = tr.StartSpan("parent", Recordable)
StartRecording(sp1, SingleNodeRecording)
sp2 = StartChildSpan(
"child", sp1, logtags.SingleTagBuffer("key", "val"), false, /*separateRecording*/
)
sp2.Finish()
sp1.Finish()
if err := TestingCheckRecordedSpans(GetRecording(sp1), `
span parent:
span child:
tags: key=val
`); err != nil {
t.Fatal(err)
}
}
3.3 跨进程追踪测试
测试跨进程追踪功能,主要是测试inject接口和 extract 接口,Inject用于向carrier中注入SpanContext信息,Extract用于从carrier中抽取出SpanContext 信息。
func TestTracerInjectExtract(t *testing.T) {
tr := NewTracer()
tr2 := NewTracer()
// Verify that noop spans become noop spans on the remote side.
noop1 := tr.StartSpan("noop")
if _, noop := noop1.(*noopSpan); !noop {
t.Fatalf("expected noop span: %+v", noop1)
}
carrier := make(opentracing.HTTPHeadersCarrier)
if err := tr.Inject(noop1.Context(), opentracing.HTTPHeaders, carrier); err != nil {
t.Fatal(err)
}
if len(carrier) != 0 {
t.Errorf("noop span has carrier: %+v", carrier)
}
wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier)
if err != nil {
t.Fatal(err)
}
if _, noopCtx := wireContext.(noopSpanContext); !noopCtx {
t.Errorf("expected noop context: %v", wireContext)
}
noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext))
if _, noop := noop2.(*noopSpan); !noop {
t.Fatalf("expected noop span: %+v", noop2)
}
noop1.Finish()
noop2.Finish()
// Verify that snowball tracing is propagated and triggers recording on the
// remote side.
s1 := tr.StartSpan("a", Recordable)
StartRecording(s1, SnowballRecording)
carrier = make(opentracing.HTTPHeadersCarrier)
if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil {
t.Fatal(err)
}
wireContext, err = tr2.Extract(opentracing.HTTPHeaders, carrier)
if err != nil {
t.Fatal(err)
}
s2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext))
// Compare TraceIDs
trace1 := s1.Context().(*spanContext).TraceID
trace2 := s2.Context().(*spanContext).TraceID
if trace1 != trace2 {
t.Errorf("TraceID doesn't match: parent %d child %d", trace1, trace2)
}
s2.LogKV("x", 1)
s2.Finish()
// Verify that recording was started automatically.
rec := GetRecording(s2)
if err := TestingCheckRecordedSpans(rec, `
span remote op:
tags: sb=1
x: 1
`); err != nil {
t.Fatal(err)
}
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: sb=1 unfinished=
`); err != nil {
t.Fatal(err)
}
if err := ImportRemoteSpans(s1, rec); err != nil {
t.Fatal(err)
}
s1.Finish()
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: sb=1
span remote op:
tags: sb=1
x: 1
`); err != nil {
t.Fatal(err)
}
}
边栏推荐
- At the age of 35, I made a decision to face unemployment
- LinkedBlockingQueue源码分析-新增和删除
- 95. (cesium chapter) cesium dynamic monomer-3d building (building)
- If an exception is thrown in the constructor, the best way is to prevent memory leakage?
- 动态库基本原理和使用方法,-fPIC 选项的来龙去脉
- 【编程题】【Scratch二级】2019.03 垃圾分类
- Robomaster visual tutorial (11) summary
- PostGIS learning
- [programming problem] [scratch Level 2] December 2019 flying birds
- [the most detailed in history] statistical description of overdue days in credit
猜你喜欢
Les mots ont été écrits, la fonction est vraiment puissante!
一鍵免費翻譯300多頁的pdf文檔
QT and OpenGL: load 3D models using the open asset import library (assimp)
Preliminary test of optical flow sensor: gl9306
光流传感器初步测试:GL9306
CoinDesk评波场去中心化进程:让人们看到互联网的未来
Stm32f1 and stm32cubeide programming example - rotary encoder drive
用语雀写文章了,功能真心强大!
Detailed explanation of interview questions: the history of blood and tears in implementing distributed locks with redis
QT creator add JSON based Wizard
随机推荐
Handwriting a simulated reentrantlock
The difference between get and post
如何衡量产品是否“刚需、高频、痛点”
Relevant methods of sorting arrays in JS (if you want to understand arrays, it's enough to read this article)
limit 与offset的用法(转载)
【推荐系统基础】正负样本采样和构造
去了字节跳动,才知道年薪 40w 的测试工程师有这么多?
玩转Sonar
快速上手使用本地测试工具postman
用語雀寫文章了,功能真心强大!
【leetcode】day1
SQL uses the in keyword to query multiple fields
Benchmarking Detection Transfer Learning with Vision Transformers(2021-11)
DataGuard active / standby cleanup archive settings
【编程题】【Scratch二级】2019.09 制作蝙蝠冲关游戏
【編程題】【Scratch二級】2019.12 飛翔的小鳥
paddle入门-使用LeNet在MNIST实现图像分类方法一
Use filters to count URL request time
One click installation with fishros in blue bridge ROS
Development of a horse tourism website (optimization of servlet)