七叶笔记 » golang编程 » golang 实现一种环形队列,及周期任务

golang 实现一种环形队列,及周期任务

一、环形队列

环形队列不同语言有很多种不同的实现,不过大部分都比较复杂。

在使用golang实践生产者消费者模型时,发现了一种变相的环形队列,代码比“常规的”环形队列简单的多,解决2个问题:
1、生产者消费者间数据传递;
2、内存空间预申请,避免频繁的动态内存申请释放带来内存碎片以及性能损耗。

 package main
import (
    "fmt"
    "net"
    "time"
)
const (
    BUFF_MAX = 100
    BUFF_LEN = 1500
)
var (
    buff   [BUFF_MAX][BUFF_LEN]byte
    buffCh = make(chan int, BUFF_MAX-2) // chan 的长度要比buff少2,否则生产者会因为太快的写入,把未被消费者处理的缓冲区覆盖掉
)
func listenUdp(ip, port string) (*net.UDPConn, error) {
    addr, err := net.ResolveUDPAddr("udp4", ip+":"+"port")
    if err != nil {
        return nil, err
    }
    socket, err := net.ListenUDP("udp4", addr)
    if err != nil {
        return nil, err
    }
    return socket, nil
}
func connectUdp(ip, port string) (*net.UDPConn, error) {
    addr, err := net.ResolveUDPAddr("udp4", ip+":"+"port")
    if err != nil {
        return nil, err
    }
    socket, err := net.DialUDP("udp4", nil, addr)
    if err != nil {
        return nil, err
    }
    return socket, nil
}
func main() {
    go func() {
        sock, _ := listenUdp("localhost", "5000")
        defer sock.Close()
        index := 0
        for {
            n, _ := sock.Read(buff[index][:])
            if n > 0 {
                buffCh <- ((index << 16) | n)
                index++
                if index >= BUFF_MAX { // 模拟换行队列,从头开始复用buff
                    index = 0
                }
            }
        }
    }()
    go func() {
        sock, _ := connectUdp("localhost", "5000")
        defer sock.Close()
        for {
            data := <-buffCh
            index, n := data>>16, data&0xffff
            fmt.Println(index, n) // 现在可以处理 buff[index] 中的数据了,有效的数据长度 n
            time.Sleep(time.Second) // 模拟处理每包数据需要耗费的时间
        }
    }()
}  

代码说明:
1、第一个goroutine 生产者,第二个是消费者;
2、buffCh中传递了可以访问的数据在buff中的index和长度,实际使用中用什么chan以及什么方式传递这些内容可以根据需求变,比如传递buff[index][:];
3、特别要注意chan的长度要比buff至少少2。否则在生产者填充数据的时候,可能当前被填充的缓冲区正在被消费者使用。

借助于golang的chan的顺序性,可以非常简单的实现生产者和消费者传递数据,也避免了常规环形队列复杂的指针和互斥操作。

二、周期任务

以上面udp socket处理为例,有时需要消费者可以比较“平稳的”处理包数据,即在一个时间段内处理n包,不要太快。比如流视频场景,平稳的将报文投递到上层,可以让视频流更顺畅,而且也可以让接收端有机会对乱序、丢包做出一些处理。

一般的思路是:
1、启动一个循环定时器;
2、超时后处理n包数据。

代码示例(对上面的消费者修改):

 go func() {
        const PKG_MAX = 10
        sock, _ := connectUdp("localhost", "5000")
        defer sock.Close()
        t := time.NewTimer(time.Millisecond * 1000)
        for {
            for i := 0; i < PKG_MAX; i++ {
                data := <-buffCh
                index, n := data>>16, data&0xffff
                fmt.Println(index, n) // 现在可以处理 buff[index] 中的数据了,有效的数据长度 n
                time.Sleep(time.Millisecond * 50) // 模拟处理每包数据需要耗费的时间
            }
            <-t.C
            t.Reset(time.Millisecond * 1000)
        }
    }()  

上面代码的意图是,每秒发送10包数据,如果报文处理的快,那就等定时器超时;如果报文处理的慢,那定时器相当于失效了(每次执行到 <-t.C 时没有阻塞)。

需要注意是,定时器粒度越小越不准,波动越大。 上面的设置,如果改成每100ms发一包,会产生一些波动,而且性能也会下降不少。有兴趣的可以自己测测。

再演变的话,这个模型需要三个goroutine就可以实现一个流量稳定的rudp:

  • routineA:负责接收udp报文,将报文通过chan1传给routineB;
  • routineB:
    • 对乱序报文重排序、对丢包报文考虑重传(自己实现);
    • 将整理好的报文通过chan2传给routineC;
  • routineC:就像上面代码一样,定时定量把报文投递到应用层。

相关文章