欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Golang:將日志以Json格式輸出到Kafka

在上一篇文章中我實(shí)現(xiàn)了一個(gè)支持Debug、Info、Error等多個(gè)級(jí)別的日志庫,并將日志寫到了磁盤文件中,代碼比較簡(jiǎn)單,適合練手。有興趣的可以通過這個(gè)鏈接前往:https://github.com/bosima/ylog/releases/tag/v1.0.1

成都創(chuàng)新互聯(lián)公司是一家成都網(wǎng)站制作、成都網(wǎng)站建設(shè),提供網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),網(wǎng)站制作,建網(wǎng)站,按需搭建網(wǎng)站,網(wǎng)站開發(fā)公司,成立與2013年是互聯(lián)行業(yè)建設(shè)者,服務(wù)者。以提升客戶品牌價(jià)值為核心業(yè)務(wù),全程參與項(xiàng)目的網(wǎng)站策劃設(shè)計(jì)制作,前端開發(fā),后臺(tái)程序制作以及后期項(xiàng)目運(yùn)營(yíng)并提出專業(yè)建議和思路。

工程實(shí)踐中,我們往往還需要對(duì)日志進(jìn)行采集,將日志歸集到一起,然后用于各種處理分析,比如生產(chǎn)環(huán)境上的錯(cuò)誤分析、異常告警等等。在日志消息系統(tǒng)領(lǐng)域,Kafka久負(fù)盛名,這篇文章就以將日志發(fā)送到Kafka來實(shí)現(xiàn)日志的采集;同時(shí)考慮到日志分析時(shí)對(duì)結(jié)構(gòu)化數(shù)據(jù)的需求,這篇文章還會(huì)提供一種輸出Json格式日志的方法。

這個(gè)升級(jí)版的日志庫還要保持向前兼容,即還能夠使用普通文本格式,以及寫日志到磁盤文件,這兩個(gè)特性和要新增的兩個(gè)功能分別屬于同類處理,因此我這里對(duì)它們進(jìn)行抽象,形成兩個(gè)接口:格式化接口、寫日志接口。

格式化接口

所謂格式化,就是日志的格式處理。這個(gè)日志庫目前要支持兩種格式:普通文本和Json。

為了在不同格式之上提供一個(gè)統(tǒng)一的抽象,ylog中定義 logEntry 來代表一條日志:

type logEntry struct {
	Ts    time.Time `json:"ts"`
	File  string    `json:"file"`
	Line  int       `json:"line"`
	Level LogLevel  `json:"level"`
	Msg   string    `json:"msg"`
}

格式化接口的能力就是將日志從logEntry格式轉(zhuǎn)化為其它某種數(shù)據(jù)格式。ylog中對(duì)它的定義是:

type LoggerFormatter interface {
	Format(*logEntry, *[]byte) error
}

第1個(gè)參數(shù)是一個(gè)logEntry實(shí)例,也就是要被格式化的日志,第2個(gè)參數(shù)是日志格式化之后要寫入的容器。

普通文本格式化器

其實(shí)現(xiàn)是這樣的:

type textFormatter struct {
}

func NewTextFormatter() *textFormatter {
	return &textFormatter{}
}

func (f *textFormatter) Format(entry *logEntry, buf *[]byte) error {
	formatTime(buf, entry.Ts)
	*buf = append(*buf, ' ')

	file := toShort(entry.File)
	*buf = append(*buf, file...)
	*buf = append(*buf, ':')
	itoa(buf, entry.Line, -1)
	*buf = append(*buf, ' ')

	*buf = append(*buf, levelNames[entry.Level]...)
	*buf = append(*buf, ' ')

	*buf = append(*buf, entry.Msg...)

	return nil
}

可以看到它的主要功能就是將logEntry中的各個(gè)字段按照某種順序平鋪開來,中間用空格分隔。

其中的很多數(shù)據(jù)處理方法參考了Golang標(biāo)準(zhǔn)日志庫中的數(shù)據(jù)格式化處理代碼,有興趣的可以去Github中詳細(xì)查看。

這里對(duì)日期時(shí)間格式化為字符串做了特別的優(yōu)化,在標(biāo)準(zhǔn)日志庫中為了將年、月、日、時(shí)、分、秒、毫秒、微秒等格式化指定長(zhǎng)度的字符串,使用了一個(gè)函數(shù):

