当前位置:网站首页>2022-02-14 incluxdb cluster write data writetoshard parsing
2022-02-14 incluxdb cluster write data writetoshard parsing
2022-07-03 13:04:00 【a tracer】
Catalog
To the far end shard write in :
Abstract :
analysis influxdb Cluster write data writeToShard, Analysis shows that influxdb How the cluster handles data .
Source code :
top floor writeToShard
// writeToShards writes points to a shard and ensures a write consistency level has been met. If the write
// partially succeeds, ErrPartialWrite is returned.
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
consistency ConsistencyLevel, points []models.Point) error {
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
// The required number of writes to achieve the requested consistency level
required := len(shard.Owners)
switch consistency {
case ConsistencyLevelAny, ConsistencyLevelOne:
required = 1
case ConsistencyLevelQuorum:
required = required/2 + 1
}
// response channel for each shard writer go routine
type AsyncWriteResult struct {
Owner meta.ShardOwner
Err error
}
ch := make(chan *AsyncWriteResult, len(shard.Owners))
for _, owner := range shard.Owners {
go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
if w.Node.ID == owner.NodeID {
atomic.AddInt64(&w.stats.PointWriteReqLocal, int64(len(points)))
err := w.TSDBStore.WriteToShard(shardID, points)
// If we've written to shard that should exist on the current node, but the store has
// not actually created this shard, tell it to create it and retry the write
if err == tsdb.ErrShardNotFound {
err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID, true)
if err != nil {
ch <- &AsyncWriteResult{owner, err}
return
}
err = w.TSDBStore.WriteToShard(shardID, points)
}
ch <- &AsyncWriteResult{owner, err}
return
}
atomic.AddInt64(&w.stats.PointWriteReqRemote, int64(len(points)))
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
if err != nil && tsdb.IsRetryable(err) {
// The remote write failed so queue it via hinted handoff
atomic.AddInt64(&w.stats.WritePointReqHH, int64(len(points)))
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
if hherr != nil {
ch <- &AsyncWriteResult{owner, hherr}
return
}
// If the write consistency level is ANY, then a successful hinted handoff can
// be considered a successful write so send nil to the response channel
// otherwise, let the original error propagate to the response channel
if hherr == nil && consistency == ConsistencyLevelAny {
ch <- &AsyncWriteResult{owner, nil}
return
}
}
ch <- &AsyncWriteResult{owner, err}
}(shard.ID, owner, points)
}
var wrote int
timeout := time.After(w.WriteTimeout)
var writeError error
for range shard.Owners {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout:
atomic.AddInt64(&w.stats.WriteTimeout, 1)
// return timeout error to caller
return ErrTimeout
case result := <-ch:
// If the write returned an error, continue to the next response
if result.Err != nil {
atomic.AddInt64(&w.stats.WriteErr, 1)
w.Logger.Info("write failed", zap.Uint64("shard", shard.ID), zap.Uint64("node", result.Owner.NodeID), zap.Error(result.Err))
// Keep track of the first error we see to return back to the client
if writeError == nil {
writeError = result.Err
}
continue
}
wrote++
// We wrote the required consistency level
if wrote >= required {
atomic.AddInt64(&w.stats.WriteOK, 1)
return nil
}
}
}
if wrote > 0 {
atomic.AddInt64(&w.stats.WritePartial, 1)
return ErrPartialWrite
}
if writeError != nil {
return fmt.Errorf("write failed: %v", writeError)
}
return ErrWriteFailed
}
To the far end shard write in :
// WriteShard writes time series points to a shard
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {
c, err := w.dial(ownerID)
if err != nil {
return err
}
conn, ok := c.(*pooledConn)
if !ok {
panic("wrong connection type")
}
defer func(conn net.Conn) {
conn.Close() // return to pool
}(conn)
// Determine the location of this shard and whether it still exists
db, rp, sgi := w.MetaClient.ShardOwner(shardID)
if sgi == nil {
// If we can't get the shard group for this shard, then we need to drop this request
// as it is no longer valid. This could happen if writes were queued via
// hinted handoff and we're processing the queue after a shard group was deleted.
return nil
}
// Build write request.
var request WriteShardRequest
request.SetShardID(shardID)
request.SetDatabase(db)
request.SetRetentionPolicy(rp)
request.AddPoints(points)
// Marshal into protocol buffers.
buf, err := request.MarshalBinary()
if err != nil {
return err
}
// Write request.
conn.SetWriteDeadline(time.Now().Add(w.timeout))
if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil {
conn.MarkUnusable()
return err
}
// Read the response.
conn.SetReadDeadline(time.Now().Add(w.timeout))
_, buf, err = ReadTLV(conn)
if err != nil {
conn.MarkUnusable()
return err
}
// Unmarshal response.
var response WriteShardResponse
if err := response.UnmarshalBinary(buf); err != nil {
return err
}
if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}
return nil
}Look for shard Mapping :
// MapShards maps the points contained in wp to a ShardMapping. If a point
// maps to a shard group or shard that does not currently exist, it will be
// created before returning the mapping.
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
if err != nil {
return nil, err
} else if rp == nil {
return nil, freetsdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
}
// Holds all the shard groups and shards that are required for writes.
list := make(sgList, 0, 8)
min := time.Unix(0, models.MinNanoTime)
if rp.Duration > 0 {
min = time.Now().Add(-rp.Duration)
}
for _, p := range wp.Points {
// Either the point is outside the scope of the RP, or we already have
// a suitable shard group for the point.
if p.Time().Before(min) || list.Covers(p.Time()) {
continue
}
// No shard groups overlap with the point's time, so we will create
// a new shard group for this point.
sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, p.Time())
if err != nil {
return nil, err
}
if sg == nil {
return nil, errors.New("nil shard group")
}
list = list.Append(*sg)
}
mapping := NewShardMapping(len(wp.Points))
for _, p := range wp.Points {
sg := list.ShardGroupAt(p.Time())
if sg == nil {
// We didn't create a shard group because the point was outside the
// scope of the RP.
mapping.Dropped = append(mapping.Dropped, p)
atomic.AddInt64(&w.stats.WriteDropped, 1)
continue
}
sh := sg.ShardFor(p.HashID())
mapping.MapPoint(&sh, p)
}
return mapping, nil
}Sequence diagram :

