坦白的讲,kafka-producer-perf-test.sh这个脚本也可以做压测。但是使用这个工具有一个问题,那就是发送消息的格式无法做自定义。还有一点,作为程序员,使用现有的工具就像带着T XX一样,纵然这工具再怎么美味多姿,却感觉始终达不到内心的G点。
人活着图个啥? 不就是个爽嘛。

调侃完毕,开正题。
接下来我基于golang手把手教大家实现kafka发送压测工具。
kafka的发送分两种,一种是同步,一种是异步。
同步就是说,我先发一个,这个发完了,我再发下一个。所以同步的方式比较慢,在我的例子里,采用同步方式,大概每秒可以发送2000条消息;
异步就是说,我一直发,啥时候你发完了,通知我一下让我知道即可。所以,异步的方式比较快,在我的例子里,采用异步的方式,大概每秒可以发送10000条以上的消息。
那接下来,咱们上代码:

先看同步方式:
sync.go
package main
import (
“fmt”
” github . com /Shopify/sarama”
“github.com/ json -iterator/go”
“time”
)
type Message struct {
Log string `json:”log”`
Stream string `json:”stream”`
Time string `json:”time”`
}
var sasl_enabled = true
func main() {
config := sarama.NewConfig()
// WaitForAll waits for all in-sync replicas to commit before responding.
config.Producer.RequiredAcks = sarama.WaitForAll
// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
config.Producer.Partitioner = sarama.NewRandom Partition er
config.Producer.Return.Successes = true
if sasl_enabled {
config.Net.SASL.Enable = true
config.Net.SASL.User = “admin”
config.Net.SASL.Password = “admin-secret”
config.Net.SASL.Mechanism = “PLAIN”
}
client, err := sarama.NewSyncProducer([]string{“172.21.92.170:31090”}, config)
if err != nil {
fmt.Println(“producer close, err:”, err)
return
}
defer client.Close()
myMesg := message {“error”, “this is a log”, “2020”}
var json= jsoniter.ConfigCompatibleWithStandardLibrary
myStr, err := json.MarshalToString(myMesg)
if err != nil {
fmt.Println(“marshal error”)
return
}
var n int = 0
start := time.Now()
defer func() {
cost := time.Since(start)
fmt.Println(“cost=”, cost)
}()
for n < 360000{
msg := &sarama.ProducerMessage{}
msg.Topic = “advlog”
msg.Key = sarama.StringEncoder(“miles”)
msg.Value = sarama.StringEncoder(myStr)
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println(“send message failed,”, err)
return
}
fmt.Printf(“pid:%v offset:%v\n”, pid, offset)
n++
}
}
编译:
# export GOPATH=你的目录
# go build sync.go
再看异步方式:
package main
import (
“fmt”
“github.com/Shopify/sarama”
“github.com/json-iterator/go”
“log”
” runtime ”
“sync”
“time”
)
type AMessage struct {
Log string `json:”log”`
Stream string `json:”stream”`
Time string `json:”time”`
}
var use_sasl = true
const total int = 1000000000
const interval_us time.Duration = 10
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
wg := &sync.WaitGroup{}
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
if use_sasl {
config.Net.SASL.Enable = true
config.Net.SASL.User = “admin”
config.Net.SASL.Password = “admin-secret”
config.Net.SASL.Mechanism = “PLAIN”
}
producer, err := sarama.NewAsyncProducer([]string{“172.21.92.170:31090”}, config)
if err != nil {
fmt.Println(“producer close, err:”, err)
return
}
defer producer.AsyncClose()
go ProcessResponse(producer)
var topic string = “advlog”
var logMsg string = “abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz”
myMesg := AMessage{logMsg, “stream”, “2020”}
var json= jsoniter.ConfigCompatibleWithStandardLibrary
myStr, err := json.MarshalToString(myMesg)
if err != nil {
fmt.Println(“marshal error”)
return
}
var n int = 0
start := time.Now()
defer func() {
cost := time.Since(start)
fmt.Println(“cost=”, cost)
}()
for n < total {
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(myStr)}
time.Sleep(interval_us * time.Microsecond)
n++
}
wg.Add(1)
fmt.Println(“running…”)
wg.Wait()
fmt.Printf(“finished…”)
}
func ProcessResponse(producer sarama.AsyncProducer) {
for {
select {
case result := <-producer.Successes():
//log.Printf(“> message: \”%s\” sent to partition %d at offset %d\n”, result.Value, result.Partition, result.Offset)
log.Printf(“> message: sent to partition %d at offset %d\n”, result.Partition, result.Offset)
case err := <-producer.Errors():
log.Println(“Failed to produce message”, err)
}
}
}
乖乖,看到代码了没?
我测试了10亿条数据。
我跑了一晚上,发送了10亿条数据。
稳着呢。
一大波儿性能测试工具和组件优化方案正在火速赶来,欢迎关注。