当前位置:网站首页>浪潮云溪分布式数据库 Tracing(二)—— 源码解析

浪潮云溪分布式数据库 Tracing(二)—— 源码解析

2022-07-07 22:13:00 浪潮云溪数据库

按照【云溪数据库Tracing(一)】介绍的使用opentracing要求,本文着重介绍云溪数据库Tracing模块中是如何实现Span,SpanContexts和Tracer的。

Part 1 - Tracing 模块调用关系

1.1  Traincg模块包含的文件列表

Tracer.go :定义了opentracing 中的trace相关接口的实现。Tracer_span.go :定义了opentracing中的span 相关操作的实现。Tags.go :定义了 opentracing中关于tags的相关接口。Shadow.go :不是opentracing中的概念,这里主要实现与zipkin的通信,用于tracing 信息推送到外部的zipkin中。

1.2   各个文件之间的调用关系

在cluster_settings.go中会创建tracer,供全局使用,其他模块中使用这个Tracer实现span的创建和其他操作,例如设定span名称、设定tag 、增加log等操作。

 

Part 2 - Opentracing 

在云溪数据库中的实现

以下是只是列出了部分接口实现,并非全部。

2.1   Span 接口实现:

GetContext实现:API用于获取Span中的SpanContext,主要功能是先创建一个map[string]string类型的baggageCopy,span中的mu.Baggage 读出写入baggageCopy,创建新的spanContext,并且返回。

func (s *span) Context() opentracing.SpanContext { s.mu.Lock() defer s.mu.Unlock() baggageCopy := make(map[string]string, len(s.mu.Baggage)) for k, v := range s.mu.Baggage { baggageCopy[k] = v } sc := &spanContext{ spanMeta: s.spanMeta, Baggage: baggageCopy, } if s.shadowTr != nil { sc.shadowTr = s.shadowTr sc.shadowCtx = s.shadowSpan.Context() } if s.isRecording() { sc.recordingGroup = s.mu.recordingGroup sc.recordingType = s.mu.recordingType } return sc}

Finished实现:API用于结束一个Span的记录和追踪。​​​​​​

func (s *span) Finish() { s.FinishWithOptions(opentracing.FinishOptions{})}

SetTag实现:用于向指定的Span添加Tag信息。

func (s *span) SetTag(key string, value interface{}) opentracing.Span { return s.setTagInner(key, value, false /* locked */)}

Log实现:用于向指定的Span添加Log信息。

func (s *span) LogKV(alternatingKeyValues ...interface{}) { fields, err := otlog.InterleavedKVToFields(alternatingKeyValues...) if err != nil { s.LogFields(otlog.Error(err), otlog.String("function", "LogKV")) return } s.LogFields(fields...)}

SetBaggageItem实现:用于向指定的Span增加Baggage信息,主要是用于跨进程追踪使用。

func (s *span) SetBaggageItem(restrictedKey, value string) opentracing.Span { s.mu.Lock() defer s.mu.Unlock() return s.setBaggageItemLocked(restrictedKey, value)}

BaggageItem实现:用于获取指定的Baggage信息。

func (s *span) BaggageItem(restrictedKey string) string { s.mu.Lock() defer s.mu.Unlock() return s.mu.Baggage[restrictedKey]}

SetOperationName实现:用于设定Span 的名称。

func (s *span) SetOperationName(operationName string) opentracing.Span { if s.shadowTr != nil {  s.shadowSpan.SetOperationName(operationName) } s.operation = operationName return s}

Tracer实现:用于获取Span属于哪个Tracer。

// Tracer is part of the opentracing.Span interface.func (s *span) Tracer() opentracing.Tracer { return s.tracer}

2.2   SpanContext 接口实现:

ForeachBaggageItem实现:用于遍历spanContext中的baggage信息。

func (sc *spanContext) ForeachBaggageItem(handler func(k, v string) bool) { for k, v := range sc.Baggage { if !handler(k, v) { break } }}

2.3   Tracer接口实现:

Inject实现:用于向carrier中注入SpanContext信息

// Inject is part of the opentracing.Tracer interface.func (t *Tracer) Inject( osc opentracing.SpanContext, format interface{}, carrier interface{},) error { …… // We onlysupport the HTTPHeaders/TextMap format. if format != opentracing.HTTPHeaders && format != opentracing.TextMap { return opentracing.ErrUnsupportedFormat } mapWriter, ok := carrier.(opentracing.TextMapWriter) if !ok { return opentracing.ErrInvalidCarrier } sc, ok := osc.(*spanContext) if !ok { return opentracing.ErrInvalidSpanContext } mapWriter.Set(fieldNameTraceID, strconv.FormatUint(sc.TraceID, 16)) mapWriter.Set(fieldNameSpanID, strconv.FormatUint(sc.SpanID, 16)) for k, v := range sc.Baggage { mapWriter.Set(prefixBaggage+k, v) } …… return nil}

