aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/conn_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/conn_pool.go')
-rw-r--r--vendor/github.com/jackc/pgx/conn_pool.go549
1 files changed, 0 insertions, 549 deletions
diff --git a/vendor/github.com/jackc/pgx/conn_pool.go b/vendor/github.com/jackc/pgx/conn_pool.go
deleted file mode 100644
index 6ca0ee0..0000000
--- a/vendor/github.com/jackc/pgx/conn_pool.go
+++ /dev/null
@@ -1,549 +0,0 @@
-package pgx
-
-import (
- "context"
- "sync"
- "time"
-
- "github.com/pkg/errors"
-
- "github.com/jackc/pgx/pgtype"
-)
-
-type ConnPoolConfig struct {
- ConnConfig
- MaxConnections int // max simultaneous connections to use, default 5, must be at least 2
- AfterConnect func(*Conn) error // function to call on every new connection
- AcquireTimeout time.Duration // max wait time when all connections are busy (0 means no timeout)
-}
-
-type ConnPool struct {
- allConnections []*Conn
- availableConnections []*Conn
- cond *sync.Cond
- config ConnConfig // config used when establishing connection
- inProgressConnects int
- maxConnections int
- resetCount int
- afterConnect func(*Conn) error
- logger Logger
- logLevel int
- closed bool
- preparedStatements map[string]*PreparedStatement
- acquireTimeout time.Duration
- connInfo *pgtype.ConnInfo
-}
-
-type ConnPoolStat struct {
- MaxConnections int // max simultaneous connections to use
- CurrentConnections int // current live connections
- AvailableConnections int // unused live connections
-}
-
-// ErrAcquireTimeout occurs when an attempt to acquire a connection times out.
-var ErrAcquireTimeout = errors.New("timeout acquiring connection from pool")
-
-// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool.
-var ErrClosedPool = errors.New("cannot acquire from closed pool")
-
-// NewConnPool creates a new ConnPool. config.ConnConfig is passed through to
-// Connect directly.
-func NewConnPool(config ConnPoolConfig) (p *ConnPool, err error) {
- p = new(ConnPool)
- p.config = config.ConnConfig
- p.connInfo = minimalConnInfo
- p.maxConnections = config.MaxConnections
- if p.maxConnections == 0 {
- p.maxConnections = 5
- }
- if p.maxConnections < 1 {
- return nil, errors.New("MaxConnections must be at least 1")
- }
- p.acquireTimeout = config.AcquireTimeout
- if p.acquireTimeout < 0 {
- return nil, errors.New("AcquireTimeout must be equal to or greater than 0")
- }
-
- p.afterConnect = config.AfterConnect
-
- if config.LogLevel != 0 {
- p.logLevel = config.LogLevel
- } else {
- // Preserve pre-LogLevel behavior by defaulting to LogLevelDebug
- p.logLevel = LogLevelDebug
- }
- p.logger = config.Logger
- if p.logger == nil {
- p.logLevel = LogLevelNone
- }
-
- p.allConnections = make([]*Conn, 0, p.maxConnections)
- p.availableConnections = make([]*Conn, 0, p.maxConnections)
- p.preparedStatements = make(map[string]*PreparedStatement)
- p.cond = sync.NewCond(new(sync.Mutex))
-
- // Initially establish one connection
- var c *Conn
- c, err = p.createConnection()
- if err != nil {
- return
- }
- p.allConnections = append(p.allConnections, c)
- p.availableConnections = append(p.availableConnections, c)
- p.connInfo = c.ConnInfo.DeepCopy()
-
- return
-}
-
-// Acquire takes exclusive use of a connection until it is released.
-func (p *ConnPool) Acquire() (*Conn, error) {
- p.cond.L.Lock()
- c, err := p.acquire(nil)
- p.cond.L.Unlock()
- return c, err
-}
-
-// deadlinePassed returns true if the given deadline has passed.
-func (p *ConnPool) deadlinePassed(deadline *time.Time) bool {
- return deadline != nil && time.Now().After(*deadline)
-}
-
-// acquire performs acquision assuming pool is already locked
-func (p *ConnPool) acquire(deadline *time.Time) (*Conn, error) {
- if p.closed {
- return nil, ErrClosedPool
- }
-
- // A connection is available
- if len(p.availableConnections) > 0 {
- c := p.availableConnections[len(p.availableConnections)-1]
- c.poolResetCount = p.resetCount
- p.availableConnections = p.availableConnections[:len(p.availableConnections)-1]
- return c, nil
- }
-
- // Set initial timeout/deadline value. If the method (acquire) happens to
- // recursively call itself the deadline should retain its value.
- if deadline == nil && p.acquireTimeout > 0 {
- tmp := time.Now().Add(p.acquireTimeout)
- deadline = &tmp
- }
-
- // Make sure the deadline (if it is) has not passed yet
- if p.deadlinePassed(deadline) {
- return nil, ErrAcquireTimeout
- }
-
- // If there is a deadline then start a timeout timer
- var timer *time.Timer
- if deadline != nil {
- timer = time.AfterFunc(deadline.Sub(time.Now()), func() {
- p.cond.Broadcast()
- })
- defer timer.Stop()
- }
-
- // No connections are available, but we can create more
- if len(p.allConnections)+p.inProgressConnects < p.maxConnections {
- // Create a new connection.
- // Careful here: createConnectionUnlocked() removes the current lock,
- // creates a connection and then locks it back.
- c, err := p.createConnectionUnlocked()
- if err != nil {
- return nil, err
- }
- c.poolResetCount = p.resetCount
- p.allConnections = append(p.allConnections, c)
- return c, nil
- }
- // All connections are in use and we cannot create more
- if p.logLevel >= LogLevelWarn {
- p.logger.Log(LogLevelWarn, "waiting for available connection", nil)
- }
-
- // Wait until there is an available connection OR room to create a new connection
- for len(p.availableConnections) == 0 && len(p.allConnections)+p.inProgressConnects == p.maxConnections {
- if p.deadlinePassed(deadline) {
- return nil, ErrAcquireTimeout
- }
- p.cond.Wait()
- }
-
- // Stop the timer so that we do not spawn it on every acquire call.
- if timer != nil {
- timer.Stop()
- }
- return p.acquire(deadline)
-}
-
-// Release gives up use of a connection.
-func (p *ConnPool) Release(conn *Conn) {
- if conn.ctxInProgress {
- panic("should never release when context is in progress")
- }
-
- if conn.txStatus != 'I' {
- conn.Exec("rollback")
- }
-
- if len(conn.channels) > 0 {
- if err := conn.Unlisten("*"); err != nil {
- conn.die(err)
- }
- conn.channels = make(map[string]struct{})
- }
- conn.notifications = nil
-
- p.cond.L.Lock()
-
- if conn.poolResetCount != p.resetCount {
- conn.Close()
- p.cond.L.Unlock()
- p.cond.Signal()
- return
- }
-
- if conn.IsAlive() {
- p.availableConnections = append(p.availableConnections, conn)
- } else {
- p.removeFromAllConnections(conn)
- }
- p.cond.L.Unlock()
- p.cond.Signal()
-}
-
-// removeFromAllConnections Removes the given connection from the list.
-// It returns true if the connection was found and removed or false otherwise.
-func (p *ConnPool) removeFromAllConnections(conn *Conn) bool {
- for i, c := range p.allConnections {
- if conn == c {
- p.allConnections = append(p.allConnections[:i], p.allConnections[i+1:]...)
- return true
- }
- }
- return false
-}
-
-// Close ends the use of a connection pool. It prevents any new connections from
-// being acquired and closes available underlying connections. Any acquired
-// connections will be closed when they are released.
-func (p *ConnPool) Close() {
- p.cond.L.Lock()
- defer p.cond.L.Unlock()
-
- p.closed = true
-
- for _, c := range p.availableConnections {
- _ = c.Close()
- }
-
- // This will cause any checked out connections to be closed on release
- p.resetCount++
-}
-
-// Reset closes all open connections, but leaves the pool open. It is intended
-// for use when an error is detected that would disrupt all connections (such as
-// a network interruption or a server state change).
-//
-// It is safe to reset a pool while connections are checked out. Those
-// connections will be closed when they are returned to the pool.
-func (p *ConnPool) Reset() {
- p.cond.L.Lock()
- defer p.cond.L.Unlock()
-
- p.resetCount++
- p.allConnections = p.allConnections[0:0]
-
- for _, conn := range p.availableConnections {
- conn.Close()
- }
-
- p.availableConnections = p.availableConnections[0:0]
-}
-
-// invalidateAcquired causes all acquired connections to be closed when released.
-// The pool must already be locked.
-func (p *ConnPool) invalidateAcquired() {
- p.resetCount++
-
- for _, c := range p.availableConnections {
- c.poolResetCount = p.resetCount
- }
-
- p.allConnections = p.allConnections[:len(p.availableConnections)]
- copy(p.allConnections, p.availableConnections)
-}
-
-// Stat returns connection pool statistics
-func (p *ConnPool) Stat() (s ConnPoolStat) {
- p.cond.L.Lock()
- defer p.cond.L.Unlock()
-
- s.MaxConnections = p.maxConnections
- s.CurrentConnections = len(p.allConnections)
- s.AvailableConnections = len(p.availableConnections)
- return
-}
-
-func (p *ConnPool) createConnection() (*Conn, error) {
- c, err := connect(p.config, p.connInfo)
- if err != nil {
- return nil, err
- }
- return p.afterConnectionCreated(c)
-}
-
-// createConnectionUnlocked Removes the current lock, creates a new connection, and
-// then locks it back.
-// Here is the point: lets say our pool dialer's OpenTimeout is set to 3 seconds.
-// And we have a pool with 20 connections in it, and we try to acquire them all at
-// startup.
-// If it happens that the remote server is not accessible, then the first connection
-// in the pool blocks all the others for 3 secs, before it gets the timeout. Then
-// connection #2 holds the lock and locks everything for the next 3 secs until it
-// gets OpenTimeout err, etc. And the very last 20th connection will fail only after
-// 3 * 20 = 60 secs.
-// To avoid this we put Connect(p.config) outside of the lock (it is thread safe)
-// what would allow us to make all the 20 connection in parallel (more or less).
-func (p *ConnPool) createConnectionUnlocked() (*Conn, error) {
- p.inProgressConnects++
- p.cond.L.Unlock()
- c, err := Connect(p.config)
- p.cond.L.Lock()
- p.inProgressConnects--
-
- if err != nil {
- return nil, err
- }
- return p.afterConnectionCreated(c)
-}
-
-// afterConnectionCreated executes (if it is) afterConnect() callback and prepares
-// all the known statements for the new connection.
-func (p *ConnPool) afterConnectionCreated(c *Conn) (*Conn, error) {
- if p.afterConnect != nil {
- err := p.afterConnect(c)
- if err != nil {
- c.die(err)
- return nil, err
- }
- }
-
- for _, ps := range p.preparedStatements {
- if _, err := c.Prepare(ps.Name, ps.SQL); err != nil {
- c.die(err)
- return nil, err
- }
- }
-
- return c, nil
-}
-
-// Exec acquires a connection, delegates the call to that connection, and releases the connection
-func (p *ConnPool) Exec(sql string, arguments ...interface{}) (commandTag CommandTag, err error) {
- var c *Conn
- if c, err = p.Acquire(); err != nil {
- return
- }
- defer p.Release(c)
-
- return c.Exec(sql, arguments...)
-}
-
-func (p *ConnPool) ExecEx(ctx context.Context, sql string, options *QueryExOptions, arguments ...interface{}) (commandTag CommandTag, err error) {
- var c *Conn
- if c, err = p.Acquire(); err != nil {
- return
- }
- defer p.Release(c)
-
- return c.ExecEx(ctx, sql, options, arguments...)
-}
-
-// Query acquires a connection and delegates the call to that connection. When
-// *Rows are closed, the connection is released automatically.
-func (p *ConnPool) Query(sql string, args ...interface{}) (*Rows, error) {
- c, err := p.Acquire()
- if err != nil {
- // Because checking for errors can be deferred to the *Rows, build one with the error
- return &Rows{closed: true, err: err}, err
- }
-
- rows, err := c.Query(sql, args...)
- if err != nil {
- p.Release(c)
- return rows, err
- }
-
- rows.connPool = p
-
- return rows, nil
-}
-
-func (p *ConnPool) QueryEx(ctx context.Context, sql string, options *QueryExOptions, args ...interface{}) (*Rows, error) {
- c, err := p.Acquire()
- if err != nil {
- // Because checking for errors can be deferred to the *Rows, build one with the error
- return &Rows{closed: true, err: err}, err
- }
-
- rows, err := c.QueryEx(ctx, sql, options, args...)
- if err != nil {
- p.Release(c)
- return rows, err
- }
-
- rows.connPool = p
-
- return rows, nil
-}
-
-// QueryRow acquires a connection and delegates the call to that connection. The
-// connection is released automatically after Scan is called on the returned
-// *Row.
-func (p *ConnPool) QueryRow(sql string, args ...interface{}) *Row {
- rows, _ := p.Query(sql, args...)
- return (*Row)(rows)
-}
-
-func (p *ConnPool) QueryRowEx(ctx context.Context, sql string, options *QueryExOptions, args ...interface{}) *Row {
- rows, _ := p.QueryEx(ctx, sql, options, args...)
- return (*Row)(rows)
-}
-
-// Begin acquires a connection and begins a transaction on it. When the
-// transaction is closed the connection will be automatically released.
-func (p *ConnPool) Begin() (*Tx, error) {
- return p.BeginEx(context.Background(), nil)
-}
-
-// Prepare creates a prepared statement on a connection in the pool to test the
-// statement is valid. If it succeeds all connections accessed through the pool
-// will have the statement available.
-//
-// Prepare creates a prepared statement with name and sql. sql can contain
-// placeholders for bound parameters. These placeholders are referenced
-// positional as $1, $2, etc.
-//
-// Prepare is idempotent; i.e. it is safe to call Prepare multiple times with
-// the same name and sql arguments. This allows a code path to Prepare and
-// Query/Exec/PrepareEx without concern for if the statement has already been prepared.
-func (p *ConnPool) Prepare(name, sql string) (*PreparedStatement, error) {
- return p.PrepareEx(context.Background(), name, sql, nil)
-}
-
-// PrepareEx creates a prepared statement on a connection in the pool to test the
-// statement is valid. If it succeeds all connections accessed through the pool
-// will have the statement available.
-//
-// PrepareEx creates a prepared statement with name and sql. sql can contain placeholders
-// for bound parameters. These placeholders are referenced positional as $1, $2, etc.
-// It defers from Prepare as it allows additional options (such as parameter OIDs) to be passed via struct
-//
-// PrepareEx is idempotent; i.e. it is safe to call PrepareEx multiple times with the same
-// name and sql arguments. This allows a code path to PrepareEx and Query/Exec/Prepare without
-// concern for if the statement has already been prepared.
-func (p *ConnPool) PrepareEx(ctx context.Context, name, sql string, opts *PrepareExOptions) (*PreparedStatement, error) {
- p.cond.L.Lock()
- defer p.cond.L.Unlock()
-
- if ps, ok := p.preparedStatements[name]; ok && ps.SQL == sql {
- return ps, nil
- }
-
- c, err := p.acquire(nil)
- if err != nil {
- return nil, err
- }
-
- p.availableConnections = append(p.availableConnections, c)
-
- // Double check that the statement was not prepared by someone else
- // while we were acquiring the connection (since acquire is not fully
- // blocking now, see createConnectionUnlocked())
- if ps, ok := p.preparedStatements[name]; ok && ps.SQL == sql {
- return ps, nil
- }
-
- ps, err := c.PrepareEx(ctx, name, sql, opts)
- if err != nil {
- return nil, err
- }
-
- for _, c := range p.availableConnections {
- _, err := c.PrepareEx(ctx, name, sql, opts)
- if err != nil {
- return nil, err
- }
- }
-
- p.invalidateAcquired()
- p.preparedStatements[name] = ps
-
- return ps, err
-}
-
-// Deallocate releases a prepared statement from all connections in the pool.
-func (p *ConnPool) Deallocate(name string) (err error) {
- p.cond.L.Lock()
- defer p.cond.L.Unlock()
-
- for _, c := range p.availableConnections {
- if err := c.Deallocate(name); err != nil {
- return err
- }
- }
-
- p.invalidateAcquired()
- delete(p.preparedStatements, name)
-
- return nil
-}
-
-// BeginEx acquires a connection and starts a transaction with txOptions
-// determining the transaction mode. When the transaction is closed the
-// connection will be automatically released.
-func (p *ConnPool) BeginEx(ctx context.Context, txOptions *TxOptions) (*Tx, error) {
- for {
- c, err := p.Acquire()
- if err != nil {
- return nil, err
- }
-
- tx, err := c.BeginEx(ctx, txOptions)
- if err != nil {
- alive := c.IsAlive()
- p.Release(c)
-
- // If connection is still alive then the error is not something trying
- // again on a new connection would fix, so just return the error. But
- // if the connection is dead try to acquire a new connection and try
- // again.
- if alive || ctx.Err() != nil {
- return nil, err
- }
- continue
- }
-
- tx.connPool = p
- return tx, nil
- }
-}
-
-// CopyFrom acquires a connection, delegates the call to that connection, and releases the connection
-func (p *ConnPool) CopyFrom(tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int, error) {
- c, err := p.Acquire()
- if err != nil {
- return 0, err
- }
- defer p.Release(c)
-
- return c.CopyFrom(tableName, columnNames, rowSrc)
-}
-
-// BeginBatch acquires a connection and begins a batch on that connection. When
-// *Batch is finished, the connection is released automatically.
-func (p *ConnPool) BeginBatch() *Batch {
- c, err := p.Acquire()
- return &Batch{conn: c, connPool: p, err: err}
-}