七叶笔记 » golang编程 » 分布式系统选主场景分析及实现

分布式系统选主场景分析及实现

一:需要选主的场景

1:服务有多台机器,取其中一台去执行任务。多台机器同时执行会出问题,如将数据库中状态为失败的记录取出来重新执行,如果多台机器同时执行,会导致一个失败的任务被多台机器同时执行。

2:服务有多台机器,选其中一台作为主,主负责任务的分发,大家一起消费并处理任务。还是将数据库中状态为失败的记录取出来重新执行,由于一台机器可能处理不过来,需要多台机器协同处理。这个时候主机器负责将失败的记录从数据库中查出来,写入消息队列,其他机器一同消费队列中的任务,并处理失败的记录

二:进行选主

根据上面的选主场景,我们其实可以从多台机器中随机取一台,比raft这种选主算法简单得多。我们甚至可以在配置文件中指定一台机器,只有这台机器才执行相关功能,其他机器则不执行。如果是固定的几台机器,且一台机器也能完成我们的需求,这样搞其实也可以。如果机器不固定,而且单台处理不过来时,用配置文件的方式就不适合。

可采用竞争选主的方式,谁先抢到谁就是主。

1:方案一

采用redis方案实现。如果指定的key不存在就将机器信息写入这个key,成功写入的那台机器就是主,设置过期时间,防止机器异常挂掉的情况,所有的机器都需要定时去抢redis锁。SETNX这个命令就满足我们的需求,写redis成功的就是主,写失败的就是从。

优点:

  • 1:实现简单,比配置文件的方式好一点,支持机器动态

缺点:

  • 1:需要定时去抢锁
  • 2:主可能经常变化,而且要保证主在切换的过程中业务逻辑的正确性
  • 3:有些时间片可能没有主,就是主挂掉了,而其他机器还没到抢锁的时间,这个时间片就没有主

2:方案二

采用etcd方案实现。etcd支持事务能做到不存在就写入,达到redis SETNX一样的效果,而且通过etcd的租赁机制保证在主挂掉的情况下通知所有机器,这时大家自动开始新一轮的选主,还是那句话第一个抢到的就是主。

优点:

  • 满足我们的需求,没有设计上的缺陷
  • 只有主挂掉的情况,才会重新选主,不用担心主在切换的过程中对业务逻辑的影响

缺点:

  • 实现起来相对复杂,那我就来试试吧

golang源码实现如下:

 package etcdDemo

import (
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "github.com/google/uuid"
    "time"
)

type Callback func(isMaster bool)

type SelectMaster struct {
    endPoints []string
    key       string
    cli       *clientv3.Client
    lease     *clientv3.LeaseGrantResponse
    chClose   chan int
    callback  Callback
    token     string
    isMaster  bool
}

func NewSelectMaster(endPoints []string, key string) (*SelectMaster, error) {
    sm := &SelectMaster{
        endPoints: endPoints,
        key:       key,
        chClose:   make(chan int, 0),
        token:     uuid.New().String(),
    }

    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endPoints,
        DialTimeout: 3 * time.Second,
    })
    if err != nil {
        return sm, err
    }
    sm.cli = cli
    go sm.ioLoop()
    return sm, nil
}

func (sm *SelectMaster) ioLoop() {
    fmt.Println("SelectMaster.ioLoop start")
    ticker := time.NewTicker(time.Second * 3)
    defer ticker.Stop()
    chWatch := sm.cli.Watch(context.TODO(), sm.key)
    for {
        select {
        case <-ticker.C:
            if sm.lease == nil {
                leaseResp, err := sm.cli.Grant(context.Background(), 4)
                if err != nil {
                    fmt.Println("cli.Grant error=", err.Error())
                } else {
                    sm.lease = leaseResp
                }
            }
            if sm.lease != nil {
                _, err := sm.cli.KeepAliveOnce(context.Background(), sm.lease.ID)
                if err != nil {
                    fmt.Println("cli.KeepAliveOnce error=", err.Error())
                    break
                }
            }
        case c := <-chWatch:
            for _, e := range c.Events {
                if e == nil || e.Kv == nil {
                    continue
                }
                token := string(e.Kv.Value)
                sm.isMaster = sm.token == token
                if sm.callback == nil {
                    fmt.Println("SelectMaster.callback is nil")
                } else {
                    sm.callback(sm.isMaster)
                    fmt.Println("SelectMaster.isLoop token=", token)
                    if token == "" { //主挂了,开始竞选
                        sm.election()
                    }
                }
            }
        case <-sm.chClose:
            goto stop
        }
    }
stop:
    fmt.Println("SelectMaster.ioLoop end")
}

func (sm *SelectMaster) IsMaster() bool {
    return sm.isMaster
}

func (sm *SelectMaster) Close() {
    sm.chClose <- 1
}

func (sm *SelectMaster) Election(callback Callback) (bool, error) {
    sm.callback = callback
    return sm.election()
}

func (sm *SelectMaster) election() (bool, error) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
    defer cancel()
    leaseResp, err := sm.cli.Grant(ctx, 10)
    if err != nil {
        return false, err
    }
    sm.lease = leaseResp
    txn := clientv3.NewKV(sm.cli).Txn(context.TODO())
    txn.If(clientv3.Compare(clientv3.CreateRevision(sm.key), "=", 0)).
        Then(clientv3.OpPut(sm.key, sm.token, clientv3.WithLease(leaseResp.ID))).Else()
    txnResp, err := txn.Commit()
    if err != nil {
        return false, err
    }
    return txnResp.Succeeded, nil
}

func testSelectMaster() *SelectMaster {
    endPoints := []string{"172.25.20.248:2379"}
    sm, err := NewSelectMaster(endPoints, "/test/lock")
    if err != nil {
        fmt.Println(err.Error())
        return nil
    }
    callback := func(isMaster bool) {
        fmt.Println(sm.token, "callback=", isMaster)
    }
    isSuccess, err := sm.Election(callback)
    if err != nil {
        fmt.Println(sm.token, "Election=", err.Error())
    } else {
        fmt.Println(sm.token, "Election=", isSuccess)
    }
    return sm
}

func TestSelectMaster() {
    var master *SelectMaster
    for i := 0; i < 3; i++ {
        sm := testSelectMaster()
        if sm.IsMaster() {
            master = sm
        }
    }
    if master != nil {
        master.Close()
    }
    time.Sleep(time.Second*10)
}  

相关文章