[dev.fuzz] internal/fuzz: add mutex to workerClient

This prevents workerClient.Close from closing fuzzIn while
workerClient.fuzz is writing to it concurrently. It also prevents
multiple callers from writing to fuzzIn concurrently, though there's
nothing that does that yet.

This should prevent most "broken pipe" errors, though they may still
be possible if worker.stop is called and it needs to kill the process
due to a timeout. In the future, we should detect and ignore those
errors, but for now, they're useful for debugging.

Also, improve documentation on workerClient and workerServer.

Change-Id: Ie2c870392d5e91674d3b1e32b2fa4f9de9ac3eb0
Reviewed-on: https://go-review.googlesource.com/c/go/+/275173
Run-TryBot: Jay Conrod <jayconrod@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Trust: Jay Conrod <jayconrod@google.com>
Reviewed-by: Katie Hockman <katie@golang.org>
diff --git a/src/internal/fuzz/worker.go b/src/internal/fuzz/worker.go
index a194a5f..148cc6d 100644
--- a/src/internal/fuzz/worker.go
+++ b/src/internal/fuzz/worker.go
@@ -13,6 +13,7 @@
 	"os"
 	"os/exec"
 	"runtime"
+	"sync"
 	"time"
 )
 
@@ -102,6 +103,9 @@
 					// TODO(jayconrod): if we get an error here, something failed between
 					// main and the call to testing.F.Fuzz. The error here won't
 					// be useful. Collect stderr, clean it up, and return that.
+					// TODO(jayconrod): we can get EPIPE if w.stop is called concurrently
+					// and it kills the worker process. Suppress this message in
+					// that case.
 					// TODO(jayconrod): what happens if testing.F.Fuzz is never called?
 					// TODO(jayconrod): time out if the test process hangs.
 					fmt.Fprintf(os.Stderr, "communicating with worker: %v\n", err)
@@ -284,7 +288,7 @@
 	if err != nil {
 		return err
 	}
-	srv := &workerServer{workerComm: comm, fn: fn}
+	srv := &workerServer{workerComm: comm, fuzzFn: fn}
 	return srv.serve()
 }
 
@@ -304,23 +308,36 @@
 	Err     string
 }
 
-// workerComm holds objects needed for the worker client and server
-// to communicate.
+// workerComm holds pipes and shared memory used for communication
+// between the coordinator process (client) and a worker process (server).
 type workerComm struct {
 	fuzzIn, fuzzOut *os.File
 	mem             *sharedMem
 }
 
-// workerServer is a minimalist RPC server, run in fuzz worker processes.
+// workerServer is a minimalist RPC server, run by fuzz worker processes.
+// It allows the coordinator process (using workerClient) to call methods in a
+// worker process. This system allows the coordinator to run multiple worker
+// processes in parallel and to collect inputs that caused crashes from shared
+// memory after a worker process terminates unexpectedly.
 type workerServer struct {
 	workerComm
-	fn func([]byte) error
+
+	// fuzzFn runs the worker's fuzz function on the given input and returns
+	// an error if it finds a crasher (the process may also exit or crash).
+	fuzzFn func([]byte) error
 }
 
-// serve deserializes and executes RPCs on a given pair of pipes.
+// serve reads serialized RPC messages on fuzzIn. When serve receives a message,
+// it calls the corresponding method, then sends the serialized result back
+// on fuzzOut.
 //
-// serve returns errors communicating over the pipes. It does not return
-// errors from methods; those are passed through response values.
+// serve handles RPC calls synchronously; it will not attempt to read a message
+// until the previous call has finished.
+//
+// serve returns errors that occurred when communicating over pipes. serve
+// does not return errors from method calls; those are passed through serialized
+// responses.
 func (ws *workerServer) serve() error {
 	enc := json.NewEncoder(ws.fuzzOut)
 	dec := json.NewDecoder(ws.fuzzIn)
@@ -358,7 +375,7 @@
 			return fuzzResponse{}
 		default:
 			b := mutate(value)
-			if err := ws.fn(b); err != nil {
+			if err := ws.fuzzFn(b); err != nil {
 				return fuzzResponse{Crasher: b, Err: err.Error()}
 			}
 			// TODO(jayconrod,katiehockman): return early if coverage is expanded
@@ -366,9 +383,13 @@
 	}
 }
 
-// workerClient is a minimalist RPC client, run in the fuzz coordinator.
+// workerClient is a minimalist RPC client. The coordinator process uses a
+// workerClient to call methods in each worker process (handled by
+// workerServer).
 type workerClient struct {
 	workerComm
+
+	mu  sync.Mutex
 	enc *json.Encoder
 	dec *json.Decoder
 }
@@ -385,6 +406,9 @@
 // closing fuzz_in. Close drains fuzz_out (avoiding a SIGPIPE in the worker),
 // and closes it after the worker process closes the other end.
 func (wc *workerClient) Close() error {
+	wc.mu.Lock()
+	defer wc.mu.Unlock()
+
 	// Close fuzzIn. This signals to the server that there are no more calls,
 	// and it should exit.
 	if err := wc.fuzzIn.Close(); err != nil {
@@ -403,6 +427,9 @@
 
 // fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
 func (wc *workerClient) fuzz(value []byte, args fuzzArgs) (fuzzResponse, error) {
+	wc.mu.Lock()
+	defer wc.mu.Unlock()
+
 	wc.mem.setValue(value)
 	c := call{Fuzz: &args}
 	if err := wc.enc.Encode(c); err != nil {