当前位置: 首页 > news >正文

专业的英文网站建设全网营销的概念

专业的英文网站建设,全网营销的概念,建设一个电商网站需要多少钱,网址大全名称文章目录 XORM源码浅析及实践ORMORM vs. SQLXORM软件架构 ORM 引擎 Engine——DBM*core.DB Golang#xff1a;database/sql 源码基本结构连接复用#xff0c;提高性能。增加数据库连接池数量连接管理 database/sql主要内容#xff1a;sql.DB创建数据库连接sql.Open()DB.conn… 文章目录 XORM源码浅析及实践ORMORM vs. SQLXORM软件架构 ORM 引擎 Engine——DBM*core.DB Golangdatabase/sql 源码基本结构连接复用提高性能。增加数据库连接池数量连接管理 database/sql主要内容sql.DB创建数据库连接sql.Open()DB.conn可能需要创建新的连接 连接释放清理无效连接 数据库操作数据库操作Prepare() 和Query()区别接口的实现细节 Query基本使用源码解析 ExecPrepare基本使用 RowsDB.Stmt,Statements基本使用源码 XORM源码浅析及实践 ORM 应用程序是如何实现的数据的增删改查的 ORM 是 Object Relational Mapping 的缩写译为“对象关系映射”框架。 ORM 框架是一种数据持久化技术是一种为了解决面向对象与关系型数据库中数据类型不匹配的技术它通过描述对象与数据库表之间的映射关系自动将应用程序中的对象持久化到关系型数据库的表中。在程序中的对象和关系型数据库之间建立映射关系这样就可以用面向对象的方式操作数据。 持久化层是作为业务逻辑层和数据库层的桥梁为业务层提供数据增删改查的接口接收业务层的指令与数据库建立连接获取对应的数据 ORM就是处于持久化层的一个东西ORM支撑了数据的增删改查。ORM接收业务层指令后最终还是转换成SQL语句发送给数据库api驱动器 ORM vs. SQL ORM为业务层提供了更友好的接口但其最后还是将业务层的指令转换sql语句。从这大家应该可以猜到为了实现数据的增删改查除了使用ORM我们也可以直接使用sql。 为什么不直接使用sql使用ORM有什么优势吗 ORM将数据表映射为数据模型屏蔽了底层数据库的细节和差异并提供易用的接口生成对应的sql语句。直接使用sql势必要了解sql语法不同的数据库其sql语法也存在差异相对ORM学习成本更高因为ORM采用面向对象的方式操作数据可拓展新更强 让开发者只需关注具体业务逻辑具体实现只需调用对应的ORM接口这种模式可以屏蔽底层的数据库的细节不需要我们与复杂的 SQL 语句打交道直接采用操作对象的形式操作就可以。且框架会提供一些常用的功能如字段名别名查询条件的拼接。ORM主要解决的问题是对象关系的映射。这样我们在具体的操作数据库的时候就不用再和复杂的SQL语句打交道只要像平时操作对象一样操作他就行了不用sql直接编码能够操作对象一样从数据库获取数据. XORM Go语言ORM库. 通过它可以使数据库操作非常简便。 通过xorm框架开发者可以方便的使用各种封装好的方法来代替原生的sql语句。这样就降低了我们开发者使用数据库的门槛。 软件架构 builder 包下的 builder模块拼接 SQL 语句它主要是将符合 SQL 规范的语句封装成一个个的 API其本身并不能直接操作数据库。 dialect 包下的 IDialect 接口主要处理各类 SQL数据库的特性即: 方言(不属于SQL规范的各类数据库的特性)其本身并不能直接操作数据库。 schema 包下的 schema主要处理的是 Model与数据库 Table 之间的关系利用反射将模型映射成数据库表结构。 core 包主要是将 sql.DB扩展成core.DBsql.stmt扩展成core.stmt…就是扩展了 database/sql包下的内容。其本身具备了操作数据库的能力。 integration包下的 statement模块就是将 builderdialectcoreschema 装配起来生成一个个完整的SQL语句具备了操作本框架支持的SQL数据库的能力。 Session 只是对 Statement模块的包装和扩展。Session在Statement的基础上增加了一些自定义的逻辑和功能。 Engine 则又在 Session的基础上进一步的封装。扩展了 Session的功能。 每一个 Engine 对象只能持有一个处于连接状态的 *core.DB对象。它只能保持一个数据库连接 每一个 Engine 对象能打开任意多个 *Session 对象。(它能维护任意多个会话) ORM 引擎 Engine——DBM Engine 是orm的主要结构表示数据库管理器.通常一个应用程序只需要一个引擎 // Engine is the major struct of xorm, it means a database manager. // Commonly, an application only need one engine type Engine struct {cacherMgr *caches.Manager // 缓存模块defaultContext context.Context // 默认上下文。dialect dialects.Dialect // 主要是为了传给 Session.statementdriver dialects.DriverengineGroup *EngineGrouplogger log.ContextLogger // 上下文日志模块。tagParser *tags.Parser // 主要是为了传给 Session.statementdb *core.DBdriverName string //驱动名称例如:mysql,mssql,...dataSourceName string //对应驱动的 dataSourceName 连接信息TZLocation *time.Location // The timezone of the applicationDatabaseTZ *time.Location // The timezone of the databaselogSessionID bool // 默认为false,表示:没有创建 Session对象。初始化一个 Engine 对象时不会自动创建 Session 对象 }*core.DB DB 是 *sql.DB 的扩展。 type DB struct {*sql.DBMapper mapper.IMapper //表示结构体的字段名称和表的列名称之间的名称转换cache map[reflect.Type]*cacheStructmu sync.RWMutexLogger log.IContextLoggerhooks ictx.Hooks }mapper IMapper 是实现的多态任意一个struct实现了接口中的所有方法那么则认为该struct实现了该接口。 SnakeMapper 支持struct为驼峰式命名表结构为下划线命名之间的转换这个是默认的MaperSameMapper 支持结构体名称和对应的表名称以及结构体field名称与对应的表字段名称相同的命名GonicMapper 和SnakeMapper很类似但是对于特定词支持更好比如ID会翻译成id而不是i_d。 // Mapper 表示结构体的字段名称和表的列名称之间的名称转换 type IMapper interface {Obj2Table(string) stringTable2Obj(string) string }// 1-1名称map缓存 type CacheMapper struct {oriMapper IMapperobj2tableCache map[string]stringobj2tableMutex sync.RWMutextable2objCache map[string]stringtable2objMutex sync.RWMutex}// SameMapper 实现IMapper并在结构表和数据库表之间提供相同的名称 type SameMapper struct {} func (m SameMapper) Obj2Table(o string) string {return o } func (m SameMapper) Table2Obj(t string) string {return t }// SnakeMapper 实现IMapper并在结构表和数据库表之间驼峰式名称转化userId--user_id type SnakeMapper struct {}// GonicMapper 实现IMapper。映射名称时将考虑初始性。 // id - ID, user - User and to table names: UserID - user_id, MyUID - my_uid type GonicMapper map[string]bool Golangdatabase/sql 源码 database/sql主要是提供了对数据库的读写操作的接口和数据库连接池功能需要向其注册相应的数据库驱动才能工作。go-sql-driver/mysql是mysql数据库驱动mysql客户端的真正实现负责与mysql服务端进行交互完成连接、读写操作。github.com/lib/pq常用的第三方 PostgreSQL 驱动程序 Go内置了数据库相关的库 - database/sql实现数据库操作相关的接口其中还包含一个很重要的功能 - 连接池用来实现连接的复用限制连接的数量从而提高性能避免连接数量失控导致资源消耗不可控。 如何一步步设计包含连接池的数据库组件包括模型抽象、连接复用以及如何管理连接数。 基本结构 需要一个数据库对象将数据库对象抽象为一个DB的结构体一个对象对应的是一个数据库实例所以DB必须是**单例 数据库需要连接所以可以对连接进行抽象命名为Conn这里我们不关心Conn的属性而是关心行为所以Conn类型定义成一个interface包含所需两个方法 预处理方法Prepare(接收一个sql返回一个实现Stmt接口的预处对象接着设置一下参数最后执行数据库操作)关闭连接方法Close 不同的数据库连接的实现方式会有不同对连接的方式进行抽象定义一个连接接口 - Connector用来创建连接依赖倒置原则依赖于抽象而不依赖于具体。写代码时用到具体类时不与具体类交互而与具体类的上层接口交互当初始化DB的时候再将具体实现注入到DB对象中也就是依赖注入。 // 数据库对象 type DB struct {connector Connector }type Conn interface {Prepare(query string) (Stmt, error)Close() error }// 数据库连接接口 type Connector interface {Connect(context.Context) (Conn, error) }给DB对象增加一个获取连接的方法Conn在不考虑连接池的情况下调用connector.Connect(ctx)直接获取连接 // 获取一个连接 func (db *DB) Conn(ctx context.Context) (*Conn, error) {return db.connector.Connect(ctx) }连接复用提高性能。增加数据库连接池 存储空闲连接切片来存储freeConn []*Conn并发安全的问题需要增加一个锁mu sync.Mutex来保护连接要复用就不能关闭用完需要放回连接池中所以DB需要一个将连接放回连接池的方法 - putConn没有连接池功能的时候一个Conn用完了就一定会调用Close()释放资源有连接池功能后对Conn进行改造增加一层代理 // 数据库对象 type DB struct {connector Connectormu sync.Mutex // protects following fieldsfreeConn []*Conn }数据库连接获取方法Conn就需要修改为 如果没有创建新连接如果有返回一个经连接池包装之后的连接 func (db *DB) Conn(ctx context.Context) (*Conn, error) {db.mu.Lock() // 加锁保护freeConnnumFree : len(db.freeConn)if numFree 0 {conn : db.freeConn[0]// 移除第一个连接copy(db.freeConn, db.freeConn[1:])db.freeConn db.freeConn[:numFree-1]db.mu.Unlock()return conn, nil}db.mu.Unlock()return db.connector.Connect(ctx) // 没有空闲连接重新创建 }func (db *DB) putConn(dc Conn) {db.mu.Lock()db.freeConn append(db.freeConn, dc)db.mu.Unlock() }type PConn interface {db *DBci Conn //原来的Conn作为PConn的一个属性同时实现了Conn接口的两个方法 } func (pc *PConn) Close() error {dc.db.putConn(pc) } func (pc *PConn) Prepare(query string) (Stmt, error) {return pc.ci.Prepare(query) }// 调整一下Conn的创建方法 func (db *DB) Conn(ctx context.Context) (Conn, error) {...c , err : db.connector.Connect(ctx) // 没有空闲连接重新创建if err!nil {return nil, err}return PConn{ //返回包装过后的连接ci: c,db: db,} } 数量连接管理 限创建连接可能会导致资源耗尽资源消耗曲线过陡峭 限制连接数量。将连接的数量约束在指定的范围内 保存当前连接数量最大连接数 连接请求队列。当连接数量达到最大值时连接请求需要需要放到等待队列中等待有连接释放。 没有空闲连接、连接数已达最大连接数怎么操作 chan 可以理解为一个管道或者先进先出的队列。带缓存的 channel 实际上是一个阻塞队列。队列满时写协程会阻塞队列空时读协程阻塞。 Database/sql里是采用一个map来存储注这个有点奇怪为什么不用队列在DB结构体增加一个属性connRequests类型为map[uint64]chan connRequest其中key/value key 请求唯一标识。调用nextRequestKeyLocked方法生成实际上就是一个自增的序列只是为了保持唯一value等待连接的请求。类型为chan每个请求封装为一个chan可以保证并发安全同时也可以利用其阻塞特性当chan没有值时阻塞等待chan接收的数据类型为connRequest格式当其他协程有释放连接时会将连接放到一个connRequest对象中发送给该chanconnRequest只包含两个属性conn和err用来接收返回连接或是异常。 type DB struct {...numOpen int // number of opened and pending open connectionsmaxOpen int // 0 means unlimitednextRequest uint64 // Next key to use in connRequests.connRequests map[uint64]chan connRequest }func (db *DB) nextRequestKeyLocked() uint64 {next : db.nextRequestdb.nextRequestreturn next }// 连接请求 type connRequest struct {conn *PConnerr error }调整获取连接的方法Conn(ctx context.Context) 逻辑 判断freeConn是否有空闲连接有就返回判断连接数量numOpen是否大于maxOpen如果还小于maxOpen说明还可以创建新连接创建连接后numOpen返回连接当numOpen已经是大于等于maxOpen就不能再创建新连接这是就把请求放到集合connRequests中等待连接释放。 func (db *DB) Conn(ctx context.Context) (Conn, error) {db.mu.Lock() // 加锁保护freeConnnumFree : len(db.freeConn)if numFree 0 { // 有空闲连接conn : db.freeConn[0]copy(db.freeConn, db.freeConn[1:])db.freeConn db.freeConn[:numFree-1]db.mu.Unlock()return conn, nil}// 连接数已经超出if db.maxOpen 0 db.numOpen db.maxOpen {req : make(chan connRequest, 1) // 创建一个chan接收连接reqKey : db.nextRequestKeyLocked() // 生成唯一序号db.connRequests[reqKey] req // 放到全局属性让其他方法能访问到db.mu.Unlock()select {case -ctx.Done(): //超时等// Remove the connection request and ensure no value has been sent// on it after removing.db.mu.Lock()delete(db.connRequests, reqKey) // 移除db.mu.Unlock()}case ret, ok : -req: // 收到连接释放if !ok {return nil, errDBClosed}return ret.conn, ret.err}}// 连接数没超出可以创建新连接db.numOpen // optimisticallydb.mu.Unlock()c, err : db.connector.Connect(ctx) // 重新创建if err ! nil {return nil, err}return PConn{ci: c,db: db,}, nil }接着我们还需要调整连接释放方法 - putConn 增加一个bool返回值告诉调用方连接是否释放成功如果失败客户端可以决定关闭连接如果连接数numOpen大于 maxOpen时当前连接直接丢弃返回false当len(db.connRequests)大于0时需要考虑将连接优先给db.connRequests中的请求最后才将连接放入空闲列表中。 func (pc *PConn) Close() error {ok : dc.db.putConn(pc) //释放连接失败需要把连接关闭if !ok {dc.ci.Close()} }func (db *DB) putConn(dc Conn) bool{db.mu.Lock()defer db.mu.Unlock()if db.maxOpen 0 db.numOpen db.maxOpen {return false}// 当有等待连接的请求时if c : len(db.connRequests); c 0 {var req chan connRequestvar reqKey uint64for reqKey, req range db.connRequests {break}delete(db.connRequests, reqKey) // Remove from pending requests.req - connRequest{conn: dc,err: err,}return true}// 放入空闲连接池中db.freeConn append(db.freeConn, dc)return true }database/sql https://xie.infoq.cn/article/c705a7821cb0d63f8bd381276 https://studygolang.com/articles/35389 sql 包其实是 db 的抽象实际连接查询是 db 驱动包来完成 主要内容 database 包目录结构介绍数据库连接池的定义为什么要连接池主要核心的数据结构及解释重要方法的流程梳理及源码分析初始化数据库 获取连接释放连接清理连接复用连接 目录结构 └─database└─sql│ convert.go // 数据类型转换工具│ ctxutil.go // SQL查询接口的一层wrapper主要处理context close的问题│ sql.go // 数据库驱动和SQL的通用接口和类型包括连接池、连接、事务、statement等│└─driverdriver.go // driver定义了实现数据库驱动所需的接口这些接口由sql包和具体的驱动包来实现types.go // 数据类型的别名和转换sql.DB DB是数据库句柄它包含着一个连接池里面可以有0个或者多个连接。对于多协程的并发调用它也是安全的。sql包会自动的创建和释放连接池中的连接也能保持一个空闲连接的连接池。在事务中一旦DB.Begin被调用其返回的Tx对象就是绑定到一个单独的连接上直到事务提交或者回滚这个连接才会重新被返回空闲连接池。 sql.DBsql.Opensql.close DB结构 type DB struct {waitDuration int64 // 等待新的连接所需要的总时间原子操作waitDuration放在第一个是为了避免32位系统使用64位原子操作的坑https://toutiao.io/posts/jagmqm/previewconnector driver.Connector // 由数据库驱动实现的connectornumClosed uint64 // 关闭的连接数mu sync.Mutex // protects following fieldsfreeConn []*driverConn // 连接池connRequests map[uint64]chan connRequest // 阻塞等待连接的随机队列nextRequest uint64 // 下一个连接的keynumOpen int // 正在用的在连接池里的将要打开的// 用来接收创建新连接的信号 // connectionOpener方法会读取该channel的信号而当需要创建连接的时候maybeOpenNewConnections就会往该channel发信号。 // 当调用db.Close()的时候该channel就会被关闭openerCh chan struct{} // channel用于通知建立新连接closed bool // 当前数据库是否关闭dep map[finalCloser]depSet lastPut map[*driverConn]string // debug onlymaxIdleCount int // 最大空闲连接数也就是连接池的大小0代表默认连接数2负数代表0maxOpen int // 最大打开的连接数已使用空闲的小于等于0表示不限制maxLifetime time.Duration // 一个连接在连接池中最长的生存时间maxIdleTime time.Duration // Go1.5添加一个连接在被关掉前的最大空闲时间cleanerCh chan struct{} // channel用于通知清理连接waitCount int64 // 等待的连接数如果maxIdelCount为0waitCount就是一直为0maxIdleClosed int64 // Go1.5添加释放连接时因为连接池已满而被关闭的连接数maxLifetimeClosed int64 // 连接超过生存时间后而被关闭的连接数stop func() // stop cancels the connection opener and the session resetter. } 创建数据库连接 sql.Open() sql.Open只是创建了数据库对象简称db以及开启了两个协程并没有连接数据库的操作 //来在导入go-sql-driver时该库的init函数就自动注册一个数据库驱动到sql.drivers var (driversMu sync.RWMutexdrivers make(map[string]driver.Driver) )type DB struct {....connector driver.Connector }type Connector interface {Connect(context.Context) (Conn, error)Driver() Driver }func Open(driverName, dataSourceName string) (*DB, error) {driversMu.RLock()// 根据driverName获取相应的数据库驱动对象 driveri, ok : drivers[driverName]driversMu.RUnlock()if !ok {return nil, fmt.Errorf(sql: unknown driver %q (forgotten import?), driverName)}// 如果 Driver 实现了 DriverContext则 sql.DB 将调用 OpenConnector 来获取 Connector然后调用该 Connector 的 Connect 方法来获取每个所需的连接而不是为每个连接调用 Driver 的 Open 方法。if driverCtx, ok : driveri.(driver.DriverContext); ok {connector, err : driverCtx.OpenConnector(dataSourceName)if err ! nil {return nil, err}return OpenDB(connector), nil}// 调用 sql.OpenDB返回数据库对象return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil }func OpenDB(c driver.Connector) *DB {ctx, cancel : context.WithCancel(context.Background())// 创建数据库对象db : DB{connector: c,openerCh: make(chan struct{}, connectionRequestQueueSize),lastPut: make(map[*driverConn]string),connRequests: make(map[uint64]chan connRequest),stop: cancel,}//初始化 db 后开一个协程来等待通过 openerCh channel 当做信号来创建新的数据库连接。// 开启协程负责收到请求后新建连接go db.connectionOpener(ctx)return db }// Runs in a separate goroutine, opens new connections when requested. func (db *DB) connectionOpener() {for {select {case -ctx.Done(): //超时等returncase -db.openerCh://收到打开一个新的连接信号db.openNewConnection(ctx)}} }// Open one new connection func (db *DB) openNewConnection() {ci, err : db.connector.Connect(ctx)// 创建连接的过程加了互斥锁db.mu.Lock()defer db.mu.Unlock()// 如果新连接创建失败或者已经被关闭则在返回之前必须减1if db.closed {if err nil {ci.Close()}db.numOpen--return}if err ! nil {db.numOpen--db.putConnDBLocked(nil, err)db.maybeOpenNewConnections()return}dc : driverConn{db: db,createdAt: nowFunc(),ci: ci,}// 获取到连接之后会根据 putConnDBLocked 这个方法去决定这个连接是否被复用。如果复用则返回如果无法复用就会关掉这个连接不会一直放着不管的。if db.putConnDBLocked(dc, err) {db.addDepLocked(dc, dc)} else {db.numOpen--ci.Close()} }// 如果有连接请求并且连接数还没达到上限就通知connectionOpener创建新的连接 func (db *DB) maybeOpenNewConnections() {numRequests : len(db.connRequests)if db.maxOpen 0 {numCanOpen : db.maxOpen - db.numOpenif numRequests numCanOpen {numRequests numCanOpen}}for numRequests 0 {db.numOpen // optimisticallynumRequests--if db.closed {return}db.openerCh - struct{}{}} } putConnDBLocked 如何决定传进来的连接是否可以复用。 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {// 查看 DB 状态及是否有限制如果关闭了或者达到限制了那当然就无法复用了if db.closed {return false}if db.maxOpen 0 db.numOpen db.maxOpen {return false}// 看一下 connRequests 是否有阻塞等待的请求if c : len(db.connRequests); c 0 {var req chan connRequestvar reqKey uint64for reqKey, req range db.connRequests {break}delete(db.connRequests, reqKey) // Remove from pending requests.if err nil {dc.inUse true}req - connRequest{conn: dc,err: err,}return true// 没有的话就看下连接池是否可用可以的话就放到连接池里面} else if err nil !db.closed {if db.maxIdleConnsLocked() len(db.freeConn) {db.freeConn append(db.freeConn, dc)// 会开启清扫连接池里面空闲连接的逻辑db.startCleanerLocked()return true}db.maxIdleClosed}return false }DB.conn DB.Conn是db内部获取数据库连接的函数通过 2 个策略来获取连接cachedOrNewConn从连接池内或者新建获取连接alwaysNewConn总是通过新建来获取连接. 连接池获取连接就是这样的逻辑设计复用池阻塞等待直接获取。 const (// alwaysNewConn forces a new connection to the database.alwaysNewConn connReuseStrategy iota// 返回一个缓存连接如果可用否则等待一个可用如果已达到 MaxOpenConns或创建一个新的数据库连接cachedOrNewConn )func (db *DB) Conn(ctx context.Context) (*Conn, error) {var dc *driverConnvar err error// 尝试通过缓存或者新建(连接池里面没有可用连接)来获取连接如果获取不到最大重试次数为2次for i : 0; i maxBadConnRetries; i {dc, err db.conn(ctx, cachedOrNewConn)if err ! driver.ErrBadConn {break}}// 如果在上一步通过缓存或者新建获取不到连接则再通过新建获取一个连接if err driver.ErrBadConn {dc, err db.conn(ctx, alwaysNewConn)}if err ! nil {return nil, err}conn : Conn{db: db,dc: dc,}return conn, nil } 这个方法会分为三种情况返回连接分别是 从连接池获取复用连接连接池无可用连接或不允许使用连接池且创建的连接有数量限制的请求就会阻塞等待创建好的连接直接通过 connector 驱动去从数据库获取连接 从连接池里面直接获取 如果策略是 cachedOrNewConn 的先检查 freeConn 连接池里面有没有可以复用的连接如果有取出来判断没有超时后返回可用的连接。在取的这个逻辑L4~L6 是获取连接池第一个的连接的写法这样的做法可以减少切片的容量-1这样可以减少切片因为伸缩而产生的内存分配问题。 last : len(db.freeConn) - 1 // 如果strategy是cachedOrNewConn优先从空闲队列中获取 if strategy cachedOrNewConn last 0 {conn : db.freeConn[last]db.freeConn db.freeConn[:last]conn.inUse trueif conn.expired(lifetime) {db.maxLifetimeCloseddb.mu.Unlock()conn.Close()return nil, driver.ErrBadConn}db.mu.Unlock()// Reset the session if required.if err : conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {conn.Close()return nil, err}return conn, nil }如果strategy不是cachedOrNewConn或者空闲队列为空则先检查最大打开连接数和正在打开连接数如果正在打开连接数已经到达了限制创建一个获取连接请求并等待。如果正在打开连接数还没有到达限制直接创建一个新连接 // 如果打开的连接数已经到达了限制创建一个获取连接请求并等待。 if db.maxOpen 0 db.numOpen db.maxOpen {req : make(chan connRequest, 1)reqKey : db.nextRequestKeyLocked()db.connRequests[reqKey] reqdb.waitCountdb.mu.Unlock()waitStart : nowFunc()// Timeout the connection request with the context.select {case -ctx.Done()://等待超时db.mu.Lock()delete(db.connRequests, reqKey)db.mu.Unlock()atomic.AddInt64(db.waitDuration, int64(time.Since(waitStart)))select {default:case ret, ok : -req://删除的过程中再检查了一下会不会刚好超时刚好 req 收到连接的情况if ok ret.conn ! nil {db.putConn(ret.conn, ret.err, false)//那不会直接 close相反的会留着用放到连接池里面使用}}return nil, ctx.Err()case ret, ok : -req:// req 获取到连接了// 如果策略是cachedOrNewConns则仅检查连接是否过期。if strategy cachedOrNewConn ret.err nil ret.conn.expired(lifetime) {db.mu.Lock()db.maxLifetimeCloseddb.mu.Unlock()ret.conn.Close()return nil, driver.ErrBadConn}if ret.conn nil {return nil, ret.err}// Reset the session if required.if err : ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {ret.conn.Close()return nil, err}return ret.conn, ret.err }//如果strategy不是cachedOrNewConn或者空闲连接队列为空需要创建一个新的连接。db.numOpen // optimisticallydb.mu.Unlock()ci, err : db.connector.Connect(ctx)if err ! nil {db.mu.Lock()db.numOpen-- // correct for earlier optimism//不是失败就失败了而是会在错误里面再去调一次获取连接的函数。这里的思维就是尽可能的减少 connRequests 里面阻塞的请求连接池设计不仅仅是要思考如何复用更多的也要考虑如何减少阻塞的请求db.maybeOpenNewConnections()db.mu.Unlock()return nil, err}db.mu.Lock()dc : driverConn{db: db,createdAt: nowFunc(),returnedAt: nowFunc(),ci: ci,inUse: true,}db.addDepLocked(dc, dc)db.mu.Unlock()return dc, nil 可能需要创建新的连接 maybeOpenNewConnections 。这个方法就是为了获取连接更多的是给阻塞的请求创建连接的一个方法。里面主要的也就是往前面讲到的 openerCh 发信号通知它要创建一个新的连接了有某个请求可能需要了。 在这边创建失败后再次调取 maybeOpenNewConnections目的还是觉得有需要连接的阻塞的请求 func (db *DB) maybeOpenNewConnections() {numRequests : len(db.connRequests)if db.maxOpen 0 {numCanOpen : db.maxOpen - db.numOpenif numRequests numCanOpen {numRequests numCanOpen}}for numRequests 0 {db.numOpen // optimisticallynumRequests--if db.closed {return}// db.openerCh 收到信号后其实调取底层的方法就是 openNewConnectiondb.openerCh - struct{}{}} }连接释放 func (c *Conn) Close() error {return c.close(nil) }closemu sync.RWMutex func (c *Conn) close(err error) error {if !atomic.CompareAndSwapInt32(c.done, 0, 1) {return ErrConnDone}// Lock around releasing the driver connection// to ensure all queries have been stopped before doing so.c.closemu.Lock()defer c.closemu.Unlock()c.dc.releaseConn(err)c.dc nilc.db nilreturn err }func (dc *driverConn) releaseConn(err error) {dc.db.putConn(dc, err, true) }// putConn adds a connection to the dbs free pool. func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {// 判断是否为坏连接driver.ErrBadConnif errors.Is(err, driver.ErrBadConn) {// Dont reuse bad connections.// Since the conn is considered bad and is being discarded, treat it// as closed. Dont decrement the open count here, finalClose will// take care of that.db.maybeOpenNewConnections()// tell the connectionOpener to open new connections.db.mu.Unlock()dc.Close()return}// 用于测试的后置钩子函数if putConnHook ! nil {putConnHook(db, dc)}added : db.putConnDBLocked(dc, nil)db.mu.Unlock()if !added {dc.Close()return} }func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {if db.closed {return false}if db.maxOpen 0 db.numOpen db.maxOpen {return false}if c : len(db.connRequests); c 0 {var req chan connRequestvar reqKey uint64for reqKey, req range db.connRequests {break}delete(db.connRequests, reqKey) // Remove from pending requests.if err nil {dc.inUse true}req - connRequest{conn: dc,err: err,}return true} else if err nil !db.closed {if db.maxIdleConnsLocked() len(db.freeConn) {db.freeConn append(db.freeConn, dc)db.startCleanerLocked()return true}db.maxIdleClosed}return false }清理无效连接 清理无效连接不是一初始化数据库就开一个协程等超时清理 放回连接池就会发起一次清理连接池空闲连接的动作重新设置连接最大生存时间的时候也会触发一次。 // startCleanerLocked starts connectionCleaner if needed. func (db *DB) startCleanerLocked() {if (db.maxLifetime 0 || db.maxIdleTime 0) db.numOpen 0 db.cleanerCh nil {db.cleanerCh make(chan struct{}, 1)go db.connectionCleaner(db.shortestIdleTimeLocked())} }func (db *DB) connectionCleaner(d time.Duration) {const minInterval time.Secondif d minInterval {d minInterval}t : time.NewTimer(d)//1秒的定时器for {select {case -t.C://定时清理case -db.cleanerCh: // 调用清理maxLifetime was changed or db was closed.}db.mu.Lock()d db.shortestIdleTimeLocked()if db.closed || db.numOpen 0 || d 0 {db.cleanerCh nildb.mu.Unlock()return} //找到超过max connection lifetime limit、idle time的连接driverConnd, closing : db.connectionCleanerRunLocked(d)db.mu.Unlock()for _, c : range closing {c.Close()}if d minInterval {d minInterval}if !t.Stop() {select {case -t.C:default:}}t.Reset(d)} }创建新连接是调用db.connector.Connect进行创建的先来回顾一下db.connector是什么是怎么来的: 在xorm.io/dialects/driver.go中有个注册函数,注册一个pq数据库驱动到sql.drivers func regDrvsNDialects() bool {providedDrvsNDialects : map[string]struct {dbType schemas.DBTypegetDriver func() DrivergetDialect func() Dialect}{mssql: {mssql, func() Driver { return odbcDriver{} }, func() Dialect { return mssql{} }},odbc: {mssql, func() Driver { return odbcDriver{} }, func() Dialect { return mssql{} }}, mysql: {mysql, func() Driver { return mysqlDriver{} }, func() Dialect { return mysql{} }},mymysql: {mysql, func() Driver { return mymysqlDriver{} }, func() Dialect { return mysql{}}},postgres: {postgres, func() Driver { return pqDriver{} }, func() Dialect { return postgres{}}},....}for driverName, v : range providedDrvsNDialects {if driver : QueryDriver(driverName); driver nil {RegisterDriver(driverName, v.getDriver())RegisterDialect(v.dbType, v.getDialect)}}return true }pq conn.go func init() {sql.Register(postgres, Driver{}) }数据库操作 https://juejin.cn/post/7270900777643622437 关于数据库操作 database/sql 包 功能提供了 SQL 类数据库的通用接口 前提 必须和数据库驱动配合使用不支持 context 取消的驱动直到查询完成后才会返回结果支持上下文参数 context 指在执行数据库查询或其他操作时可以使用上下文对象来设置超时时间、传递控制信号以及在需要时进行取消操作。 数据库操作 类型定义 Rows查询结果集 rows.Close() 函数释放与 *sql.Rows 关联的数据库资源rows.Next() 函数遍历rows.Scan() 函数将每行记录的值复制到这个例子中定义结构体变量 类型 stmt预编译语句Query 执行数据库的Query操作例如一个Select语句返回一个 *sql.Rows 对象。非查询db.Exec() 函数用于执行非查询 SQL 语句例如 INSERT、UPDATE 和 DELETE返回一个 sql.Result 对象db.Prepare()准备一个数据库query操作返回一个 sql 执行模板*Stmt用于后续query或执行。这个Stmt可以被多次执行或者并发执行,预编译提高效率、防止Sql注入 其中的?为需要输入的参数之后通过stmt.Exec()添加参数。Exec返回的Result可以获取 LastInsertId()但是并不是所有数据库都支持RowsAffected()能够获取修改数据的条数。 Prepare() 和Query()区别 rows, err : store.DB.Query(SQL, args ...) defer rows.Close()stmt, err : store.DB.Prepare(SQL) defer stmt.Close() // 将参数传递给方法而不是使用原始SQL字符串(出于安全考虑). rows, err : stmt.Query(args ...) defer rows.Close()接口的实现细节 func (db *DB) Prepare(query string) (*Stmt, error) func (db *DB) Exec(query string, args ...interface{}) (Result, error) func (db *DB) Query(query string, args ...interface{}) (*Rows, error) func (db *DB) QueryRow(query string, args ...interface{}) *Row这几个接口真正的实现者是 func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)核心功能实现者 func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error)然后最终落实到 func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error)*DC函数主要是通过调用driverConn实现的函数来执行数据库操作指令当然driverConn也是继续调用Driver.Conn的实现的函数这就和具体的驱动实现有关了这里我们只关心到driverConn。事实上这里会对应调用Driver.Conn的下列函数 type ConnPrepareContext interface {// PrepareContext returns a prepared statement, bound to this connection.// context is for the preparation of the statement,// it must not store the context within the statement itself.PrepareContext(ctx context.Context, query string) (Stmt, error) }type ExecerContext interface {ExecContext(ctx context.Context, query string, args []NamedValue) (Result, error) }type QueryerContext interface {QueryContext(ctx context.Context, query string, args []NamedValue) (Rows, error) }Query 基本使用 func selectData(db *sql.DB) {// 返回查询到的所有行rows, err : db.Query(SELECT * FROM userinfo)checkerr(err)// 一行一行地循环显示// rows.Next() 查看是否还有下一行 有则返回为真否则假。为后面的rows.Scan()做准备for rows.Next() {var uid intvar username stringvar department stringvar createds string// 由刚才的rows.Next()获得当前行后 将当前行的各列填入对应参数中// 每次调用Scan方法都必须在前面调用Next方法err rows.Scan(uid, username, department, createds)checkerr(err)fmt.Printf(%d,%s,%s,%s\n, uid, username, department, createds)} } 源码解析 func (db *DB) Query(query string, args ...any) (*Rows, error) {// 调用DB.QueryContext自动传入一个空的context因此DB.Query无法通过context设置超时或提前取消。return db.QueryContext(context.Background(), query, args...) }func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {var rows *Rowsvar err errorvar isBadConn boolfor i : 0; i maxBadConnRetries; i {rows, err db.query(ctx, query, args, cachedOrNewConn)isBadConn errors.Is(err, driver.ErrBadConn)if !isBadConn {break}}if isBadConn {return db.query(ctx, query, args, alwaysNewConn)}return rows, err }func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {// 调用db.conn来获取连接获取连接成功后调用驱动相应的接口进行查询操作dc, err : db.conn(ctx, strategy)if err ! nil {return nil, err}// 调用DB.queryDC时传入了db.releaseConn函数就是为查询操作结束或者发生异常情况释放连接做准备的return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args) }func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []any) (*Rows, error) {// 如果驱动有实现QueryerContext或者Queryer接口则直接执行驱动的相应接口queryerCtx, ok : dc.ci.(driver.QueryerContext)var queryer driver.Queryerif !ok {queryer, ok dc.ci.(driver.Queryer)}if ok {var nvdargs []driver.NamedValuevar rowsi driver.Rowsvar err errorwithLock(dc, func() {// 来自 Stmt.Exec 和 Stmt.Query 调用者的参数转换为驱动程序值。nvdargs, err driverArgsConnLocked(dc.ci, nil, args)if err ! nil {return}rowsi, err ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)})if err ! driver.ErrSkip {if err ! nil {releaseConn(err)return nil, err}// Note: ownership of dc passes to the *Rows, to be freed// with releaseConn.rows : Rows{dc: dc,releaseConn: releaseConn,rowsi: rowsi,}// rows.initContextClose函数会开启一个协程阻塞监听context的Done事件消息// 若发生context超时或cancel事件调用rows.close,rows.close能调用cancel函数结束awaitDone协程rows.initContextClose(ctx, txctx)return rows, nil}}// 如果如果驱动没有实现QueryerContext或者Queryer接口又或者返回了driver.ErrSkip错误// 则采取先预编译查询语句再传参进行查询的方法var si driver.Stmtvar err errorwithLock(dc, func() {si, err ctxDriverPrepare(ctx, dc.ci, query)})if err ! nil {releaseConn(err)return nil, err}ds : driverStmt{Locker: dc, si: si}rowsi, err : rowsiFromStatement(ctx, dc.ci, ds, args...)if err ! nil {ds.Close()releaseConn(err)return nil, err}// Note: ownership of ci passes to the *Rows, to be freed// with releaseConn.rows : Rows{dc: dc,releaseConn: releaseConn,rowsi: rowsi,closeStmt: ds,}// 监听context超时或cancel时调用rows.closerows.initContextClose(ctx, txctx)return rows, nil }Exec 定义func (*Conn) ExecContext(ctx context.Context,query string,args ...any)(Result,error) 作用执行不返回任何行结果的查询。 场景通常用于执行 INSERT、UPDATE 或 DELETE 等修改数据库的操作 参数 ctx:上下文对象query: sql 查询语句args可选替换 SQL 查询中的占位符参数如 ? 或 :name import (contextdatabase/sqllog ) var (ctx context.Contextdb *sql.DB // sql.DB 是封装好的一个数据库操作对象包含了操作数据库的基本方法 ) func main() {conn, err : db.Conn(ctx)if err ! nil {log.Fatal(err)}defer conn.Close()id : 41result, err : conn.ExecContext(ctx, UPDATE balances SET balance balance 10 WHERE user_id ?;, id)if err ! nil {log.Fatal(err)}rows, err : result.RowsAffected()if err ! nil {log.Fatal(err)}if rows ! 1 {log.Fatalf(expected single row affected, got %d rows affected, rows)} } Prepare 基本使用 func main() {// 创建数据库连接db, err : sql.Open(mysql, username:passwordtcp(127.0.0.1:3306)/database_name)if err ! nil {panic(err.Error())}defer db.Close()// 创建 Prepared Statementstmt, err : db.Prepare(SELECT name, age FROM users WHERE age ?)if err ! nil {panic(err.Error())}defer stmt.Close()// 执行 Prepared Statementrows, err : stmt.Query(18)if err ! nil {panic(err.Error())}defer rows.Close()// 遍历查询结果for rows.Next() {var name stringvar age intif err : rows.Scan(name, age); err ! nil {panic(err.Error())}fmt.Printf(Name: %s, Age: %d\n, name, age)}// 检查是否出错if err : rows.Err(); err ! nil {panic(err.Error())} }Rows rows是Query接口的返回它是一个迭代器抽象可以通过rows.Next遍历查询操作返回的所有结果行。 rows依赖于它的子成员rowsi它是driver.Rows类型。rows的Next函数依赖于rowsi的Next函数来完成Rows.Scan会将数据库的字段转化为Go能识别的字段 rows, _ : db.Query() defer rows.Close() for rows.Next() {rows.Scan() } // 如果是多结果集查询 for rows.NextResultSet() {for rows.Next() {rows.Scan()} }func (rs *Rows) Next() bool {var doClose, ok boolwithLock(rs.closemu.RLocker(), func() {// 当nextLocked函数返回EOF或其他类型的错误时就会自动close调这个rows而rows.close会release调他所占用的连接doClose, ok rs.nextLocked()})if doClose {rs.Close()}return ok }func (rs *Rows) Scan(dest ...interface{}) error {if rs.isClosed() {return errors.New(sql: Rows are closed)}if rs.lastcols nil {return errors.New(sql: Scan called without calling Next)}if len(dest) ! len(rs.lastcols) {return fmt.Errorf(sql: expected %d destination arguments in Scan, not %d, len(rs.lastcols), len(dest))}for i, sv : range rs.lastcols {err : convertAssign(dest[i], sv)if err ! nil {return fmt.Errorf(sql: Scan error on column index %d: %v, i, err)}}return nil }DB.Stmt,Statements 基本使用 一般用Prepared Statements和Exec()完成INSERT, UPDATE, DELETE操作。 stmt, err : db.Prepare(INSERT INTO users(name) VALUES(?)) if err ! nil {log.Fatal(err) } res, err : stmt.Exec(sommer) if err ! nil {log.Fatal(err) } lastId, err : res.LastInsertId() if err ! nil {log.Fatal(err) } rowCnt, err : res.RowsAffected() if err ! nil {log.Fatal(err) } log.Printf(ID %d, affected %d\n, lastId, rowCnt)源码 在数据库层面Prepared Statements是和单个数据库连接绑定的。客户端发送一个有占位符的statement到服务端服务器返回一个statement ID然后客户端发送ID和参数来执行statement。 当生成一个Prepared Statement 自动在连接池中绑定到一个空闲连接Stmt对象记住绑定了哪个连接执行Stmt时尝试使用该连接。如果不可用例如连接被关闭或繁忙中会自动re-prepare绑定到另一个连接。 type Stmt struct {...css []connStmt.... }func (db *DB) Prepare(query string) (*Stmt, error) {return db.PrepareContext(context.Background(), query) }func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {var stmt *Stmtvar err errorfor i : 0; i maxBadConnRetries; i {stmt, err db.prepare(ctx, query, cachedOrNewConn)if err ! driver.ErrBadConn {break}}if err driver.ErrBadConn {return db.prepare(ctx, query, alwaysNewConn)}return stmt, err }func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {dc, err : db.conn(ctx, strategy)if err ! nil {return nil, err}var si driver.StmtwithLock(dc, func() {si, err dc.prepareLocked(ctx, query)})if err ! nil {db.putConn(dc, err)return nil, err}stmt : Stmt{db: db,query: query,css: []connStmt{{dc, si}},lastNumClosed: atomic.LoadUint64(db.numClosed),}db.addDep(stmt, stmt)db.putConn(dc, nil)return stmt, nil }func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {if err s.stickyErr; err ! nil {return}s.mu.Lock()if s.closed {s.mu.Unlock()err errors.New(sql: statement is closed)return}// In a transaction or connection, we always use the connection that the// stmt was created on.if s.cg ! nil {s.mu.Unlock()dc, releaseConn, err s.cg.grabConn(ctx) // blocks, waiting for the connection.if err ! nil {return}return dc, releaseConn, s.cgds, nil}s.removeClosedStmtLocked() //这里把所有close的连接从css中删除掉主要是为了优化遍历s.css的速度s.mu.Unlock()//为什么不直接先在css里面尝试寻找没有的话再从全局的空闲连接池中找。(css中的连接无法判断是否是空闲不是有一个inUse字段的吗.从复杂度方面考虑一下)dc, err s.db.conn(ctx, strategy)//并不会优先获得执行过prepare语句的connectionif err ! nil {return nil, nil, nil, err}s.mu.Lock()for _, v : range s.css { //if v.dc dc { //检查从空闲连接池获得的是不是该statement执行prepare指令的连接s.mu.Unlock()return dc, dc.releaseConn, v.ds, nil}}s.mu.Unlock()//如果不是则需要重新在这条新的连接上prepare这个statement// No luck; we need to prepare the statement on this connectionwithLock(dc, func() {ds, err s.prepareOnConnLocked(ctx, dc)})if err ! nil {dc.releaseConn(err)return nil, nil, nil, err}return dc, dc.releaseConn, ds, nil }
http://www.pierceye.com/news/280581/

