欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

web開(kāi)發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)

今天就跟大家聊聊有關(guān)web開(kāi)發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù),可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

秀英網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)公司!從網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、APP開(kāi)發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開(kāi)發(fā),運(yùn)營(yíng)維護(hù)。成都創(chuàng)新互聯(lián)公司自2013年創(chuàng)立以來(lái)到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來(lái)保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)公司。

業(yè)務(wù)場(chǎng)景

在做任務(wù)開(kāi)發(fā)的時(shí)候,你們一定會(huì)碰到以下場(chǎng)景:

場(chǎng)景1:調(diào)用第三方接口的時(shí)候, 一個(gè)需求你需要調(diào)用不同的接口,做數(shù)據(jù)組裝。

場(chǎng)景2:一個(gè)應(yīng)用首頁(yè)可能依托于很多服務(wù)。那就涉及到在加載頁(yè)面時(shí)需要同時(shí)請(qǐng)求多個(gè)服務(wù)的接口。這一步往往是由后端統(tǒng)一調(diào)用組裝數(shù)據(jù)再返回給前端,也就是所謂的  BFF(Backend For Frontend) 層。

針對(duì)以上兩種場(chǎng)景,假設(shè)在沒(méi)有強(qiáng)依賴關(guān)系下,選擇串行調(diào)用,那么總耗時(shí)即:

time=s1+s2+....sn

按照當(dāng)代秒入百萬(wàn)的有為青年,這么長(zhǎng)時(shí)間早就把你祖宗十八代問(wèn)候了一遍。

為了偉大的KPI,我們往往會(huì)選擇并發(fā)地調(diào)用這些依賴接口。那么總耗時(shí)就是:

time=max(s1,s2,s3.....,sn)

當(dāng)然開(kāi)始堆業(yè)務(wù)的時(shí)候可以先串行化,等到上面的人著急的時(shí)候,亮出絕招。

這樣,年底 PPT 就可以加上濃重的一筆流水賬:為業(yè)務(wù)某個(gè)接口提高百分之XXX性能,間接產(chǎn)生XXX價(jià)值。

當(dāng)然這一切的前提是,做老板不懂技術(shù),做技術(shù)”懂”你。

言歸正傳,如果修改成并發(fā)調(diào)用,你可能會(huì)這么寫(xiě),

package main  import (     "fmt"     "sync"     "time" )  func main() {     var wg sync.WaitGroup     wg.Add(2)      var userInfo *User     var productList []Product      go func() {         defer wg.Done()         userInfo, _ = getUser()     }()      go func() {         defer wg.Done()         productList, _ = getProductList()     }()     wg.Wait()     fmt.Printf("用戶信息:%+v\n", userInfo)     fmt.Printf("商品信息:%+v\n", productList) }   /********用戶服務(wù)**********/  type User struct {     Name string     Age uint8 }  func getUser() (*User, error) {     time.Sleep(500 * time.Millisecond)     var u User     u.Name = "wuqinqiang"     u.Age = 18     return &u, nil }  /********商品服務(wù)**********/  type Product struct {     Title string     Price uint32 }  func getProductList() ([]Product, error) {     time.Sleep(400 * time.Millisecond)     var list []Product     list = append(list, Product{         Title: "SHib",         Price: 10,     })     return list, nil }

從實(shí)現(xiàn)上來(lái)說(shuō),需要多少服務(wù),會(huì)開(kāi)多少個(gè) G,利用 sync.WaitGroup 的特性,

實(shí)現(xiàn)并發(fā)編排任務(wù)的效果。

好像,問(wèn)題不大。

但是隨著代號(hào) 996 業(yè)務(wù)場(chǎng)景的增加,你會(huì)發(fā)現(xiàn),好多模塊都有相似的功能,只是對(duì)應(yīng)的業(yè)務(wù)場(chǎng)景不同而已。

那么我們能不能抽像出一套針對(duì)此業(yè)務(wù)場(chǎng)景的工具,而把具體業(yè)務(wù)實(shí)現(xiàn)交給業(yè)務(wù)方。

使用

本著不重復(fù)造輪子的原則,去搜了下開(kāi)源項(xiàng)目,最終看上了 go-zero 里面的一個(gè)工具 mapreduce。

可以自行 Google 這個(gè)名詞。

使用很簡(jiǎn)單。我們通過(guò)它改造一下上面的代碼:

package main  import (     "fmt"     "github.com/tal-tech/go-zero/core/mr"     "time" )  func main() {     var userInfo *User     var productList []Product     _ = mr.Finish(func() (err error) {         userInfo, err = getUser()         return err     }, func() (err error) {         productList, err = getProductList()         return err     })     fmt.Printf("用戶信息:%+v\n", userInfo)     fmt.Printf("商品信息:%+v\n", productList) } //打印 用戶信息:&{Name:wuqinqiang Age:18} 商品信息:[{Title:SHib Price:10}]

是不是舒服多了。

