diff options
Diffstat (limited to 'vendor/github.com/jackc/pgx/batch.go')
| -rw-r--r-- | vendor/github.com/jackc/pgx/batch.go | 313 |
1 files changed, 0 insertions, 313 deletions
diff --git a/vendor/github.com/jackc/pgx/batch.go b/vendor/github.com/jackc/pgx/batch.go deleted file mode 100644 index 0d7f14c..0000000 --- a/vendor/github.com/jackc/pgx/batch.go +++ /dev/null @@ -1,313 +0,0 @@ -package pgx - -import ( - "context" - - "github.com/jackc/pgx/pgproto3" - "github.com/jackc/pgx/pgtype" -) - -type batchItem struct { - query string - arguments []interface{} - parameterOIDs []pgtype.OID - resultFormatCodes []int16 -} - -// Batch queries are a way of bundling multiple queries together to avoid -// unnecessary network round trips. -type Batch struct { - conn *Conn - connPool *ConnPool - items []*batchItem - resultsRead int - pendingCommandComplete bool - ctx context.Context - err error - inTx bool -} - -// BeginBatch returns a *Batch query for c. -func (c *Conn) BeginBatch() *Batch { - return &Batch{conn: c} -} - -// BeginBatch returns a *Batch query for tx. Since this *Batch is already part -// of a transaction it will not automatically be wrapped in a transaction. -func (tx *Tx) BeginBatch() *Batch { - return &Batch{conn: tx.conn, inTx: true} -} - -// Conn returns the underlying connection that b will or was performed on. -func (b *Batch) Conn() *Conn { - return b.conn -} - -// Queue queues a query to batch b. parameterOIDs are required if there are -// parameters and query is not the name of a prepared statement. -// resultFormatCodes are required if there is a result. -func (b *Batch) Queue(query string, arguments []interface{}, parameterOIDs []pgtype.OID, resultFormatCodes []int16) { - b.items = append(b.items, &batchItem{ - query: query, - arguments: arguments, - parameterOIDs: parameterOIDs, - resultFormatCodes: resultFormatCodes, - }) -} - -// Send sends all queued queries to the server at once. -// If the batch is created from a conn Object then All queries are wrapped -// in a transaction. The transaction can optionally be configured with -// txOptions. The context is in effect until the Batch is closed. -// -// Warning: Send writes all queued queries before reading any results. This can -// cause a deadlock if an excessive number of queries are queued. It is highly -// advisable to use a timeout context to protect against this possibility. -// Unfortunately, this excessive number can vary based on operating system, -// connection type (TCP or Unix domain socket), and type of query. Unix domain -// sockets seem to be much more susceptible to this issue than TCP connections. -// However, it usually is at least several thousand. -// -// The deadlock occurs when the batched queries to be sent are so large that the -// PostgreSQL server cannot receive it all at once. PostgreSQL received some of -// the queued queries and starts executing them. As PostgreSQL executes the -// queries it sends responses back. pgx will not read any of these responses -// until it has finished sending. Therefore, if all network buffers are full pgx -// will not be able to finish sending the queries and PostgreSQL will not be -// able to finish sending the responses. -// -// See https://github.com/jackc/pgx/issues/374. -func (b *Batch) Send(ctx context.Context, txOptions *TxOptions) error { - if b.err != nil { - return b.err - } - - b.ctx = ctx - - err := b.conn.waitForPreviousCancelQuery(ctx) - if err != nil { - return err - } - - if err := b.conn.ensureConnectionReadyForQuery(); err != nil { - return err - } - - buf := b.conn.wbuf - if !b.inTx { - buf = appendQuery(buf, txOptions.beginSQL()) - } - - err = b.conn.initContext(ctx) - if err != nil { - return err - } - - for _, bi := range b.items { - var psName string - var psParameterOIDs []pgtype.OID - - if ps, ok := b.conn.preparedStatements[bi.query]; ok { - psName = ps.Name - psParameterOIDs = ps.ParameterOIDs - } else { - psParameterOIDs = bi.parameterOIDs - buf = appendParse(buf, "", bi.query, psParameterOIDs) - } - - var err error - buf, err = appendBind(buf, "", psName, b.conn.ConnInfo, psParameterOIDs, bi.arguments, bi.resultFormatCodes) - if err != nil { - return err - } - - buf = appendDescribe(buf, 'P', "") - buf = appendExecute(buf, "", 0) - } - - buf = appendSync(buf) - b.conn.pendingReadyForQueryCount++ - - if !b.inTx { - buf = appendQuery(buf, "commit") - b.conn.pendingReadyForQueryCount++ - } - - n, err := b.conn.conn.Write(buf) - if err != nil { - if fatalWriteErr(n, err) { - b.conn.die(err) - } - return err - } - - for !b.inTx { - msg, err := b.conn.rxMsg() - if err != nil { - return err - } - - switch msg := msg.(type) { - case *pgproto3.ReadyForQuery: - return nil - default: - if err := b.conn.processContextFreeMsg(msg); err != nil { - return err - } - } - } - - return nil -} - -// ExecResults reads the results from the next query in the batch as if the -// query has been sent with Exec. -func (b *Batch) ExecResults() (CommandTag, error) { - if b.err != nil { - return "", b.err - } - - select { - case <-b.ctx.Done(): - b.die(b.ctx.Err()) - return "", b.ctx.Err() - default: - } - - if err := b.ensureCommandComplete(); err != nil { - b.die(err) - return "", err - } - - b.resultsRead++ - - b.pendingCommandComplete = true - - for { - msg, err := b.conn.rxMsg() - if err != nil { - return "", err - } - - switch msg := msg.(type) { - case *pgproto3.CommandComplete: - b.pendingCommandComplete = false - return CommandTag(msg.CommandTag), nil - default: - if err := b.conn.processContextFreeMsg(msg); err != nil { - return "", err - } - } - } -} - -// QueryResults reads the results from the next query in the batch as if the -// query has been sent with Query. -func (b *Batch) QueryResults() (*Rows, error) { - rows := b.conn.getRows("batch query", nil) - - if b.err != nil { - rows.fatal(b.err) - return rows, b.err - } - - select { - case <-b.ctx.Done(): - b.die(b.ctx.Err()) - rows.fatal(b.err) - return rows, b.ctx.Err() - default: - } - - if err := b.ensureCommandComplete(); err != nil { - b.die(err) - rows.fatal(err) - return rows, err - } - - b.resultsRead++ - - b.pendingCommandComplete = true - - fieldDescriptions, err := b.conn.readUntilRowDescription() - if err != nil { - b.die(err) - rows.fatal(b.err) - return rows, err - } - - rows.batch = b - rows.fields = fieldDescriptions - return rows, nil -} - -// QueryRowResults reads the results from the next query in the batch as if the -// query has been sent with QueryRow. -func (b *Batch) QueryRowResults() *Row { - rows, _ := b.QueryResults() - return (*Row)(rows) - -} - -// Close closes the batch operation. Any error that occured during a batch -// operation may have made it impossible to resyncronize the connection with the -// server. In this case the underlying connection will have been closed. -func (b *Batch) Close() (err error) { - if b.err != nil { - return b.err - } - - defer func() { - err = b.conn.termContext(err) - if b.conn != nil && b.connPool != nil { - b.connPool.Release(b.conn) - } - }() - - for i := b.resultsRead; i < len(b.items); i++ { - if _, err = b.ExecResults(); err != nil { - return err - } - } - - if err = b.conn.ensureConnectionReadyForQuery(); err != nil { - return err - } - - return nil -} - -func (b *Batch) die(err error) { - if b.err != nil { - return - } - - b.err = err - b.conn.die(err) - - if b.conn != nil && b.connPool != nil { - b.connPool.Release(b.conn) - } -} - -func (b *Batch) ensureCommandComplete() error { - for b.pendingCommandComplete { - msg, err := b.conn.rxMsg() - if err != nil { - return err - } - - switch msg := msg.(type) { - case *pgproto3.CommandComplete: - b.pendingCommandComplete = false - return nil - default: - err = b.conn.processContextFreeMsg(msg) - if err != nil { - return err - } - } - } - - return nil -} |
