本節(jié)介紹 net 包,它提供構(gòu)建客戶端和服務(wù)器程序的組件,這些程序通過 TCP、UDP 或者 UNIX 套接字進(jìn)行通信。網(wǎng)絡(luò)服務(wù) net/http 包是在 net 包的基礎(chǔ)上構(gòu)建的。
專注于為中小企業(yè)提供做網(wǎng)站、網(wǎng)站設(shè)計服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)佳縣免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了近1000家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
這個示例是一個時鐘服務(wù)器,它以每秒一次的頻率向客戶端發(fā)送當(dāng)前時間:
package main
import (
"io"
"log"
"net"
"time"
)
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
handleConn(conn) // 一次處理一個連接
}
}
func handleConn(c net.Conn) {
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("2006/01/02 15:04:05\r\n"))
if err != nil {
return // 例如,連接斷開
}
time.Sleep(1 * time.Second)
}
}
Listen 函數(shù)創(chuàng)建一個 net.Listener 對象,它在一個網(wǎng)絡(luò)端口上監(jiān)聽進(jìn)來的連接,這里是 TCP 端口 localhost:8000。監(jiān)聽器的 Accept 方法被阻塞,知道有連接請求進(jìn)來,然后返回 net.Conn 對象來代表一個連接。
handleConn 函數(shù)處理一個完整的客戶端連接。在循環(huán)里,它將 time.Now() 獲取的當(dāng)前時間發(fā)送給客戶端。因為 net.Conn 滿足 io.Writer 接口,所以可以直接向它進(jìn)行寫入。當(dāng)寫入失敗時循環(huán)結(jié)束,很多時候是客戶端斷開連接,這是 handleConn 函數(shù)使用延遲(defer)的 Close 調(diào)用關(guān)閉自己這邊的連接,然后繼續(xù)等待下一個連接請求。
為了連接到服務(wù)器,還需要一個 socket 客戶端,這里可以先使用系統(tǒng)的 telnet 來進(jìn)行驗證:
$ telnet localhost 8000
這里可以開兩個 telnet 嘗試進(jìn)行連接,只有第一個可以連接上,而其他的連接會阻塞。當(dāng)把第一個客戶端的連接斷開后,服務(wù)端會重新返回到 main 函數(shù)的 for 循環(huán)中等待新的連接。此時之前阻塞的一個連接就能連接進(jìn)來,繼續(xù)顯示時間。服務(wù)端程序暫時先這樣,先來實現(xiàn)一個 socket 客戶端程序。
下面的客戶端使用 net.Dial 實現(xiàn)了 Go 版本的 netcat 程序,用來連接 TCP服務(wù)器:
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
這個程序從網(wǎng)絡(luò)連接中讀取,然后寫到標(biāo)準(zhǔn)輸出,直到到達(dá) EOF 或者出錯。
如果打開多個客戶端,同時只有一個客戶端能正常工作。第二個客戶端必須等到第一個結(jié)束才能正常工作,這是因為服務(wù)器是順序的,一次只能處理一個客戶請求。讓服務(wù)器支持并發(fā)只需要一個很小的改變:在調(diào)用 handleConn 的地方添加一個 go關(guān)鍵字,使它在自己的 goroutine 內(nèi)執(zhí)行:
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
go handleConn(conn) // 并發(fā)處理連接
}
現(xiàn)在的版本,多個客戶端可以同時接入并正常工作了。
上面的時鐘服務(wù)器每個連接使用一個 goroutine。下面要實現(xiàn)的這個回聲服務(wù)器,每個連接使用多個 goroutine 來處理。大多數(shù)的回聲服務(wù)器僅僅將讀到的內(nèi)容寫回去,所以可以使用下面簡單的 handleConn 版本:
func handleConn(c net.Conn) {
io.Copy(c, c)
c.Close()
}
下面的這個版本可以重復(fù)3次,第一個全大寫,第二次正常,第三次全消息:
// reverb1
package main
import (
"bufio"
"fmt"
"io"
"log"
"net"
"strings"
"time"
)
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1*time.Second)
}
// 注意:忽略 input.Err() 中可能的錯誤
c.Close()
}
func handleConn0(c net.Conn) {
io.Copy(c, c)
c.Close()
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
go handleConn(conn) // 并發(fā)處理連接
}
}
在上一個示例中,已經(jīng)知道需要使用 go 關(guān)鍵字調(diào)用 handleConn 函數(shù)。不過在這個例子中,重點不是處理多個客戶端的連接,所以這里不是重點。
現(xiàn)在來升級一下客戶端,使它可以在終端上向服務(wù)器輸入,還可以將服務(wù)器的回復(fù)復(fù)制到輸出,這里提供了另一個使用并發(fā)的機(jī)會:
package main
import (
"io"
"log"
"net"
"os"
)
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
func main() {
conn, err := net.Dial("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
使用上面的服務(wù)端版本,如果有多個連續(xù)的輸入,新輸入的內(nèi)容不會馬上返回,而是要等待之前輸入的內(nèi)容全部返回后才會處理之后的內(nèi)容。要想做的更好,需要更多的 goroutine。再一次,在調(diào)用 echo 時需要加入 go 關(guān)鍵字:
// reverb2
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
go echo(c, input.Text(), 1*time.Second)
}
// 注意:忽略 input.Err() 中可能的錯誤
c.Close()
}
這個改進(jìn)的版本,回聲也是并發(fā)的,在時間上互相重合。
這就是使服務(wù)器變成并發(fā)所要做的,不僅處理來自多個客戶端的鏈接,還包括在一個連接處理中,使用多個 go 關(guān)鍵字。在這個例子里,單個客戶端連接也可以同時發(fā)起多個請求。在最初的版本里,沒有使用 go 調(diào)用 echo,所以處理單個客戶端的請求不是并發(fā)的,只有前一個處理完才會繼續(xù)處理下一個。之后改進(jìn)的版本,使用 go 調(diào)用 echo,這里對每一個請求的處理都是并發(fā)的了。
然而,在添加這些 go 關(guān)鍵字的同時,必須要仔細(xì)考慮方法 net.Conn 的并發(fā)調(diào)用是不是安全的,對大多數(shù)類型來講,這都是不安全的。
之前的客戶端在主 goroutine 中將輸入復(fù)制到服務(wù)器中,這樣的客戶端在輸入接收后立即退出,即使后臺的 goroutine 還在繼續(xù)。為了讓程序等待后臺的 goroutine 在完成后再退出,使用一個通道來同步兩個 goroutine:
func main() {
conn, err := net.Dial("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // 注意:忽略錯誤
log.Println("done")
done <- struct{}{} // 通知主 goroutine 的信號
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // 等待后臺 goroutine 完成
}
當(dāng)用戶關(guān)閉標(biāo)準(zhǔn)輸入流(Windows系統(tǒng)使用Ctrl+Z)時,mustCopy 返回,主 goroutine 調(diào)用 conn.Close() 來關(guān)閉兩端網(wǎng)絡(luò)連接。關(guān)閉寫半邊的連接會導(dǎo)致服務(wù)器看到 EOF。關(guān)閉讀半邊的連接導(dǎo)致后臺 goroutine 調(diào)用 io.Copy 返回 “read from closed connection” 錯誤,所以這個版本里去掉了打印錯誤日志。
上面這個版本使用起來的效果和之前的版本并沒有太大的差別,幾乎看不到差別。雖然多了等待連接關(guān)閉,但是依然不會等待接收完畢所有服務(wù)器的返回。不過這步解決了等待 goroutine 運行完畢后,主 goroutine 才會結(jié)束。使用下面的 TCP 鏈接,就可以實現(xiàn)接收完畢所有信息后,goroutine 才會結(jié)束。在 net 包中,conn 接口有一個具體的類型 *net.TCPConn,它代表一個 TCP 連接:
tcpAddr, err := net.ResolveTCPAddr("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatal(err)
}
TCP 鏈接由兩半邊組成,可以通過 CloseRead 和 CloseWrite 方法分別關(guān)閉。修改主 goroutine,僅僅關(guān)閉連接的寫半邊,這樣程序可以繼續(xù)執(zhí)行輸出來自 reverb1 服務(wù)器的回聲,即使標(biāo)準(zhǔn)輸入已經(jīng)關(guān)閉:
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
tcpAddr, err := net.ResolveTCPAddr("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // 注意:忽略錯誤
log.Println("done")
done <- struct{}{} // 通知主 goroutine 的信號
}()
mustCopy(conn, os.Stdin)
conn.CloseWrite()
<-done // 等待后臺 goroutine 完成
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
現(xiàn)在只對第一個回聲服務(wù)器版本 reverb1 有效,對于之后改進(jìn)的可以并發(fā)處理同一個客戶端多個請求的 reverb2 服務(wù)器,服務(wù)端還需要做一些修改。
在 reverb2 服務(wù)器的版本中,因為對于每一個連接,每一次回聲的請求都會生成一個新的 goroutine 進(jìn)行處理。為了知道什么時候最后一個 goroutine 結(jié)束(有時候不一定是最后啟動的那個),需要在每一個 goroutine 啟動千遞增計數(shù),在每一個 goroutine 結(jié)束時遞減計數(shù)。這需要一個特殊設(shè)計的計數(shù)器,它可以被多個 goroutine 安全地操作,然后又一個方法一直等到他變?yōu)?0。這個計數(shù)器類型是 sync.WaitGroup。下面是完整的服務(wù)器代碼:
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup // 工作 goroutine 的個數(shù)
func echo(c net.Conn, shout string, delay time.Duration) {
defer wg.Done()
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
wg.Add(1)
go echo(c, input.Text(), 2*time.Second)
}
// 注意:忽略 input.Err() 中可能的錯誤
wg.Wait()
c.Close()
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
go handleConn(conn) // 并發(fā)處理連接
}
}
注意 Add 和 Done 方法的不對稱性。Add 遞增計數(shù)器,它必須工作在 goroutine 開始之前執(zhí)行,而不是在中間。另外,Add 有一個參數(shù),但 Done 沒有,它等價于 Add(-1)。使用 defer 來確保計數(shù)器在任何情況下都可以遞減。在不知道迭代次數(shù)的情況下,上面的代碼結(jié)構(gòu)是通用的,符合習(xí)慣的并行循環(huán)模式。
下面的版本增加了超時斷開的功能。這樣服務(wù)端和客戶端就各有兩個斷開連接的情況了,原本只有一種。
這里需要用到 select 多路復(fù)用。
服務(wù)端
原本只要被動等待客戶端斷開就可以了,這個邏輯原本原本放在主 goroutine 中。現(xiàn)在服務(wù)端超時需要主動斷開,客戶端斷開了,需要被動斷開,這2個邏輯都需要一個單獨的 goroutine,而主 goroutine 則阻塞接收這兩個情況的通道,任意一個通道有數(shù)據(jù),就斷開并退出:
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"sync"
"time"
)
var wg sync.WaitGroup // 工作 goroutine 的個數(shù)
func echo(c net.Conn, shout string, delay time.Duration) {
defer wg.Done()
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
stop1 := make(chan struct{})
stop2 := make(chan struct{})
inputSignal := make(chan struct{}) // 有任何輸入,就發(fā)送一個信號
go func() { // 接收客戶端發(fā)送回聲的goroutine
input := bufio.NewScanner(c)
for input.Scan() { // 注意:忽略 input.Err() 中可能的錯誤
inputSignal <- struct{}{}
wg.Add(1)
go echo(c, input.Text(), 1*time.Second)
}
// 退出上面的for循環(huán),表示客戶端斷開
stop1 <- struct{}{}
}()
delay := 5 * time.Second
timer := time.NewTimer(delay)
go func() { // 計算超時的goroutine
for {
select {
case <-inputSignal:
timer.Reset(delay)
case <-timer.C:
// 超時,斷開連接
stop2 <- struct{}{}
return
}
}
}()
select {
case <-stop1:
case <-stop2:
}
wg.Wait()
c.Close()
}
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // 例如,連接終止
continue
}
go handleConn(conn) // 并發(fā)處理連接
}
}
客戶端
原本只需要響應(yīng)接收標(biāo)準(zhǔn)輸入的 Ctrl+Z 然后斷開寫半邊的連接,這個邏輯也需要從主 goroutine 放到一個新的 goroutine 中。另外一種斷開的連接是被動響應(yīng)服務(wù)端的斷開連接然后客戶端也退出。這里還要稍微在復(fù)雜一點,如果是服務(wù)端的超時斷開,則直接斷開。如果是客戶端的主動斷開,則還需要繼續(xù)等待服務(wù)端的斷開,然后再退出:
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
tcpAddr, err := net.ResolveTCPAddr("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatal(err)
}
done1 := make(chan struct{})
go func() { // 打印回聲的goroutine
io.Copy(os.Stdout, conn) // 注意:忽略錯誤
log.Println("done")
done1 <- struct{}{} // 通知主 goroutine 的信號
}()
done2 := make(chan struct{})
go func() { // 發(fā)送請求的goroutine
mustCopy(conn, os.Stdin)
conn.CloseWrite()
done2 <- struct{}{}
}()
select { // 等待后臺 goroutine 完成
case <-done1:
case <-done2: // 客戶端主動斷開后,只關(guān)閉寫半邊連接
<-done1 // 繼續(xù)等待服務(wù)端斷開,就是等待全是打印完畢
}
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
實現(xiàn)一個聊天服務(wù)器,它可以在幾個用戶之間相互廣播文本消息。
這個程序中有四種 goroutine:
主函數(shù)的工作是監(jiān)聽端口,接受連接請求。對每一個連接,它創(chuàng)建一個新的 handleConn。就像之前的并發(fā)回聲服務(wù)器中那樣:
func main() {
listener, err := net.Listen("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
廣播器,它的變量 clients 會記錄當(dāng)前連接的客戶集合。其記錄的內(nèi)容是每一個客戶端對外發(fā)送消息的通道:
// 廣播器
type client chan<- string // 對外發(fā)送消息的通道
var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string) // 所有接受的客戶消息
)
func broadcaster() {
clients := make(map[client]bool) // 所有連接的客戶端集合
for {
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發(fā)送消息通道
for cli := range clients {
cli <- msg
}
case cli := <-entering:
clients[cli] = true
case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}
廣播器監(jiān)聽兩個全局的通道 entering 和 leaving。通過它們通知有客戶進(jìn)入和離開,如果從一個通道中接收到事件,它將更新 clients 集合。如果是客戶離開,還會關(guān)閉對應(yīng)客戶對外發(fā)送消息的通道。
廣播器還監(jiān)聽 messages 通道,所有的客戶都會將要廣播的消息發(fā)送到這個通道。當(dāng)收到一個消息后,就會把消息廣播給所有客戶。
handleConn 函數(shù)創(chuàng)建一個對外發(fā)送消息的新通道,然后通過 entering 通道通知廣播器新客戶進(jìn)入。接著,要讀取客戶發(fā)來的每一條消息,通過 messages 通道將每一條消息發(fā)送給廣播器,發(fā)送時再每條消息前面加上發(fā)送者的ID作為前綴。一旦客戶端將消息讀取完畢,handleConn 通過 leaving 通道通知客戶離開,然后關(guān)閉連接:
// 客戶端處理函數(shù): handleConn
func handleConn(conn net.Conn) {
ch := make(chan string) // 對外發(fā)送客戶消息的通道
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
ch <- "You are " + who // 這條單發(fā)給自己
messages <- who + " has arrived" // 這條進(jìn)行進(jìn)行廣播,但是自己還沒加到廣播列表中
entering <- ch // 然后把自己加到廣播列表中
input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// 注意,忽略input.Err()中可能的錯誤
leaving <- ch
messages <- who + " has left"
conn.Close()
}
另外,handleConn 函數(shù)還為每一個客戶創(chuàng)建了寫入(clientWriter)goroutine,每個客戶都從自己的通道中接收消息發(fā)送給客戶端的網(wǎng)絡(luò)連接。在廣播器收到 leaving 通知并關(guān)閉這個接收消息的通道后,clientWriter 會結(jié)束通道的遍歷后運行結(jié)束:
// 客戶端處理函數(shù): clientWriter
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
// 在消息結(jié)尾使用 \r\n ,提升平臺兼容
fmt.Fprintf(conn, "%s\r\n", msg) // 注意,忽略網(wǎng)絡(luò)層面的錯誤
}
}
給客戶端發(fā)送的消息字符串需要用"\n"結(jié)尾。如果換成"\r\n"結(jié)尾,平臺的兼容性應(yīng)該會更好。至少windows上的telnet客戶端可以直接使用了。
完整的源碼就是上面的四段代碼,拼在一起就能運行了。
和之前使用回聲服務(wù)器一樣,可以用 telnet 或者也可以用之前寫的 netcat 作為客戶端來聊天。
當(dāng)有 n 個客戶 session 在連接的時候,程序并發(fā)運行著 2n+2 個相互通信的 goroutine,它不需要隱式的加鎖操作也能做到并發(fā)安全。clients map 被限制在廣播器這一個 goroutine 中,所以不會被并發(fā)的訪問。唯一被多個 goroutine 共享的變量是通道以及 net.Conn 的實例,它們也都是并發(fā)安全的。
上面的聊天服務(wù)器提供了一個很好的架構(gòu),現(xiàn)在再在其之上擴(kuò)展功能就很方便了。
在新用戶到來之后,告知該新用戶當(dāng)前在聊天室的所有的用戶列表。每個用戶加入后,系統(tǒng)都會自動生成一個用戶名(基于用戶的網(wǎng)絡(luò)連接,之后會添加設(shè)置用戶名的功能),就是要把這些存在的用戶名打印出來。
所有的用戶列表只在廣播器的 clients map 中,但是這個 map 又不包括用戶名。所以先要修改數(shù)據(jù)類型,把每個連接的數(shù)據(jù)結(jié)構(gòu)加上一個新的用戶名字段:
type client chan<- string // 對外發(fā)送消息的通道
type clientInfo struct {
name string
ch client
}
原本使用 client 作為元素的通道和 map,現(xiàn)在全部也都要換成 clientInfo 作為元素。像新用戶發(fā)送當(dāng)前用戶列表的任務(wù)也在廣播器中完成:
// 廣播器
type client chan<- string // 對外發(fā)送消息的通道
type clientInfo struct {
name string
ch client
}
var (
entering = make(chan clientInfo)
leaving = make(chan clientInfo)
messages = make(chan string) // 所有接受的客戶消息
)
func broadcaster() {
clients := make(map[clientInfo]bool) // 所有連接的客戶端集合
for {
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發(fā)送消息通道
for cli := range clients {
cli.ch <- msg
}
case cli := <-entering:
// 在每一個新用戶到來的時候,通知當(dāng)前存在的用戶
var users []string
for cli := range clients {
users = append(users, cli.name)
}
if len(users) > 0 {
cli.ch <- fmt.Sprintf("Other users in room: %s", strings.Join(users, "; "))
} else {
cli.ch <- "You are the only user in this room."
}
clients[cli] = true
case cli := <-leaving:
delete(clients, cli)
close(cli.ch)
}
}
}
客戶端處理函數(shù)還需要做少量的修改,主要是因為數(shù)據(jù)結(jié)構(gòu)變了。原本給 entering 和 leaving 通道發(fā)送的是 ch。現(xiàn)在要發(fā)送封裝好 who 的結(jié)構(gòu)體。客戶端處理函數(shù)的代碼略,之后的擴(kuò)展中會貼出來:
cli := clientInfo{who, ch}
entering <- cli
如果在一段時間里,客戶端沒有任何輸入,服務(wù)器就將客戶端斷開。之前的邏輯是,客戶端處理函數(shù)會一直在阻塞在 input.Scan() 這里等待客戶端輸入。只要在另外一個 goroutine 中調(diào)用 conn.Close(),就可以讓當(dāng)前阻塞的讀操作變成非阻塞,就像 input.Scan() 輸入完成的讀操作一樣。不過這么做的話會有一點小問題,原本在主 goroutine 的結(jié)尾有一個conn.Close()
操作,現(xiàn)在在定時的 goroutine 中還需要有一個關(guān)閉的操作。如果因為定時而結(jié)束的,就會有兩次關(guān)閉操作。
這里關(guān)閉的是 socket 連接,本質(zhì)上就是文件句柄。嘗試多次關(guān)閉貌似不會有什么問題,不過要解決這個問題也不難。一種是把響應(yīng)用戶輸入的操作也放到 goroutine 中。現(xiàn)有有兩個 goroutine 在運行,主 goroutine 則只要一直阻塞,通過一個通道等待其中任何一個 goroutine 完成后發(fā)送的信號即可。這樣關(guān)閉的操作只在主 goroutine 中操作。下面的是客戶端處理函數(shù),包括上一個功能里修改的部分:
// 客戶端處理函數(shù): handleConn
func handleConn(conn net.Conn) {
ch := make(chan string) // 對外發(fā)送客戶消息的通道
go clientWriter(conn, ch)
who := conn.RemoteAddr().String()
cli := clientInfo{who, ch} // 打包好用戶名和通道
ch <- "You are " + who // 這條單發(fā)給自己
messages <- who + " has arrived" // 這條進(jìn)行進(jìn)行廣播,但是自己還沒加到廣播列表中
entering <- cli // 然后把自己加到廣播列表中
done := make(chan struct{}, 2) // 等待下面兩個 goroutine 其中一個執(zhí)行完成。使用緩沖通道防止 goroutine 泄漏
// 計算超時的goroutine
inputSignal := make(chan struct{}) // 有任何輸入,就發(fā)送一個信號
timeout := 15 * time.Second // 客戶端空閑的超時時間
go func() {
timer := time.NewTimer(timeout)
for {
select {
case <-inputSignal:
timer.Reset(timeout)
case <-timer.C:
// 超時,斷開連接
done <- struct{}{}
return
}
}
}()
go func() {
input := bufio.NewScanner(conn)
for input.Scan() {
inputSignal <- struct{}{}
if len(strings.TrimSpace(input.Text())) == 0 { // 禁止發(fā)送純空白字符
continue
}
messages <- who + ": " + input.Text()
}
// 注意,忽略input.Err()中可能的錯誤
done <- struct{}{}
}()
<-done
leaving <- cli
messages <- who + " has left"
conn.Close()
}
這里還簡單加了一個限制客戶端發(fā)送空消息的功能,在 input.Scan() 循環(huán)中。空消息不會發(fā)送廣播,但是可以重置定時器的時間。
在客戶端連接后,不立刻進(jìn)入聊天室,而是先輸入一個名字。考慮到名字不能和已有的名字重復(fù),而現(xiàn)有的名字都保存在廣播器里的 clients 這個 map 中。所以客戶端輸入的名字需要在 clients 中查找一下是否已經(jīng)有人用了。現(xiàn)在有了按名字進(jìn)行查找的需求,clients 類型更適合使用一個以名字為 key 的 map 而不是原本的集合。這個 map 的 value 就是向該客戶發(fā)送消息的通道,也就是最初這個集合的 key 的值:
clients := make(map[string]client) // 所有連接的客戶端集合
客戶端處理函數(shù)
在客戶端處理函數(shù)的開頭,需要增加注冊用戶名的過程。用戶名注冊的處理過程比較復(fù)雜,所以單獨封裝到了一個函數(shù) clientRegiste 中:
// 客戶端處理函數(shù)
func handleConn(conn net.Conn) {
who := clientRegiste(conn) // 新增這一行,注冊獲取用戶名
ch := make(chan string) // 對外發(fā)送客戶消息的通道
go clientWriter(conn, ch)
// who := conn.RemoteAddr().String() // 去掉這一行
// 之后的代碼不變
}
這里使用一個交互的方式來獲取用戶名,代替原本通過連接的信息自動生成。這個函數(shù)是串行的,只有在返回用戶名后,才會繼續(xù)執(zhí)行下去。之后的代碼和之前是一樣的。
在 clientRegiste 函數(shù)中,不停的和終端進(jìn)行交互,處理收到的消息,如果用戶名可用,繼續(xù)執(zhí)行之后的流程。如果用戶名不可用,則提示用戶繼續(xù)處理:
// 客戶端處理函數(shù) clientRegiste
// 注冊用戶名
func clientRegiste(conn net.Conn) (who string) {
ch := make(chan bool)
fmt.Fprint(conn, "input nickname: ") // 注意,忽略網(wǎng)絡(luò)層面的錯誤
input := bufio.NewScanner(conn)
for input.Scan() {
if len(strings.TrimSpace(input.Text())) == 0 { // 禁止發(fā)送純空白字符
continue
}
who = input.Text()
register <- registeInfo{who, ch}
if <-ch {
break
}
fmt.Fprintf(conn, "name %q is existed\r\ntry other name: ", who)
}
// 注意,忽略input.Err()中可能的錯誤
return who
}
這里只有最簡單的功能,還可以增加輸入超時,以及嘗試次數(shù)的限制。所以把這個函數(shù)獨立出來完成功能,更方便之后對注冊函數(shù)進(jìn)行擴(kuò)展。
函數(shù)的主要邏輯就是 input.Scan() 的循環(huán),這和 handleConn 中的循環(huán)十分相似。如果之后再加上輸入超時,這兩段的處理邏輯只有極小部分的差別,所以這部分代碼也可以單獨寫一個函數(shù)。這里避免過早的優(yōu)化,暫時就先這樣,看著也比較清晰。之后要添加超時功能的時候,再把這部分重復(fù)的代碼獨立出來。這部分優(yōu)化最后完整的代碼里會有。
廣播器
在廣播器的 select 里要加一個分支,用來處理用戶名的請求。收到請求后,判斷是否已經(jīng)存在,把結(jié)果返回給 clientRegiste。因為 clients 是只有廣播器可見的,所以這里要使用通道傳遞過來,判斷后再用通道把結(jié)果傳回去。這樣可以保證 clients 變量只在這一個 goroutine 里被使用(包括修改)。另外,每個客戶端的注冊都使用一個通道將注冊信息發(fā)送給廣播器,但是廣播器返回的內(nèi)容,需要對每個客戶端使用不同的通道。所以這里,廣播器新創(chuàng)建了專門用于注冊交互的數(shù)據(jù)結(jié)構(gòu):
type registeInfo struct {
name string
ch chan<- bool
}
var register = make(chan registeInfo) // 注冊用戶名的通道
客戶注冊的函數(shù)創(chuàng)建一個布爾型的通道,加上用戶的名字封裝到 registeInfo 結(jié)構(gòu)體中。然后廣播器判斷后,把結(jié)果通道 registeInfo 里的 ch 字段這個通道,把結(jié)果返回給對應(yīng)的客戶注冊函數(shù)。
下面是廣播器 broadcaster 的代碼,主要是 select 新增了一個分支,處理注冊用戶名:
// 廣播器
type client chan<- string // 對外發(fā)送消息的通道
type clientInfo struct {
name string
ch client
}
var (
entering = make(chan clientInfo)
leaving = make(chan clientInfo)
messages = make(chan string) // 所有接受的客戶消息
)
type registeInfo struct {
name string
ch chan<- bool
}
var register = make(chan registeInfo) // 注冊用戶名的通道
func broadcaster() {
clients := make(map[string]client) // 所有連接的客戶端集合
for {
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發(fā)送消息通道
for _, cli := range clients {
cli <- msg
}
case user := <-register:
// 先判斷新用戶名是否有重復(fù)
_, ok := clients[user.name]
user.ch <- !ok
case cliSt := <-entering:
// 在每一個新用戶到來的時候,通知當(dāng)前存在的用戶
var users []string
for user := range clients {
users = append(users, user)
}
if len(users) > 0 {
cliSt.ch <- fmt.Sprintf("Other users in room: %s", strings.Join(users, "; "))
} else {
cliSt.ch <- "You are the only user in this room."
}
clients[cliSt.name] = cliSt.ch
case cliSt := <-leaving:
delete(clients, cliSt.name)
close(cliSt.ch)
}
}
}
最后還有一個問題,就是客戶端可能會卡或者延遲,但是客戶端的問題不能影響到服務(wù)器的正常運行。不過我沒法實現(xiàn)一個這樣的有延遲的客戶端,默認(rèn)操作系統(tǒng)應(yīng)該就已經(jīng)非常友好的幫我們處理掉了,把從網(wǎng)絡(luò)上接收到的數(shù)據(jù)暫存在緩沖區(qū)里(對于TCP連接還有亂序重組和超時重傳,這些我們都不需要關(guān)心了),等待程序去讀取。代碼里接收的操作應(yīng)該是直接從緩沖區(qū)讀取,這時服務(wù)的已經(jīng)發(fā)送完畢了。所以現(xiàn)在只能照著下面的思路寫了:
任何客戶程序讀取數(shù)據(jù)的時間很長最終會造成所有的客戶卡住。修改廣播器,使它滿足如果一個向客戶寫入的通道沒有準(zhǔn)備好接受它,那么跳過這條消息。還可以給每一個向客戶發(fā)送消息的通道增加緩沖,這樣大多數(shù)的消息不會丟棄;廣播器在這個通道上應(yīng)該使用非阻塞的發(fā)送方式。
客戶端處理函數(shù)中創(chuàng)建的發(fā)送消息的通道改用有緩沖區(qū)的通道:
// 客戶端處理函數(shù)
func handleConn(conn net.Conn) {
defer conn.Close() // 退出時關(guān)閉客戶端連接,現(xiàn)在有分支了,并且可能會提前退出
who, ok := clientRegiste(conn) // 注冊獲取用戶名
if !ok { // 用戶名未注冊成功
fmt.Fprintln(conn, "\r\nName registe failed...")
return
}
ch := make(chan string, 10) // 有緩沖區(qū),對外發(fā)送客戶消息的通道
go clientWriter(conn, ch)
// 省略后面的代碼
}
然后廣播器的 select 對應(yīng)的 messages 通道的分支,改成非阻塞的方式:
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發(fā)送消息通道
for name, cli := range clients {
select {
case cli <- msg:
default:
fmt.Fprintf(os.Stderr, "send message failed: %s: %s\n", name, msg)
}
}
// 其他分支略過
}
下面是聊天服務(wù)器最后完整的代碼。這里的改變還包括了上一節(jié)最后提到的注冊用戶名時的輸入的超時。已經(jīng)兩次用到了輸入超時,分別在 handleConn 和 clientRegiste 中,這里也就把這部分代碼單獨寫了一個函數(shù) inputWithTimeout。完整代碼如下:
package main
import (
"bufio"
"fmt"
"log"
"net"
"os"
"strings"
"time"
)
func main() {
listener, err := net.Listen("tcp", ":8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
// 廣播器
type client chan<- string // 對外發(fā)送消息的通道
type clientInfo struct {
name string
ch client
}
var (
entering = make(chan clientInfo)
leaving = make(chan clientInfo)
messages = make(chan string) // 所有接受的客戶消息
)
type registeInfo struct {
name string
ch chan<- bool
}
var register = make(chan registeInfo) // 注冊用戶名的通道
func broadcaster() {
clients := make(map[string]client) // 所有連接的客戶端集合
for {
select {
case msg := <-messages:
// 把所有接收的消息廣播給所有的客戶
// 發(fā)送消息通道
for name, cli := range clients {
select {
case cli <- msg:
default:
fmt.Fprintf(os.Stderr, "send message failed: %s: %s\n", name, msg)
}
}
case user := <-register:
// 先判斷新用戶名是否有重復(fù)
_, ok := clients[user.name]
user.ch <- !ok
case cliSt := <-entering:
// 在每一個新用戶到來的時候,通知當(dāng)前存在的用戶
var users []string
for user := range clients {
users = append(users, user)
}
if len(users) > 0 {
cliSt.ch <- fmt.Sprintf("Other users in room: %s", strings.Join(users, "; "))
} else {
cliSt.ch <- "You are the only user in this room."
}
clients[cliSt.name] = cliSt.ch
case cliSt := <-leaving:
delete(clients, cliSt.name)
close(cliSt.ch)
}
}
}
// 客戶端處理函數(shù)
func handleConn(conn net.Conn) {
defer conn.Close() // 退出時關(guān)閉客戶端連接,現(xiàn)在有分支了,并且可能會提前退出
who, ok := clientRegiste(conn) // 注冊獲取用戶名
if !ok { // 用戶名未注冊成功
fmt.Fprintln(conn, "\r\nName registe failed...")
return
}
ch := make(chan string, 10) // 有緩沖區(qū),對外發(fā)送客戶消息的通道
go clientWriter(conn, ch)
cli := clientInfo{who, ch} // 打包好用戶名和通道
ch <- "You are " + who // 這條單發(fā)給自己
messages <- who + " has arrived" // 現(xiàn)在這條廣播自己也能收到
entering <- cli
inputFunc := func(sig chan<- struct{}) {
input := bufio.NewScanner(conn)
for input.Scan() {
sig <- struct{}{} // 向 sig 發(fā)送信號,會重新開始計時
if len(strings.TrimSpace(input.Text())) == 0 { // 禁止發(fā)送純空白字符
continue
}
messages <- who + ": " + input.Text()
}
// 注意,忽略input.Err()中可能的錯誤
}
inputWithTimeout(conn, 300*time.Second, inputFunc)
leaving <- cli
messages <- who + " has left"
}
func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
// windows 需要 \r 了正常顯示
fmt.Fprintln(conn, msg+"\r") // 注意,忽略網(wǎng)絡(luò)層面的錯誤
}
}
// 注冊用戶名
func clientRegiste(conn net.Conn) (who string, ok bool) {
inputFunc := func(sig chan<- struct{}) {
input := bufio.NewScanner(conn)
ch := make(chan bool)
fmt.Fprint(conn, "input nickname: ") // 注意,忽略網(wǎng)絡(luò)層面的錯誤
for input.Scan() {
if len(strings.TrimSpace(input.Text())) == 0 { // 禁止發(fā)送純空白字符
continue
}
who = input.Text()
register <- registeInfo{who, ch}
if <-ch {
ok = true
break
}
fmt.Fprintf(conn, "name %q is existed\r\ntry other name: ", who)
}
// 注意,忽略input.Err()中可能的錯誤
}
inputWithTimeout(conn, 15*time.Second, inputFunc)
return who, ok
}
// 為 input.Scan 封裝超時退出的功能
func inputWithTimeout(conn net.Conn, timeout time.Duration, input func(sig chan<- struct{})) {
done := make(chan struct{}, 2)
inputSignal := make(chan struct{})
go func() {
timer := time.NewTimer(timeout)
for {
select {
case <-inputSignal:
timer.Reset(timeout)
case <-timer.C:
// 超時,斷開連接
done <- struct{}{}
return
}
}
}()
go func() {
input(inputSignal)
done <- struct{}{}
}()
<-done
}
文章題目:Go網(wǎng)絡(luò)編程示例
網(wǎng)頁URL:http://chinadenli.net/article40/iphpeo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供靜態(tài)網(wǎng)站、網(wǎng)站內(nèi)鏈、面包屑導(dǎo)航、軟件開發(fā)、網(wǎng)站建設(shè)、營銷型網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)