但是這里還需要注意一點(diǎn),假設(shè)你調(diào)用的其中一個(gè)服務(wù)錯(cuò)誤,并且你 return err 對(duì)應(yīng)的錯(cuò)誤,那么其他調(diào)用的服務(wù)會(huì)被取消。

比如我們修改 getProductList 直接響應(yīng)錯(cuò)誤。

func getProductList() ([]Product, error) {     return nil, errors.New("test error") } //打印 // 用戶信息:<nil> // 商品信息:[]

那么最終打印的時(shí)候連用戶信息都會(huì)為空,因?yàn)槌霈F(xiàn)一個(gè)服務(wù)錯(cuò)誤,用戶服務(wù)請(qǐng)求被取消了。

一般情況下,在請(qǐng)求服務(wù)錯(cuò)誤的時(shí)候我們會(huì)有保底操作,一個(gè)服務(wù)錯(cuò)誤不能影響其他請(qǐng)求的結(jié)果。

所以在使用的時(shí)候具體處理取決于業(yè)務(wù)場(chǎng)景。

源碼

既然用了,那么就追下源碼吧。

func Finish(fns ...func() error) error {     if len(fns) == 0 {         return nil     }      return MapReduceVoid(func(source chan<- interface{}) {         for _, fn := range fns {             source <- fn         }     }, func(item interface{}, writer Writer, cancel func(error)) {         fn := item.(func() error)         if err := fn(); err != nil {             cancel(err)         }     }, func(pipe <-chan interface{}, cancel func(error)) {         drain(pipe)     }, WithWorkers(len(fns))) }
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {         reducer(input, cancel)         drain(input)         // We need to write a placeholder to let MapReduce to continue on reducer done,         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.         writer.Write(lang.Placeholder)     }, opts...)     return err }

對(duì)于 MapReduceVoid函數(shù),主要查看三個(gè)閉包參數(shù)。

  • 第一個(gè) GenerateFunc 用于生產(chǎn)數(shù)據(jù)。

  • MapperFunc 讀取生產(chǎn)出的數(shù)據(jù),進(jìn)行處理。

  • VoidReducerFunc 這里表示不對(duì) mapper 后的數(shù)據(jù)做聚合返回。所以這個(gè)閉包在此操作幾乎0作用。

func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {     source := buildSource(generate)      return MapReduceWithSource(source, mapper, reducer, opts...) }  func buildSource(generate GenerateFunc) chan interface{} {     source := make(chan interface{})// 創(chuàng)建無(wú)緩沖通道     threading.GoSafe(func() {         defer close(source)         generate(source) //開(kāi)始生產(chǎn)數(shù)據(jù)     })      return source //返回?zé)o緩沖通道 }

buildSource函數(shù)中,返回一個(gè)無(wú)緩沖的通道。并開(kāi)啟一個(gè) G 運(yùn)行  generate(source),往無(wú)緩沖通道塞數(shù)據(jù)。這個(gè)generate(source) 不就是一開(kāi)始 Finish 傳遞的第一個(gè)閉包參數(shù)。

return MapReduceVoid(func(source chan<- interface{}) {     // 就這個(gè)         for _, fn := range fns {             source <- fn         }     })

然后查看 MapReduceWithSource 函數(shù),

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,     opts ...Option) (interface{}, error) {     options := buildOptions(opts...)     //任務(wù)執(zhí)行結(jié)束通知信號(hào)     output := make(chan interface{})     //將mapper處理完的數(shù)據(jù)寫(xiě)入collector     collector := make(chan interface{}, options.workers)     // 取消操作信號(hào)     done := syncx.NewDoneChan()     writer := newGuardedWriter(output, done.Done())     var closeOnce sync.Once     var retErr errorx.AtomicError     finish := func() {         closeOnce.Do(func() {             done.Close()             close(output)         })     }     cancel := once(func(err error) {         if err != nil {             retErr.Set(err)         } else {             retErr.Set(ErrCancelWithNil)         }          drain(source)         finish()     })      go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         drain(collector)     }()     // 真正從生成器通道取數(shù)據(jù)執(zhí)行Mapper     go executeMappers(func(item interface{}, w Writer) {         mapper(item, w, cancel)     }, source, collector, done.Done(), options.workers)      value, ok := <-output     if err := retErr.Load(); err != nil {         return nil, err     } else if ok {         return value, nil     } else {         return nil, ErrReduceNoOutput     } }

這段代碼挺長(zhǎng)的,我們說(shuō)下核心的點(diǎn)。這里使用一個(gè)G 調(diào)用 executeMappers 方法。

go executeMappers(func(item interface{}, w Writer) {         mapper(item, w, cancel)     }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},     done <-chan lang.PlaceholderType, workers int) {     var wg sync.WaitGroup     defer func() {         // 等待所有任務(wù)全部執(zhí)行完畢         wg.Wait()         // 關(guān)閉通道         close(collector)     }()    //根據(jù)指定數(shù)量創(chuàng)建 worker池     pool := make(chan lang.PlaceholderType, workers)      writer := newGuardedWriter(collector, done)     for {         select {         case <-done:             return         case pool <- lang.Placeholder:             // 從buildSource() 返回的無(wú)緩沖通道取數(shù)據(jù)             item, ok := <-input              // 當(dāng)通道關(guān)閉,結(jié)束             if !ok {                 <-pool                 return             }              wg.Add(1)             // better to safely run caller defined method             threading.GoSafe(func() {                 defer func() {                     wg.Done()                     <-pool                 }()                 //真正運(yùn)行閉包函數(shù)的地方                // func(item interface{}, w Writer) {                // mapper(item, w, cancel)                // }                 mapper(item, writer)             })         }     } }

具體的邏輯已備注,代碼很容易懂。

一旦 executeMappers 函數(shù)返回,關(guān)閉 collector 通道,那么執(zhí)行 reducer 不再阻塞。

go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         //這里         drain(collector)     }()

這里的 reducer(collector, writer, cancel) 其實(shí)就是從 MapReduceVoid 傳遞的第三個(gè)閉包函數(shù)。

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {         reducer(input, cancel)         //這里         drain(input)         // We need to write a placeholder to let MapReduce to continue on reducer done,         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.         writer.Write(lang.Placeholder)     }, opts...)     return err }

然后這個(gè)閉包函數(shù)又執(zhí)行了 reducer(input, cancel),這里的 reducer 就是我們一開(kāi)始解釋過(guò)的 VoidReducerFunc,從  Finish() 而來(lái)。

web開(kāi)發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)

等等,看到上面三個(gè)地方的 drain(input)了嗎?

// drain drains the channel. func drain(channel <-chan interface{}) {     // drain the channel     for range channel {     } }

其實(shí)就是一個(gè)排空 channel 的操作,但是三個(gè)地方都對(duì)同一個(gè) channel做同樣的操作,也是讓我費(fèi)解。

還有更重要的一點(diǎn)。

go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         drain(collector)     }()

上面的代碼,假如執(zhí)行 reducer,writer 寫(xiě)入引發(fā) panic,那么drain(collector) 將沒(méi)有機(jī)會(huì)執(zhí)行。

不過(guò)作者已經(jīng)修復(fù)了這個(gè)問(wèn)題,直接把 drain(collector) 放入到 defer。

web開(kāi)發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)

具體 issues[1]。

到這里,關(guān)于 Finish 的源碼也就結(jié)束了。感興趣的可以看看其他源碼。

很喜歡 go-zero 里的一些工具,但是工具往往并不獨(dú)立,依賴于其他文件包,導(dǎo)致明明只想使用其中一個(gè)工具卻需要安裝整個(gè)包。

所以最終的結(jié)果就是扒源碼,創(chuàng)建無(wú)依賴庫(kù)工具集,遵循 MIT 即可。

看完上述內(nèi)容,你們對(duì)web開(kāi)發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。

分享題目:web開(kāi)發(fā)中怎樣優(yōu)雅地實(shí)現(xiàn)并發(fā)編排任務(wù)
分享鏈接:http://chinadenli.net/article26/ihjicg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供營(yíng)銷型網(wǎng)站建設(shè)、定制開(kāi)發(fā)響應(yīng)式網(wǎng)站、企業(yè)建站、外貿(mào)建站、Google

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

營(yíng)銷型網(wǎng)站建設(shè)
成人免费视频免费观看| 国产成人人人97超碰熟女| 精品一区二区三区乱码中文| 日本人妻精品有码字幕| 亚洲国产综合久久天堂| 福利在线午夜绝顶三级| 欧美人妻一区二区三区| 天海翼精品久久中文字幕| 国产亚洲视频香蕉一区| 麻豆视频传媒入口在线看| 国产精品白丝一区二区| 国产午夜福利一区二区| 97人妻精品一区二区三区男同 | 国产男女激情在线视频| 久久午夜福利精品日韩| 欧洲自拍偷拍一区二区| 欧美一本在线免费观看| 东北女人的逼操的舒服吗| 日韩精品小视频在线观看| 国产午夜福利一区二区| 日韩一区二区三区免费av| 国产精品日韩欧美第一页| 亚洲伦理中文字幕在线观看 | 亚洲视频一级二级三级| 欧美美女视频在线免费看| 成人日韩在线播放视频| 亚洲中文字幕在线视频频道| 亚洲精品欧美精品日韩精品| 白白操白白在线免费观看| 老外那个很粗大做起来很爽| 亚洲男人的天堂就去爱| 成人精品网一区二区三区| 亚洲精品av少妇在线观看| 国产传媒免费观看视频| 亚洲男人天堂成人在线视频| 国产日产欧美精品大秀| 欧美一区二区三区不卡高清视| 国产精品国产亚洲看不卡| 亚洲内射人妻一区二区| 日韩亚洲精品国产第二页| 久久婷婷综合色拍亚洲|