当前位置:网站首页>Use go language to import Doris data through stream load
Use go language to import Doris data through stream load
2022-07-25 03:14:00 【Zhangjiafeng】
This article uses GO yes 1.17.2
Doris 0.15.0 release edition
Doris The data import of is available in various languages , however GO The language version is basically invisible , I learned it briefly , I wrote a simple one Stream Load Example of warehousing , For reference only
The table structure used in the example :
CREATE TABLE IF NOT EXISTS user_info( user_id LARGEINT NOT NULL COMMENT " user id", username varchar(50) NOT NULL COMMENT " user name ", city VARCHAR(20) COMMENT " User City ", age SMALLINT COMMENT " User age ", sex TINYINT COMMENT " User's gender ", phone LARGEINT COMMENT " Telephone ", address VARCHAR(500) COMMENT " Address ", register_time datetime COMMENT " User registration time ")Unique KEY(user_id, username)DISTRIBUTED BY HASH(user_id) BUCKETS 3PROPERTIES ("replication_num" = "3");
Here is GO Example code for , It supports importing from files , Import from memory data , It also provides access BE Method of node list , You can get one randomly from here when importing BE node IP And port , Direct connection BE Import
package mainimport ( "container/list" "encoding/base64" "encoding/json" "fmt" "github.com/gofrs/uuid" "io/ioutil" "log" "net/http" "strconv" "strings")type StreamLoad struct { url string dbName string tableName string data string userName string password string}// Realization Doris User authentication information func auth(load StreamLoad) string { s := load.userName + ":" + load.password b := []byte(s) sEnc := base64.StdEncoding.EncodeToString(b) fmt.Printf("enc=[%s]\n", sEnc) sDec, err := base64.StdEncoding.DecodeString(sEnc) if err != nil { fmt.Printf("base64 decode failure, error=[%v]\n", err) } else { fmt.Printf("dec=[%s]\n", sDec) } return sEnc}// Use Stream load Import file data into Doris In the corresponding data table func batch_load_file(load StreamLoad, file string) { client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load" //fmt.Formatter(.Format(url,load.dbName,l)) fileContext, err := ioutil.ReadFile(file) if err != nil { log.Println("Failed to Read the File", file, err) } record := strings.NewReader(string(fileContext)) // Submit a request reqest, err := http.NewRequest(http.MethodPut, url, record) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) reqest.Header.Add("EXPECT", "100-continue") var u1 = uuid.Must(uuid.NewV4()) reqest.Header.Add("label", u1.String()) reqest.Header.Add("column_separator", ",") if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) responseBody := ResponseBody{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &responseBody) if err != nil { fmt.Println(err.Error()) } if responseBody.Status == "Success" { // If there is filtered data , Print wrong URL if responseBody.NumberFilteredRows > 0 { fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } else { fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows) } } fmt.Println(string(body)) } defer response.Body.Close()}// Memory stream data , adopt Stream Load Import Doris In the table func batch_load_data(load StreamLoad, data string) { client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load" //fmt.Formatter(.Format(url,load.dbName,l)) record := strings.NewReader(data) // Submit a request reqest, err := http.NewRequest(http.MethodPut, url, record) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) reqest.Header.Add("EXPECT", "100-continue") var u1 = uuid.Must(uuid.NewV4()) reqest.Header.Add("label", u1.String()) reqest.Header.Add("column_separator", ",") if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) responseBody := ResponseBody{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &responseBody) if err != nil { fmt.Println(err.Error()) } if responseBody.Status == "Success" { // If there is filtered data , Print wrong URL if responseBody.NumberFilteredRows > 0 { fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } else { fmt.Printf("Success import data : %d", responseBody.NumberLoadedRows) } } else { fmt.Printf("Error Message : %s \n", responseBody.Message) fmt.Printf("Error Data : %s ", responseBody.ErrorURL) } //fmt.Println(jsonStr) } defer response.Body.Close()}// obtain BE list func get_doris_be_list() *list.List { var load StreamLoad load.userName = "root" load.password = "" client := &http.Client{} // Generate the url url := "http://10.220.146.10:8030/api/backends?is_alive=true" // Submit a request reqest, err := http.NewRequest("GET", url, nil) // increase header Options reqest.Header.Add("Authorization", "basic "+auth(load)) if err != nil { panic(err) } // Processing return results response, _ := client.Do(reqest) bes := list.New() if response.StatusCode == 200 { body, _ := ioutil.ReadAll(response.Body) backends := Backend{} jsonStr := string(body) err := json.Unmarshal([]byte(jsonStr), &backends) if err != nil { fmt.Println(err.Error()) } for _, beinfo := range backends.Data.Backends { be := beinfo.IP + ":" + strconv.Itoa(beinfo.HTTPPort) bes.PushBack(be) } } defer response.Body.Close() return bes}//Stream load Return the message structure type ResponseBody struct { TxnID int `json:"TxnId"` Label string `json:"Label"` Status string `json:"Status"` Message string `json:"Message"` NumberTotalRows int `json:"NumberTotalRows"` NumberLoadedRows int `json:"NumberLoadedRows"` NumberFilteredRows int `json:"NumberFilteredRows"` NumberUnselectedRows int `json:"NumberUnselectedRows"` LoadBytes int `json:"LoadBytes"` LoadTimeMs int `json:"LoadTimeMs"` BeginTxnTimeMs int `json:"BeginTxnTimeMs"` StreamLoadPutTimeMs int `json:"StreamLoadPutTimeMs"` ReadDataTimeMs int `json:"ReadDataTimeMs"` WriteDataTimeMs int `json:"WriteDataTimeMs"` CommitAndPublishTimeMs int `json:"CommitAndPublishTimeMs"` ErrorURL string `json:"ErrorURL"`}// obtain BE The list returns the structure type Backend struct { Msg string `json:"msg"` Code int `json:"code"` Data struct { Backends []struct { IP string `json:"ip"` HTTPPort int `json:"http_port"` IsAlive bool `json:"is_alive"` } `json:"backends"` } `json:"data"` Count int `json:"count"`}func main() { var load StreamLoad load.userName = "root" load.password = "" //auth_info := auth(load) //fmt.Println(auth_info) //backends := get_doris_be_list() //for e := backends.Front(); e != nil; e = e.Next() { // fmt.Println(e.Value) //} data := "10001, Zhang ***, Xi'an ,30,1,133****760, Shaanxi Province **********,2021-03-12 12:34:12" batch_load_data(load, data) //batch_load_file(/load, "/Users/zhangfeng/Downloads/test.csv")}
边栏推荐
- Question D: pruning shrubs
- JS common interview questions
- Preliminary foundation JVM
- Merge sort / quick sort
- NVM installation and use
- Hyperchain hyperblockchain Shi Xingguo was interviewed by 36 krypton: the amount of customer cooperation consulting is doubling
- Download the jar package of jsqlparser and PageHelper
- A queue of two stacks
- Concurrent programming day01
- Experiment 4 CTF practice
猜你喜欢

