当前位置:网站首页>浪潮云溪分布式数据库 Tracing(二)—— 源码解析
浪潮云溪分布式数据库 Tracing(二)—— 源码解析
2022-07-07 22:13:00 【浪潮云溪数据库】
按照【云溪数据库Tracing(一)】介绍的使用opentracing要求,本文着重介绍云溪数据库Tracing模块中是如何实现Span,SpanContexts和Tracer的。
Part 1 - Tracing 模块调用关系
1.1 Traincg模块包含的文件列表
Tracer.go :定义了opentracing 中的trace相关接口的实现。Tracer_span.go :定义了opentracing中的span 相关操作的实现。Tags.go :定义了 opentracing中关于tags的相关接口。Shadow.go :不是opentracing中的概念,这里主要实现与zipkin的通信,用于tracing 信息推送到外部的zipkin中。
1.2 各个文件之间的调用关系
在cluster_settings.go中会创建tracer,供全局使用,其他模块中使用这个Tracer实现span的创建和其他操作,例如设定span名称、设定tag 、增加log等操作。
Part 2 - Opentracing
在云溪数据库中的实现
以下是只是列出了部分接口实现,并非全部。
2.1 Span 接口实现:
GetContext实现:API用于获取Span中的SpanContext,主要功能是先创建一个map[string]string类型的baggageCopy,将span中的mu.Baggage 读出写入baggageCopy,创建新的spanContext,并且返回。
func (s *span) Context() opentracing.SpanContext {
s.mu.Lock()
defer s.mu.Unlock()
baggageCopy := make(map[string]string, len(s.mu.Baggage))
for k, v := range s.mu.Baggage {
baggageCopy[k] = v
}
sc := &spanContext{
spanMeta: s.spanMeta,
Baggage: baggageCopy,
}
if s.shadowTr != nil {
sc.shadowTr = s.shadowTr
sc.shadowCtx = s.shadowSpan.Context()
}
if s.isRecording() {
sc.recordingGroup = s.mu.recordingGroup
sc.recordingType = s.mu.recordingType
}
return sc
}
Finished实现:API用于结束一个Span的记录和追踪。
func (s *span) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}
SetTag实现:用于向指定的Span添加Tag信息。
func (s *span) SetTag(key string, value interface{}) opentracing.Span {
return s.setTagInner(key, value, false /* locked */)
}
Log实现:用于向指定的Span添加Log信息。
func (s *span) LogKV(alternatingKeyValues ...interface{}) {
fields, err := otlog.InterleavedKVToFields(alternatingKeyValues...)
if err != nil {
s.LogFields(otlog.Error(err), otlog.String("function", "LogKV"))
return
}
s.LogFields(fields...)
}
SetBaggageItem实现:用于向指定的Span增加Baggage信息,主要是用于跨进程追踪使用。
func (s *span) SetBaggageItem(restrictedKey, value string) opentracing.Span {
s.mu.Lock()
defer s.mu.Unlock()
return s.setBaggageItemLocked(restrictedKey, value)
}
BaggageItem实现:用于获取指定的Baggage信息。
func (s *span) BaggageItem(restrictedKey string) string {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.Baggage[restrictedKey]
}
SetOperationName实现:用于设定Span 的名称。
func (s *span) SetOperationName(operationName string) opentracing.Span { if s.shadowTr != nil { s.shadowSpan.SetOperationName(operationName) } s.operation = operationName return s}
Tracer实现:用于获取Span属于哪个Tracer。
// Tracer is part of the opentracing.Span interface.
func (s *span) Tracer() opentracing.Tracer {
return s.tracer
}
2.2 SpanContext 接口实现:
ForeachBaggageItem实现:用于遍历spanContext中的baggage信息。
func (sc *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
for k, v := range sc.Baggage {
if !handler(k, v) {
break
}
}
}
2.3 Tracer接口实现:
Inject实现:用于向carrier中注入SpanContext信息
// Inject is part of the opentracing.Tracer interface.
func (t *Tracer) Inject(
osc opentracing.SpanContext, format interface{}, carrier interface{},
) error {
……
// We only
support the HTTPHeaders/TextMap format.
if format != opentracing.HTTPHeaders && format != opentracing.TextMap {
return opentracing.ErrUnsupportedFormat
}
mapWriter, ok := carrier.(opentracing.TextMapWriter)
if !ok {
return opentracing.ErrInvalidCarrier
}
sc, ok := osc.(*spanContext)
if !ok {
return opentracing.ErrInvalidSpanContext
}
mapWriter.Set(fieldNameTraceID, strconv.FormatUint(sc.TraceID, 16))
mapWriter.Set(fieldNameSpanID, strconv.FormatUint(sc.SpanID, 16))
for k, v := range sc.Baggage {
mapWriter.Set(prefixBaggage+k, v)
}
……
return nil
}
Extract实现:用于从carrier中抽取出SpanContext信息。
func (t *Tracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
// We only
support the HTTPHeaders/TextMap format.
if format != opentracing.HTTPHeaders && format != opentracing.TextMap {
return noopSpanContext{}, opentracing.ErrUnsupportedFormat
}
mapReader, ok := carrier.(opentracing.TextMapReader)
if !ok {
return noopSpanContext{}, opentracing.ErrInvalidCarrier
}
var sc spanContext
……
err :=
mapReader.ForeachKey(func(k, v string) error {
switch k = strings.ToLower(k); k {
case fieldNameTraceID:
var err error
sc.TraceID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
case fieldNameSpanID:
var err error
sc.SpanID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
case fieldNameShadowType:
shadowType = v
default:
if strings.HasPrefix(k, prefixBaggage) {
if sc.Baggage == nil {
sc.Baggage = make(map[string]string)
}
sc.Baggage[strings.TrimPrefix(k, prefixBaggage)] = v
} else if strings.HasPrefix(k, prefixShadow) {
if shadowCarrier == nil {
shadowCarrier = make(opentracing.TextMapCarrier)
}
// We build a
shadow textmap with the original shadow keys.
shadowCarrier.Set(strings.TrimPrefix(k, prefixShadow), v)
}
}
return nil
})
if err != nil {
return noopSpanContext{}, err
}
if sc.TraceID == 0 &&
sc.SpanID == 0 {
return noopSpanContext{}, nil
}
……
return &sc, nil
}
StartSpan接口实现:用于创建一个新的Span,可根据传入不同opts来实现不同Span的初始化。
func (t *Tracer) StartSpan(
operationName string, opts ...opentracing.StartSpanOption,
) opentracing.Span {
// Fast paths to
avoid the allocation of StartSpanOptions below when tracing
// is disabled: if we have no options
or a single SpanReference (the common
// case) with a noop context, return a
noop span now.
if len(opts) == 1 {
if o, ok := opts[0].(opentracing.SpanReference); ok {
if IsNoopContext(o.ReferencedContext) {
return &t.noopSpan
}
}
}
shadowTr := t.getShadowTracer()
……
return s
}
2.4 noop span 实现:
noop span实现:使监控代码不依赖Tracer和Span的返回值,防止程序异常退出。
type noopSpan struct {
tracer *Tracer
}
var _ opentracing.Span = &noopSpan{}
func (n *noopSpan) Context() opentracing.SpanContext { return noopSpanContext{} }
func (n *noopSpan) BaggageItem(key string) string { return "" }
func (n *noopSpan) SetTag(key string, value interface{}) opentracing.Span { return n }
func (n *noopSpan) Finish() {}
func (n *noopSpan) FinishWithOptions(opts opentracing.FinishOptions) {}
func (n *noopSpan) SetOperationName(operationName string) opentracing.Span { return n }
func (n *noopSpan) Tracer() opentracing.Tracer { return n.tracer }
func (n *noopSpan) LogFields(fields ...otlog.Field) {}
func (n *noopSpan) LogKV(keyVals ...interface{}) {}
func (n *noopSpan) LogEvent(event string) {}
func (n *noopSpan) LogEventWithPayload(event string, payload interface{}) {}
func (n *noopSpan) Log(data opentracing.LogData) {}
func (n *noopSpan) SetBaggageItem(key, val string) opentracing.Span {
if key == Snowball {
panic("attempting to set Snowball on a noop span; use the Recordable option
to StartSpan")
}
return n
}
Part3 - 云溪数据库中
Opentracing 简单使用示例
3.1 开启Tracer Recording测试
云溪数据库中 开始创建的span均是no operator span,需要手动调用StartRecording,将span转换为可record状态,才能正常对span进行操作。
func TestTracerRecording(t *testing.T) {
tr := NewTracer()
noop1 := tr.StartSpan("noop")
if _, noop := noop1.(*noopSpan); !noop {
t.Error("expected noop span")
}
noop1.LogKV("hello", "void")
noop2 := tr.StartSpan("noop2", opentracing.ChildOf(noop1.Context()))
if _, noop := noop2.(*noopSpan); !noop {
t.Error("expected noop child span")
}
noop2.Finish()
noop1.Finish()
s1 := tr.StartSpan("a", Recordable)
if _, noop := s1.(*noopSpan); noop {
t.Error("Recordable (but not recording) span should not be noop")
}
if !IsBlackHoleSpan(s1) {
t.Error("Recordable span should be black hole")
}
// Unless recording is actually started, child spans are still noop.
noop3 := tr.StartSpan("noop3", opentracing.ChildOf(s1.Context()))
if _, noop := noop3.(*noopSpan); !noop {
t.Error("expected noop child span")
}
noop3.Finish()
s1.LogKV("x", 1)
StartRecording(s1, SingleNodeRecording)
s1.LogKV("x", 2)
s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context()))
if IsBlackHoleSpan(s2) {
t.Error("recording span should not be black hole")
}
s2.LogKV("x", 3)
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: unfinished=
x: 2
span b:
tags: unfinished=
x: 3
`); err != nil {
t.Fatal(err)
}
if err := TestingCheckRecordedSpans(GetRecording(s2), `
span b:
tags: unfinished=
x: 3
`); err != nil {
t.Fatal(err)
}
s3 := tr.StartSpan("c", opentracing.FollowsFrom(s2.Context()))
s3.LogKV("x", 4)
s3.SetTag("tag", "val")
s2.Finish()
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: unfinished=
x: 2
span b:
x: 3
span c:
tags: tag=val unfinished=
x: 4
`); err != nil {
t.Fatal(err)
}
s3.Finish()
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: unfinished=
x: 2
span b:
x: 3
span c:
tags: tag=val
x: 4
`); err != nil {
t.Fatal(err)
}
StopRecording(s1)
s1.LogKV("x", 100)
if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil {
t.Fatal(err)
}
// The child span is still recording.
s3.LogKV("x", 5)
if err := TestingCheckRecordedSpans(GetRecording(s3), `
span c:
tags: tag=val
x: 4
x: 5
`); err != nil {
t.Fatal(err)
}
s1.Finish()
}
3.2 创建childSpan 测试
测试StartChildSpan,根据已有span创建出一个新的span,为已有span的子span。
func TestStartChildSpan(t *testing.T) {
tr := NewTracer()
sp1 := tr.StartSpan("parent", Recordable)
StartRecording(sp1, SingleNodeRecording)
sp2 := StartChildSpan("child", sp1, nil /* logTags */, false /*separateRecording*/)
sp2.Finish()
sp1.Finish()
if err := TestingCheckRecordedSpans(GetRecording(sp1), `
span parent:
span child:
`); err != nil {
t.Fatal(err)
}
sp1 = tr.StartSpan("parent", Recordable)
StartRecording(sp1, SingleNodeRecording)
sp2 = StartChildSpan("child", sp1, nil /* logTags */, true /*separateRecording*/)
sp2.Finish()
sp1.Finish()
if err := TestingCheckRecordedSpans(GetRecording(sp1), `
span parent:
`); err != nil {
t.Fatal(err)
}
if err := TestingCheckRecordedSpans(GetRecording(sp2), `
span child:
`); err != nil {
t.Fatal(err)
}
sp1 = tr.StartSpan("parent", Recordable)
StartRecording(sp1, SingleNodeRecording)
sp2 = StartChildSpan(
"child", sp1, logtags.SingleTagBuffer("key", "val"), false, /*separateRecording*/
)
sp2.Finish()
sp1.Finish()
if err := TestingCheckRecordedSpans(GetRecording(sp1), `
span parent:
span child:
tags: key=val
`); err != nil {
t.Fatal(err)
}
}
3.3 跨进程追踪测试
测试跨进程追踪功能,主要是测试inject接口和 extract 接口,Inject用于向carrier中注入SpanContext信息,Extract用于从carrier中抽取出SpanContext 信息。
func TestTracerInjectExtract(t *testing.T) {
tr := NewTracer()
tr2 := NewTracer()
// Verify that noop spans become noop spans on the remote side.
noop1 := tr.StartSpan("noop")
if _, noop := noop1.(*noopSpan); !noop {
t.Fatalf("expected noop span: %+v", noop1)
}
carrier := make(opentracing.HTTPHeadersCarrier)
if err := tr.Inject(noop1.Context(), opentracing.HTTPHeaders, carrier); err != nil {
t.Fatal(err)
}
if len(carrier) != 0 {
t.Errorf("noop span has carrier: %+v", carrier)
}
wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier)
if err != nil {
t.Fatal(err)
}
if _, noopCtx := wireContext.(noopSpanContext); !noopCtx {
t.Errorf("expected noop context: %v", wireContext)
}
noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext))
if _, noop := noop2.(*noopSpan); !noop {
t.Fatalf("expected noop span: %+v", noop2)
}
noop1.Finish()
noop2.Finish()
// Verify that snowball tracing is propagated and triggers recording on the
// remote side.
s1 := tr.StartSpan("a", Recordable)
StartRecording(s1, SnowballRecording)
carrier = make(opentracing.HTTPHeadersCarrier)
if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil {
t.Fatal(err)
}
wireContext, err = tr2.Extract(opentracing.HTTPHeaders, carrier)
if err != nil {
t.Fatal(err)
}
s2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext))
// Compare TraceIDs
trace1 := s1.Context().(*spanContext).TraceID
trace2 := s2.Context().(*spanContext).TraceID
if trace1 != trace2 {
t.Errorf("TraceID doesn't match: parent %d child %d", trace1, trace2)
}
s2.LogKV("x", 1)
s2.Finish()
// Verify that recording was started automatically.
rec := GetRecording(s2)
if err := TestingCheckRecordedSpans(rec, `
span remote op:
tags: sb=1
x: 1
`); err != nil {
t.Fatal(err)
}
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: sb=1 unfinished=
`); err != nil {
t.Fatal(err)
}
if err := ImportRemoteSpans(s1, rec); err != nil {
t.Fatal(err)
}
s1.Finish()
if err := TestingCheckRecordedSpans(GetRecording(s1), `
span a:
tags: sb=1
span remote op:
tags: sb=1
x: 1
`); err != nil {
t.Fatal(err)
}
}
边栏推荐
- 面试题详解:用Redis实现分布式锁的血泪史
- Using Google test in QT
- [leetcode] 20. Valid brackets
- 【編程題】【Scratch二級】2019.12 飛翔的小鳥
- 关于组织2021-2022全国青少年电子信息智能创新大赛西南赛区(四川)复赛的通知
- C language learning
- 10 schemes to ensure interface data security
- 用语雀写文章了,功能真心强大!
- Flask learning record 000: error summary
- How to measure whether the product is "just needed, high frequency, pain points"
猜你喜欢
【测试面试题】页面很卡的原因分析及解决方案
Automated testing: robot framework is a practical skill that 90% of people want to know
Problems faced when connecting to sqlserver after downloading (I)
Data Lake (XV): spark and iceberg integrate write operations
一鍵免費翻譯300多頁的pdf文檔
QT and OpenGL: load 3D models using the open asset import library (assimp)
3年经验,面试测试岗20K都拿不到了吗?这么坑?
Binary sort tree [BST] - create, find, delete, output
ROS从入门到精通(九) 可视化仿真初体验之TurtleBot3
Coindesk comments on the decentralization process of the wave field: let people see the future of the Internet
随机推荐
Development of a horse tourism website (optimization of servlet)
One click free translation of more than 300 pages of PDF documents
About the difference between ch32 library function and STM32 library function
Solutions to problems in sqlserver deleting data in tables
[basis of recommendation system] sampling and construction of positive and negative samples
玩转Sonar
腾讯安全发布《BOT管理白皮书》|解读BOT攻击,探索防护之道
C language 005: common examples
One click installation with fishros in blue bridge ROS
The underlying principles and templates of new and delete
Detailed explanation of interview questions: the history of blood and tears in implementing distributed locks with redis
Basic learning of SQL Server -- creating databases and tables with code
Usage of limit and offset (Reprint)
The difference between -s and -d when downloading packages using NPM
从Starfish OS持续对SFO的通缩消耗,长远看SFO的价值
Is it safe to buy funds online?
攻防世界Web进阶区unserialize3题解
某马旅游网站开发(登录注册退出功能的实现)
【编程题】【Scratch二级】2019.09 绘制雪花图案
Smart regulation enters the market, where will meituan and other Internet service platforms go