analysis :
From the logical process :
- When it comes to this function , It's done key -> shard Find the mapping relationship between
- Traverse the stored in memory shard list
- We need to pay attention to shard Each of the shard Create a collaborative process to deal with
- If shard Is your own , call tsdb Write
- If it's remote shard
- Write to remote end , If the writing fails
- Write to memory hinted handoff
- Collect processing results
Architecturally :
- It can be seen that key The conversion shardID, Distributed to the entire cluster for storage
- Once the far end shard Writing failure , Use HH Save in this shard
- You need to pay attention to the skill of using this function for the co process
边栏推荐
- 【习题五】【数据库原理】
- 剑指 Offer 15. 二进制中1的个数
- 【习题七】【数据库原理】
- Analysis of the influence of voltage loop on PFC system performance
- 剑指 Offer 12. 矩阵中的路径
- [problem exploration and solution of one or more filters or listeners failing to start]
- Enable SASL authentication for memcached
- [combinatorics] permutation and combination (multiple set permutation | multiple set full permutation | multiple set incomplete permutation all elements have a repetition greater than the permutation
- Glide 4.6.1 API initial
- 高效能人士的七个习惯
猜你喜欢

The latest version of lottery blind box operation version

4. Wireless in vivo nano network: electromagnetic propagation model and key points of sensor deployment

Seven habits of highly effective people

Node. Js: use of express + MySQL

Grid connection - Analysis of low voltage ride through and island coexistence

Solve system has not been booted with SYSTEMd as init system (PID 1) Can‘t operate.
![[combinatorics] permutation and combination (the combination number of multiple sets | the repetition of all elements is greater than the combination number | the derivation of the combination number](/img/9d/6118b699c0d90810638f9b08d4f80a.jpg)
[combinatorics] permutation and combination (the combination number of multiple sets | the repetition of all elements is greater than the combination number | the derivation of the combination number

Xctf mobile--app1 problem solving

studio All flavors must now belong to a named flavor dimension. Learn more

Swift bit operation exercise
随机推荐
luoguP3694邦邦的大合唱站队
The foreground uses RSA asymmetric security to encrypt user information
Sword finger offer 12 Path in matrix
Kung Fu pays off, and learning is done
正则表达式
[exercise 6] [Database Principle]
电压环对 PFC 系统性能影响分析
Tianyi ty1208-z brush machine detailed tutorial (free to remove)
context. Getexternalfilesdir() is compared with the returned path
我的创作纪念日:五周年
[review questions of database principles]
Ali & ant self developed IDE
Idea full text search shortcut ctr+shift+f failure problem
【综合题】【数据库原理】
Sword finger offer 14- ii Cut rope II
How to get user location in wechat applet?
剑指 Offer 16. 数值的整数次方
Dix règles de travail
Sitescms v3.0.2 release, upgrade jfinal and other dependencies
How to stand out quickly when you are new to the workplace?