Extract实现:用于从carrier中抽取出SpanContext信息。

func (t *Tracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) { // We onlysupport the HTTPHeaders/TextMap format. if format != opentracing.HTTPHeaders && format != opentracing.TextMap { return noopSpanContext{}, opentracing.ErrUnsupportedFormat } mapReader, ok := carrier.(opentracing.TextMapReader) if !ok { return noopSpanContext{}, opentracing.ErrInvalidCarrier } var sc spanContext …… err :=mapReader.ForeachKey(func(k, v string) error { switch k = strings.ToLower(k); k { case fieldNameTraceID: var err error sc.TraceID, err = strconv.ParseUint(v, 16, 64) if err != nil { return opentracing.ErrSpanContextCorrupted } case fieldNameSpanID: var err error sc.SpanID, err = strconv.ParseUint(v, 16, 64) if err != nil { return opentracing.ErrSpanContextCorrupted } case fieldNameShadowType: shadowType = v default: if strings.HasPrefix(k, prefixBaggage) { if sc.Baggage == nil { sc.Baggage = make(map[string]string) } sc.Baggage[strings.TrimPrefix(k, prefixBaggage)] = v } else if strings.HasPrefix(k, prefixShadow) { if shadowCarrier == nil { shadowCarrier = make(opentracing.TextMapCarrier) } // We build ashadow textmap with the original shadow keys. shadowCarrier.Set(strings.TrimPrefix(k, prefixShadow), v) } } return nil }) if err != nil { return noopSpanContext{}, err } if sc.TraceID == 0 &&sc.SpanID == 0 { return noopSpanContext{}, nil } …… return &sc, nil}

StartSpan接口实现:用于创建一个新的Span,可根据传入不同opts来实现不同Span的初始化。

func (t *Tracer) StartSpan( operationName string, opts ...opentracing.StartSpanOption,) opentracing.Span { // Fast paths toavoid the allocation of StartSpanOptions below when tracing // is disabled: if we have no optionsor a single SpanReference (the common // case) with a noop context, return anoop span now. if len(opts) == 1 { if o, ok := opts[0].(opentracing.SpanReference); ok { if IsNoopContext(o.ReferencedContext) { return &t.noopSpan } } } shadowTr := t.getShadowTracer() …… return s}

2.4   noop span 实现:

noop span实现:使监控代码不依赖Tracer和Span的返回值,防止程序异常退出。

type noopSpan struct { tracer *Tracer}var _ opentracing.Span = &noopSpan{}func (n *noopSpan) Context() opentracing.SpanContext { return noopSpanContext{} }func (n *noopSpan) BaggageItem(key string) string { return "" }func (n *noopSpan) SetTag(key string, value interface{}) opentracing.Span { return n }func (n *noopSpan) Finish() {}func (n *noopSpan) FinishWithOptions(opts opentracing.FinishOptions) {}func (n *noopSpan) SetOperationName(operationName string) opentracing.Span { return n }func (n *noopSpan) Tracer() opentracing.Tracer { return n.tracer }func (n *noopSpan) LogFields(fields ...otlog.Field) {}func (n *noopSpan) LogKV(keyVals ...interface{}) {}func (n *noopSpan) LogEvent(event string) {}func (n *noopSpan) LogEventWithPayload(event string, payload interface{}) {}func (n *noopSpan) Log(data opentracing.LogData) {}func (n *noopSpan) SetBaggageItem(key, val string) opentracing.Span { if key == Snowball { panic("attempting to set Snowball on a noop span; use the Recordable optionto StartSpan") } return n}

 

Part3 - 云溪数据库中

Opentracing 简单使用示例

3.1     开启Tracer Recording测试

云溪数据库中 开始创建的span均是no operator span,需要手动调用StartRecording,将span转换为可record状态,才能正常对span进行操作。

func TestTracerRecording(t *testing.T) { tr := NewTracer() noop1 := tr.StartSpan("noop") if _, noop := noop1.(*noopSpan); !noop { t.Error("expected noop span") } noop1.LogKV("hello", "void") noop2 := tr.StartSpan("noop2", opentracing.ChildOf(noop1.Context())) if _, noop := noop2.(*noopSpan); !noop { t.Error("expected noop child span") } noop2.Finish() noop1.Finish() s1 := tr.StartSpan("a", Recordable) if _, noop := s1.(*noopSpan); noop { t.Error("Recordable (but not recording) span should not be noop") } if !IsBlackHoleSpan(s1) { t.Error("Recordable span should be black hole") } // Unless recording is actually started, child spans are still noop. noop3 := tr.StartSpan("noop3", opentracing.ChildOf(s1.Context())) if _, noop := noop3.(*noopSpan); !noop { t.Error("expected noop child span") } noop3.Finish() s1.LogKV("x", 1) StartRecording(s1, SingleNodeRecording) s1.LogKV("x", 2) s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context())) if IsBlackHoleSpan(s2) { t.Error("recording span should not be black hole") } s2.LogKV("x", 3) if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: unfinished= x: 2 span b: tags: unfinished= x: 3 `); err != nil { t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(s2), ` span b: tags: unfinished= x: 3 `); err != nil { t.Fatal(err) } s3 := tr.StartSpan("c", opentracing.FollowsFrom(s2.Context())) s3.LogKV("x", 4) s3.SetTag("tag", "val") s2.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: unfinished= x: 2 span b: x: 3 span c: tags: tag=val unfinished= x: 4 `); err != nil { t.Fatal(err) } s3.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: unfinished= x: 2 span b: x: 3 span c: tags: tag=val x: 4 `); err != nil { t.Fatal(err) } StopRecording(s1) s1.LogKV("x", 100) if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil { t.Fatal(err) } // The child span is still recording. s3.LogKV("x", 5) if err := TestingCheckRecordedSpans(GetRecording(s3), ` span c: tags: tag=val x: 4 x: 5 `); err != nil { t.Fatal(err) } s1.Finish()}

3.2     创建childSpan 测试

测试StartChildSpan,根据已有span创建出一个新的span,为已有span的子span。

func TestStartChildSpan(t *testing.T) { tr := NewTracer() sp1 := tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 := StartChildSpan("child", sp1, nil /* logTags */, false /*separateRecording*/) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), ` span parent: span child: `); err != nil { t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 = StartChildSpan("child", sp1, nil /* logTags */, true /*separateRecording*/) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), ` span parent: `); err != nil { t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(sp2), ` span child: `); err != nil { t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 = StartChildSpan( "child", sp1, logtags.SingleTagBuffer("key", "val"), false, /*separateRecording*/ ) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), ` span parent: span child: tags: key=val `); err != nil { t.Fatal(err) }}