func itoa(buf *[]byte, i int, wid int) {
	// Assemble decimal in reverse order.
	var b [20]byte
	bp := len(b) - 1
	for i >= 10 || wid > 1 {
		wid--
		q := i / 10
		b[bp] = byte('0' + i - q*10)
		bp--
		i = q
	}
	// i < 10
	b[bp] = byte('0' + i)
	*buf = append(*buf, b[bp:]...)
}

其邏輯大概就是將數(shù)字中的每一位轉(zhuǎn)換為字符并存入byte中,注意這里初始化byte數(shù)組的時(shí)候是20位,這是int64最大的數(shù)字位數(shù)。

其實(shí)時(shí)間字符串中的每個(gè)部分位數(shù)都是固定的,比如年是4位、月日時(shí)分秒都是2位,根本不需要20位,所以這個(gè)空間可以節(jié)??;還有這里用了循環(huán),這對(duì)于CPU的分支預(yù)測(cè)可能有那么點(diǎn)影響,所以我這里分別對(duì)不同位數(shù)寫了專門的格式化方法,以2位數(shù)為例:

func itoa2(buf *[]byte, i int) {
	q := i / 10
	s := byte('0' + i - q*10)
	f := byte('0' + q)
	*buf = append(*buf, f, s)
}

Json文本格式化器

其實(shí)現(xiàn)是這樣的:

type jsonFormatter struct {
}

func NewJsonFormatter() *jsonFormatter {
	return &jsonFormatter{}
}

func (f *jsonFormatter) Format(entry *logEntry, buf *[]byte) (err error) {
	entry.File = toShortFile(entry.File)
	jsonBuf, err := json.Marshal(entry)
	*buf = append(*buf, jsonBuf...)
	return
}

代碼也很簡(jiǎn)單,使用標(biāo)準(zhǔn)庫的json序列化方法將logEntry實(shí)例轉(zhuǎn)化為Json格式的數(shù)據(jù)。

對(duì)于Json格式,后續(xù)考慮支持用戶自定義Json字段,這里暫時(shí)先簡(jiǎn)單處理。

寫日志接口

寫日志就是將日志輸出到別的目標(biāo),比如ylog要支持的輸出到磁盤文件、輸出到Kafka等。

前邊格式化接口將格式化后的數(shù)據(jù)封裝到了 []byte 中,寫日志接口就是將格式化處理的輸出 []byte 寫到某種輸出目標(biāo)中。參考Golang中各種Writer的定義,ylog中對(duì)它的定義是:

type LoggerWriter interface {
	Ensure(*logEntry) error
	Write([]byte) error
	Sync() error
	Close() error
}

這里有4個(gè)方法:

  • Ensure 確保輸出目標(biāo)已經(jīng)準(zhǔn)備好接收數(shù)據(jù),比如打開要寫入的文件、創(chuàng)建Kafka連接等等。
  • Write 向輸出目標(biāo)寫數(shù)據(jù)。
  • Sync 要求輸出目標(biāo)將緩存持久化,比如寫數(shù)據(jù)到磁盤時(shí),操作系統(tǒng)會(huì)有緩存,通過這個(gè)方法要求緩存數(shù)據(jù)寫入磁盤。
  • Close 寫日志結(jié)束,關(guān)閉輸出目標(biāo)。

寫日志到文件

這里定義一個(gè)名為fileWriter的類型,它需要實(shí)現(xiàn)LoggerWriter的接口。

先看類型的定義:

type fileWriter struct {
	file     *os.File
	lastHour int64
	Path     string
}

包含四個(gè)字段:

  • file 要輸出的文件對(duì)象。
  • lastHour 按照小時(shí)創(chuàng)建文件的需要。
  • Path 日志文件的根路徑。

再看其實(shí)現(xiàn)的接口:

func (w *fileWriter) Ensure(entry *logEntry) (err error) {
	if w.file == nil {
		f, err := w.createFile(w.Path, entry.Ts)
		if err != nil {
			return err
		}
		w.lastHour = w.getTimeHour(entry.Ts)
		w.file = f
		return nil
	}

	currentHour := w.getTimeHour(entry.Ts)
	if w.lastHour != currentHour {
		_ = w.file.Close()
		f, err := w.createFile(w.Path, entry.Ts)
		if err != nil {
			return err
		}
		w.lastHour = currentHour
		w.file = f
	}

	return
}

func (w *fileWriter) Write(buf []byte) (err error) {
	buf = append(buf, '\n')
	_, err = w.file.Write(buf)
	return
}

func (w *fileWriter) Sync() error {
	return w.file.Sync()
}

func (w *fileWriter) Close() error {
	return w.file.Close()
}