相关文章:

  • 美食网站界面设计网页设计制作代码大全
  • 宁波网站建设托管网站正在建设维护中页面
  • 古色古香网站模板响应式布局网站
  • 网站建设制作设计开发福建网站开发文档撰写
  • 钢管公司网站建设国外平面设计欣赏网站
  • 网站建设如何销售济南专门做网站的公司
  • 2018年淘宝客网站怎么做iis网站建设中
  • 网站倒计时代码企业网站建设运营方案
  • 课程网站开发过程东莞外贸模板建站
  • asp.net 网站提速廊坊企业官网搭建
  • 网站开发全过程电商数据分析
  • 代理 指定网站 hostwordpress图片无限放大
  • 中材建设有限公司招标网站包装设计网课答案
  • python云服务器网站开发实例外贸小家电网站推广
  • 郑州做网站公司中天猫商城的商品来源
  • 织梦网站首页互联网保险平台排名
  • 免费做链接的网站做动画相册在哪个网站好
  • 做思维导图好看的网站可以做富集分析的网站
  • wordpress 媒体库 cos百度网站怎样优化排名
  • 自助建站程序html样式模板
  • 公主岭网站建设筑梦网站建设
  • 昊源建设监理有限公司网站广州住房与城乡建设部网站
  • 如何免费建立自己网站wordpress媒体优化
  • 南京企业做网站网站建设的类型有几种
  • 不需要证件做网站相城区建设网站
  • 游戏推广网站如何做的全网投放广告的渠道有哪些
  • 飞数石家庄网站建设seo 关键词优化
  • 织梦新手网站建设建筑工程公司资质
  • 网站建设开什么名目外贸网站建设关键点
  • 大学生网站设计河南省建筑工程信息网