summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport/transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/transport.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go316
1 files changed, 278 insertions, 38 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 2859b87..924ba4f 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -27,6 +27,7 @@ import (
"fmt"
"io"
"net"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -38,6 +39,7 @@ import (
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
@@ -131,7 +133,7 @@ type recvBufferReader struct {
err error
}
-func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
+func (r *recvBufferReader) ReadHeader(header []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
@@ -140,9 +142,9 @@ func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
return n, nil
}
if r.closeStream != nil {
- n, r.err = r.readMessageHeaderClient(header)
+ n, r.err = r.readHeaderClient(header)
} else {
- n, r.err = r.readMessageHeader(header)
+ n, r.err = r.readHeader(header)
}
return n, r.err
}
@@ -172,12 +174,12 @@ func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
return buf, r.err
}
-func (r *recvBufferReader) readMessageHeader(header []byte) (n int, err error) {
+func (r *recvBufferReader) readHeader(header []byte) (n int, err error) {
select {
case <-r.ctxDone:
return 0, ContextErr(r.ctx.Err())
case m := <-r.recv.get():
- return r.readMessageHeaderAdditional(m, header)
+ return r.readHeaderAdditional(m, header)
}
}
@@ -190,7 +192,7 @@ func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
}
}
-func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) {
+func (r *recvBufferReader) readHeaderClient(header []byte) (n int, err error) {
// If the context is canceled, then closes the stream with nil metadata.
// closeStream writes its error parameter to r.recv as a recvMsg.
// r.readAdditional acts on that message and returns the necessary error.
@@ -211,9 +213,9 @@ func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err er
// faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
- return r.readMessageHeaderAdditional(m, header)
+ return r.readHeaderAdditional(m, header)
case m := <-r.recv.get():
- return r.readMessageHeaderAdditional(m, header)
+ return r.readHeaderAdditional(m, header)
}
}
@@ -244,7 +246,7 @@ func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
}
}
-func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
+func (r *recvBufferReader) readHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
r.recv.load()
if m.err != nil {
if m.buffer != nil {
@@ -286,8 +288,14 @@ const (
// Stream represents an RPC in the transport layer.
type Stream struct {
id uint32
- ctx context.Context // the associated context of the stream
- method string // the associated RPC method of the stream
+ st ServerTransport // nil for client side Stream
+ ct ClientTransport // nil for server side Stream
+ ctx context.Context // the associated context of the stream
+ cancel context.CancelFunc // always nil for client side Stream
+ done chan struct{} // closed at the end of stream to unblock writers. On the client side.
+ doneFunc func() // invoked at the end of stream on client side.
+ ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
+ method string // the associated RPC method of the stream
recvCompress string
sendCompress string
buf *recvBuffer
@@ -295,17 +303,58 @@ type Stream struct {
fc *inFlow
wq *writeQuota
+ // Holds compressor names passed in grpc-accept-encoding metadata from the
+ // client. This is empty for the client side stream.
+ clientAdvertisedCompressors string
// Callback to state application's intentions to read data. This
// is used to adjust flow control, if needed.
requestRead func(int)
+ headerChan chan struct{} // closed to indicate the end of header metadata.
+ headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
+ // headerValid indicates whether a valid header was received. Only
+ // meaningful after headerChan is closed (always call waitOnHeader() before
+ // reading its value). Not valid on server side.
+ headerValid bool
+ headerWireLength int // Only set on server side.
+
+ // hdrMu protects header and trailer metadata on the server-side.
+ hdrMu sync.Mutex
+ // On client side, header keeps the received header metadata.
+ //
+ // On server side, header keeps the header set by SetHeader(). The complete
+ // header will merged into this after t.WriteHeader() is called.
+ header metadata.MD
+ trailer metadata.MD // the key-value map of trailer metadata.
+
+ noHeaders bool // set if the client never received headers (set only after the stream is done).
+
+ // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
+ headerSent uint32
+
state streamState
+ // On client-side it is the status error received from the server.
+ // On server-side it is unused.
+ status *status.Status
+
+ bytesReceived uint32 // indicates whether any bytes have been received on this stream
+ unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
+
// contentSubtype is the content-subtype for requests.
// this must be lowercase or the behavior is undefined.
contentSubtype string
+}
- trailer metadata.MD // the key-value map of trailer metadata.
+// isHeaderSent is only valid on the server-side.
+func (s *Stream) isHeaderSent() bool {
+ return atomic.LoadUint32(&s.headerSent) == 1
+}
+
+// updateHeaderSent updates headerSent and returns true
+// if it was already set. It is valid only on server-side.
+func (s *Stream) updateHeaderSent() bool {
+ return atomic.SwapUint32(&s.headerSent, 1) == 1
}
func (s *Stream) swapState(st streamState) streamState {
@@ -320,12 +369,110 @@ func (s *Stream) getState() streamState {
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
}
+func (s *Stream) waitOnHeader() {
+ if s.headerChan == nil {
+ // On the server headerChan is always nil since a stream originates
+ // only after having received headers.
+ return
+ }
+ select {
+ case <-s.ctx.Done():
+ // Close the stream to prevent headers/trailers from changing after
+ // this function returns.
+ s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
+ // headerChan could possibly not be closed yet if closeStream raced
+ // with operateHeaders; wait until it is closed explicitly here.
+ <-s.headerChan
+ case <-s.headerChan:
+ }
+}
+
+// RecvCompress returns the compression algorithm applied to the inbound
+// message. It is empty string if there is no compression applied.
+func (s *Stream) RecvCompress() string {
+ s.waitOnHeader()
+ return s.recvCompress
+}
+
+// SetSendCompress sets the compression algorithm to the stream.
+func (s *Stream) SetSendCompress(name string) error {
+ if s.isHeaderSent() || s.getState() == streamDone {
+ return errors.New("transport: set send compressor called after headers sent or stream done")
+ }
+
+ s.sendCompress = name
+ return nil
+}
+
+// SendCompress returns the send compressor name.
+func (s *Stream) SendCompress() string {
+ return s.sendCompress
+}
+
+// ClientAdvertisedCompressors returns the compressor names advertised by the
+// client via grpc-accept-encoding header.
+func (s *Stream) ClientAdvertisedCompressors() []string {
+ values := strings.Split(s.clientAdvertisedCompressors, ",")
+ for i, v := range values {
+ values[i] = strings.TrimSpace(v)
+ }
+ return values
+}
+
+// Done returns a channel which is closed when it receives the final status
+// from the server.
+func (s *Stream) Done() <-chan struct{} {
+ return s.done
+}
+
+// Header returns the header metadata of the stream.
+//
+// On client side, it acquires the key-value pairs of header metadata once it is
+// available. It blocks until i) the metadata is ready or ii) there is no header
+// metadata or iii) the stream is canceled/expired.
+//
+// On server side, it returns the out header after t.WriteHeader is called. It
+// does not block and must not be called until after WriteHeader.
+func (s *Stream) Header() (metadata.MD, error) {
+ if s.headerChan == nil {
+ // On server side, return the header in stream. It will be the out
+ // header after t.WriteHeader is called.
+ return s.header.Copy(), nil
+ }
+ s.waitOnHeader()
+
+ if !s.headerValid || s.noHeaders {
+ return nil, s.status.Err()
+ }
+
+ return s.header.Copy(), nil
+}
+
+// TrailersOnly blocks until a header or trailers-only frame is received and
+// then returns true if the stream was trailers-only. If the stream ends
+// before headers are received, returns true, nil. Client-side only.
+func (s *Stream) TrailersOnly() bool {
+ s.waitOnHeader()
+ return s.noHeaders
+}
+
// Trailer returns the cached trailer metadata. Note that if it is not called
-// after the entire stream is done, it could return an empty MD.
+// after the entire stream is done, it could return an empty MD. Client
+// side only.
// It can be safely read only after stream has ended that is either read
// or write have returned io.EOF.
func (s *Stream) Trailer() metadata.MD {
- return s.trailer.Copy()
+ c := s.trailer.Copy()
+ return c
+}
+
+// ContentSubtype returns the content-subtype for a request. For example, a
+// content-subtype of "proto" will result in a content-type of
+// "application/grpc+proto". This will always be lowercase. See
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
+// more details.
+func (s *Stream) ContentSubtype() string {
+ return s.contentSubtype
}
// Context returns the context of the stream.
@@ -333,31 +480,81 @@ func (s *Stream) Context() context.Context {
return s.ctx
}
+// SetContext sets the context of the stream. This will be deleted once the
+// stats handler callouts all move to gRPC layer.
+func (s *Stream) SetContext(ctx context.Context) {
+ s.ctx = ctx
+}
+
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
}
+// Status returns the status received from the server.
+// Status can be read safely only after the stream has ended,
+// that is, after Done() is closed.
+func (s *Stream) Status() *status.Status {
+ return s.status
+}
+
+// HeaderWireLength returns the size of the headers of the stream as received
+// from the wire. Valid only on the server.
+func (s *Stream) HeaderWireLength() int {
+ return s.headerWireLength
+}
+
+// SetHeader sets the header metadata. This can be called multiple times.
+// Server side only.
+// This should not be called in parallel to other data writes.
+func (s *Stream) SetHeader(md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
+ if s.isHeaderSent() || s.getState() == streamDone {
+ return ErrIllegalHeaderWrite
+ }
+ s.hdrMu.Lock()
+ s.header = metadata.Join(s.header, md)
+ s.hdrMu.Unlock()
+ return nil
+}
+
+// SendHeader sends the given header metadata. The given metadata is
+// combined with any metadata set by previous calls to SetHeader and
+// then written to the transport stream.
+func (s *Stream) SendHeader(md metadata.MD) error {
+ return s.st.WriteHeader(s, md)
+}
+
+// SetTrailer sets the trailer metadata which will be sent with the RPC status
+// by the server. This can be called multiple times. Server side only.
+// This should not be called parallel to other data writes.
+func (s *Stream) SetTrailer(md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
+ if s.getState() == streamDone {
+ return ErrIllegalHeaderWrite
+ }
+ s.hdrMu.Lock()
+ s.trailer = metadata.Join(s.trailer, md)
+ s.hdrMu.Unlock()
+ return nil
+}
+
func (s *Stream) write(m recvMsg) {
s.buf.put(m)
}
-// ReadMessageHeader reads data into the provided header slice from the stream.
-// It first checks if there was an error during a previous read operation and
-// returns it if present. It then requests a read operation for the length of
-// the header. It continues to read from the stream until the entire header
-// slice is filled or an error occurs. If an `io.EOF` error is encountered with
-// partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an
-// unexpected end of the stream. The method returns any error encountered during
-// the read process or nil if the header was successfully read.
-func (s *Stream) ReadMessageHeader(header []byte) (err error) {
+func (s *Stream) ReadHeader(header []byte) (err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.er; er != nil {
return er
}
s.requestRead(len(header))
for len(header) != 0 {
- n, err := s.trReader.ReadMessageHeader(header)
+ n, err := s.trReader.ReadHeader(header)
header = header[n:]
if len(header) == 0 {
err = nil
@@ -373,7 +570,7 @@ func (s *Stream) ReadMessageHeader(header []byte) (err error) {
}
// Read reads n bytes from the wire for this stream.
-func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
+func (s *Stream) Read(n int) (data mem.BufferSlice, err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.er; er != nil {
return nil, er
@@ -413,8 +610,8 @@ type transportReader struct {
er error
}
-func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
- n, err := t.reader.ReadMessageHeader(header)
+func (t *transportReader) ReadHeader(header []byte) (int, error) {
+ n, err := t.reader.ReadHeader(header)
if err != nil {
t.er = err
return 0, err
@@ -433,6 +630,17 @@ func (t *transportReader) Read(n int) (mem.Buffer, error) {
return buf, nil
}
+// BytesReceived indicates whether any bytes have been received on this stream.
+func (s *Stream) BytesReceived() bool {
+ return atomic.LoadUint32(&s.bytesReceived) == 1
+}
+
+// Unprocessed indicates whether the server did not process this stream --
+// i.e. it sent a refused stream or GOAWAY including this stream ID.
+func (s *Stream) Unprocessed() bool {
+ return atomic.LoadUint32(&s.unprocessed) == 1
+}
+
// GoString is implemented by Stream so context.String() won't
// race when printing %#v.
func (s *Stream) GoString() string {
@@ -508,9 +716,15 @@ type ConnectOptions struct {
BufferPool mem.BufferPool
}
-// WriteOptions provides additional hints and information for message
+// NewClientTransport establishes the transport with the required ConnectOptions
+// and returns it to the caller.
+func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
+ return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
+}
+
+// Options provides additional hints and information for message
// transmission.
-type WriteOptions struct {
+type Options struct {
// Last indicates whether this write is the last piece for
// this stream.
Last bool
@@ -559,8 +773,18 @@ type ClientTransport interface {
// It does not block.
GracefulClose()
+ // Write sends the data for the given stream. A nil stream indicates
+ // the write is to be performed on the transport as a whole.
+ Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error
+
// NewStream creates a Stream for an RPC.
- NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
+ NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
+
+ // CloseStream clears the footprint of a stream when the stream is
+ // not needed any more. The err indicates the error incurred when
+ // CloseStream is called. Must be called when a stream is finished
+ // unless the associated transport is closing.
+ CloseStream(stream *Stream, err error)
// Error returns a channel that is closed when some I/O error
// happens. Typically the caller should have a goroutine to monitor
@@ -580,6 +804,12 @@ type ClientTransport interface {
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
+
+ // IncrMsgSent increments the number of message sent through this transport.
+ IncrMsgSent()
+
+ // IncrMsgRecv increments the number of message received through this transport.
+ IncrMsgRecv()
}
// ServerTransport is the common interface for all gRPC server-side transport
@@ -589,7 +819,19 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
- HandleStreams(context.Context, func(*ServerStream))
+ HandleStreams(context.Context, func(*Stream))
+
+ // WriteHeader sends the header metadata for the given stream.
+ // WriteHeader may not be called on all streams.
+ WriteHeader(s *Stream, md metadata.MD) error
+
+ // Write sends the data for the given stream.
+ // Write may not be called on all streams.
+ Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error
+
+ // WriteStatus sends the status of a stream to the client. WriteStatus is
+ // the final call made on a stream and always occurs.
+ WriteStatus(s *Stream, st *status.Status) error
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
@@ -601,14 +843,12 @@ type ServerTransport interface {
// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain(debugData string)
-}
-type internalServerTransport interface {
- ServerTransport
- writeHeader(s *ServerStream, md metadata.MD) error
- write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
- writeStatus(s *ServerStream, st *status.Status) error
- incrMsgRecv()
+ // IncrMsgSent increments the number of message sent through this transport.
+ IncrMsgSent()
+
+ // IncrMsgRecv increments the number of message received through this transport.
+ IncrMsgRecv()
}
// connectionErrorf creates an ConnectionError with the specified error description.