總結(jié)下實現(xiàn)思路:
創(chuàng)新互聯(lián)長期為超過千家客戶提供的網(wǎng)站建設(shè)服務,團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為資源企業(yè)提供專業(yè)的成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè),資源網(wǎng)站改版等技術(shù)服務。擁有10多年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。
package pipeline import ( "encoding/binary" "fmt" "io" "math/rand" "sort" "time" ) var startTime time.Time func Init() { startTime = time.Now() } //內(nèi)部處理方法 //這里是排序:異步處理容器元素排序 func InMemSort(in <-chan int) <-chan int { out := make(chan int, 1024) go func() { a := []int{} for v := range in { a = append(a, v) } fmt.Println("Read done:", time.Since(startTime)) sort.Ints(a) fmt.Println("InMemSort done:", time.Since(startTime)) for _, v := range a { out <- v } close(out) }() return out } //兩路和并,每路通過內(nèi)部方法異步處理 //這里是排序:in1,in2元素需要排好序(經(jīng)過內(nèi)部方法InMemSort異步處理)的容器單元(channel 異步容器/隊列) func Merge(in1, in2 <-chan int) <-chan int { out := make(chan int, 1024) // go func() { // v1, ok1 := <-in1 // v2, ok2 := <-in2 // for { // if ok1 || ok2 { // if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大 // out <- v1 // v1, ok1 = <-in1 // } else { // out <- v2 // v2, ok2 = <-in2 // } // } else { // close(out) // break // } // } // }() go func() { v1, ok1 := <-in1 v2, ok2 := <-in2 for ok1 || ok2 { if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大 out <- v1 v1, ok1 = <-in1 } else { out <- v2 v2, ok2 = <-in2 } } close(out) fmt.Println("Merge done:", time.Since(startTime)) }() return out } //讀取原數(shù)據(jù) //chunkSize=-1全讀 func ReadSource(r io.Reader, chunkSize int) <-chan int { out := make(chan int, 1024) go func() { buffer := make([]byte, 8) //int長度根據(jù)操作系統(tǒng)來的,64位為int64,64位8個字節(jié) bytesRead := 0 for { //持續(xù)讀取 n, err := r.Read(buffer) //讀取一個int 8byte bytesRead += n if n > 0 { out <- int(binary.BigEndian.Uint64(buffer)) //字節(jié)數(shù)組轉(zhuǎn)int } if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全讀 break } } close(out) }() return out } //寫處理后(排序)數(shù)據(jù) func WriteSink(w io.Writer, in <-chan int) { for v := range in { buffer := make([]byte, 8) binary.BigEndian.PutUint64(buffer, uint64(v)) w.Write(buffer) } } //隨機生成數(shù)據(jù)源 func RandomSource(count int) <-chan int { out := make(chan int) go func() { for i := 0; i < count; i++ { out <- rand.Int() } close(out) }() return out } //多路兩兩歸并,每路通過內(nèi)部方法異步處理 //這里是排序:ins元素需要排好序(經(jīng)過內(nèi)部方法InMemSort異步處理)的容器單元(channel 異步容器/隊列) func MergeN(ins ...<-chan int) <-chan int { if len(ins) == 1 { return ins[0] } m := len(ins) / 2 return Merge( MergeN(ins[:m]...), MergeN(ins[m:]...)) //chennel異步并發(fā)歸并 }
文章名稱:使用Go搭建并行排序處理管道筆記
URL分享:http://chinadenli.net/article6/dsoisog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動態(tài)網(wǎng)站、商城網(wǎng)站、電子商務、用戶體驗、網(wǎng)站制作、品牌網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)