一:需要选主的场景
1:服务有多台机器,取其中一台去执行任务。多台机器同时执行会出问题,如将数据库中状态为失败的记录取出来重新执行,如果多台机器同时执行,会导致一个失败的任务被多台机器同时执行。
2:服务有多台机器,选其中一台作为主,主负责任务的分发,大家一起消费并处理任务。还是将数据库中状态为失败的记录取出来重新执行,由于一台机器可能处理不过来,需要多台机器协同处理。这个时候主机器负责将失败的记录从数据库中查出来,写入消息队列,其他机器一同消费队列中的任务,并处理失败的记录
二:进行选主
根据上面的选主场景,我们其实可以从多台机器中随机取一台,比raft这种选主算法简单得多。我们甚至可以在配置文件中指定一台机器,只有这台机器才执行相关功能,其他机器则不执行。如果是固定的几台机器,且一台机器也能完成我们的需求,这样搞其实也可以。如果机器不固定,而且单台处理不过来时,用配置文件的方式就不适合。
可采用竞争选主的方式,谁先抢到谁就是主。
1:方案一
采用redis方案实现。如果指定的key不存在就将机器信息写入这个key,成功写入的那台机器就是主,设置过期时间,防止机器异常挂掉的情况,所有的机器都需要定时去抢redis锁。SETNX这个命令就满足我们的需求,写redis成功的就是主,写失败的就是从。
优点:
- 1:实现简单,比配置文件的方式好一点,支持机器动态
缺点:
- 1:需要定时去抢锁
- 2:主可能经常变化,而且要保证主在切换的过程中业务逻辑的正确性
- 3:有些时间片可能没有主,就是主挂掉了,而其他机器还没到抢锁的时间,这个时间片就没有主
2:方案二
采用etcd方案实现。etcd支持事务能做到不存在就写入,达到redis SETNX一样的效果,而且通过etcd的租赁机制保证在主挂掉的情况下通知所有机器,这时大家自动开始新一轮的选主,还是那句话第一个抢到的就是主。
优点:
- 满足我们的需求,没有设计上的缺陷
- 只有主挂掉的情况,才会重新选主,不用担心主在切换的过程中对业务逻辑的影响
缺点:
- 实现起来相对复杂,那我就来试试吧
golang源码实现如下:
package etcdDemo
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/google/uuid"
"time"
)
type Callback func(isMaster bool)
type SelectMaster struct {
endPoints []string
key string
cli *clientv3.Client
lease *clientv3.LeaseGrantResponse
chClose chan int
callback Callback
token string
isMaster bool
}
func NewSelectMaster(endPoints []string, key string) (*SelectMaster, error) {
sm := &SelectMaster{
endPoints: endPoints,
key: key,
chClose: make(chan int, 0),
token: uuid.New().String(),
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: endPoints,
DialTimeout: 3 * time.Second,
})
if err != nil {
return sm, err
}
sm.cli = cli
go sm.ioLoop()
return sm, nil
}
func (sm *SelectMaster) ioLoop() {
fmt.Println("SelectMaster.ioLoop start")
ticker := time.NewTicker(time.Second * 3)
defer ticker.Stop()
chWatch := sm.cli.Watch(context.TODO(), sm.key)
for {
select {
case <-ticker.C:
if sm.lease == nil {
leaseResp, err := sm.cli.Grant(context.Background(), 4)
if err != nil {
fmt.Println("cli.Grant error=", err.Error())
} else {
sm.lease = leaseResp
}
}
if sm.lease != nil {
_, err := sm.cli.KeepAliveOnce(context.Background(), sm.lease.ID)
if err != nil {
fmt.Println("cli.KeepAliveOnce error=", err.Error())
break
}
}
case c := <-chWatch:
for _, e := range c.Events {
if e == nil || e.Kv == nil {
continue
}
token := string(e.Kv.Value)
sm.isMaster = sm.token == token
if sm.callback == nil {
fmt.Println("SelectMaster.callback is nil")
} else {
sm.callback(sm.isMaster)
fmt.Println("SelectMaster.isLoop token=", token)
if token == "" { //主挂了,开始竞选
sm.election()
}
}
}
case <-sm.chClose:
goto stop
}
}
stop:
fmt.Println("SelectMaster.ioLoop end")
}
func (sm *SelectMaster) IsMaster() bool {
return sm.isMaster
}
func (sm *SelectMaster) Close() {
sm.chClose <- 1
}
func (sm *SelectMaster) Election(callback Callback) (bool, error) {
sm.callback = callback
return sm.election()
}
func (sm *SelectMaster) election() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
leaseResp, err := sm.cli.Grant(ctx, 10)
if err != nil {
return false, err
}
sm.lease = leaseResp
txn := clientv3.NewKV(sm.cli).Txn(context.TODO())
txn.If(clientv3.Compare(clientv3.CreateRevision(sm.key), "=", 0)).
Then(clientv3.OpPut(sm.key, sm.token, clientv3.WithLease(leaseResp.ID))).Else()
txnResp, err := txn.Commit()
if err != nil {
return false, err
}
return txnResp.Succeeded, nil
}
func testSelectMaster() *SelectMaster {
endPoints := []string{"172.25.20.248:2379"}
sm, err := NewSelectMaster(endPoints, "/test/lock")
if err != nil {
fmt.Println(err.Error())
return nil
}
callback := func(isMaster bool) {
fmt.Println(sm.token, "callback=", isMaster)
}
isSuccess, err := sm.Election(callback)
if err != nil {
fmt.Println(sm.token, "Election=", err.Error())
} else {
fmt.Println(sm.token, "Election=", isSuccess)
}
return sm
}
func TestSelectMaster() {
var master *SelectMaster
for i := 0; i < 3; i++ {
sm := testSelectMaster()
if sm.IsMaster() {
master = sm
}
}
if master != nil {
master.Close()
}
time.Sleep(time.Second*10)
}