aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/batch.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/batch.go')
-rw-r--r--vendor/github.com/jackc/pgx/batch.go313
1 files changed, 0 insertions, 313 deletions
diff --git a/vendor/github.com/jackc/pgx/batch.go b/vendor/github.com/jackc/pgx/batch.go
deleted file mode 100644
index 0d7f14c..0000000
--- a/vendor/github.com/jackc/pgx/batch.go
+++ /dev/null
@@ -1,313 +0,0 @@
-package pgx
-
-import (
- "context"
-
- "github.com/jackc/pgx/pgproto3"
- "github.com/jackc/pgx/pgtype"
-)
-
-type batchItem struct {
- query string
- arguments []interface{}
- parameterOIDs []pgtype.OID
- resultFormatCodes []int16
-}
-
-// Batch queries are a way of bundling multiple queries together to avoid
-// unnecessary network round trips.
-type Batch struct {
- conn *Conn
- connPool *ConnPool
- items []*batchItem
- resultsRead int
- pendingCommandComplete bool
- ctx context.Context
- err error
- inTx bool
-}
-
-// BeginBatch returns a *Batch query for c.
-func (c *Conn) BeginBatch() *Batch {
- return &Batch{conn: c}
-}
-
-// BeginBatch returns a *Batch query for tx. Since this *Batch is already part
-// of a transaction it will not automatically be wrapped in a transaction.
-func (tx *Tx) BeginBatch() *Batch {
- return &Batch{conn: tx.conn, inTx: true}
-}
-
-// Conn returns the underlying connection that b will or was performed on.
-func (b *Batch) Conn() *Conn {
- return b.conn
-}
-
-// Queue queues a query to batch b. parameterOIDs are required if there are
-// parameters and query is not the name of a prepared statement.
-// resultFormatCodes are required if there is a result.
-func (b *Batch) Queue(query string, arguments []interface{}, parameterOIDs []pgtype.OID, resultFormatCodes []int16) {
- b.items = append(b.items, &batchItem{
- query: query,
- arguments: arguments,
- parameterOIDs: parameterOIDs,
- resultFormatCodes: resultFormatCodes,
- })
-}
-
-// Send sends all queued queries to the server at once.
-// If the batch is created from a conn Object then All queries are wrapped
-// in a transaction. The transaction can optionally be configured with
-// txOptions. The context is in effect until the Batch is closed.
-//
-// Warning: Send writes all queued queries before reading any results. This can
-// cause a deadlock if an excessive number of queries are queued. It is highly
-// advisable to use a timeout context to protect against this possibility.
-// Unfortunately, this excessive number can vary based on operating system,
-// connection type (TCP or Unix domain socket), and type of query. Unix domain
-// sockets seem to be much more susceptible to this issue than TCP connections.
-// However, it usually is at least several thousand.
-//
-// The deadlock occurs when the batched queries to be sent are so large that the
-// PostgreSQL server cannot receive it all at once. PostgreSQL received some of
-// the queued queries and starts executing them. As PostgreSQL executes the
-// queries it sends responses back. pgx will not read any of these responses
-// until it has finished sending. Therefore, if all network buffers are full pgx
-// will not be able to finish sending the queries and PostgreSQL will not be
-// able to finish sending the responses.
-//
-// See https://github.com/jackc/pgx/issues/374.
-func (b *Batch) Send(ctx context.Context, txOptions *TxOptions) error {
- if b.err != nil {
- return b.err
- }
-
- b.ctx = ctx
-
- err := b.conn.waitForPreviousCancelQuery(ctx)
- if err != nil {
- return err
- }
-
- if err := b.conn.ensureConnectionReadyForQuery(); err != nil {
- return err
- }
-
- buf := b.conn.wbuf
- if !b.inTx {
- buf = appendQuery(buf, txOptions.beginSQL())
- }
-
- err = b.conn.initContext(ctx)
- if err != nil {
- return err
- }
-
- for _, bi := range b.items {
- var psName string
- var psParameterOIDs []pgtype.OID
-
- if ps, ok := b.conn.preparedStatements[bi.query]; ok {
- psName = ps.Name
- psParameterOIDs = ps.ParameterOIDs
- } else {
- psParameterOIDs = bi.parameterOIDs
- buf = appendParse(buf, "", bi.query, psParameterOIDs)
- }
-
- var err error
- buf, err = appendBind(buf, "", psName, b.conn.ConnInfo, psParameterOIDs, bi.arguments, bi.resultFormatCodes)
- if err != nil {
- return err
- }
-
- buf = appendDescribe(buf, 'P', "")
- buf = appendExecute(buf, "", 0)
- }
-
- buf = appendSync(buf)
- b.conn.pendingReadyForQueryCount++
-
- if !b.inTx {
- buf = appendQuery(buf, "commit")
- b.conn.pendingReadyForQueryCount++
- }
-
- n, err := b.conn.conn.Write(buf)
- if err != nil {
- if fatalWriteErr(n, err) {
- b.conn.die(err)
- }
- return err
- }
-
- for !b.inTx {
- msg, err := b.conn.rxMsg()
- if err != nil {
- return err
- }
-
- switch msg := msg.(type) {
- case *pgproto3.ReadyForQuery:
- return nil
- default:
- if err := b.conn.processContextFreeMsg(msg); err != nil {
- return err
- }
- }
- }
-
- return nil
-}
-
-// ExecResults reads the results from the next query in the batch as if the
-// query has been sent with Exec.
-func (b *Batch) ExecResults() (CommandTag, error) {
- if b.err != nil {
- return "", b.err
- }
-
- select {
- case <-b.ctx.Done():
- b.die(b.ctx.Err())
- return "", b.ctx.Err()
- default:
- }
-
- if err := b.ensureCommandComplete(); err != nil {
- b.die(err)
- return "", err
- }
-
- b.resultsRead++
-
- b.pendingCommandComplete = true
-
- for {
- msg, err := b.conn.rxMsg()
- if err != nil {
- return "", err
- }
-
- switch msg := msg.(type) {
- case *pgproto3.CommandComplete:
- b.pendingCommandComplete = false
- return CommandTag(msg.CommandTag), nil
- default:
- if err := b.conn.processContextFreeMsg(msg); err != nil {
- return "", err
- }
- }
- }
-}
-
-// QueryResults reads the results from the next query in the batch as if the
-// query has been sent with Query.
-func (b *Batch) QueryResults() (*Rows, error) {
- rows := b.conn.getRows("batch query", nil)
-
- if b.err != nil {
- rows.fatal(b.err)
- return rows, b.err
- }
-
- select {
- case <-b.ctx.Done():
- b.die(b.ctx.Err())
- rows.fatal(b.err)
- return rows, b.ctx.Err()
- default:
- }
-
- if err := b.ensureCommandComplete(); err != nil {
- b.die(err)
- rows.fatal(err)
- return rows, err
- }
-
- b.resultsRead++
-
- b.pendingCommandComplete = true
-
- fieldDescriptions, err := b.conn.readUntilRowDescription()
- if err != nil {
- b.die(err)
- rows.fatal(b.err)
- return rows, err
- }
-
- rows.batch = b
- rows.fields = fieldDescriptions
- return rows, nil
-}
-
-// QueryRowResults reads the results from the next query in the batch as if the
-// query has been sent with QueryRow.
-func (b *Batch) QueryRowResults() *Row {
- rows, _ := b.QueryResults()
- return (*Row)(rows)
-
-}
-
-// Close closes the batch operation. Any error that occured during a batch
-// operation may have made it impossible to resyncronize the connection with the
-// server. In this case the underlying connection will have been closed.
-func (b *Batch) Close() (err error) {
- if b.err != nil {
- return b.err
- }
-
- defer func() {
- err = b.conn.termContext(err)
- if b.conn != nil && b.connPool != nil {
- b.connPool.Release(b.conn)
- }
- }()
-
- for i := b.resultsRead; i < len(b.items); i++ {
- if _, err = b.ExecResults(); err != nil {
- return err
- }
- }
-
- if err = b.conn.ensureConnectionReadyForQuery(); err != nil {
- return err
- }
-
- return nil
-}
-
-func (b *Batch) die(err error) {
- if b.err != nil {
- return
- }
-
- b.err = err
- b.conn.die(err)
-
- if b.conn != nil && b.connPool != nil {
- b.connPool.Release(b.conn)
- }
-}
-
-func (b *Batch) ensureCommandComplete() error {
- for b.pendingCommandComplete {
- msg, err := b.conn.rxMsg()
- if err != nil {
- return err
- }
-
- switch msg := msg.(type) {
- case *pgproto3.CommandComplete:
- b.pendingCommandComplete = false
- return nil
- default:
- err = b.conn.processContextFreeMsg(msg)
- if err != nil {
- return err
- }
- }
- }
-
- return nil
-}