七叶笔记 » golang编程 » Golang database/sql源码分析

Golang database/sql源码分析

简介

G ORM 是Go语言开发用的比较多的一个ORM。它的功能比较全:

  • 增删改查
  • 关联(包含一个,包含多个,属于,多对多,多种包含)
  • CallBacks(创建、保存、更新、删除、查询找)之前 之后都可以有callback函数
  • 预加载
  • 事务
  • 复合主键
  • 日志

database/sql 包

但是这篇文章中并不会直接看Gorm的源码,我们会先从database/ sql 分析。原因是Gorm也是基于这个包来封装的一些功能。所以只有先了解了database/sql包才能更加好的理解Gorm源码。

database/sql 其实也是一个对于mysql驱动的上层封装。”github.com/go-sql-driver/mysql”就是一个对于mysql的驱动,database/sql 就是在这个基础上做的基本封装包含连接池的使用

使用例子

下面这个是最基本的增删改查操作

操作分下面几个步骤:

  1. 引入github.com/go-sql-driver/mysql包(包中的init方法会初始化mysql驱动的注册)
  2. 使用sql.Open 初始化一个sql.DB结构
  3. 调用Prepare Exec 执行sql语句
 package main

 import  (
    "database/sql"
    "fmt"
    _ "github.com/go-sql-driver/mysql"
    " strconv "
)

func main() {
    // 打开连接
    db,  Err  := sql.Open("mysql", " root :feg@125800@tcp(47.100.245.167:3306)/artifact? charset =utf8&loc=Asia%2FShanghai&parseTime=True")
    if err != nil {
        fmt.Println("err:",  err )
    }
    // 设置最大空闲连接数
    db.SetMaxIdleConns(1)
    // 设置最大链接数
    db.SetMaxOpenConns(1)
    query(db, 3)
}

//修改
func update(db *sql.DB, id int, user string) {
    stmt, err := db.Prepare("update user set UserName=? where Id =?")
    if err != nil {
        fmt.Println(err)
    }
    res, err := stmt.Exec(user, id)
    updateId, err := res.LastInsertId()
    fmt.Println(updateId)
}

//删除
func delete(db *sql.DB, id int) {
    stmt, err := db.Prepare("delete  from user where id = ?")
    if err != nil {
        fmt.Println(err)
    }
    res, err := stmt.Exec(1)
    updateId, err := res.LastInsertId()
    fmt.Println(updateId)
}

//查询
func query(db *sql.DB, id int) {
    rows, err := db.Query("select * from user where  id = " + strconv.Itoa(id))
    if err != nil {
        fmt.Println(err)
        return
    }

    for rows.Next() {
        var id int
        var user string
        var pwd string
        rows.Scan(&id, &user, &pwd)
        fmt.Println("id:", id, "user:", user, "pwd:", pwd)
    }
    rows.Close()
}

//插入
func insert(db *sql.DB, user, pwd string) {
    stmt, err := db.Prepare("insert into user set UserName=?,Password=?")
    if err != nil {
        fmt.Println(err)
    }
    res, err := stmt.Exec("peter", "panlei")
    id, err := res.LastInsertId()
    fmt.Println(id)
}
  

连接池

因为Gorm的连接池就是使用database/sql包中的连接池,所以这里我们需要学习一下包里的连接池的源码实现。其实所有连接池最重要的就是连接池对象、获取函数、释放函数下面来看一下database/sql中的连接池。

DB对象

 type DB  struct  {
    //数据库实现驱动
    driver driver.Driver
     dsn     string
    numClosed uint64
    // 锁
    mu           sync.Mutex // protects following fields
    // 空闲连接
    freeConn     []*driverConn
    //阻塞请求队列,等连接数达到最大限制时,后续请求将插入此队列等待可用连接
    conn request s map[uint64]chan connRequest
    // 记录下一个key用于connRequests map的key
    nextRequest  uint64 // Next key to use in conn Request s.
    numOpen      int    //  number  of opened and pending open connections

    openerCh    chan struct{}
    closed       bool 
    dep         map[finalCloser]depSet
    lastPut     map[*driverConn]string 
    // 最大空闲连接数
    maxIdle     int                    
    // 最大打开连接数
    maxOpen     int  
    // 连接最大存活时间
    maxLifetime time.Duration          
    cleanerCh   chan struct{}
}  

获取方法

 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // Check if the context is expired.
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime

    // 查看是否有空闲的连接 如果有则直接使用空闲连接
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        // 取出数据第一个
        conn := db.freeConn[0]
        // 复制数组,去除第一个连接
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn,  nil 
    }

    // 判断是否超出最大连接数 
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 创建一个chan 
        req := make(chan connRequest, 1)
        // 获取下一个request 作为map 中的key
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.mu.Unlock()

        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // Remove the connection request and ensure no value has been sent
            // on it after removing.
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            select {
            default:
            case ret, ok := <-req:
                if ok {
                    db.putConn(ret.conn, ret.err)
                }
            }
            return nil, ctx.Err()
        // 如果没有取消则从req chan中获取数据 阻塞主一直等待有conn数据传入
        case ret, ok := <-req:
            if !ok {
                return nil, errDBClosed
            }
            // 判断超时 
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }

    db.numOpen++ // optimistically
    db.mu.Unlock()
    // 调用driver的Open方法建立连接
    ci, err := db.driver.Open(db.dsn)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}  

释放连接方法

 // 释放连接
func (db *DB) putConn(dc *driverConn, err error) {
    db.mu.Lock()
    if !dc.inUse {
        if debugGetPut {
            fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
        }
        panic("sql: connection returned that was never out")
    }
    if debugGetPut {
        db.lastPut[dc] = stack()
    }
    // 设置已经在使用中
    dc.inUse = false

    for _, fn := range dc.onPut {
        fn()
    }
    dc.onPut = nil
    // 判断连接是否有错误 
    if err == driver.ErrBadConn {
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        dc.Close()
        return
    }
    if putConnHook != nil {
        putConnHook(db, dc)
    }
    // 调用方法 释放连接
    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()
    // 判断如果没有加到了空闲列表中 dc关闭
    if !added {
        dc.Close()
    }
}

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 如果等待chan列表大于0 
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        // 获取map 中chan和key
        for reqKey, req = range db.connRequests {
            break
        }
        // 从列表中删除chan 
        delete(db.connRequests, reqKey) // Remove from pending requests.
        if err == nil {
            dc.inUse = true
        }
        // 把连接传入chan中 让之前获取连接被阻塞的获取函数继续
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
        // 如果没有等待列表,则把连接放到空闲列表中
        db.freeConn = append(db.freeConn, dc)
        db.startCleanerLocked()
        return true
    }
    return false
}  

连接池的实现有很多方法,在database/sql包中使用的是chan阻塞 使用map记录等待列表,等到有连接释放的时候再把连接传入等待列表中的chan 不在阻塞返回连接。

之前我们看到的Redigo是使用一个chan 来阻塞,然后释放的时候放入空闲列表,在往这一个chan中传入struct{}{},让程序继续 获取的时候再从空闲列表中获取。并且使用的是 链表 的结构来存储空闲列表。

总结

database/sql 是对于mysql驱动的封装,然而Gorm则是对于database/sql的再次封装。让我们可以更加简单的实现对于 mysql数据库 的操作。

相关文章