| // Copyright 2009 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| /* |
| The rpc package provides access to the public methods of an object across a |
| network or other I/O connection. A server registers an object, making it visible |
| as a service with the name of the type of the object. After registration, public |
| methods of the object will be accessible remotely. A server may register multiple |
| objects (services) of different types but it is an error to register multiple |
| objects of the same type. |
| |
| Only methods that satisfy these criteria will be made available for remote access; |
| other methods will be ignored: |
| |
| - the method name is publicly visible, that is, begins with an upper case letter. |
| - the method has two arguments, both pointers to publicly visible structs. |
| - the method has return type os.Error. |
| |
| The method's first argument represents the arguments provided by the caller; the |
| second argument represents the result parameters to be returned to the caller. |
| The method's return value, if non-nil, is passed back as a string that the client |
| sees as an os.ErrorString. |
| |
| The server may handle requests on a single connection by calling ServeConn. More |
| typically it will create a network listener and call Accept or, for an HTTP |
| listener, HandleHTTP and http.Serve. |
| |
| A client wishing to use the service establishes a connection and then invokes |
| NewClient on the connection. The convenience function Dial (DialHTTP) performs |
| both steps for a raw network connection (an HTTP connection). The resulting |
| Client object has two methods, Call and Go, that specify the service and method to |
| call, a structure containing the arguments, and a structure to receive the result |
| parameters. |
| |
| Call waits for the remote call to complete; Go launches the call asynchronously |
| and returns a channel that will signal completion. |
| |
| Package "gob" is used to transport the data. |
| |
| Here is a simple example. A server wishes to export an object of type Arith: |
| |
| package server |
| |
| type Args struct { |
| A, B int |
| } |
| |
| type Reply struct { |
| C int |
| } |
| |
| type Arith int |
| |
| func (t *Arith) Multiply(args *Args, reply *Reply) os.Error { |
| reply.C = args.A * args.B |
| return nil |
| } |
| |
| func (t *Arith) Divide(args *Args, reply *Reply) os.Error { |
| if args.B == 0 { |
| return os.ErrorString("divide by zero") |
| } |
| reply.C = args.A / args.B |
| return nil |
| } |
| |
| The server calls (for HTTP service): |
| |
| arith := new(Arith) |
| rpc.Register(arith) |
| rpc.HandleHTTP() |
| l, e := net.Listen("tcp", ":1234") |
| if e != nil { |
| log.Exit("listen error:", e) |
| } |
| go http.Serve(l, nil) |
| |
| At this point, clients can see a service "Arith" with methods "Arith.Multiply" and |
| "Arith.Divide". To invoke one, a client first dials the server: |
| |
| client, err := rpc.DialHTTP("tcp", serverAddress + ":1234") |
| if err != nil { |
| log.Exit("dialing:", err) |
| } |
| |
| Then it can make a remote call: |
| |
| // Synchronous call |
| args := &server.Args{7,8} |
| reply := new(server.Reply) |
| err = client.Call("Arith.Multiply", args, reply) |
| if err != nil { |
| log.Exit("arith error:", err) |
| } |
| fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply.C) |
| |
| or |
| |
| // Asynchronous call |
| divCall := client.Go("Arith.Divide", args, reply, nil) |
| replyCall := <-divCall.Done // will be equal to divCall |
| // check errors, print, etc. |
| |
| A server implementation will often provide a simple, type-safe wrapper for the |
| client. |
| */ |
| package rpc |
| |
| import ( |
| "gob" |
| "http" |
| "log" |
| "io" |
| "net" |
| "os" |
| "reflect" |
| "strings" |
| "sync" |
| "unicode" |
| "utf8" |
| ) |
| |
| // Precompute the reflect type for os.Error. Can't use os.Error directly |
| // because Typeof takes an empty interface value. This is annoying. |
| var unusedError *os.Error |
| var typeOfOsError = reflect.Typeof(unusedError).(*reflect.PtrType).Elem() |
| |
| type methodType struct { |
| sync.Mutex // protects counters |
| method reflect.Method |
| argType *reflect.PtrType |
| replyType *reflect.PtrType |
| numCalls uint |
| } |
| |
| type service struct { |
| name string // name of service |
| rcvr reflect.Value // receiver of methods for the service |
| typ reflect.Type // type of the receiver |
| method map[string]*methodType // registered methods |
| } |
| |
| // Request is a header written before every RPC call. It is used internally |
| // but documented here as an aid to debugging, such as when analyzing |
| // network traffic. |
| type Request struct { |
| ServiceMethod string // format: "Service.Method" |
| Seq uint64 // sequence number chosen by client |
| } |
| |
| // Response is a header written before every RPC return. It is used internally |
| // but documented here as an aid to debugging, such as when analyzing |
| // network traffic. |
| type Response struct { |
| ServiceMethod string // echoes that of the Request |
| Seq uint64 // echoes that of the request |
| Error string // error, if any. |
| } |
| |
| type serverType struct { |
| sync.Mutex // protects the serviceMap |
| serviceMap map[string]*service |
| } |
| |
| // This variable is a global whose "public" methods are really private methods |
| // called from the global functions of this package: rpc.Register, rpc.ServeConn, etc. |
| // For example, rpc.Register() calls server.add(). |
| var server = &serverType{serviceMap: make(map[string]*service)} |
| |
| // Is this a publicly visible - upper case - name? |
| func isPublic(name string) bool { |
| rune, _ := utf8.DecodeRuneInString(name) |
| return unicode.IsUpper(rune) |
| } |
| |
| func (server *serverType) register(rcvr interface{}) os.Error { |
| server.Lock() |
| defer server.Unlock() |
| if server.serviceMap == nil { |
| server.serviceMap = make(map[string]*service) |
| } |
| s := new(service) |
| s.typ = reflect.Typeof(rcvr) |
| s.rcvr = reflect.NewValue(rcvr) |
| sname := reflect.Indirect(s.rcvr).Type().Name() |
| if sname == "" { |
| log.Exit("rpc: no service name for type", s.typ.String()) |
| } |
| if !isPublic(sname) { |
| s := "rpc Register: type " + sname + " is not public" |
| log.Stderr(s) |
| return os.ErrorString(s) |
| } |
| if _, present := server.serviceMap[sname]; present { |
| return os.ErrorString("rpc: service already defined: " + sname) |
| } |
| s.name = sname |
| s.method = make(map[string]*methodType) |
| |
| // Install the methods |
| for m := 0; m < s.typ.NumMethod(); m++ { |
| method := s.typ.Method(m) |
| mtype := method.Type |
| mname := method.Name |
| if !isPublic(mname) { |
| continue |
| } |
| // Method needs three ins: receiver, *args, *reply. |
| // The args and reply must be structs until gobs are more general. |
| if mtype.NumIn() != 3 { |
| log.Stderr("method", mname, "has wrong number of ins:", mtype.NumIn()) |
| continue |
| } |
| argType, ok := mtype.In(1).(*reflect.PtrType) |
| if !ok { |
| log.Stderr(mname, "arg type not a pointer:", mtype.In(1)) |
| continue |
| } |
| if _, ok := argType.Elem().(*reflect.StructType); !ok { |
| log.Stderr(mname, "arg type not a pointer to a struct:", argType) |
| continue |
| } |
| replyType, ok := mtype.In(2).(*reflect.PtrType) |
| if !ok { |
| log.Stderr(mname, "reply type not a pointer:", mtype.In(2)) |
| continue |
| } |
| if _, ok := replyType.Elem().(*reflect.StructType); !ok { |
| log.Stderr(mname, "reply type not a pointer to a struct:", replyType) |
| continue |
| } |
| if !isPublic(argType.Elem().Name()) { |
| log.Stderr(mname, "argument type not public:", argType) |
| continue |
| } |
| if !isPublic(replyType.Elem().Name()) { |
| log.Stderr(mname, "reply type not public:", replyType) |
| continue |
| } |
| // Method needs one out: os.Error. |
| if mtype.NumOut() != 1 { |
| log.Stderr("method", mname, "has wrong number of outs:", mtype.NumOut()) |
| continue |
| } |
| if returnType := mtype.Out(0); returnType != typeOfOsError { |
| log.Stderr("method", mname, "returns", returnType.String(), "not os.Error") |
| continue |
| } |
| s.method[mname] = &methodType{method: method, argType: argType, replyType: replyType} |
| } |
| |
| if len(s.method) == 0 { |
| s := "rpc Register: type " + sname + " has no public methods of suitable type" |
| log.Stderr(s) |
| return os.ErrorString(s) |
| } |
| server.serviceMap[s.name] = s |
| return nil |
| } |
| |
| // A value sent as a placeholder for the response when the server receives an invalid request. |
| type InvalidRequest struct { |
| marker int |
| } |
| |
| var invalidRequest = InvalidRequest{1} |
| |
| func _new(t *reflect.PtrType) *reflect.PtrValue { |
| v := reflect.MakeZero(t).(*reflect.PtrValue) |
| v.PointTo(reflect.MakeZero(t.Elem())) |
| return v |
| } |
| |
| func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, enc *gob.Encoder, errmsg string) { |
| resp := new(Response) |
| // Encode the response header |
| resp.ServiceMethod = req.ServiceMethod |
| if errmsg != "" { |
| resp.Error = errmsg |
| } |
| resp.Seq = req.Seq |
| sending.Lock() |
| enc.Encode(resp) |
| // Encode the reply value. |
| enc.Encode(reply) |
| sending.Unlock() |
| } |
| |
| func (s *service) call(sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, enc *gob.Encoder) { |
| mtype.Lock() |
| mtype.numCalls++ |
| mtype.Unlock() |
| function := mtype.method.Func |
| // Invoke the method, providing a new value for the reply. |
| returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) |
| // The return value for the method is an os.Error. |
| errInter := returnValues[0].Interface() |
| errmsg := "" |
| if errInter != nil { |
| errmsg = errInter.(os.Error).String() |
| } |
| sendResponse(sending, req, replyv.Interface(), enc, errmsg) |
| } |
| |
| func (server *serverType) input(conn io.ReadWriteCloser) { |
| dec := gob.NewDecoder(conn) |
| enc := gob.NewEncoder(conn) |
| sending := new(sync.Mutex) |
| for { |
| // Grab the request header. |
| req := new(Request) |
| err := dec.Decode(req) |
| if err != nil { |
| if err == os.EOF || err == io.ErrUnexpectedEOF { |
| log.Stderr("rpc: ", err) |
| break |
| } |
| s := "rpc: server cannot decode request: " + err.String() |
| sendResponse(sending, req, invalidRequest, enc, s) |
| continue |
| } |
| serviceMethod := strings.Split(req.ServiceMethod, ".", 0) |
| if len(serviceMethod) != 2 { |
| s := "rpc: service/method request ill:formed: " + req.ServiceMethod |
| sendResponse(sending, req, invalidRequest, enc, s) |
| continue |
| } |
| // Look up the request. |
| server.Lock() |
| service, ok := server.serviceMap[serviceMethod[0]] |
| server.Unlock() |
| if !ok { |
| s := "rpc: can't find service " + req.ServiceMethod |
| sendResponse(sending, req, invalidRequest, enc, s) |
| continue |
| } |
| mtype, ok := service.method[serviceMethod[1]] |
| if !ok { |
| s := "rpc: can't find method " + req.ServiceMethod |
| sendResponse(sending, req, invalidRequest, enc, s) |
| continue |
| } |
| // Decode the argument value. |
| argv := _new(mtype.argType) |
| replyv := _new(mtype.replyType) |
| err = dec.Decode(argv.Interface()) |
| if err != nil { |
| log.Stderr("rpc: tearing down", serviceMethod[0], "connection:", err) |
| sendResponse(sending, req, replyv.Interface(), enc, err.String()) |
| continue |
| } |
| go service.call(sending, mtype, req, argv, replyv, enc) |
| } |
| conn.Close() |
| } |
| |
| func (server *serverType) accept(lis net.Listener) { |
| for { |
| conn, err := lis.Accept() |
| if err != nil { |
| log.Exit("rpc.Serve: accept:", err.String()) // TODO(r): exit? |
| } |
| go server.input(conn) |
| } |
| } |
| |
| // Register publishes in the server the set of methods of the |
| // receiver value that satisfy the following conditions: |
| // - public method |
| // - two arguments, both pointers to public structs |
| // - one return value of type os.Error |
| // It returns an error if the receiver is not public or has no |
| // suitable methods. |
| func Register(rcvr interface{}) os.Error { return server.register(rcvr) } |
| |
| // ServeConn runs the server on a single connection. When the connection |
| // completes, service terminates. ServeConn blocks; the caller typically |
| // invokes it in a go statement. |
| func ServeConn(conn io.ReadWriteCloser) { go server.input(conn) } |
| |
| // Accept accepts connections on the listener and serves requests |
| // for each incoming connection. Accept blocks; the caller typically |
| // invokes it in a go statement. |
| func Accept(lis net.Listener) { server.accept(lis) } |
| |
| // Can connect to RPC service using HTTP CONNECT to rpcPath. |
| var rpcPath string = "/_goRPC_" |
| var debugPath string = "/debug/rpc" |
| var connected = "200 Connected to Go RPC" |
| |
| func serveHTTP(c *http.Conn, req *http.Request) { |
| if req.Method != "CONNECT" { |
| c.SetHeader("Content-Type", "text/plain; charset=utf-8") |
| c.WriteHeader(http.StatusMethodNotAllowed) |
| io.WriteString(c, "405 must CONNECT to "+rpcPath+"\n") |
| return |
| } |
| conn, _, err := c.Hijack() |
| if err != nil { |
| log.Stderr("rpc hijacking ", c.RemoteAddr, ": ", err.String()) |
| return |
| } |
| io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n") |
| server.input(conn) |
| } |
| |
| // HandleHTTP registers an HTTP handler for RPC messages. |
| // It is still necessary to invoke http.Serve(), typically in a go statement. |
| func HandleHTTP() { |
| http.Handle(rpcPath, http.HandlerFunc(serveHTTP)) |
| http.Handle(debugPath, http.HandlerFunc(debugHTTP)) |
| } |