当前位置:网站首页>浪潮云溪分布式数据库 Tracing(二)—— 源码解析
浪潮云溪分布式数据库 Tracing(二)—— 源码解析
2022-07-07 22:13:00 【浪潮云溪数据库】
按照【云溪数据库Tracing(一)】介绍的使用opentracing要求,本文着重介绍云溪数据库Tracing模块中是如何实现Span,SpanContexts和Tracer的。
Part 1 - Tracing 模块调用关系
1.1 Traincg模块包含的文件列表
Tracer.go :定义了opentracing 中的trace相关接口的实现。Tracer_span.go :定义了opentracing中的span 相关操作的实现。Tags.go :定义了 opentracing中关于tags的相关接口。Shadow.go :不是opentracing中的概念,这里主要实现与zipkin的通信,用于tracing 信息推送到外部的zipkin中。
1.2 各个文件之间的调用关系
在cluster_settings.go中会创建tracer,供全局使用,其他模块中使用这个Tracer实现span的创建和其他操作,例如设定span名称、设定tag 、增加log等操作。

Part 2 - Opentracing
在云溪数据库中的实现
以下是只是列出了部分接口实现,并非全部。
2.1 Span 接口实现:
GetContext实现:API用于获取Span中的SpanContext,主要功能是先创建一个map[string]string类型的baggageCopy,将span中的mu.Baggage 读出写入baggageCopy,创建新的spanContext,并且返回。
func (s *span) Context() opentracing.SpanContext {s.mu.Lock()defer s.mu.Unlock()baggageCopy := make(map[string]string, len(s.mu.Baggage))for k, v := range s.mu.Baggage {baggageCopy[k] = v}sc := &spanContext{spanMeta: s.spanMeta,Baggage: baggageCopy,}if s.shadowTr != nil {sc.shadowTr = s.shadowTrsc.shadowCtx = s.shadowSpan.Context()}if s.isRecording() {sc.recordingGroup = s.mu.recordingGroupsc.recordingType = s.mu.recordingType}return sc}
Finished实现:API用于结束一个Span的记录和追踪。
func (s *span) Finish() {s.FinishWithOptions(opentracing.FinishOptions{})}
SetTag实现:用于向指定的Span添加Tag信息。
func (s *span) SetTag(key string, value interface{}) opentracing.Span {return s.setTagInner(key, value, false /* locked */)}
Log实现:用于向指定的Span添加Log信息。
func (s *span) LogKV(alternatingKeyValues ...interface{}) {fields, err := otlog.InterleavedKVToFields(alternatingKeyValues...)if err != nil {s.LogFields(otlog.Error(err), otlog.String("function", "LogKV"))return}s.LogFields(fields...)}
SetBaggageItem实现:用于向指定的Span增加Baggage信息,主要是用于跨进程追踪使用。
func (s *span) SetBaggageItem(restrictedKey, value string) opentracing.Span {s.mu.Lock()defer s.mu.Unlock()return s.setBaggageItemLocked(restrictedKey, value)}
BaggageItem实现:用于获取指定的Baggage信息。
func (s *span) BaggageItem(restrictedKey string) string {s.mu.Lock()defer s.mu.Unlock()return s.mu.Baggage[restrictedKey]}
SetOperationName实现:用于设定Span 的名称。
func (s *span) SetOperationName(operationName string) opentracing.Span { if s.shadowTr != nil { s.shadowSpan.SetOperationName(operationName) } s.operation = operationName return s}Tracer实现:用于获取Span属于哪个Tracer。
// Tracer is part of the opentracing.Span interface.func (s *span) Tracer() opentracing.Tracer {return s.tracer}
2.2 SpanContext 接口实现:
ForeachBaggageItem实现:用于遍历spanContext中的baggage信息。
func (sc *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {for k, v := range sc.Baggage {if !handler(k, v) {break}}}
2.3 Tracer接口实现:
Inject实现:用于向carrier中注入SpanContext信息
// Inject is part of the opentracing.Tracer interface.func (t *Tracer) Inject(osc opentracing.SpanContext, format interface{}, carrier interface{},) error {……// We onlysupport the HTTPHeaders/TextMap format.if format != opentracing.HTTPHeaders && format != opentracing.TextMap {return opentracing.ErrUnsupportedFormat}mapWriter, ok := carrier.(opentracing.TextMapWriter)if !ok {return opentracing.ErrInvalidCarrier}sc, ok := osc.(*spanContext)if !ok {return opentracing.ErrInvalidSpanContext}mapWriter.Set(fieldNameTraceID, strconv.FormatUint(sc.TraceID, 16))mapWriter.Set(fieldNameSpanID, strconv.FormatUint(sc.SpanID, 16))for k, v := range sc.Baggage {mapWriter.Set(prefixBaggage+k, v)}……return nil}
Extract实现:用于从carrier中抽取出SpanContext信息。
func (t *Tracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {// We onlysupport the HTTPHeaders/TextMap format.if format != opentracing.HTTPHeaders && format != opentracing.TextMap {return noopSpanContext{}, opentracing.ErrUnsupportedFormat}mapReader, ok := carrier.(opentracing.TextMapReader)if !ok {return noopSpanContext{}, opentracing.ErrInvalidCarrier}var sc spanContext……err :=mapReader.ForeachKey(func(k, v string) error {switch k = strings.ToLower(k); k {case fieldNameTraceID:var err errorsc.TraceID, err = strconv.ParseUint(v, 16, 64)if err != nil {return opentracing.ErrSpanContextCorrupted}case fieldNameSpanID:var err errorsc.SpanID, err = strconv.ParseUint(v, 16, 64)if err != nil {return opentracing.ErrSpanContextCorrupted}case fieldNameShadowType:shadowType = vdefault:if strings.HasPrefix(k, prefixBaggage) {if sc.Baggage == nil {sc.Baggage = make(map[string]string)}sc.Baggage[strings.TrimPrefix(k, prefixBaggage)] = v} else if strings.HasPrefix(k, prefixShadow) {if shadowCarrier == nil {shadowCarrier = make(opentracing.TextMapCarrier)}// We build ashadow textmap with the original shadow keys.shadowCarrier.Set(strings.TrimPrefix(k, prefixShadow), v)}}return nil})if err != nil {return noopSpanContext{}, err}if sc.TraceID == 0 &&sc.SpanID == 0 {return noopSpanContext{}, nil}……return &sc, nil}
StartSpan接口实现:用于创建一个新的Span,可根据传入不同opts来实现不同Span的初始化。
func (t *Tracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption,) opentracing.Span {// Fast paths toavoid the allocation of StartSpanOptions below when tracing// is disabled: if we have no optionsor a single SpanReference (the common// case) with a noop context, return anoop span now.if len(opts) == 1 {if o, ok := opts[0].(opentracing.SpanReference); ok {if IsNoopContext(o.ReferencedContext) {return &t.noopSpan}}}shadowTr := t.getShadowTracer()……return s}
2.4 noop span 实现:
noop span实现:使监控代码不依赖Tracer和Span的返回值,防止程序异常退出。
type noopSpan struct {tracer *Tracer}var _ opentracing.Span = &noopSpan{}func (n *noopSpan) Context() opentracing.SpanContext { return noopSpanContext{} }func (n *noopSpan) BaggageItem(key string) string { return "" }func (n *noopSpan) SetTag(key string, value interface{}) opentracing.Span { return n }func (n *noopSpan) Finish() {}func (n *noopSpan) FinishWithOptions(opts opentracing.FinishOptions) {}func (n *noopSpan) SetOperationName(operationName string) opentracing.Span { return n }func (n *noopSpan) Tracer() opentracing.Tracer { return n.tracer }func (n *noopSpan) LogFields(fields ...otlog.Field) {}func (n *noopSpan) LogKV(keyVals ...interface{}) {}func (n *noopSpan) LogEvent(event string) {}func (n *noopSpan) LogEventWithPayload(event string, payload interface{}) {}func (n *noopSpan) Log(data opentracing.LogData) {}func (n *noopSpan) SetBaggageItem(key, val string) opentracing.Span {if key == Snowball {panic("attempting to set Snowball on a noop span; use the Recordable optionto StartSpan")}return n}
Part3 - 云溪数据库中
Opentracing 简单使用示例
3.1 开启Tracer Recording测试
云溪数据库中 开始创建的span均是no operator span,需要手动调用StartRecording,将span转换为可record状态,才能正常对span进行操作。
func TestTracerRecording(t *testing.T) {tr := NewTracer()noop1 := tr.StartSpan("noop")if _, noop := noop1.(*noopSpan); !noop {t.Error("expected noop span")}noop1.LogKV("hello", "void")noop2 := tr.StartSpan("noop2", opentracing.ChildOf(noop1.Context()))if _, noop := noop2.(*noopSpan); !noop {t.Error("expected noop child span")}noop2.Finish()noop1.Finish()s1 := tr.StartSpan("a", Recordable)if _, noop := s1.(*noopSpan); noop {t.Error("Recordable (but not recording) span should not be noop")}if !IsBlackHoleSpan(s1) {t.Error("Recordable span should be black hole")}// Unless recording is actually started, child spans are still noop.noop3 := tr.StartSpan("noop3", opentracing.ChildOf(s1.Context()))if _, noop := noop3.(*noopSpan); !noop {t.Error("expected noop child span")}noop3.Finish()s1.LogKV("x", 1)StartRecording(s1, SingleNodeRecording)s1.LogKV("x", 2)s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context()))if IsBlackHoleSpan(s2) {t.Error("recording span should not be black hole")}s2.LogKV("x", 3)if err := TestingCheckRecordedSpans(GetRecording(s1), `span a:tags: unfinished=x: 2span b:tags: unfinished=x: 3`); err != nil {t.Fatal(err)}if err := TestingCheckRecordedSpans(GetRecording(s2), `span b:tags: unfinished=x: 3`); err != nil {t.Fatal(err)}s3 := tr.StartSpan("c", opentracing.FollowsFrom(s2.Context()))s3.LogKV("x", 4)s3.SetTag("tag", "val")s2.Finish()if err := TestingCheckRecordedSpans(GetRecording(s1), `span a:tags: unfinished=x: 2span b:x: 3span c:tags: tag=val unfinished=x: 4`); err != nil {t.Fatal(err)}s3.Finish()if err := TestingCheckRecordedSpans(GetRecording(s1), `span a:tags: unfinished=x: 2span b:x: 3span c:tags: tag=valx: 4`); err != nil {t.Fatal(err)}StopRecording(s1)s1.LogKV("x", 100)if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil {t.Fatal(err)}// The child span is still recording.s3.LogKV("x", 5)if err := TestingCheckRecordedSpans(GetRecording(s3), `span c:tags: tag=valx: 4x: 5`); err != nil {t.Fatal(err)}s1.Finish()}
3.2 创建childSpan 测试
测试StartChildSpan,根据已有span创建出一个新的span,为已有span的子span。
func TestStartChildSpan(t *testing.T) {tr := NewTracer()sp1 := tr.StartSpan("parent", Recordable)StartRecording(sp1, SingleNodeRecording)sp2 := StartChildSpan("child", sp1, nil /* logTags */, false /*separateRecording*/)sp2.Finish()sp1.Finish()if err := TestingCheckRecordedSpans(GetRecording(sp1), `span parent:span child:`); err != nil {t.Fatal(err)}sp1 = tr.StartSpan("parent", Recordable)StartRecording(sp1, SingleNodeRecording)sp2 = StartChildSpan("child", sp1, nil /* logTags */, true /*separateRecording*/)sp2.Finish()sp1.Finish()if err := TestingCheckRecordedSpans(GetRecording(sp1), `span parent:`); err != nil {t.Fatal(err)}if err := TestingCheckRecordedSpans(GetRecording(sp2), `span child:`); err != nil {t.Fatal(err)}sp1 = tr.StartSpan("parent", Recordable)StartRecording(sp1, SingleNodeRecording)sp2 = StartChildSpan("child", sp1, logtags.SingleTagBuffer("key", "val"), false, /*separateRecording*/)sp2.Finish()sp1.Finish()if err := TestingCheckRecordedSpans(GetRecording(sp1), `span parent:span child:tags: key=val`); err != nil {t.Fatal(err)}}
3.3 跨进程追踪测试
测试跨进程追踪功能,主要是测试inject接口和 extract 接口,Inject用于向carrier中注入SpanContext信息,Extract用于从carrier中抽取出SpanContext 信息。
func TestTracerInjectExtract(t *testing.T) {tr := NewTracer()tr2 := NewTracer()// Verify that noop spans become noop spans on the remote side.noop1 := tr.StartSpan("noop")if _, noop := noop1.(*noopSpan); !noop {t.Fatalf("expected noop span: %+v", noop1)}carrier := make(opentracing.HTTPHeadersCarrier)if err := tr.Inject(noop1.Context(), opentracing.HTTPHeaders, carrier); err != nil {t.Fatal(err)}if len(carrier) != 0 {t.Errorf("noop span has carrier: %+v", carrier)}wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier)if err != nil {t.Fatal(err)}if _, noopCtx := wireContext.(noopSpanContext); !noopCtx {t.Errorf("expected noop context: %v", wireContext)}noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext))if _, noop := noop2.(*noopSpan); !noop {t.Fatalf("expected noop span: %+v", noop2)}noop1.Finish()noop2.Finish()// Verify that snowball tracing is propagated and triggers recording on the// remote side.s1 := tr.StartSpan("a", Recordable)StartRecording(s1, SnowballRecording)carrier = make(opentracing.HTTPHeadersCarrier)if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil {t.Fatal(err)}wireContext, err = tr2.Extract(opentracing.HTTPHeaders, carrier)if err != nil {t.Fatal(err)}s2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext))// Compare TraceIDstrace1 := s1.Context().(*spanContext).TraceIDtrace2 := s2.Context().(*spanContext).TraceIDif trace1 != trace2 {t.Errorf("TraceID doesn't match: parent %d child %d", trace1, trace2)}s2.LogKV("x", 1)s2.Finish()// Verify that recording was started automatically.rec := GetRecording(s2)if err := TestingCheckRecordedSpans(rec, `span remote op:tags: sb=1x: 1`); err != nil {t.Fatal(err)}if err := TestingCheckRecordedSpans(GetRecording(s1), `span a:tags: sb=1 unfinished=`); err != nil {t.Fatal(err)}if err := ImportRemoteSpans(s1, rec); err != nil {t.Fatal(err)}s1.Finish()if err := TestingCheckRecordedSpans(GetRecording(s1), `span a:tags: sb=1span remote op:tags: sb=1x: 1`); err != nil {t.Fatal(err)}}
边栏推荐
- Tools for debugging makefiles - tool for debugging makefiles
- Two small problems in creating user registration interface
- Is Zhou Hongyi, 52, still young?
- When creating body middleware, express Is there any difference between setting extended to true and false in urlencoded?
- 搭建ADG过程中复制报错 RMAN-03009 ORA-03113
- Redis caching tool class, worth owning~
- AWS AWS help error
- 52岁的周鸿祎,还年轻吗?
- Orthodontic precautions (continuously updated)
- Binary sort tree [BST] - create, find, delete, output
猜你喜欢

Seven years' experience of a test engineer -- to you who walk alone all the way (don't give up)

Fully automated processing of monthly card shortage data and output of card shortage personnel information

STM32F1與STM32CubeIDE編程實例-旋轉編碼器驅動
![[question de programmation] [scratch niveau 2] oiseaux volants en décembre 2019](/img/5e/a105f8615f3991635c9ffd3a8e5836.png)
[question de programmation] [scratch niveau 2] oiseaux volants en décembre 2019

Binder核心API

某马旅游网站开发(对servlet的优化)

【编程题】【Scratch二级】2019.09 绘制雪花图案

RPA云电脑,让RPA开箱即用算力无限?

80%的人答错,苹果logo上的叶子到底朝左还是朝右?
![[programming problem] [scratch Level 2] 2019.09 make bat Challenge Game](/img/81/c84432a7d7c2fe8ef377d8c13991d6.png)
[programming problem] [scratch Level 2] 2019.09 make bat Challenge Game
随机推荐
Teach you to make a custom form label by hand
动态库基本原理和使用方法,-fPIC 选项的来龙去脉
去了字节跳动,才知道年薪 40w 的测试工程师有这么多?
52歲的周鴻禕,還年輕嗎?
The difference between get and post
【编程题】【Scratch二级】2019.03 垃圾分类
Fully automated processing of monthly card shortage data and output of card shortage personnel information
【編程題】【Scratch二級】2019.12 飛翔的小鳥
某马旅游网站开发(登录注册退出功能的实现)
If an exception is thrown in the constructor, the best way is to prevent memory leakage?
Anaconda+pycharm+pyqt5 configuration problem: pyuic5 cannot be found exe
Install sqlserver2019
Scrapy framework
Detailed explanation of interview questions: the history of blood and tears in implementing distributed locks with redis
【编程题】【Scratch二级】2019.12 飞翔的小鸟
单机高并发模型设计
AWS AWS help error
Single machine high concurrency model design
At the age of 35, I made a decision to face unemployment
用语雀写文章了,功能真心强大!