当前位置:网站首页>2022-02-14 incluxdb cluster write data writetoshard parsing
2022-02-14 incluxdb cluster write data writetoshard parsing
2022-07-03 13:04:00 【a tracer】
Catalog
To the far end shard write in :
Abstract :
analysis influxdb Cluster write data writeToShard, Analysis shows that influxdb How the cluster handles data .
Source code :
top floor writeToShard
// writeToShards writes points to a shard and ensures a write consistency level has been met. If the write
// partially succeeds, ErrPartialWrite is returned.
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
consistency ConsistencyLevel, points []models.Point) error {
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
// The required number of writes to achieve the requested consistency level
required := len(shard.Owners)
switch consistency {
case ConsistencyLevelAny, ConsistencyLevelOne:
required = 1
case ConsistencyLevelQuorum:
required = required/2 + 1
}
// response channel for each shard writer go routine
type AsyncWriteResult struct {
Owner meta.ShardOwner
Err error
}
ch := make(chan *AsyncWriteResult, len(shard.Owners))
for _, owner := range shard.Owners {
go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
if w.Node.ID == owner.NodeID {
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
err := w.TSDBStore.WriteToShard(shardID, points)
// If we've written to shard that should exist on the current node, but the store has
// not actually created this shard, tell it to create it and retry the write
if err == tsdb.ErrShardNotFound {
err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)
if err != nil {
ch <- &AsyncWriteResult{owner, err}
return
}
err = w.TSDBStore.WriteToShard(shardID, points)
}
ch <- &AsyncWriteResult{owner, err}
return
}
atomic.AddInt64(&w.stats.PointWriteReqRemote, int64(len(points)))
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
if err != nil && tsdb.IsRetryable(err) {
// The remote write failed so queue it via hinted handoff
atomic.AddInt64(&w.stats.WritePointReqHH, int64(len(points)))
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
if hherr != nil {
ch <- &AsyncWriteResult{owner, hherr}
return
}
// If the write consistency level is ANY, then a successful hinted handoff can
// be considered a successful write so send nil to the response channel
// otherwise, let the original error propagate to the response channel
if hherr == nil && consistency == ConsistencyLevelAny {
ch <- &AsyncWriteResult{owner, nil}
return
}
}
ch <- &AsyncWriteResult{owner, err}
}(shard.ID, owner, points)
}
var wrote int
timeout := time.After(w.WriteTimeout)
var writeError error
for range shard.Owners {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout:
atomic.AddInt64(&w.stats.WriteTimeout, 1)
// return timeout error to caller
return ErrTimeout
case result := <-ch:
// If the write returned an error, continue to the next response
if result.Err != nil {
atomic.AddInt64(&w.stats.WriteErr, 1)
w.Logger.Info("write failed", zap.Uint64("shard", shard.ID), zap.Uint64("node", result.Owner.NodeID), zap.Error(result.Err))
// Keep track of the first error we see to return back to the client
if writeError == nil {
writeError = result.Err
}
continue
}
wrote++
// We wrote the required consistency level
if wrote >= required {
atomic.AddInt64(&w.stats.WriteOK, 1)
return nil
}
}
}
if wrote > 0 {
atomic.AddInt64(&w.stats.WritePartial, 1)
return ErrPartialWrite
}
if writeError != nil {
return fmt.Errorf("write failed: %v", writeError)
}
return ErrWriteFailed
}
To the far end shard write in :
// WriteShard writes time series points to a shard
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {
c, err := w.dial(ownerID)
if err != nil {
return err
}
conn, ok := c.(*pooledConn)
if !ok {
panic("wrong connection type")
}
defer func(conn net.Conn) {
conn.Close() // return to pool
}(conn)
// Determine the location of this shard and whether it still exists
db, rp, sgi := w.MetaClient.ShardOwner(shardID)
if sgi == nil {
// If we can't get the shard group for this shard, then we need to drop this request
// as it is no longer valid. This could happen if writes were queued via
// hinted handoff and we're processing the queue after a shard group was deleted.
return nil
}
// Build write request.
var request WriteShardRequest
request.SetShardID(shardID)
request.SetDatabase(db)
request.SetRetentionPolicy(rp)
request.AddPoints(points)
// Marshal into protocol buffers.
buf, err := request.MarshalBinary()
if err != nil {
return err
}
// Write request.
conn.SetWriteDeadline(time.Now().Add(w.timeout))
if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil {
conn.MarkUnusable()
return err
}
// Read the response.
conn.SetReadDeadline(time.Now().Add(w.timeout))
_, buf, err = ReadTLV(conn)
if err != nil {
conn.MarkUnusable()
return err
}
// Unmarshal response.
var response WriteShardResponse
if err := response.UnmarshalBinary(buf); err != nil {
return err
}
if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
return nil
}Look for shard Mapping :
// MapShards maps the points contained in wp to a ShardMapping. If a point
// maps to a shard group or shard that does not currently exist, it will be
// created before returning the mapping.
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
if err != nil {
return nil, err
} else if rp == nil {
return nil, freetsdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
}
// Holds all the shard groups and shards that are required for writes.
list := make(sgList, 0, 8)
min := time.Unix(0, models.MinNanoTime)
if rp.Duration > 0 {
min = time.Now().Add(-rp.Duration)
}
for _, p := range wp.Points {
// Either the point is outside the scope of the RP, or we already have
// a suitable shard group for the point.
if p.Time().Before(min) || list.Covers(p.Time()) {
continue
}
// No shard groups overlap with the point's time, so we will create
// a new shard group for this point.
sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, p.Time())
if err != nil {
return nil, err
}
if sg == nil {
return nil, errors.New("nil shard group")
}
list = list.Append(*sg)
}
mapping := NewShardMapping(len(wp.Points))
for _, p := range wp.Points {
sg := list.ShardGroupAt(p.Time())
if sg == nil {
// We didn't create a shard group because the point was outside the
// scope of the RP.
mapping.Dropped = append(mapping.Dropped, p)
atomic.AddInt64(&w.stats.WriteDropped, 1)
continue
}
sh := sg.ShardFor(p.HashID())
mapping.MapPoint(&sh, p)
}
return mapping, nil
}Sequence diagram :

