七叶笔记 » golang编程 » kafka压测工具:同步方式2000+、异步方式10000+、带源码

kafka压测工具:同步方式2000+、异步方式10000+、带源码

坦白的讲,kafka-producer-perf-test.sh这个脚本也可以做压测。但是使用这个工具有一个问题,那就是发送消息的格式无法做自定义。还有一点,作为程序员,使用现有的工具就像带着T XX一样,纵然这工具再怎么美味多姿,却感觉始终达不到内心的G点。

人活着图个啥? 不就是个爽嘛。

调侃完毕,开正题。


接下来我基于golang手把手教大家实现kafka发送压测工具。

kafka的发送分两种,一种是同步,一种是异步。

同步就是说,我先发一个,这个发完了,我再发下一个。所以同步的方式比较慢,在我的例子里,采用同步方式,大概每秒可以发送2000条消息;

异步就是说,我一直发,啥时候你发完了,通知我一下让我知道即可。所以,异步的方式比较快,在我的例子里,采用异步的方式,大概每秒可以发送10000条以上的消息。

那接下来,咱们上代码:

先看同步方式:

sync.go

package main

import (

“fmt”

github . com /Shopify/sarama”

“github.com/ json -iterator/go”

“time”

)

type Message struct {

Log string `json:”log”`

Stream string `json:”stream”`

Time string `json:”time”`

}

var sasl_enabled = true

func main() {

config := sarama.NewConfig()

// WaitForAll waits for all in-sync replicas to commit before responding.

config.Producer.RequiredAcks = sarama.WaitForAll

// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.

config.Producer.Partitioner = sarama.NewRandom Partition er

config.Producer.Return.Successes = true

if sasl_enabled {

config.Net.SASL.Enable = true

config.Net.SASL.User = “admin”

config.Net.SASL.Password = “admin-secret”

config.Net.SASL.Mechanism = “PLAIN”

}

client, err := sarama.NewSyncProducer([]string{“172.21.92.170:31090”}, config)

if err != nil {

fmt.Println(“producer close, err:”, err)

return

}

defer client.Close()

myMesg := message {“error”, “this is a log”, “2020”}

var json= jsoniter.ConfigCompatibleWithStandardLibrary

myStr, err := json.MarshalToString(myMesg)

if err != nil {

fmt.Println(“marshal error”)

return

}

var n int = 0

start := time.Now()

defer func() {

cost := time.Since(start)

fmt.Println(“cost=”, cost)

}()

for n < 360000{

msg := &sarama.ProducerMessage{}

msg.Topic = “advlog”

msg.Key = sarama.StringEncoder(“miles”)

msg.Value = sarama.StringEncoder(myStr)

pid, offset, err := client.SendMessage(msg)

if err != nil {

fmt.Println(“send message failed,”, err)

return

}

fmt.Printf(“pid:%v offset:%v\n”, pid, offset)

n++

}

}

编译:

# export GOPATH=你的目录

# go build sync.go

再看异步方式:

package main

import (

“fmt”

“github.com/Shopify/sarama”

“github.com/json-iterator/go”

“log”

runtime

“sync”

“time”

)

type AMessage struct {

Log string `json:”log”`

Stream string `json:”stream”`

Time string `json:”time”`

}

var use_sasl = true

const total int = 1000000000

const interval_us time.Duration = 10

func main() {

runtime.GOMAXPROCS(runtime.NumCPU())

wg := &sync.WaitGroup{}

config := sarama.NewConfig()

config.Producer.Return.Successes = true

config.Producer.Return.Errors = true

if use_sasl {

config.Net.SASL.Enable = true

config.Net.SASL.User = “admin”

config.Net.SASL.Password = “admin-secret”

config.Net.SASL.Mechanism = “PLAIN”

}

producer, err := sarama.NewAsyncProducer([]string{“172.21.92.170:31090”}, config)

if err != nil {

fmt.Println(“producer close, err:”, err)

return

}

defer producer.AsyncClose()

go ProcessResponse(producer)

var topic string = “advlog”

var logMsg string = “abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz”

myMesg := AMessage{logMsg, “stream”, “2020”}

var json= jsoniter.ConfigCompatibleWithStandardLibrary

myStr, err := json.MarshalToString(myMesg)

if err != nil {

fmt.Println(“marshal error”)

return

}

var n int = 0

start := time.Now()

defer func() {

cost := time.Since(start)

fmt.Println(“cost=”, cost)

}()

for n < total {

producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(myStr)}

time.Sleep(interval_us * time.Microsecond)

n++

}

wg.Add(1)

fmt.Println(“running…”)

wg.Wait()

fmt.Printf(“finished…”)

}

func ProcessResponse(producer sarama.AsyncProducer) {

for {

select {

case result := <-producer.Successes():

//log.Printf(“> message: \”%s\” sent to partition %d at offset %d\n”, result.Value, result.Partition, result.Offset)

log.Printf(“> message: sent to partition %d at offset %d\n”, result.Partition, result.Offset)

case err := <-producer.Errors():

log.Println(“Failed to produce message”, err)

}

}

}

乖乖,看到代码了没?

我测试了10亿条数据。

我跑了一晚上,发送了10亿条数据。

稳着呢。


一大波儿性能测试工具和组件优化方案正在火速赶来,欢迎关注。

相关文章