aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/jackc/pgx/replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/jackc/pgx/replication.go')
-rw-r--r--vendor/github.com/jackc/pgx/replication.go198
1 files changed, 114 insertions, 84 deletions
diff --git a/vendor/github.com/jackc/pgx/replication.go b/vendor/github.com/jackc/pgx/replication.go
index 7b28d6b..7dd5efe 100644
--- a/vendor/github.com/jackc/pgx/replication.go
+++ b/vendor/github.com/jackc/pgx/replication.go
@@ -1,10 +1,15 @@
package pgx
import (
- "errors"
+ "context"
+ "encoding/binary"
"fmt"
- "net"
"time"
+
+ "github.com/pkg/errors"
+
+ "github.com/jackc/pgx/pgio"
+ "github.com/jackc/pgx/pgproto3"
)
const (
@@ -172,17 +177,21 @@ type ReplicationConn struct {
// message to the server, as well as carries the WAL position of the
// client, which then updates the server's replication slot position.
func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) {
- writeBuf := newWriteBuf(rc.c, copyData)
- writeBuf.WriteByte(standbyStatusUpdate)
- writeBuf.WriteInt64(int64(k.WalWritePosition))
- writeBuf.WriteInt64(int64(k.WalFlushPosition))
- writeBuf.WriteInt64(int64(k.WalApplyPosition))
- writeBuf.WriteInt64(int64(k.ClientTime))
- writeBuf.WriteByte(k.ReplyRequested)
+ buf := rc.c.wbuf
+ buf = append(buf, copyData)
+ sp := len(buf)
+ buf = pgio.AppendInt32(buf, -1)
+
+ buf = append(buf, standbyStatusUpdate)
+ buf = pgio.AppendInt64(buf, int64(k.WalWritePosition))
+ buf = pgio.AppendInt64(buf, int64(k.WalFlushPosition))
+ buf = pgio.AppendInt64(buf, int64(k.WalApplyPosition))
+ buf = pgio.AppendInt64(buf, int64(k.ClientTime))
+ buf = append(buf, k.ReplyRequested)
- writeBuf.closeMsg()
+ pgio.SetInt32(buf[sp:], int32(len(buf[sp:])))
- _, err = rc.c.conn.Write(writeBuf.buf)
+ _, err = rc.c.conn.Write(buf)
if err != nil {
rc.c.die(err)
}
@@ -203,107 +212,115 @@ func (rc *ReplicationConn) CauseOfDeath() error {
}
func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) {
- var t byte
- var reader *msgReader
- t, reader, err = rc.c.rxMsg()
+ msg, err := rc.c.rxMsg()
if err != nil {
return
}
- switch t {
- case noticeResponse:
- pgError := rc.c.rxErrorResponse(reader)
+ switch msg := msg.(type) {
+ case *pgproto3.NoticeResponse:
+ pgError := rc.c.rxErrorResponse((*pgproto3.ErrorResponse)(msg))
if rc.c.shouldLog(LogLevelInfo) {
- rc.c.log(LogLevelInfo, pgError.Error())
+ rc.c.log(LogLevelInfo, pgError.Error(), nil)
}
- case errorResponse:
- err = rc.c.rxErrorResponse(reader)
+ case *pgproto3.ErrorResponse:
+ err = rc.c.rxErrorResponse(msg)
if rc.c.shouldLog(LogLevelError) {
- rc.c.log(LogLevelError, err.Error())
+ rc.c.log(LogLevelError, err.Error(), nil)
}
return
- case copyBothResponse:
+ case *pgproto3.CopyBothResponse:
// This is the tail end of the replication process start,
// and can be safely ignored
return
- case copyData:
- var msgType byte
- msgType = reader.readByte()
+ case *pgproto3.CopyData:
+ msgType := msg.Data[0]
+ rp := 1
+
switch msgType {
case walData:
- walStart := reader.readInt64()
- serverWalEnd := reader.readInt64()
- serverTime := reader.readInt64()
- walData := reader.readBytes(reader.msgBytesRemaining)
- walMessage := WalMessage{WalStart: uint64(walStart),
- ServerWalEnd: uint64(serverWalEnd),
- ServerTime: uint64(serverTime),
+ walStart := binary.BigEndian.Uint64(msg.Data[rp:])
+ rp += 8
+ serverWalEnd := binary.BigEndian.Uint64(msg.Data[rp:])
+ rp += 8
+ serverTime := binary.BigEndian.Uint64(msg.Data[rp:])
+ rp += 8
+ walData := msg.Data[rp:]
+ walMessage := WalMessage{WalStart: walStart,
+ ServerWalEnd: serverWalEnd,
+ ServerTime: serverTime,
WalData: walData,
}
return &ReplicationMessage{WalMessage: &walMessage}, nil
case senderKeepalive:
- serverWalEnd := reader.readInt64()
- serverTime := reader.readInt64()
- replyNow := reader.readByte()
- h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow}
+ serverWalEnd := binary.BigEndian.Uint64(msg.Data[rp:])
+ rp += 8
+ serverTime := binary.BigEndian.Uint64(msg.Data[rp:])
+ rp += 8
+ replyNow := msg.Data[rp]
+ rp += 1
+ h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow}
return &ReplicationMessage{ServerHeartbeat: h}, nil
default:
if rc.c.shouldLog(LogLevelError) {
- rc.c.log(LogLevelError, "Unexpected data playload message type %v", t)
+ rc.c.log(LogLevelError, "Unexpected data playload message type", map[string]interface{}{"type": msgType})
}
}
default:
if rc.c.shouldLog(LogLevelError) {
- rc.c.log(LogLevelError, "Unexpected replication message type %v", t)
+ rc.c.log(LogLevelError, "Unexpected replication message type", map[string]interface{}{"type": msg})
}
}
return
}
-// Wait for a single replication message up to timeout time.
+// Wait for a single replication message.
//
// Properly using this requires some knowledge of the postgres replication mechanisms,
// as the client can receive both WAL data (the ultimate payload) and server heartbeat
// updates. The caller also must send standby status updates in order to keep the connection
// alive and working.
//
-// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified
-// duration.
-func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) {
- var zeroTime time.Time
-
- deadline := time.Now().Add(timeout)
-
- // Use SetReadDeadline to implement the timeout. SetReadDeadline will
- // cause operations to fail with a *net.OpError that has a Timeout()
- // of true. Because the normal pgx rxMsg path considers any error to
- // have potentially corrupted the state of the connection, it dies
- // on any errors. So to avoid timeout errors in rxMsg we set the
- // deadline and peek into the reader. If a timeout error occurs there
- // we don't break the pgx connection. If the Peek returns that data
- // is available then we turn off the read deadline before the rxMsg.
- err = rc.c.conn.SetReadDeadline(deadline)
- if err != nil {
- return nil, err
+// This returns the context error when there is no replication message before
+// the context is canceled.
+func (rc *ReplicationConn) WaitForReplicationMessage(ctx context.Context) (*ReplicationMessage, error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
}
- // Wait until there is a byte available before continuing onto the normal msg reading path
- _, err = rc.c.reader.Peek(1)
- if err != nil {
- rc.c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline
- if err, ok := err.(*net.OpError); ok && err.Timeout() {
- return nil, ErrNotificationTimeout
+ go func() {
+ select {
+ case <-ctx.Done():
+ if err := rc.c.conn.SetDeadline(time.Now()); err != nil {
+ rc.Close() // Close connection if unable to set deadline
+ return
+ }
+ rc.c.closedChan <- ctx.Err()
+ case <-rc.c.doneChan:
}
- return nil, err
- }
+ }()
- err = rc.c.conn.SetReadDeadline(zeroTime)
- if err != nil {
- return nil, err
+ r, opErr := rc.readReplicationMessage()
+
+ var err error
+ select {
+ case err = <-rc.c.closedChan:
+ if err := rc.c.conn.SetDeadline(time.Time{}); err != nil {
+ rc.Close() // Close connection if unable to disable deadline
+ return nil, err
+ }
+
+ if opErr == nil {
+ err = nil
+ }
+ case rc.c.doneChan <- struct{}{}:
+ err = opErr
}
- return rc.readReplicationMessage()
+ return r, err
}
func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) {
@@ -312,32 +329,30 @@ func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) {
rows := rc.c.getRows(sql, nil)
if err := rc.c.lock(); err != nil {
- rows.abort(err)
+ rows.fatal(err)
return rows, err
}
rows.unlockConn = true
err := rc.c.sendSimpleQuery(sql)
if err != nil {
- rows.abort(err)
+ rows.fatal(err)
}
- var t byte
- var r *msgReader
- t, r, err = rc.c.rxMsg()
+ msg, err := rc.c.rxMsg()
if err != nil {
return nil, err
}
- switch t {
- case rowDescription:
- rows.fields = rc.c.rxRowDescription(r)
+ switch msg := msg.(type) {
+ case *pgproto3.RowDescription:
+ rows.fields = rc.c.rxRowDescription(msg)
// We don't have c.PgTypes here because we're a replication
// connection. This means the field descriptions will have
- // only Oids. Not much we can do about this.
+ // only OIDs. Not much we can do about this.
default:
- if e := rc.c.processContextFreeMsg(t, r); e != nil {
- rows.abort(e)
+ if e := rc.c.processContextFreeMsg(msg); e != nil {
+ rows.fatal(e)
return rows, e
}
}
@@ -354,7 +369,7 @@ func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) {
//
// NOTE: Because this is a replication mode connection, we don't have
// type names, so the field descriptions in the result will have only
-// Oids and no DataTypeName values
+// OIDs and no DataTypeName values
func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) {
return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM")
}
@@ -369,7 +384,7 @@ func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) {
//
// NOTE: Because this is a replication mode connection, we don't have
// type names, so the field descriptions in the result will have only
-// Oids and no DataTypeName values
+// OIDs and no DataTypeName values
func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) {
return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline))
}
@@ -401,15 +416,18 @@ func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, ti
return
}
+ ctx, cancelFn := context.WithTimeout(context.Background(), initialReplicationResponseTimeout)
+ defer cancelFn()
+
// The first replication message that comes back here will be (in a success case)
// a empty CopyBoth that is (apparently) sent as the confirmation that the replication has
// started. This call will either return nil, nil or if it returns an error
// that indicates the start replication command failed
var r *ReplicationMessage
- r, err = rc.WaitForReplicationMessage(initialReplicationResponseTimeout)
+ r, err = rc.WaitForReplicationMessage(ctx)
if err != nil && r != nil {
if rc.c.shouldLog(LogLevelError) {
- rc.c.log(LogLevelError, "Unxpected replication message %v", r)
+ rc.c.log(LogLevelError, "Unexpected replication message", map[string]interface{}{"msg": r, "err": err})
}
}
@@ -422,6 +440,18 @@ func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string)
return
}
+// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values.
+func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) {
+ var dummy string
+ var rows *Rows
+ rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin))
+ defer rows.Close()
+ for rows.Next() {
+ rows.Scan(&dummy, &consistentPoint, &snapshotName, &dummy)
+ }
+ return
+}
+
// Drop the replication slot for the given name
func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) {
_, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName))