| // Copyright 2010 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| /* |
| The netchan package implements type-safe networked channels: |
| it allows the two ends of a channel to appear on different |
| computers connected by a network. It does this by transporting |
| data sent to a channel on one machine so it can be recovered |
| by a receive of a channel of the same type on the other. |
| |
| An exporter publishes a set of channels by name. An importer |
| connects to the exporting machine and imports the channels |
| by name. After importing the channels, the two machines can |
| use the channels in the usual way. |
| |
| Networked channels are not synchronized; they always behave |
| as if they are buffered channels of at least one element. |
| */ |
| package netchan |
| |
| // BUG: can't use range clause to receive when using ImportNValues to limit the count. |
| |
| import ( |
| "log" |
| "io" |
| "net" |
| "os" |
| "reflect" |
| "strconv" |
| "sync" |
| ) |
| |
| // Export |
| |
| // expLog is a logging convenience function. The first argument must be a string. |
| func expLog(args ...interface{}) { |
| args[0] = "netchan export: " + args[0].(string) |
| log.Print(args...) |
| } |
| |
| // An Exporter allows a set of channels to be published on a single |
| // network port. A single machine may have multiple Exporters |
| // but they must use different ports. |
| type Exporter struct { |
| *clientSet |
| } |
| |
| type expClient struct { |
| *encDec |
| exp *Exporter |
| chans map[int]*netChan // channels in use by client |
| mu sync.Mutex // protects remaining fields |
| errored bool // client has been sent an error |
| seqNum int64 // sequences messages sent to client; has value of highest sent |
| ackNum int64 // highest sequence number acknowledged |
| seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu |
| } |
| |
| func newClient(exp *Exporter, conn io.ReadWriter) *expClient { |
| client := new(expClient) |
| client.exp = exp |
| client.encDec = newEncDec(conn) |
| client.seqNum = 0 |
| client.ackNum = 0 |
| client.chans = make(map[int]*netChan) |
| return client |
| } |
| |
| func (client *expClient) sendError(hdr *header, err string) { |
| error := &error{err} |
| expLog("sending error to client:", error.Error) |
| client.encode(hdr, payError, error) // ignore any encode error, hope client gets it |
| client.mu.Lock() |
| client.errored = true |
| client.mu.Unlock() |
| } |
| |
| func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan { |
| exp := client.exp |
| exp.mu.Lock() |
| ech, ok := exp.names[name] |
| exp.mu.Unlock() |
| if !ok { |
| client.sendError(hdr, "no such channel: "+name) |
| return nil |
| } |
| if ech.dir != dir { |
| client.sendError(hdr, "wrong direction for channel: "+name) |
| return nil |
| } |
| nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count) |
| client.chans[hdr.Id] = nch |
| return nch |
| } |
| |
| func (client *expClient) getChan(hdr *header, dir Dir) *netChan { |
| nch := client.chans[hdr.Id] |
| if nch == nil { |
| return nil |
| } |
| if nch.dir != dir { |
| client.sendError(hdr, "wrong direction for channel: "+nch.name) |
| } |
| return nch |
| } |
| |
| // The function run manages sends and receives for a single client. For each |
| // (client Recv) request, this will launch a serveRecv goroutine to deliver |
| // the data for that channel, while (client Send) requests are handled as |
| // data arrives from the client. |
| func (client *expClient) run() { |
| hdr := new(header) |
| hdrValue := reflect.NewValue(hdr) |
| req := new(request) |
| reqValue := reflect.NewValue(req) |
| error := new(error) |
| for { |
| *hdr = header{} |
| if err := client.decode(hdrValue); err != nil { |
| if err != os.EOF { |
| expLog("error decoding client header:", err) |
| } |
| break |
| } |
| switch hdr.PayloadType { |
| case payRequest: |
| *req = request{} |
| if err := client.decode(reqValue); err != nil { |
| expLog("error decoding client request:", err) |
| break |
| } |
| if req.Size < 1 { |
| panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values") |
| } |
| switch req.Dir { |
| case Recv: |
| // look up channel before calling serveRecv to |
| // avoid a lock around client.chans. |
| if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil { |
| go client.serveRecv(nch, *hdr, req.Count) |
| } |
| case Send: |
| client.newChan(hdr, Recv, req.Name, req.Size, req.Count) |
| // The actual sends will have payload type payData. |
| // TODO: manage the count? |
| default: |
| error.Error = "request: can't handle channel direction" |
| expLog(error.Error, req.Dir) |
| client.encode(hdr, payError, error) |
| } |
| case payData: |
| client.serveSend(*hdr) |
| case payClosed: |
| client.serveClosed(*hdr) |
| case payAck: |
| client.mu.Lock() |
| if client.ackNum != hdr.SeqNum-1 { |
| // Since the sequence number is incremented and the message is sent |
| // in a single instance of locking client.mu, the messages are guaranteed |
| // to be sent in order. Therefore receipt of acknowledgement N means |
| // all messages <=N have been seen by the recipient. We check anyway. |
| expLog("sequence out of order:", client.ackNum, hdr.SeqNum) |
| } |
| if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count. |
| client.ackNum = hdr.SeqNum |
| } |
| client.mu.Unlock() |
| case payAckSend: |
| if nch := client.getChan(hdr, Send); nch != nil { |
| nch.acked() |
| } |
| default: |
| log.Fatal("netchan export: unknown payload type", hdr.PayloadType) |
| } |
| } |
| client.exp.delClient(client) |
| } |
| |
| // Send all the data on a single channel to a client asking for a Recv. |
| // The header is passed by value to avoid issues of overwriting. |
| func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) { |
| for { |
| val, ok := nch.recv() |
| if !ok { |
| if err := client.encode(&hdr, payClosed, nil); err != nil { |
| expLog("error encoding server closed message:", err) |
| } |
| break |
| } |
| // We hold the lock during transmission to guarantee messages are |
| // sent in sequence number order. Also, we increment first so the |
| // value of client.SeqNum is the value of the highest used sequence |
| // number, not one beyond. |
| client.mu.Lock() |
| client.seqNum++ |
| hdr.SeqNum = client.seqNum |
| client.seqLock.Lock() // guarantee ordering of messages |
| client.mu.Unlock() |
| err := client.encode(&hdr, payData, val.Interface()) |
| client.seqLock.Unlock() |
| if err != nil { |
| expLog("error encoding client response:", err) |
| client.sendError(&hdr, err.String()) |
| break |
| } |
| // Negative count means run forever. |
| if count >= 0 { |
| if count--; count <= 0 { |
| break |
| } |
| } |
| } |
| } |
| |
| // Receive and deliver locally one item from a client asking for a Send |
| // The header is passed by value to avoid issues of overwriting. |
| func (client *expClient) serveSend(hdr header) { |
| nch := client.getChan(&hdr, Recv) |
| if nch == nil { |
| return |
| } |
| // Create a new value for each received item. |
| val := reflect.MakeZero(nch.ch.Type().(*reflect.ChanType).Elem()) |
| if err := client.decode(val); err != nil { |
| expLog("value decode:", err, "; type ", nch.ch.Type()) |
| return |
| } |
| nch.send(val) |
| } |
| |
| // Report that client has closed the channel that is sending to us. |
| // The header is passed by value to avoid issues of overwriting. |
| func (client *expClient) serveClosed(hdr header) { |
| nch := client.getChan(&hdr, Recv) |
| if nch == nil { |
| return |
| } |
| nch.close() |
| } |
| |
| func (client *expClient) unackedCount() int64 { |
| client.mu.Lock() |
| n := client.seqNum - client.ackNum |
| client.mu.Unlock() |
| return n |
| } |
| |
| func (client *expClient) seq() int64 { |
| client.mu.Lock() |
| n := client.seqNum |
| client.mu.Unlock() |
| return n |
| } |
| |
| func (client *expClient) ack() int64 { |
| client.mu.Lock() |
| n := client.seqNum |
| client.mu.Unlock() |
| return n |
| } |
| |
| // Serve waits for incoming connections on the listener |
| // and serves the Exporter's channels on each. |
| // It blocks until the listener is closed. |
| func (exp *Exporter) Serve(listener net.Listener) { |
| for { |
| conn, err := listener.Accept() |
| if err != nil { |
| expLog("listen:", err) |
| break |
| } |
| go exp.ServeConn(conn) |
| } |
| } |
| |
| // ServeConn exports the Exporter's channels on conn. |
| // It blocks until the connection is terminated. |
| func (exp *Exporter) ServeConn(conn io.ReadWriter) { |
| exp.addClient(conn).run() |
| } |
| |
| // NewExporter creates a new Exporter that exports a set of channels. |
| func NewExporter() *Exporter { |
| e := &Exporter{ |
| clientSet: &clientSet{ |
| names: make(map[string]*chanDir), |
| clients: make(map[unackedCounter]bool), |
| }, |
| } |
| return e |
| } |
| |
| // ListenAndServe exports the exporter's channels through the |
| // given network and local address defined as in net.Listen. |
| func (exp *Exporter) ListenAndServe(network, localaddr string) os.Error { |
| listener, err := net.Listen(network, localaddr) |
| if err != nil { |
| return err |
| } |
| go exp.Serve(listener) |
| return nil |
| } |
| |
| // addClient creates a new expClient and records its existence |
| func (exp *Exporter) addClient(conn io.ReadWriter) *expClient { |
| client := newClient(exp, conn) |
| exp.mu.Lock() |
| exp.clients[client] = true |
| exp.mu.Unlock() |
| return client |
| } |
| |
| // delClient forgets the client existed |
| func (exp *Exporter) delClient(client *expClient) { |
| exp.mu.Lock() |
| exp.clients[client] = false, false |
| exp.mu.Unlock() |
| } |
| |
| // Drain waits until all messages sent from this exporter/importer, including |
| // those not yet sent to any client and possibly including those sent while |
| // Drain was executing, have been received by the importer. In short, it |
| // waits until all the exporter's messages have been received by a client. |
| // If the timeout (measured in nanoseconds) is positive and Drain takes |
| // longer than that to complete, an error is returned. |
| func (exp *Exporter) Drain(timeout int64) os.Error { |
| // This wrapper function is here so the method's comment will appear in godoc. |
| return exp.clientSet.drain(timeout) |
| } |
| |
| // Sync waits until all clients of the exporter have received the messages |
| // that were sent at the time Sync was invoked. Unlike Drain, it does not |
| // wait for messages sent while it is running or messages that have not been |
| // dispatched to any client. If the timeout (measured in nanoseconds) is |
| // positive and Sync takes longer than that to complete, an error is |
| // returned. |
| func (exp *Exporter) Sync(timeout int64) os.Error { |
| // This wrapper function is here so the method's comment will appear in godoc. |
| return exp.clientSet.sync(timeout) |
| } |
| |
| func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) { |
| chanType, ok := reflect.Typeof(chT).(*reflect.ChanType) |
| if !ok { |
| return nil, os.NewError("not a channel") |
| } |
| if dir != Send && dir != Recv { |
| return nil, os.NewError("unknown channel direction") |
| } |
| switch chanType.Dir() { |
| case reflect.BothDir: |
| case reflect.SendDir: |
| if dir != Recv { |
| return nil, os.NewError("to import/export with Send, must provide <-chan") |
| } |
| case reflect.RecvDir: |
| if dir != Send { |
| return nil, os.NewError("to import/export with Recv, must provide chan<-") |
| } |
| } |
| return reflect.NewValue(chT).(*reflect.ChanValue), nil |
| } |
| |
| // Export exports a channel of a given type and specified direction. The |
| // channel to be exported is provided in the call and may be of arbitrary |
| // channel type. |
| // Despite the literal signature, the effective signature is |
| // Export(name string, chT chan T, dir Dir) |
| func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error { |
| ch, err := checkChan(chT, dir) |
| if err != nil { |
| return err |
| } |
| exp.mu.Lock() |
| defer exp.mu.Unlock() |
| _, present := exp.names[name] |
| if present { |
| return os.NewError("channel name already being exported:" + name) |
| } |
| exp.names[name] = &chanDir{ch, dir} |
| return nil |
| } |
| |
| // Hangup disassociates the named channel from the Exporter and closes |
| // the channel. Messages in flight for the channel may be dropped. |
| func (exp *Exporter) Hangup(name string) os.Error { |
| exp.mu.Lock() |
| chDir, ok := exp.names[name] |
| if ok { |
| exp.names[name] = nil, false |
| } |
| // TODO drop all instances of channel from client sets |
| exp.mu.Unlock() |
| if !ok { |
| return os.NewError("netchan export: hangup: no such channel: " + name) |
| } |
| chDir.ch.Close() |
| return nil |
| } |