diff options
Diffstat (limited to 'vendor/github.com/jackc/pgx/replication.go')
| -rw-r--r-- | vendor/github.com/jackc/pgx/replication.go | 198 |
1 files changed, 114 insertions, 84 deletions
diff --git a/vendor/github.com/jackc/pgx/replication.go b/vendor/github.com/jackc/pgx/replication.go index 7b28d6b..7dd5efe 100644 --- a/vendor/github.com/jackc/pgx/replication.go +++ b/vendor/github.com/jackc/pgx/replication.go @@ -1,10 +1,15 @@ package pgx import ( - "errors" + "context" + "encoding/binary" "fmt" - "net" "time" + + "github.com/pkg/errors" + + "github.com/jackc/pgx/pgio" + "github.com/jackc/pgx/pgproto3" ) const ( @@ -172,17 +177,21 @@ type ReplicationConn struct { // message to the server, as well as carries the WAL position of the // client, which then updates the server's replication slot position. func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) { - writeBuf := newWriteBuf(rc.c, copyData) - writeBuf.WriteByte(standbyStatusUpdate) - writeBuf.WriteInt64(int64(k.WalWritePosition)) - writeBuf.WriteInt64(int64(k.WalFlushPosition)) - writeBuf.WriteInt64(int64(k.WalApplyPosition)) - writeBuf.WriteInt64(int64(k.ClientTime)) - writeBuf.WriteByte(k.ReplyRequested) + buf := rc.c.wbuf + buf = append(buf, copyData) + sp := len(buf) + buf = pgio.AppendInt32(buf, -1) + + buf = append(buf, standbyStatusUpdate) + buf = pgio.AppendInt64(buf, int64(k.WalWritePosition)) + buf = pgio.AppendInt64(buf, int64(k.WalFlushPosition)) + buf = pgio.AppendInt64(buf, int64(k.WalApplyPosition)) + buf = pgio.AppendInt64(buf, int64(k.ClientTime)) + buf = append(buf, k.ReplyRequested) - writeBuf.closeMsg() + pgio.SetInt32(buf[sp:], int32(len(buf[sp:]))) - _, err = rc.c.conn.Write(writeBuf.buf) + _, err = rc.c.conn.Write(buf) if err != nil { rc.c.die(err) } @@ -203,107 +212,115 @@ func (rc *ReplicationConn) CauseOfDeath() error { } func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) { - var t byte - var reader *msgReader - t, reader, err = rc.c.rxMsg() + msg, err := rc.c.rxMsg() if err != nil { return } - switch t { - case noticeResponse: - pgError := rc.c.rxErrorResponse(reader) + switch msg := msg.(type) { + case *pgproto3.NoticeResponse: + pgError := rc.c.rxErrorResponse((*pgproto3.ErrorResponse)(msg)) if rc.c.shouldLog(LogLevelInfo) { - rc.c.log(LogLevelInfo, pgError.Error()) + rc.c.log(LogLevelInfo, pgError.Error(), nil) } - case errorResponse: - err = rc.c.rxErrorResponse(reader) + case *pgproto3.ErrorResponse: + err = rc.c.rxErrorResponse(msg) if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError, err.Error()) + rc.c.log(LogLevelError, err.Error(), nil) } return - case copyBothResponse: + case *pgproto3.CopyBothResponse: // This is the tail end of the replication process start, // and can be safely ignored return - case copyData: - var msgType byte - msgType = reader.readByte() + case *pgproto3.CopyData: + msgType := msg.Data[0] + rp := 1 + switch msgType { case walData: - walStart := reader.readInt64() - serverWalEnd := reader.readInt64() - serverTime := reader.readInt64() - walData := reader.readBytes(reader.msgBytesRemaining) - walMessage := WalMessage{WalStart: uint64(walStart), - ServerWalEnd: uint64(serverWalEnd), - ServerTime: uint64(serverTime), + walStart := binary.BigEndian.Uint64(msg.Data[rp:]) + rp += 8 + serverWalEnd := binary.BigEndian.Uint64(msg.Data[rp:]) + rp += 8 + serverTime := binary.BigEndian.Uint64(msg.Data[rp:]) + rp += 8 + walData := msg.Data[rp:] + walMessage := WalMessage{WalStart: walStart, + ServerWalEnd: serverWalEnd, + ServerTime: serverTime, WalData: walData, } return &ReplicationMessage{WalMessage: &walMessage}, nil case senderKeepalive: - serverWalEnd := reader.readInt64() - serverTime := reader.readInt64() - replyNow := reader.readByte() - h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow} + serverWalEnd := binary.BigEndian.Uint64(msg.Data[rp:]) + rp += 8 + serverTime := binary.BigEndian.Uint64(msg.Data[rp:]) + rp += 8 + replyNow := msg.Data[rp] + rp += 1 + h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow} return &ReplicationMessage{ServerHeartbeat: h}, nil default: if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError, "Unexpected data playload message type %v", t) + rc.c.log(LogLevelError, "Unexpected data playload message type", map[string]interface{}{"type": msgType}) } } default: if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError, "Unexpected replication message type %v", t) + rc.c.log(LogLevelError, "Unexpected replication message type", map[string]interface{}{"type": msg}) } } return } -// Wait for a single replication message up to timeout time. +// Wait for a single replication message. // // Properly using this requires some knowledge of the postgres replication mechanisms, // as the client can receive both WAL data (the ultimate payload) and server heartbeat // updates. The caller also must send standby status updates in order to keep the connection // alive and working. // -// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified -// duration. -func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { - var zeroTime time.Time - - deadline := time.Now().Add(timeout) - - // Use SetReadDeadline to implement the timeout. SetReadDeadline will - // cause operations to fail with a *net.OpError that has a Timeout() - // of true. Because the normal pgx rxMsg path considers any error to - // have potentially corrupted the state of the connection, it dies - // on any errors. So to avoid timeout errors in rxMsg we set the - // deadline and peek into the reader. If a timeout error occurs there - // we don't break the pgx connection. If the Peek returns that data - // is available then we turn off the read deadline before the rxMsg. - err = rc.c.conn.SetReadDeadline(deadline) - if err != nil { - return nil, err +// This returns the context error when there is no replication message before +// the context is canceled. +func (rc *ReplicationConn) WaitForReplicationMessage(ctx context.Context) (*ReplicationMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: } - // Wait until there is a byte available before continuing onto the normal msg reading path - _, err = rc.c.reader.Peek(1) - if err != nil { - rc.c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline - if err, ok := err.(*net.OpError); ok && err.Timeout() { - return nil, ErrNotificationTimeout + go func() { + select { + case <-ctx.Done(): + if err := rc.c.conn.SetDeadline(time.Now()); err != nil { + rc.Close() // Close connection if unable to set deadline + return + } + rc.c.closedChan <- ctx.Err() + case <-rc.c.doneChan: } - return nil, err - } + }() - err = rc.c.conn.SetReadDeadline(zeroTime) - if err != nil { - return nil, err + r, opErr := rc.readReplicationMessage() + + var err error + select { + case err = <-rc.c.closedChan: + if err := rc.c.conn.SetDeadline(time.Time{}); err != nil { + rc.Close() // Close connection if unable to disable deadline + return nil, err + } + + if opErr == nil { + err = nil + } + case rc.c.doneChan <- struct{}{}: + err = opErr } - return rc.readReplicationMessage() + return r, err } func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) { @@ -312,32 +329,30 @@ func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) { rows := rc.c.getRows(sql, nil) if err := rc.c.lock(); err != nil { - rows.abort(err) + rows.fatal(err) return rows, err } rows.unlockConn = true err := rc.c.sendSimpleQuery(sql) if err != nil { - rows.abort(err) + rows.fatal(err) } - var t byte - var r *msgReader - t, r, err = rc.c.rxMsg() + msg, err := rc.c.rxMsg() if err != nil { return nil, err } - switch t { - case rowDescription: - rows.fields = rc.c.rxRowDescription(r) + switch msg := msg.(type) { + case *pgproto3.RowDescription: + rows.fields = rc.c.rxRowDescription(msg) // We don't have c.PgTypes here because we're a replication // connection. This means the field descriptions will have - // only Oids. Not much we can do about this. + // only OIDs. Not much we can do about this. default: - if e := rc.c.processContextFreeMsg(t, r); e != nil { - rows.abort(e) + if e := rc.c.processContextFreeMsg(msg); e != nil { + rows.fatal(e) return rows, e } } @@ -354,7 +369,7 @@ func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) { // // NOTE: Because this is a replication mode connection, we don't have // type names, so the field descriptions in the result will have only -// Oids and no DataTypeName values +// OIDs and no DataTypeName values func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) { return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM") } @@ -369,7 +384,7 @@ func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) { // // NOTE: Because this is a replication mode connection, we don't have // type names, so the field descriptions in the result will have only -// Oids and no DataTypeName values +// OIDs and no DataTypeName values func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) { return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline)) } @@ -401,15 +416,18 @@ func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, ti return } + ctx, cancelFn := context.WithTimeout(context.Background(), initialReplicationResponseTimeout) + defer cancelFn() + // The first replication message that comes back here will be (in a success case) // a empty CopyBoth that is (apparently) sent as the confirmation that the replication has // started. This call will either return nil, nil or if it returns an error // that indicates the start replication command failed var r *ReplicationMessage - r, err = rc.WaitForReplicationMessage(initialReplicationResponseTimeout) + r, err = rc.WaitForReplicationMessage(ctx) if err != nil && r != nil { if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError, "Unxpected replication message %v", r) + rc.c.log(LogLevelError, "Unexpected replication message", map[string]interface{}{"msg": r, "err": err}) } } @@ -422,6 +440,18 @@ func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) return } +// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values. +func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) { + var dummy string + var rows *Rows + rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) + defer rows.Close() + for rows.Next() { + rows.Scan(&dummy, &consistentPoint, &snapshotName, &dummy) + } + return +} + // Drop the replication slot for the given name func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) { _, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName)) |