Ensure 中的主要邏輯是創(chuàng)建當(dāng)前要寫入的文件對(duì)象,如果小時(shí)數(shù)變了,先把之前的關(guān)閉,再創(chuàng)建一個(gè)新的文件。

Write 把數(shù)據(jù)寫入到文件對(duì)象,這里加了一個(gè)換行符,也就是說對(duì)于文件日志,其每條日志最后都會(huì)有一個(gè)換行符,這樣比較方便閱讀。

Sync 調(diào)用文件對(duì)象的Sync方法,將日志從操作系統(tǒng)緩存刷到磁盤。

Close 關(guān)閉當(dāng)前文件對(duì)象。

寫日志到Kafka

這里定義一個(gè)名為kafkaWriter的類型,它也需要實(shí)現(xiàn)LoggerWriter的接口。

先看其結(jié)構(gòu)體定義:

type kafkaWriter struct {
	Topic     string
	Address   string
	writer    *kafka.Writer
	batchSize int
}

這里包含四個(gè)字段:

Topic 寫Kafka時(shí)需要一個(gè)主題,這里默認(rèn)當(dāng)前Logger中所有日志使用同一個(gè)主題。

Address Kafka的訪問地址。

writer 向Kafka寫數(shù)據(jù)時(shí)使用的Writer,這里集成的是:github.com/segmentio/kafka-go,支持自動(dòng)重試和重連。

batchSize Kafka寫日志的批次大小,批量寫可以提高日志的寫效率。

再看其實(shí)現(xiàn)的接口:

func (w *kafkaWriter) Ensure(curTime time.Time) (err error) {
	if w.writer == nil {
		w.writer = &kafka.Writer{
			Addr:      kafka.TCP(w.Address),
			Topic:     w.Topic,
			BatchSize: w.batchSize,
			Async:     true,
		}
	}

	return
}

func (w *kafkaWriter) Write(buf []byte) (err error) {
	// buf will be reused by ylog when this method return,
	// with aysnc write, we need copy data to a new slice
	kbuf := append([]byte(nil), buf...)
	err = w.writer.WriteMessages(context.Background(),
		kafka.Message{Value: kbuf},
	)
	return
}

func (w *kafkaWriter) Sync() error {
	return nil
}

func (w *kafkaWriter) Close() error {
	return w.writer.Close()
}

這里采用的是異步發(fā)送到Kafka的方式,WriteMessages方法不會(huì)阻塞,因?yàn)閭魅氲腷uf要被ylog重用,所以這里copy了一下。異步還會(huì)存在的一個(gè)問題就是不會(huì)返回錯(cuò)誤,可能丟失數(shù)據(jù),不過對(duì)于日志這種數(shù)據(jù),沒有那么嚴(yán)格的要求,也可以接受。

如果采用同步發(fā)送,因?yàn)榕堪l(fā)送比較有效率,這里可以攢幾條再發(fā),但日志比較稀疏時(shí),可能短時(shí)間很難攢夠,就會(huì)出現(xiàn)長(zhǎng)時(shí)間等不到日志的情況,所以還要有個(gè)超時(shí)機(jī)制,這有點(diǎn)麻煩,不過我也寫了一個(gè)版本,有興趣的可以去看看:https://github.com/bosima/ylog/blob/main/examples/kafka-writer.go

接口的組裝

有了格式化接口和寫日志接口,下一步就是將它們組裝起來,以實(shí)現(xiàn)相應(yīng)的處理能力。

首先是創(chuàng)建它們,因?yàn)槲疫@里也沒有動(dòng)態(tài)配置的需求,所以就放到創(chuàng)建Logger實(shí)例的時(shí)候了,這樣比較簡(jiǎn)單。

func NewYesLogger(opts ...Option) (logger *YesLogger) {
	logger = &YesLogger{}
	...
	logger.writer = NewFileWriter("logs")
	logger.formatter = NewTextFormatter()

	for _, opt := range opts {
		opt(logger)
	}
	...
	return
}

可以看到默認(rèn)的formatter是textFormatter,默認(rèn)的writer是fileWriter。這個(gè)函數(shù)傳入的Option其實(shí)是個(gè)函數(shù),在下邊的opt(logger)中會(huì)執(zhí)行它們,所以使用其它的Formatter或者Writer可以這樣做:

logger := ylog.NewYesLogger(
		...
		ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)),
		ylog.Formatter(ylog.NewJsonFormatter()),
)

這里 ylog.Writer 和 ylog.Formatter 就是符合Option類型的函數(shù),調(diào)用它們可以設(shè)置不同的Formatter和Writer。