Question D: pruning shrubs

Learning record 12

Banana pie bpi-m5 toss record (2) -- compile u-boot

Preliminary foundation JVM
![Matplotlib tutorial (I) [getting to know Matplotlib first]](/img/dc/e7dfc435900c14e3e9be07f6ad75fe.png)
Matplotlib tutorial (I) [getting to know Matplotlib first]

Define macros in makefile and pass them to source code

2022-07-19: all factors of F (I): I are added up after each factor is squared. For example, f (10) = 1 square + 2 square + 5 square + 10 square = 1 + 4 + 25 + 100 = 130.

kettle_ Configure database connection_ report errors

Jenkins plug-in development -- plug-in expansion

Riotboard development board series notes (6) -- buildreoot building system image
随机推荐
mysql_ Backup restore_ Specify table_ Backup table_ Restore table_ innobackup
File permission management
How to use two stacks to simulate the implementation of a queue
Uni app configuration
Merge sort / quick sort
[stm32f103rct6] can communication
Method of adding kernel in Jupiter notebook
Vscode configuration, eslint+prettier combined with detailed configuration steps, standardized development
Hyperchain hyperblockchain Shi Xingguo was interviewed by 36 krypton: the amount of customer cooperation consulting is doubling
Tensorflow's study notes (I)
Unified return data format
Edit mathematical formulas in markdown
Question D: pruning shrubs
Vscode copy synchronization plug-in expansion
[stm32f130rct6] idea and code of ultrasonic ranging module
[Kali's sshd service is enabled]
Chrome process architecture
[stm32f103rct6] motor PWM drive module idea and code
Learning record 10
Eslint error