analysis :
From the logical process :
- When it comes to this function , It's done key -> shard Find the mapping relationship between
- Traverse the stored in memory shard list
- We need to pay attention to shard Each of the shard Create a collaborative process to deal with
- If shard Is your own , call tsdb Write
- If it's remote shard
- Write to remote end , If the writing fails
- Write to memory hinted handoff
- Collect processing results
Architecturally :
- It can be seen that key The conversion shardID, Distributed to the entire cluster for storage
- Once the far end shard Writing failure , Use HH Save in this shard
- You need to pay attention to the skill of using this function for the co process
边栏推荐
- SSH login server sends a reminder
- A large select drop-down box, village in Chaoyang District
- 【数据库原理及应用教程(第4版|微课版)陈志泊】【第四章习题】
- 剑指 Offer 16. 数值的整数次方
- Integer case study of packaging
- Understanding of CPU buffer line
- Sword finger offer 16 Integer power of numeric value
- Grid connection - Analysis of low voltage ride through and island coexistence
- 自抗扰控制器七-二阶 LADRC-PLL 结构设计
- Sitescms v3.0.2 release, upgrade jfinal and other dependencies
猜你喜欢

ncnn神經網絡計算框架在香柳丁派OrangePi 3 LTS開發板中的使用介紹

有限状态机FSM
![[combinatorics] permutation and combination (the combination number of multiple sets | the repetition of all elements is greater than the combination number | the derivation of the combination number](/img/9d/6118b699c0d90810638f9b08d4f80a.jpg)
[combinatorics] permutation and combination (the combination number of multiple sets | the repetition of all elements is greater than the combination number | the derivation of the combination number
![[ArcGIS user defined script tool] vector file generates expanded rectangular face elements](/img/39/0b31290798077cb8c355fbd058e4d3.png)
[ArcGIS user defined script tool] vector file generates expanded rectangular face elements

Dojo tutorials:getting started with deferrals source code and example execution summary

Xctf mobile--rememberother problem solving

IDEA 全文搜索快捷键Ctr+Shift+F失效问题

Some thoughts on business
![[problem exploration and solution of one or more filters or listeners failing to start]](/img/82/e7730d289c4c1c4800b520c58d975a.jpg)
[problem exploration and solution of one or more filters or listeners failing to start]

Solve system has not been booted with SYSTEMd as init system (PID 1) Can‘t operate.
随机推荐
Define a list, store n integers, and calculate the length, maximum value, minimum value and average value of the list
Drop down refresh conflicts with recyclerview sliding (swiperefreshlayout conflicts with recyclerview sliding)
[exercise 5] [Database Principle]
Node.js: express + MySQL的使用
剑指 Offer 15. 二进制中1的个数
Ali & ant self developed IDE
How to get user location in wechat applet?
【習題七】【數據庫原理】
When the R language output rmarkdown is in other formats (such as PDF), an error is reported, latex failed to compile stocks Tex. solution
Sword finger offer 15 Number of 1 in binary
My creation anniversary: the fifth anniversary
[judgment question] [short answer question] [Database Principle]
The foreground uses RSA asymmetric security to encrypt user information
[data mining review questions]
【判断题】【简答题】【数据库原理】
An example of newtonjason
GaN图腾柱无桥 Boost PFC(单相)七-PFC占空比前馈
[combinatorics] permutation and combination (the combination number of multiple sets | the repetition of all elements is greater than the combination number | the derivation of the combination number
Kotlin - improved decorator mode
Sword finger offer14 the easiest way to cut rope