然后怎么使用它們呢?

...
l.formatter.Format(entry, &buf)
l.writer.Ensure(entry)
err := l.writer.Write(buf)
...

當(dāng) logEntry 進(jìn)入消息處理環(huán)節(jié)后,首先調(diào)用formatter的Format方法格式化logEntry;然后調(diào)用了writer的Ensure方法確保writer已經(jīng)準(zhǔn)備好,最后調(diào)用writer的Write方法將格式化之后的數(shù)據(jù)輸出到對(duì)應(yīng)的目標(biāo)。

為什么不將Ensure方法放到Write中呢?這是因?yàn)槟壳皩懳谋救罩镜臅r(shí)候需要根據(jù)logEntry中的日志時(shí)間創(chuàng)建日志文件,這樣就需要給Writer傳遞兩個(gè)參數(shù),有點(diǎn)別扭,所以這里將它們分開了。

如何提高日志處理的吞吐量

Kafka的吞吐量是很高的,那么如果放到y(tǒng)log自身來說,如何提高它的吞吐量呢?

首先想到的就是Channel,可以使用有緩沖的Channel模擬一個(gè)隊(duì)列,生產(chǎn)者不停的向Channel發(fā)送數(shù)據(jù),如果Writer可以一直在緩沖被填滿之前將數(shù)據(jù)取走,那么理論上說生產(chǎn)者就是非阻塞的,相比同步輸出到某個(gè)Writer,沒有直接磁盤IO、網(wǎng)絡(luò)IO,日志處理的吞吐量必將大幅提升。

定義一個(gè)Channel,其容量默認(rèn)為當(dāng)前機(jī)器邏輯處理器的數(shù)量:

logger.pipe = make(chan *logEntry, runtime.NumCPU())

發(fā)送數(shù)據(jù)的代碼:

entry := &logEntry{
		Level: level,
		Msg:   s,
		File:  file,
		Line:  line,
		Ts:    now,
	}

	l.pipe <- entry

接收數(shù)據(jù)的代碼:

	for {
		select {
		case entry := <-l.pipe:
			// reuse the slice memory
			buf = buf[:0]
			l.formatter.Format(entry, &buf)
			l.writer.Ensure(entry.Ts)
			err := l.writer.Write(buf)
		...
		}
	}

實(shí)際效果怎么樣呢?看下Benchmark:

goos: darwin
goarch: amd64
pkg: github.com/bosima/ylog
cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
BenchmarkInfo-8   	 	       871.6 ns/op	     328 B/op	       4 allocs/op

這個(gè)結(jié)果可以和zerolog、zap等高性能日志庫一較高下了,當(dāng)然目前可以做的事情要比它們簡(jiǎn)單很多。

如果對(duì)Java有所了解的同學(xué)應(yīng)該聽說過log4j,在log4j2中引入了一個(gè)名為Disruptor的組件,它讓日志處理飛快了起來,受到很多Java開發(fā)者的追捧。Disruptor之所以這么厲害,是因?yàn)樗褂昧藷o鎖并發(fā)、環(huán)形隊(duì)列、緩存行填充等多種高級(jí)技術(shù)。

相比之下,Golang的Channel雖然也使用了環(huán)形緩沖,但是還是使用了鎖,作為隊(duì)列來說性能并不是最優(yōu)的。

Golang中有沒有類似的東西呢?最近出來的ZenQ可能是一個(gè)不錯(cuò)的選擇,不過看似還不太穩(wěn)定,過段時(shí)間再嘗試下。有興趣的可以去看看:https://github.com/alphadose/ZenQ 。


好了,以上就是本文的主要內(nèi)容。關(guān)于ylog的介紹也告一段落了,后續(xù)會(huì)在Github上持續(xù)更新,增加更多有用的功能,并不斷優(yōu)化處理性能,歡迎關(guān)注:https://github.com/bosima/ylog 。

收獲更多架構(gòu)知識(shí),請(qǐng)關(guān)注微信公眾號(hào) 螢火架構(gòu)。原創(chuàng)內(nèi)容,轉(zhuǎn)載請(qǐng)注明出處。

分享文章:Golang:將日志以Json格式輸出到Kafka
文章位置:http://chinadenli.net/article2/dsoisoc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App開發(fā)、微信小程序、自適應(yīng)網(wǎng)站、品牌網(wǎng)站制作、服務(wù)器托管網(wǎng)站排名

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站托管運(yùn)營(yíng)