3.3     跨进程追踪测试

测试跨进程追踪功能,主要是测试inject接口和 extract 接口,Inject用于向carrier中注入SpanContext信息,Extract用于从carrier中抽取出SpanContext 信息。

 func TestTracerInjectExtract(t *testing.T) { tr := NewTracer() tr2 := NewTracer() // Verify that noop spans become noop spans on the remote side. noop1 := tr.StartSpan("noop") if _, noop := noop1.(*noopSpan); !noop { t.Fatalf("expected noop span: %+v", noop1) } carrier := make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(noop1.Context(), opentracing.HTTPHeaders, carrier); err != nil { t.Fatal(err) } if len(carrier) != 0 { t.Errorf("noop span has carrier: %+v", carrier) } wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier) if err != nil { t.Fatal(err) } if _, noopCtx := wireContext.(noopSpanContext); !noopCtx { t.Errorf("expected noop context: %v", wireContext) } noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) if _, noop := noop2.(*noopSpan); !noop { t.Fatalf("expected noop span: %+v", noop2) } noop1.Finish() noop2.Finish() // Verify that snowball tracing is propagated and triggers recording on the // remote side. s1 := tr.StartSpan("a", Recordable) StartRecording(s1, SnowballRecording) carrier = make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil { t.Fatal(err) } wireContext, err = tr2.Extract(opentracing.HTTPHeaders, carrier) if err != nil { t.Fatal(err) } s2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) // Compare TraceIDs trace1 := s1.Context().(*spanContext).TraceID trace2 := s2.Context().(*spanContext).TraceID if trace1 != trace2 { t.Errorf("TraceID doesn't match: parent %d child %d", trace1, trace2) } s2.LogKV("x", 1) s2.Finish() // Verify that recording was started automatically. rec := GetRecording(s2) if err := TestingCheckRecordedSpans(rec, ` span remote op: tags: sb=1 x: 1 `); err != nil { t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: sb=1 unfinished= `); err != nil { t.Fatal(err) } if err := ImportRemoteSpans(s1, rec); err != nil { t.Fatal(err) } s1.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), ` span a: tags: sb=1 span remote op: tags: sb=1 x: 1 `); err != nil { t.Fatal(err) }}
原网站

版权声明
本文为[浪潮云溪数据库]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/5148943/blog/5550323