|  | // Copyright 2010 The Go Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style | 
|  | // license that can be found in the LICENSE file. | 
|  |  | 
|  | package netchan | 
|  |  | 
|  | import ( | 
|  | "encoding/gob" | 
|  | "errors" | 
|  | "io" | 
|  | "reflect" | 
|  | "sync" | 
|  | "time" | 
|  | ) | 
|  |  | 
|  | // The direction of a connection from the client's perspective. | 
|  | type Dir int | 
|  |  | 
|  | const ( | 
|  | Recv Dir = iota | 
|  | Send | 
|  | ) | 
|  |  | 
|  | func (dir Dir) String() string { | 
|  | switch dir { | 
|  | case Recv: | 
|  | return "Recv" | 
|  | case Send: | 
|  | return "Send" | 
|  | } | 
|  | return "???" | 
|  | } | 
|  |  | 
|  | // Payload types | 
|  | const ( | 
|  | payRequest = iota // request structure follows | 
|  | payError          // error structure follows | 
|  | payData           // user payload follows | 
|  | payAck            // acknowledgement; no payload | 
|  | payClosed         // channel is now closed | 
|  | payAckSend        // payload has been delivered. | 
|  | ) | 
|  |  | 
|  | // A header is sent as a prefix to every transmission.  It will be followed by | 
|  | // a request structure, an error structure, or an arbitrary user payload structure. | 
|  | type header struct { | 
|  | Id          int | 
|  | PayloadType int | 
|  | SeqNum      int64 | 
|  | } | 
|  |  | 
|  | // Sent with a header once per channel from importer to exporter to report | 
|  | // that it wants to bind to a channel with the specified direction for count | 
|  | // messages, with space for size buffered values. If count is -1, it means unlimited. | 
|  | type request struct { | 
|  | Name  string | 
|  | Count int64 | 
|  | Size  int | 
|  | Dir   Dir | 
|  | } | 
|  |  | 
|  | // Sent with a header to report an error. | 
|  | type error_ struct { | 
|  | Error string | 
|  | } | 
|  |  | 
|  | // Used to unify management of acknowledgements for import and export. | 
|  | type unackedCounter interface { | 
|  | unackedCount() int64 | 
|  | ack() int64 | 
|  | seq() int64 | 
|  | } | 
|  |  | 
|  | // A channel and its direction. | 
|  | type chanDir struct { | 
|  | ch  reflect.Value | 
|  | dir Dir | 
|  | } | 
|  |  | 
|  | // clientSet contains the objects and methods needed for tracking | 
|  | // clients of an exporter and draining outstanding messages. | 
|  | type clientSet struct { | 
|  | mu      sync.Mutex // protects access to channel and client maps | 
|  | names   map[string]*chanDir | 
|  | clients map[unackedCounter]bool | 
|  | } | 
|  |  | 
|  | // Mutex-protected encoder and decoder pair. | 
|  | type encDec struct { | 
|  | decLock sync.Mutex | 
|  | dec     *gob.Decoder | 
|  | encLock sync.Mutex | 
|  | enc     *gob.Encoder | 
|  | } | 
|  |  | 
|  | func newEncDec(conn io.ReadWriter) *encDec { | 
|  | return &encDec{ | 
|  | dec: gob.NewDecoder(conn), | 
|  | enc: gob.NewEncoder(conn), | 
|  | } | 
|  | } | 
|  |  | 
|  | // Decode an item from the connection. | 
|  | func (ed *encDec) decode(value reflect.Value) error { | 
|  | ed.decLock.Lock() | 
|  | err := ed.dec.DecodeValue(value) | 
|  | if err != nil { | 
|  | // TODO: tear down connection? | 
|  | } | 
|  | ed.decLock.Unlock() | 
|  | return err | 
|  | } | 
|  |  | 
|  | // Encode a header and payload onto the connection. | 
|  | func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error { | 
|  | ed.encLock.Lock() | 
|  | hdr.PayloadType = payloadType | 
|  | err := ed.enc.Encode(hdr) | 
|  | if err == nil { | 
|  | if payload != nil { | 
|  | err = ed.enc.Encode(payload) | 
|  | } | 
|  | } | 
|  | if err != nil { | 
|  | // TODO: tear down connection if there is an error? | 
|  | } | 
|  | ed.encLock.Unlock() | 
|  | return err | 
|  | } | 
|  |  | 
|  | // See the comment for Exporter.Drain. | 
|  | func (cs *clientSet) drain(timeout time.Duration) error { | 
|  | deadline := time.Now().Add(timeout) | 
|  | for { | 
|  | pending := false | 
|  | cs.mu.Lock() | 
|  | // Any messages waiting for a client? | 
|  | for _, chDir := range cs.names { | 
|  | if chDir.ch.Len() > 0 { | 
|  | pending = true | 
|  | } | 
|  | } | 
|  | // Any unacknowledged messages? | 
|  | for client := range cs.clients { | 
|  | n := client.unackedCount() | 
|  | if n > 0 { // Check for > rather than != just to be safe. | 
|  | pending = true | 
|  | break | 
|  | } | 
|  | } | 
|  | cs.mu.Unlock() | 
|  | if !pending { | 
|  | break | 
|  | } | 
|  | if timeout > 0 && time.Now().After(deadline) { | 
|  | return errors.New("timeout") | 
|  | } | 
|  | time.Sleep(100 * time.Millisecond) | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // See the comment for Exporter.Sync. | 
|  | func (cs *clientSet) sync(timeout time.Duration) error { | 
|  | deadline := time.Now().Add(timeout) | 
|  | // seq remembers the clients and their seqNum at point of entry. | 
|  | seq := make(map[unackedCounter]int64) | 
|  | cs.mu.Lock() | 
|  | for client := range cs.clients { | 
|  | seq[client] = client.seq() | 
|  | } | 
|  | cs.mu.Unlock() | 
|  | for { | 
|  | pending := false | 
|  | cs.mu.Lock() | 
|  | // Any unacknowledged messages?  Look only at clients that existed | 
|  | // when we started and are still in this client set. | 
|  | for client := range seq { | 
|  | if _, ok := cs.clients[client]; ok { | 
|  | if client.ack() < seq[client] { | 
|  | pending = true | 
|  | break | 
|  | } | 
|  | } | 
|  | } | 
|  | cs.mu.Unlock() | 
|  | if !pending { | 
|  | break | 
|  | } | 
|  | if timeout > 0 && time.Now().After(deadline) { | 
|  | return errors.New("timeout") | 
|  | } | 
|  | time.Sleep(100 * time.Millisecond) | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // A netChan represents a channel imported or exported | 
|  | // on a single connection. Flow is controlled by the receiving | 
|  | // side by sending payAckSend messages when values | 
|  | // are delivered into the local channel. | 
|  | type netChan struct { | 
|  | *chanDir | 
|  | name   string | 
|  | id     int | 
|  | size   int // buffer size of channel. | 
|  | closed bool | 
|  |  | 
|  | // sender-specific state | 
|  | ackCh chan bool // buffered with space for all the acks we need | 
|  | space int       // available space. | 
|  |  | 
|  | // receiver-specific state | 
|  | sendCh chan reflect.Value // buffered channel of values received from other end. | 
|  | ed     *encDec            // so that we can send acks. | 
|  | count  int64              // number of values still to receive. | 
|  | } | 
|  |  | 
|  | // Create a new netChan with the given name (only used for | 
|  | // messages), id, direction, buffer size, and count. | 
|  | // The connection to the other side is represented by ed. | 
|  | func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan { | 
|  | c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count} | 
|  | if c.dir == Send { | 
|  | c.ackCh = make(chan bool, size) | 
|  | c.space = size | 
|  | } | 
|  | return c | 
|  | } | 
|  |  | 
|  | // Close the channel. | 
|  | func (nch *netChan) close() { | 
|  | if nch.closed { | 
|  | return | 
|  | } | 
|  | if nch.dir == Recv { | 
|  | if nch.sendCh != nil { | 
|  | // If the sender goroutine is active, close the channel to it. | 
|  | // It will close nch.ch when it can. | 
|  | close(nch.sendCh) | 
|  | } else { | 
|  | nch.ch.Close() | 
|  | } | 
|  | } else { | 
|  | nch.ch.Close() | 
|  | close(nch.ackCh) | 
|  | } | 
|  | nch.closed = true | 
|  | } | 
|  |  | 
|  | // Send message from remote side to local receiver. | 
|  | func (nch *netChan) send(val reflect.Value) { | 
|  | if nch.dir != Recv { | 
|  | panic("send on wrong direction of channel") | 
|  | } | 
|  | if nch.sendCh == nil { | 
|  | // If possible, do local send directly and ack immediately. | 
|  | if nch.ch.TrySend(val) { | 
|  | nch.sendAck() | 
|  | return | 
|  | } | 
|  | // Start sender goroutine to manage delayed delivery of values. | 
|  | nch.sendCh = make(chan reflect.Value, nch.size) | 
|  | go nch.sender() | 
|  | } | 
|  | select { | 
|  | case nch.sendCh <- val: | 
|  | // ok | 
|  | default: | 
|  | // TODO: should this be more resilient? | 
|  | panic("netchan: remote sender sent more values than allowed") | 
|  | } | 
|  | } | 
|  |  | 
|  | // sendAck sends an acknowledgment that a message has left | 
|  | // the channel's buffer. If the messages remaining to be sent | 
|  | // will fit in the channel's buffer, then we don't | 
|  | // need to send an ack. | 
|  | func (nch *netChan) sendAck() { | 
|  | if nch.count < 0 || nch.count > int64(nch.size) { | 
|  | nch.ed.encode(&header{Id: nch.id}, payAckSend, nil) | 
|  | } | 
|  | if nch.count > 0 { | 
|  | nch.count-- | 
|  | } | 
|  | } | 
|  |  | 
|  | // The sender process forwards items from the sending queue | 
|  | // to the destination channel, acknowledging each item. | 
|  | func (nch *netChan) sender() { | 
|  | if nch.dir != Recv { | 
|  | panic("sender on wrong direction of channel") | 
|  | } | 
|  | // When Exporter.Hangup is called, the underlying channel is closed, | 
|  | // and so we may get a "too many operations on closed channel" error | 
|  | // if there are outstanding messages in sendCh. | 
|  | // Make sure that this doesn't panic the whole program. | 
|  | defer func() { | 
|  | if r := recover(); r != nil { | 
|  | // TODO check that r is "too many operations", otherwise re-panic. | 
|  | } | 
|  | }() | 
|  | for v := range nch.sendCh { | 
|  | nch.ch.Send(v) | 
|  | nch.sendAck() | 
|  | } | 
|  | nch.ch.Close() | 
|  | } | 
|  |  | 
|  | // Receive value from local side for sending to remote side. | 
|  | func (nch *netChan) recv() (val reflect.Value, ok bool) { | 
|  | if nch.dir != Send { | 
|  | panic("recv on wrong direction of channel") | 
|  | } | 
|  |  | 
|  | if nch.space == 0 { | 
|  | // Wait for buffer space. | 
|  | <-nch.ackCh | 
|  | nch.space++ | 
|  | } | 
|  | nch.space-- | 
|  | return nch.ch.Recv() | 
|  | } | 
|  |  | 
|  | // acked is called when the remote side indicates that | 
|  | // a value has been delivered. | 
|  | func (nch *netChan) acked() { | 
|  | if nch.dir != Send { | 
|  | panic("recv on wrong direction of channel") | 
|  | } | 
|  | select { | 
|  | case nch.ackCh <- true: | 
|  | // ok | 
|  | default: | 
|  | // TODO: should this be more resilient? | 
|  | panic("netchan: remote receiver sent too many acks") | 
|  | } | 
|  | } |