欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

使用Go搭建并行排序處理管道筆記

一、并行管道搭建:

總結(jié)下實現(xiàn)思路:

創(chuàng)新互聯(lián)長期為超過千家客戶提供的網(wǎng)站建設(shè)服務,團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為資源企業(yè)提供專業(yè)的成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè),資源網(wǎng)站改版等技術(shù)服務。擁有10多年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。

  1. 歸并排序:進行集合元素排序(節(jié)點),并兩兩節(jié)點歸并排序;每個節(jié)點元素要求有序的(排序),當然終點最小節(jié)點元數(shù)個數(shù)為1必是有序的;
  2. 節(jié)點:任務處理單元,歸并排序節(jié)點是處理輸出有序集合任務的單元;文件過大單臺機排不了需要多臺機集群;
  3. 根據(jù)粒度,單機版:單任務版每個節(jié)點可以是排序方法,并發(fā)版每個節(jié)點可以是一個線程/協(xié)程去處理(異步排序),集群版節(jié)點是一個主機;
  4. 單機版,不管并發(fā)還是非并發(fā),節(jié)點采用的是內(nèi)存共享數(shù)據(jù);集群版節(jié)點則需要網(wǎng)絡連接請求應答來共享數(shù)據(jù);
  5. go語言異步數(shù)據(jù)傳輸通道通過channel實現(xiàn)的;
  6. 每個節(jié)點將處理的數(shù)據(jù)異步發(fā)送到各自channel中,等待一個主節(jié)點獲取歸并,集群版多了網(wǎng)絡的數(shù)據(jù)傳輸(在并發(fā)版加上網(wǎng)絡的接口)。

二、代碼實現(xiàn):
  1. 本地節(jié)點 nodes.go:
    package pipeline
    
    import (
    	"encoding/binary"
    	"fmt"
    	"io"
    	"math/rand"
    	"sort"
    	"time"
    )
    
    var startTime time.Time
    
    func Init() {
    	startTime = time.Now()
    }
    
    //內(nèi)部處理方法
    //這里是排序:異步處理容器元素排序
    func InMemSort(in <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		a := []int{}
    		for v := range in {
    			a = append(a, v)
    		}
    		fmt.Println("Read done:", time.Since(startTime))
    
    		sort.Ints(a)
    		fmt.Println("InMemSort done:", time.Since(startTime))
    
    		for _, v := range a {
    			out <- v
    		}
    		close(out)
    	}()
    	return out
    }
    
    //兩路和并,每路通過內(nèi)部方法異步處理
    //這里是排序:in1,in2元素需要排好序(經(jīng)過內(nèi)部方法InMemSort異步處理)的容器單元(channel 異步容器/隊列)
    func Merge(in1, in2 <-chan int) <-chan int {
    	out := make(chan int, 1024)
    	// go func() {
    	// 	v1, ok1 := <-in1
    	// 	v2, ok2 := <-in2
    	// 	for {
    	// 		if ok1 || ok2 {
    	// 			if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大
    	// 				out <- v1
    	// 				v1, ok1 = <-in1
    	// 			} else {
    	// 				out <- v2
    	// 				v2, ok2 = <-in2
    	// 			}
    	// 		} else {
    	// 			close(out)
    	// 			break
    	// 		}
    	// 	}
    	// }()
    	go func() {
    		v1, ok1 := <-in1
    		v2, ok2 := <-in2
    		for ok1 || ok2 {
    			if !ok2 || (ok1 && v1 <= v2) { //v2無值或v1值比v2大
    				out <- v1
    				v1, ok1 = <-in1
    			} else {
    				out <- v2
    				v2, ok2 = <-in2
    			}
    		}
    		close(out)
    
    		fmt.Println("Merge done:", time.Since(startTime))
    	}()
    	return out
    }
    
    //讀取原數(shù)據(jù)
    //chunkSize=-1全讀
    func ReadSource(r io.Reader, chunkSize int) <-chan int {
    	out := make(chan int, 1024)
    	go func() {
    		buffer := make([]byte, 8) //int長度根據(jù)操作系統(tǒng)來的,64位為int64,64位8個字節(jié)
    		bytesRead := 0
    		for { //持續(xù)讀取
    			n, err := r.Read(buffer) //讀取一個int 8byte
    			bytesRead += n
    			if n > 0 {
    				out <- int(binary.BigEndian.Uint64(buffer)) //字節(jié)數(shù)組轉(zhuǎn)int
    			}
    			if err != nil || (chunkSize != -1 && bytesRead >= chunkSize) { //-1全讀
    				break
    			}
    		}
    		close(out)
    	}()
    	return out
    }
    
    //寫處理后(排序)數(shù)據(jù)
    func WriteSink(w io.Writer, in <-chan int) {
    	for v := range in {
    		buffer := make([]byte, 8)
    		binary.BigEndian.PutUint64(buffer, uint64(v))
    		w.Write(buffer)
    	}
    }
    
    //隨機生成數(shù)據(jù)源
    func RandomSource(count int) <-chan int {
    	out := make(chan int)
    	go func() {
    		for i := 0; i < count; i++ {
    			out <- rand.Int()
    		}
    		close(out)
    	}()
    	return out
    }
    
    //多路兩兩歸并,每路通過內(nèi)部方法異步處理
    //這里是排序:ins元素需要排好序(經(jīng)過內(nèi)部方法InMemSort異步處理)的容器單元(channel 異步容器/隊列)
    func MergeN(ins ...<-chan int) <-chan int {
    	if len(ins) == 1 {
    		return ins[0]
    	}
    	m := len(ins) / 2
    	return Merge(
    		MergeN(ins[:m]...),
    		MergeN(ins[m:]...)) //chennel異步并發(fā)歸并
    }
    

    文章名稱:使用Go搭建并行排序處理管道筆記
    URL分享:http://chinadenli.net/article6/dsoisog.html

    成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供動態(tài)網(wǎng)站、商城網(wǎng)站、電子商務、用戶體驗網(wǎng)站制作、品牌網(wǎng)站制作

    廣告

    聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁設(shè)計