| // Copyright 2009 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package rpc |
| |
| import ( |
| "bufio" |
| "gob" |
| "http" |
| "io" |
| "log" |
| "net" |
| "os" |
| "sync" |
| ) |
| |
| // Call represents an active RPC. |
| type Call struct { |
| ServiceMethod string // The name of the service and method to call. |
| Args interface{} // The argument to the function (*struct). |
| Reply interface{} // The reply from the function (*struct). |
| Error os.Error // After completion, the error status. |
| Done chan *Call // Strobes when call is complete; value is the error status. |
| seq uint64 |
| } |
| |
| // Client represents an RPC Client. |
| // There may be multiple outstanding Calls associated |
| // with a single Client. |
| type Client struct { |
| mutex sync.Mutex // protects pending, seq |
| shutdown os.Error // non-nil if the client is shut down |
| sending sync.Mutex |
| seq uint64 |
| codec ClientCodec |
| pending map[uint64]*Call |
| closing bool |
| } |
| |
| // A ClientCodec implements writing of RPC requests and |
| // reading of RPC responses for the client side of an RPC session. |
| // The client calls WriteRequest to write a request to the connection |
| // and calls ReadResponseHeader and ReadResponseBody in pairs |
| // to read responses. The client calls Close when finished with the |
| // connection. |
| type ClientCodec interface { |
| WriteRequest(*Request, interface{}) os.Error |
| ReadResponseHeader(*Response) os.Error |
| ReadResponseBody(interface{}) os.Error |
| |
| Close() os.Error |
| } |
| |
| func (client *Client) send(c *Call) { |
| // Register this call. |
| client.mutex.Lock() |
| if client.shutdown != nil { |
| c.Error = client.shutdown |
| client.mutex.Unlock() |
| _ = c.Done <- c // do not block |
| return |
| } |
| c.seq = client.seq |
| client.seq++ |
| client.pending[c.seq] = c |
| client.mutex.Unlock() |
| |
| // Encode and send the request. |
| request := new(Request) |
| client.sending.Lock() |
| request.Seq = c.seq |
| request.ServiceMethod = c.ServiceMethod |
| if err := client.codec.WriteRequest(request, c.Args); err != nil { |
| panic("rpc: client encode error: " + err.String()) |
| } |
| client.sending.Unlock() |
| } |
| |
| func (client *Client) input() { |
| var err os.Error |
| for err == nil { |
| response := new(Response) |
| err = client.codec.ReadResponseHeader(response) |
| if err != nil { |
| if err == os.EOF && !client.closing { |
| err = io.ErrUnexpectedEOF |
| } |
| break |
| } |
| seq := response.Seq |
| client.mutex.Lock() |
| c := client.pending[seq] |
| client.pending[seq] = c, false |
| client.mutex.Unlock() |
| err = client.codec.ReadResponseBody(c.Reply) |
| // Empty strings should turn into nil os.Errors |
| if response.Error != "" { |
| c.Error = os.ErrorString(response.Error) |
| } else { |
| c.Error = nil |
| } |
| // We don't want to block here. It is the caller's responsibility to make |
| // sure the channel has enough buffer space. See comment in Go(). |
| _ = c.Done <- c // do not block |
| } |
| // Terminate pending calls. |
| client.mutex.Lock() |
| client.shutdown = err |
| for _, call := range client.pending { |
| call.Error = err |
| _ = call.Done <- call // do not block |
| } |
| client.mutex.Unlock() |
| if err != os.EOF || !client.closing { |
| log.Stderr("rpc: client protocol error:", err) |
| } |
| } |
| |
| // NewClient returns a new Client to handle requests to the |
| // set of services at the other end of the connection. |
| func NewClient(conn io.ReadWriteCloser) *Client { |
| return NewClientWithCodec(&gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(conn)}) |
| } |
| |
| // NewClientWithCodec is like NewClient but uses the specified |
| // codec to encode requests and decode responses. |
| func NewClientWithCodec(codec ClientCodec) *Client { |
| client := &Client{ |
| codec: codec, |
| pending: make(map[uint64]*Call), |
| } |
| go client.input() |
| return client |
| } |
| |
| type gobClientCodec struct { |
| rwc io.ReadWriteCloser |
| dec *gob.Decoder |
| enc *gob.Encoder |
| } |
| |
| func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) os.Error { |
| if err := c.enc.Encode(r); err != nil { |
| return err |
| } |
| return c.enc.Encode(body) |
| } |
| |
| func (c *gobClientCodec) ReadResponseHeader(r *Response) os.Error { |
| return c.dec.Decode(r) |
| } |
| |
| func (c *gobClientCodec) ReadResponseBody(body interface{}) os.Error { |
| return c.dec.Decode(body) |
| } |
| |
| func (c *gobClientCodec) Close() os.Error { |
| return c.rwc.Close() |
| } |
| |
| |
| // DialHTTP connects to an HTTP RPC server at the specified network address. |
| func DialHTTP(network, address string) (*Client, os.Error) { |
| var err os.Error |
| conn, err := net.Dial(network, "", address) |
| if err != nil { |
| return nil, err |
| } |
| io.WriteString(conn, "CONNECT "+rpcPath+" HTTP/1.0\n\n") |
| |
| // Require successful HTTP response |
| // before switching to RPC protocol. |
| resp, err := http.ReadResponse(bufio.NewReader(conn), "CONNECT") |
| if err == nil && resp.Status == connected { |
| return NewClient(conn), nil |
| } |
| if err == nil { |
| err = os.ErrorString("unexpected HTTP response: " + resp.Status) |
| } |
| conn.Close() |
| return nil, &net.OpError{"dial-http", network + " " + address, nil, err} |
| } |
| |
| // Dial connects to an RPC server at the specified network address. |
| func Dial(network, address string) (*Client, os.Error) { |
| conn, err := net.Dial(network, "", address) |
| if err != nil { |
| return nil, err |
| } |
| return NewClient(conn), nil |
| } |
| |
| func (client *Client) Close() os.Error { |
| if client.shutdown != nil || client.closing { |
| return os.ErrorString("rpc: already closed") |
| } |
| client.mutex.Lock() |
| client.closing = true |
| client.mutex.Unlock() |
| return client.codec.Close() |
| } |
| |
| // Go invokes the function asynchronously. It returns the Call structure representing |
| // the invocation. The done channel will signal when the call is complete by returning |
| // the same Call object. If done is nil, Go will allocate a new channel. |
| // If non-nil, done must be buffered or Go will deliberately crash. |
| func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { |
| c := new(Call) |
| c.ServiceMethod = serviceMethod |
| c.Args = args |
| c.Reply = reply |
| if done == nil { |
| done = make(chan *Call, 10) // buffered. |
| } else { |
| // If caller passes done != nil, it must arrange that |
| // done has enough buffer for the number of simultaneous |
| // RPCs that will be using that channel. If the channel |
| // is totally unbuffered, it's best not to run at all. |
| if cap(done) == 0 { |
| log.Crash("rpc: done channel is unbuffered") |
| } |
| } |
| c.Done = done |
| if client.shutdown != nil { |
| c.Error = client.shutdown |
| _ = c.Done <- c // do not block |
| return c |
| } |
| client.send(c) |
| return c |
| } |
| |
| // Call invokes the named function, waits for it to complete, and returns its error status. |
| func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error { |
| if client.shutdown != nil { |
| return client.shutdown |
| } |
| call := <-client.Go(serviceMethod, args, reply, nil).Done |
| return call.Error |
| } |