jsonrpc2: JSON-RPC 2.0 implementation

See https://www.jsonrpc.org/specification.

Forked from golang.org/x/tool/internal/jsonrpc2_v2 at
commit 13cf844527f48de3074270bb538dff67117a6655.

Changed to use golang.org/x/exp/event.

This CL also contains a fork of golang.org/x/tools/internal/stack/...
at the same commit, needed for testing.

Change-Id: Ifc634c2b8e368a51fc43a7c0fab46cfa0ed5464b
Reviewed-on: https://go-review.googlesource.com/c/exp/+/326676
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Ian Cottrell <iancottrell@google.com>
diff --git a/internal/stack/gostacks/gostacks.go b/internal/stack/gostacks/gostacks.go
new file mode 100644
index 0000000..b2fb4e4
--- /dev/null
+++ b/internal/stack/gostacks/gostacks.go
@@ -0,0 +1,23 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The gostacks command processes stdin looking for things that look like
+// stack traces and simplifying them to make the log more readable.
+// It collates stack traces that have the same path as well as simplifying the
+// individual lines of the trace.
+// The processed log is printed to stdout.
+package main
+
+import (
+	"fmt"
+	"os"
+
+	"golang.org/x/exp/internal/stack"
+)
+
+func main() {
+	if err := stack.Process(os.Stdout, os.Stdin); err != nil {
+		fmt.Fprintln(os.Stderr, err)
+	}
+}
diff --git a/internal/stack/parse.go b/internal/stack/parse.go
new file mode 100644
index 0000000..e01da8f
--- /dev/null
+++ b/internal/stack/parse.go
@@ -0,0 +1,175 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stack
+
+import (
+	"bufio"
+	"errors"
+	"io"
+	"regexp"
+	"strconv"
+)
+
+var (
+	reBlank     = regexp.MustCompile(`^\s*$`)
+	reGoroutine = regexp.MustCompile(`^\s*goroutine (\d+) \[([^\]]*)\]:\s*$`)
+	reCall      = regexp.MustCompile(`^\s*` +
+		`(created by )?` + //marker
+		`(([\w/.]+/)?[\w]+)\.` + //package
+		`(\(([^:.)]*)\)\.)?` + //optional type
+		`([\w\.]+)` + //function
+		`(\(.*\))?` + // args
+		`\s*$`)
+	rePos = regexp.MustCompile(`^\s*(.*):(\d+)( .*)?$`)
+
+	errBreakParse = errors.New("break parse")
+)
+
+// Scanner splits an input stream into lines in a way that is consumable by
+// the parser.
+type Scanner struct {
+	lines *bufio.Scanner
+	done  bool
+}
+
+// NewScanner creates a scanner on top of a reader.
+func NewScanner(r io.Reader) *Scanner {
+	s := &Scanner{
+		lines: bufio.NewScanner(r),
+	}
+	s.Skip() // prefill
+	return s
+}
+
+// Peek returns the next line without consuming it.
+func (s *Scanner) Peek() string {
+	if s.done {
+		return ""
+	}
+	return s.lines.Text()
+}
+
+// Skip consumes the next line without looking at it.
+// Normally used after it has already been looked at using Peek.
+func (s *Scanner) Skip() {
+	if !s.lines.Scan() {
+		s.done = true
+	}
+}
+
+// Next consumes and returns the next line.
+func (s *Scanner) Next() string {
+	line := s.Peek()
+	s.Skip()
+	return line
+}
+
+// Done returns true if the scanner has reached the end of the underlying
+// stream.
+func (s *Scanner) Done() bool {
+	return s.done
+}
+
+// Err returns true if the scanner has reached the end of the underlying
+// stream.
+func (s *Scanner) Err() error {
+	return s.lines.Err()
+}
+
+// Match returns the submatchs of the regular expression against the next line.
+// If it matched the line is also consumed.
+func (s *Scanner) Match(re *regexp.Regexp) []string {
+	if s.done {
+		return nil
+	}
+	match := re.FindStringSubmatch(s.Peek())
+	if match != nil {
+		s.Skip()
+	}
+	return match
+}
+
+// SkipBlank skips any number of pure whitespace lines.
+func (s *Scanner) SkipBlank() {
+	for !s.done {
+		line := s.Peek()
+		if len(line) != 0 && !reBlank.MatchString(line) {
+			return
+		}
+		s.Skip()
+	}
+}
+
+// Parse the current contiguous block of goroutine stack traces until the
+// scanned content no longer matches.
+func Parse(scanner *Scanner) (Dump, error) {
+	dump := Dump{}
+	for {
+		gr, ok := parseGoroutine(scanner)
+		if !ok {
+			return dump, nil
+		}
+		dump = append(dump, gr)
+	}
+}
+
+func parseGoroutine(scanner *Scanner) (Goroutine, bool) {
+	match := scanner.Match(reGoroutine)
+	if match == nil {
+		return Goroutine{}, false
+	}
+	id, _ := strconv.ParseInt(match[1], 0, 32)
+	gr := Goroutine{
+		ID:    int(id),
+		State: match[2],
+	}
+	for {
+		frame, ok := parseFrame(scanner)
+		if !ok {
+			scanner.SkipBlank()
+			return gr, true
+		}
+		if frame.Position.Filename != "" {
+			gr.Stack = append(gr.Stack, frame)
+		}
+	}
+}
+
+func parseFrame(scanner *Scanner) (Frame, bool) {
+	fun, ok := parseFunction(scanner)
+	if !ok {
+		return Frame{}, false
+	}
+	frame := Frame{
+		Function: fun,
+	}
+	frame.Position, ok = parsePosition(scanner)
+	// if ok is false, then this is a broken state.
+	// we got the func but not the file that must follow
+	// the consumed line can be recovered from the frame
+	//TODO: push back the fun raw
+	return frame, ok
+}
+
+func parseFunction(scanner *Scanner) (Function, bool) {
+	match := scanner.Match(reCall)
+	if match == nil {
+		return Function{}, false
+	}
+	return Function{
+		Package: match[2],
+		Type:    match[5],
+		Name:    match[6],
+	}, true
+}
+
+func parsePosition(scanner *Scanner) (Position, bool) {
+	match := scanner.Match(rePos)
+	if match == nil {
+		return Position{}, false
+	}
+	line, _ := strconv.ParseInt(match[2], 0, 32)
+	return Position{Filename: match[1], Line: int(line)}, true
+}
diff --git a/internal/stack/process.go b/internal/stack/process.go
new file mode 100644
index 0000000..ac19366
--- /dev/null
+++ b/internal/stack/process.go
@@ -0,0 +1,112 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stack
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"runtime"
+	"sort"
+)
+
+// Capture get the current stack traces from the runtime.
+func Capture() Dump {
+	buf := make([]byte, 2<<20)
+	buf = buf[:runtime.Stack(buf, true)]
+	scanner := NewScanner(bytes.NewReader(buf))
+	dump, _ := Parse(scanner)
+	return dump
+}
+
+// Summarize a dump for easier consumption.
+// This collates goroutines with equivalent stacks.
+func Summarize(dump Dump) Summary {
+	s := Summary{
+		Total: len(dump),
+	}
+	for _, gr := range dump {
+		s.addGoroutine(gr)
+	}
+	return s
+}
+
+// Process and input stream to an output stream, summarizing any stacks that
+// are detected in place.
+func Process(out io.Writer, in io.Reader) error {
+	scanner := NewScanner(in)
+	for {
+		dump, err := Parse(scanner)
+		summary := Summarize(dump)
+		switch {
+		case len(dump) > 0:
+			fmt.Fprintf(out, "%+v\n\n", summary)
+		case err != nil:
+			return err
+		case scanner.Done():
+			return scanner.Err()
+		default:
+			// must have been a line that is not part of a dump
+			fmt.Fprintln(out, scanner.Next())
+		}
+	}
+}
+
+// Diff calculates the delta between two dumps.
+func Diff(before, after Dump) Delta {
+	result := Delta{}
+	processed := make(map[int]bool)
+	for _, gr := range before {
+		processed[gr.ID] = false
+	}
+	for _, gr := range after {
+		if _, found := processed[gr.ID]; found {
+			result.Shared = append(result.Shared, gr)
+		} else {
+			result.After = append(result.After, gr)
+		}
+		processed[gr.ID] = true
+	}
+	for _, gr := range before {
+		if done := processed[gr.ID]; !done {
+			result.Before = append(result.Before, gr)
+		}
+	}
+	return result
+}
+
+// TODO: do we want to allow contraction of stacks before comparison?
+func (s *Summary) addGoroutine(gr Goroutine) {
+	index := sort.Search(len(s.Calls), func(i int) bool {
+		return !s.Calls[i].Stack.less(gr.Stack)
+	})
+	if index >= len(s.Calls) || !s.Calls[index].Stack.equal(gr.Stack) {
+		// insert new stack, first increase the length
+		s.Calls = append(s.Calls, Call{})
+		// move the top part upward to make space
+		copy(s.Calls[index+1:], s.Calls[index:])
+		// insert the new call
+		s.Calls[index] = Call{
+			Stack: gr.Stack,
+		}
+	}
+	// merge the goroutine into the matched call
+	s.Calls[index].merge(gr)
+}
+
+//TODO: do we want other grouping strategies?
+func (c *Call) merge(gr Goroutine) {
+	for i := range c.Groups {
+		canditate := &c.Groups[i]
+		if canditate.State == gr.State {
+			canditate.Goroutines = append(canditate.Goroutines, gr)
+			return
+		}
+	}
+	c.Groups = append(c.Groups, Group{
+		State:      gr.State,
+		Goroutines: []Goroutine{gr},
+	})
+}
diff --git a/internal/stack/stack.go b/internal/stack/stack.go
new file mode 100644
index 0000000..479301a
--- /dev/null
+++ b/internal/stack/stack.go
@@ -0,0 +1,170 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package stack provides support for parsing standard goroutine stack traces.
+package stack
+
+import (
+	"fmt"
+	"text/tabwriter"
+)
+
+// Dump is a raw set of goroutines and their stacks.
+type Dump []Goroutine
+
+// Goroutine is a single parsed goroutine dump.
+type Goroutine struct {
+	State string // state that the goroutine is in.
+	ID    int    // id of the goroutine.
+	Stack Stack  // call frames that make up the stack
+}
+
+// Stack is a set of frames in a callstack.
+type Stack []Frame
+
+// Frame is a point in a call stack.
+type Frame struct {
+	Function Function
+	Position Position
+}
+
+// Function is the function called at a frame.
+type Function struct {
+	Package string // package name of function if known
+	Type    string // if set function is a method of this type
+	Name    string // function name of the frame
+}
+
+// Position is the file position for a frame.
+type Position struct {
+	Filename string // source filename
+	Line     int    // line number within file
+}
+
+// Summary is a set of stacks processed and collated into Calls.
+type Summary struct {
+	Total int    // the total count of goroutines in the summary
+	Calls []Call // the collated stack traces
+}
+
+// Call is set of goroutines that all share the same callstack.
+// They will be grouped by state.
+type Call struct {
+	Stack  Stack   // the shared callstack information
+	Groups []Group // the sets of goroutines with the same state
+}
+
+// Group is a set of goroutines with the same stack that are in the same state.
+type Group struct {
+	State      string      // the shared state of the goroutines
+	Goroutines []Goroutine // the set of goroutines in this group
+}
+
+// Delta represents the difference between two stack dumps.
+type Delta struct {
+	Before Dump // The goroutines that were only in the before set.
+	Shared Dump // The goroutines that were in both sets.
+	After  Dump // The goroutines that were only in the after set.
+}
+
+func (s Stack) equal(other Stack) bool {
+	if len(s) != len(other) {
+		return false
+	}
+	for i, frame := range s {
+		if !frame.equal(other[i]) {
+			return false
+		}
+	}
+	return true
+}
+
+func (s Stack) less(other Stack) bool {
+	for i, frame := range s {
+		if i >= len(other) {
+			return false
+		}
+		if frame.less(other[i]) {
+			return true
+		}
+		if !frame.equal(other[i]) {
+			return false
+		}
+	}
+	return len(s) < len(other)
+}
+
+func (f Frame) equal(other Frame) bool {
+	return f.Position.equal(other.Position)
+}
+
+func (f Frame) less(other Frame) bool {
+	return f.Position.less(other.Position)
+}
+
+func (p Position) equal(other Position) bool {
+	return p.Filename == other.Filename && p.Line == other.Line
+}
+
+func (p Position) less(other Position) bool {
+	if p.Filename < other.Filename {
+		return true
+	}
+	if p.Filename > other.Filename {
+		return false
+	}
+	return p.Line < other.Line
+}
+
+func (s Summary) Format(w fmt.State, r rune) {
+	tw := tabwriter.NewWriter(w, 0, 0, 1, ' ', 0)
+	for i, c := range s.Calls {
+		if i > 0 {
+			fmt.Fprintf(tw, "\n\n")
+			tw.Flush()
+		}
+		fmt.Fprint(tw, c)
+	}
+	tw.Flush()
+	if s.Total > 0 && w.Flag('+') {
+		fmt.Fprintf(w, "\n\n%d goroutines, %d unique", s.Total, len(s.Calls))
+	}
+}
+
+func (c Call) Format(w fmt.State, r rune) {
+	for i, g := range c.Groups {
+		if i > 0 {
+			fmt.Fprint(w, " ")
+		}
+		fmt.Fprint(w, g)
+	}
+	for _, f := range c.Stack {
+		fmt.Fprintf(w, "\n%v", f)
+	}
+}
+
+func (g Group) Format(w fmt.State, r rune) {
+	fmt.Fprintf(w, "[%v]: ", g.State)
+	for i, gr := range g.Goroutines {
+		if i > 0 {
+			fmt.Fprint(w, ", ")
+		}
+		fmt.Fprintf(w, "$%d", gr.ID)
+	}
+}
+
+func (f Frame) Format(w fmt.State, c rune) {
+	fmt.Fprintf(w, "%v:\t%v", f.Position, f.Function)
+}
+
+func (f Function) Format(w fmt.State, c rune) {
+	if f.Type != "" {
+		fmt.Fprintf(w, "(%v).", f.Type)
+	}
+	fmt.Fprintf(w, "%v", f.Name)
+}
+
+func (p Position) Format(w fmt.State, c rune) {
+	fmt.Fprintf(w, "%v:%v", p.Filename, p.Line)
+}
diff --git a/internal/stack/stack_test.go b/internal/stack/stack_test.go
new file mode 100644
index 0000000..8f22236
--- /dev/null
+++ b/internal/stack/stack_test.go
@@ -0,0 +1,193 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stack_test
+
+import (
+	"bytes"
+	"strings"
+	"testing"
+
+	"golang.org/x/exp/internal/stack"
+)
+
+func TestProcess(t *testing.T) {
+	for _, test := range []struct{ name, input, expect string }{{
+		name:   `empty`,
+		input:  ``,
+		expect: ``,
+	}, {
+		name:  `no_frame`,
+		input: `goroutine 1 [running]:`,
+		expect: `
+[running]: $1
+
+1 goroutines, 1 unique
+`,
+	}, {
+		name: `one_frame`,
+		input: `
+goroutine 1 [running]:
+package.function(args)
+	file.go:10
+`,
+		expect: `
+[running]: $1
+file.go:10: function
+
+1 goroutines, 1 unique
+`,
+	}, {
+		name: `one_call`,
+		input: `
+goroutine 1 [running]:
+package1.functionA(args)
+	file1.go:10
+package2.functionB(args)
+	file2.go:20
+package3.functionC(args)
+	file3.go:30
+`,
+		expect: `
+[running]: $1
+file1.go:10: functionA
+file2.go:20: functionB
+file3.go:30: functionC
+
+1 goroutines, 1 unique
+`,
+	}, {
+		name: `two_call`,
+		input: `
+goroutine 1 [running]:
+package1.functionA(args)
+	file1.go:10
+goroutine 2 [running]:
+package2.functionB(args)
+	file2.go:20
+`,
+		expect: `
+[running]: $1
+file1.go:10: functionA
+
+[running]: $2
+file2.go:20: functionB
+
+2 goroutines, 2 unique
+`,
+	}, {
+		name: `merge_call`,
+		input: `
+goroutine 1 [running]:
+package1.functionA(args)
+	file1.go:10
+goroutine 2 [running]:
+package1.functionA(args)
+	file1.go:10
+`,
+		expect: `
+[running]: $1, $2
+file1.go:10: functionA
+
+2 goroutines, 1 unique
+`,
+	}, {
+		name: `alternating_call`,
+		input: `
+goroutine 1 [running]:
+package1.functionA(args)
+	file1.go:10
+goroutine 2 [running]:
+package2.functionB(args)
+	file2.go:20
+goroutine 3 [running]:
+package1.functionA(args)
+	file1.go:10
+goroutine 4 [running]:
+package2.functionB(args)
+	file2.go:20
+goroutine 5 [running]:
+package1.functionA(args)
+	file1.go:10
+goroutine 6 [running]:
+package2.functionB(args)
+	file2.go:20
+`,
+		expect: `
+[running]: $1, $3, $5
+file1.go:10: functionA
+
+[running]: $2, $4, $6
+file2.go:20: functionB
+
+6 goroutines, 2 unique
+`,
+	}, {
+		name: `sort_calls`,
+		input: `
+goroutine 1 [running]:
+package3.functionC(args)
+	file3.go:30
+goroutine 2 [running]:
+package2.functionB(args)
+	file2.go:20
+goroutine 3 [running]:
+package1.functionA(args)
+	file1.go:10
+`,
+		expect: `
+[running]: $3
+file1.go:10: functionA
+
+[running]: $2
+file2.go:20: functionB
+
+[running]: $1
+file3.go:30: functionC
+
+3 goroutines, 3 unique
+`,
+	}, {
+		name: `real_single`,
+		input: `
+panic: oops
+
+goroutine 53 [running]:
+golang.org/x/tools/internal/jsonrpc2_test.testHandler.func1(0x1240c20, 0xc000013350, 0xc0000133b0, 0x1240ca0, 0xc00002ab00, 0x3, 0x3)
+	/work/tools/internal/jsonrpc2/jsonrpc2_test.go:160 +0x74c
+golang.org/x/tools/internal/jsonrpc2.(*Conn).Run(0xc000204330, 0x1240c20, 0xc000204270, 0x1209570, 0xc000212120, 0x1242700)
+	/work/tools/internal/jsonrpc2/jsonrpc2.go:187 +0x777
+golang.org/x/tools/internal/jsonrpc2_test.run.func1(0x123ebe0, 0xc000206018, 0x123ec20, 0xc000206010, 0xc0002080a0, 0xc000204330, 0x1240c20, 0xc000204270, 0xc000212120)
+	/work/tools/internal/jsonrpc2/jsonrpc2_test.go:131 +0xe2
+created by golang.org/x/tools/internal/jsonrpc2_test.run
+	/work/tools/internal/jsonrpc2/jsonrpc2_test.go:121 +0x263
+FAIL    golang.org/x/tools/internal/jsonrpc2    0.252s
+FAIL
+`,
+		expect: `
+panic: oops
+
+[running]: $53
+/work/tools/internal/jsonrpc2/jsonrpc2_test.go:160: testHandler.func1
+/work/tools/internal/jsonrpc2/jsonrpc2.go:187:      (*Conn).Run
+/work/tools/internal/jsonrpc2/jsonrpc2_test.go:131: run.func1
+/work/tools/internal/jsonrpc2/jsonrpc2_test.go:121: run
+
+1 goroutines, 1 unique
+
+FAIL    golang.org/x/tools/internal/jsonrpc2    0.252s
+FAIL
+`,
+	}} {
+		t.Run(test.name, func(t *testing.T) {
+			buf := &bytes.Buffer{}
+			stack.Process(buf, strings.NewReader(test.input))
+			expect := strings.TrimSpace(test.expect)
+			got := strings.TrimSpace(buf.String())
+			if got != expect {
+				t.Errorf("got:\n%s\nexpect:\n%s", got, expect)
+			}
+		})
+	}
+}
diff --git a/internal/stack/stacktest/stacktest.go b/internal/stack/stacktest/stacktest.go
new file mode 100644
index 0000000..64bc6c5
--- /dev/null
+++ b/internal/stack/stacktest/stacktest.go
@@ -0,0 +1,50 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package stacktest
+
+import (
+	"testing"
+	"time"
+
+	"golang.org/x/exp/internal/stack"
+)
+
+//this is only needed to support pre 1.14 when testing.TB did not have Cleanup
+type withCleanup interface {
+	Cleanup(func())
+}
+
+// the maximum amount of time to wait for goroutines to clean themselves up.
+const maxWait = time.Second
+
+// NoLeak checks that a test (or benchmark) does not leak any goroutines.
+func NoLeak(t testing.TB) {
+	c, ok := t.(withCleanup)
+	if !ok {
+		return
+	}
+	before := stack.Capture()
+	c.Cleanup(func() {
+		var delta stack.Delta
+		start := time.Now()
+		delay := time.Millisecond
+		for {
+			after := stack.Capture()
+			delta = stack.Diff(before, after)
+			if len(delta.After) == 0 {
+				// no leaks
+				return
+			}
+			if time.Since(start) > maxWait {
+				break
+			}
+			time.Sleep(delay)
+			delay *= 2
+		}
+		// it's been long enough, and leaks are still present
+		summary := stack.Summarize(delta.After)
+		t.Errorf("goroutine leak detected:\n%+v", summary)
+	})
+}
diff --git a/jsonrpc2/conn.go b/jsonrpc2/conn.go
new file mode 100644
index 0000000..cbc1558
--- /dev/null
+++ b/jsonrpc2/conn.go
@@ -0,0 +1,474 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
+	"sync/atomic"
+
+	"golang.org/x/exp/event"
+	errors "golang.org/x/xerrors"
+)
+
+// Binder builds a connection configuration.
+// This may be used in servers to generate a new configuration per connection.
+// ConnectionOptions itself implements Binder returning itself unmodified, to
+// allow for the simple cases where no per connection information is needed.
+type Binder interface {
+	// Bind is invoked when creating a new connection.
+	// The connection is not ready to use when Bind is called.
+	Bind(context.Context, *Connection) (ConnectionOptions, error)
+}
+
+// ConnectionOptions holds the options for new connections.
+type ConnectionOptions struct {
+	// Framer allows control over the message framing and encoding.
+	// If nil, HeaderFramer will be used.
+	Framer Framer
+	// Preempter allows registration of a pre-queue message handler.
+	// If nil, no messages will be preempted.
+	Preempter Preempter
+	// Handler is used as the queued message handler for inbound messages.
+	// If nil, all responses will be ErrNotHandled.
+	Handler Handler
+}
+
+// Connection manages the jsonrpc2 protocol, connecting responses back to their
+// calls.
+// Connection is bidirectional; it does not have a designated server or client
+// end.
+type Connection struct {
+	seq         int64 // must only be accessed using atomic operations
+	closer      io.Closer
+	writerBox   chan Writer
+	outgoingBox chan map[ID]chan<- *Response
+	incomingBox chan map[ID]*incoming
+	async       async
+}
+
+type AsyncCall struct {
+	id         ID
+	response   chan *Response // the channel a response will be delivered on
+	resultBox  chan asyncResult
+	endBuilder event.Builder // close the tracing span when all processing for the message is complete
+}
+
+type asyncResult struct {
+	result []byte
+	err    error
+}
+
+// incoming is used to track an incoming request as it is being handled
+type incoming struct {
+	request    *Request        // the request being processed
+	baseCtx    context.Context // a base context for the message processing
+	endBuilder event.Builder   // builder's End method called when all processing for the message is complete
+	handleCtx  context.Context // the context for handling the message, child of baseCtx
+	cancel     func()          // a function that cancels the handling context
+}
+
+// Bind returns the options unmodified.
+func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) {
+	return o, nil
+}
+
+// newConnection creates a new connection and runs it.
+// This is used by the Dial and Serve functions to build the actual connection.
+func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
+	c := &Connection{
+		closer:      rwc,
+		writerBox:   make(chan Writer, 1),
+		outgoingBox: make(chan map[ID]chan<- *Response, 1),
+		incomingBox: make(chan map[ID]*incoming, 1),
+	}
+
+	options, err := binder.Bind(ctx, c)
+	if err != nil {
+		return nil, err
+	}
+	if options.Framer == nil {
+		options.Framer = HeaderFramer()
+	}
+	if options.Preempter == nil {
+		options.Preempter = defaultHandler{}
+	}
+	if options.Handler == nil {
+		options.Handler = defaultHandler{}
+	}
+	c.outgoingBox <- make(map[ID]chan<- *Response)
+	c.incomingBox <- make(map[ID]*incoming)
+	c.async.init()
+	// the goroutines started here will continue until the underlying stream is closed
+	reader := options.Framer.Reader(rwc)
+	readToQueue := make(chan *incoming)
+	queueToDeliver := make(chan *incoming)
+	go c.readIncoming(ctx, reader, readToQueue)
+	go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver)
+	go c.deliverMessages(ctx, options.Handler, queueToDeliver)
+	// releaseing the writer must be the last thing we do in case any requests
+	// are blocked waiting for the connection to be ready
+	c.writerBox <- options.Framer.Writer(rwc)
+	return c, nil
+}
+
+// Notify invokes the target method but does not wait for a response.
+// The params will be marshaled to JSON before sending over the wire, and will
+// be handed to the method invoked.
+func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error {
+	notify, err := NewNotification(method, params)
+	if err != nil {
+		return errors.Errorf("marshaling notify parameters: %v", err)
+	}
+	b := event.To(ctx).With(Method.Of(method))
+	b.Clone().Metric(Started.Record(1))
+	ctx, endBuilder := b.With(RPCDirection.Of(Outbound)).Start(method)
+	err = c.write(ctx, notify)
+	event.To(ctx).With(Error.Of(err)).Metric(Finished.Record(1))
+	endBuilder.End()
+	return err
+}
+
+// Call invokes the target method and returns an object that can be used to await the response.
+// The params will be marshaled to JSON before sending over the wire, and will
+// be handed to the method invoked.
+// You do not have to wait for the response, it can just be ignored if not needed.
+// If sending the call failed, the response will be ready and have the error in it.
+func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall {
+	result := &AsyncCall{
+		id:        Int64ID(atomic.AddInt64(&c.seq, 1)),
+		resultBox: make(chan asyncResult, 1),
+	}
+	// generate a new request identifier
+	call, err := NewCall(result.id, method, params)
+	if err != nil {
+		//set the result to failed
+		result.resultBox <- asyncResult{err: errors.Errorf("marshaling call parameters: %w", err)}
+		return result
+	}
+	b := event.To(ctx).With(Method.Of(method))
+	b.Clone().Metric(Started.Record(1))
+	ctx, endBuilder := b.Clone().With(RPCDirection.Of(Outbound)).With(RPCID.Of(fmt.Sprintf("%q", result.id))).Start(method)
+	result.endBuilder = endBuilder
+	// We have to add ourselves to the pending map before we send, otherwise we
+	// are racing the response.
+	// rchan is buffered in case the response arrives without a listener.
+	result.response = make(chan *Response, 1)
+	pending := <-c.outgoingBox
+	pending[result.id] = result.response
+	c.outgoingBox <- pending
+	// now we are ready to send
+	if err := c.write(ctx, call); err != nil {
+		// sending failed, we will never get a response, so deliver a fake one
+		r, _ := NewResponse(result.id, nil, err)
+		c.incomingResponse(r)
+	}
+	return result
+}
+
+// ID used for this call.
+// This can be used to cancel the call if needed.
+func (a *AsyncCall) ID() ID { return a.id }
+
+// IsReady can be used to check if the result is already prepared.
+// This is guaranteed to return true on a result for which Await has already
+// returned, or a call that failed to send in the first place.
+func (a *AsyncCall) IsReady() bool {
+	select {
+	case r := <-a.resultBox:
+		a.resultBox <- r
+		return true
+	default:
+		return false
+	}
+}
+
+// Await the results of a Call.
+// The response will be unmarshaled from JSON into the result.
+func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
+	status := "NONE"
+	defer a.endBuilder.With(StatusCode.Of(status)).End()
+	var r asyncResult
+	select {
+	case response := <-a.response:
+		// response just arrived, prepare the result
+		switch {
+		case response.Error != nil:
+			r.err = response.Error
+			status = "ERROR"
+		default:
+			r.result = response.Result
+			status = "OK"
+		}
+	case r = <-a.resultBox:
+		// result already available
+	case <-ctx.Done():
+		status = "CANCELLED"
+		return ctx.Err()
+	}
+	// refill the box for the next caller
+	a.resultBox <- r
+	// and unpack the result
+	if r.err != nil {
+		return r.err
+	}
+	if result == nil || len(r.result) == 0 {
+		return nil
+	}
+	return json.Unmarshal(r.result, result)
+}
+
+// Respond deliverers a response to an incoming Call.
+// It is an error to not call this exactly once for any message for which a
+// handler has previously returned ErrAsyncResponse. It is also an error to
+// call this for any other message.
+func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
+	pending := <-c.incomingBox
+	defer func() { c.incomingBox <- pending }()
+	entry, found := pending[id]
+	if !found {
+		return nil
+	}
+	delete(pending, id)
+	return c.respond(entry, result, rerr)
+}
+
+// Cancel is used to cancel an inbound message by ID, it does not cancel
+// outgoing messages.
+// This is only used inside a message handler that is layering a
+// cancellation protocol on top of JSON RPC 2.
+// It will not complain if the ID is not a currently active message, and it will
+// not cause any messages that have not arrived yet with that ID to be
+// cancelled.
+func (c *Connection) Cancel(id ID) {
+	pending := <-c.incomingBox
+	defer func() { c.incomingBox <- pending }()
+	if entry, found := pending[id]; found && entry.cancel != nil {
+		entry.cancel()
+		entry.cancel = nil
+	}
+}
+
+// Wait blocks until the connection is fully closed, but does not close it.
+func (c *Connection) Wait() error {
+	return c.async.wait()
+}
+
+// Close can be used to close the underlying stream, and then wait for the connection to
+// fully shut down.
+// This does not cancel in flight requests, but waits for them to gracefully complete.
+func (c *Connection) Close() error {
+	// close the underlying stream
+	if err := c.closer.Close(); err != nil && !isClosingError(err) {
+		return err
+	}
+	// and then wait for it to cause the connection to close
+	if err := c.Wait(); err != nil && !isClosingError(err) {
+		return err
+	}
+	return nil
+}
+
+// readIncoming collects inbound messages from the reader and delivers them, either responding
+// to outgoing calls or feeding requests to the queue.
+func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) {
+	defer close(toQueue)
+	for {
+		// get the next message
+		// no lock is needed, this is the only reader
+		msg, n, err := reader.Read(ctx)
+		if err != nil {
+			// The stream failed, we cannot continue
+			c.async.setError(err)
+			return
+		}
+		switch msg := msg.(type) {
+		case *Request:
+			entry := &incoming{
+				request: msg,
+			}
+			// add a span to the context for this request
+			b := event.To(ctx).With(Method.Of(msg.Method)).With(RPCDirection.Of(Inbound))
+			if msg.IsCall() {
+				b = b.With(RPCID.Of(fmt.Sprintf("%q", msg.ID)))
+			}
+			entry.baseCtx, entry.endBuilder = b.Start(msg.Method)
+			b = event.To(entry.baseCtx).With(Method.Of(msg.Method))
+			b.Clone().Metric(Started.Record(1))
+			b.Metric(ReceivedBytes.Record(n))
+			// in theory notifications cannot be cancelled, but we build them a cancel context anyway
+			entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx)
+			// if the request is a call, add it to the incoming map so it can be
+			// cancelled by id
+			if msg.IsCall() {
+				pending := <-c.incomingBox
+				c.incomingBox <- pending
+				pending[msg.ID] = entry
+			}
+			// send the message to the incoming queue
+			toQueue <- entry
+		case *Response:
+			// If method is not set, this should be a response, in which case we must
+			// have an id to send the response back to the caller.
+			c.incomingResponse(msg)
+		}
+	}
+}
+
+func (c *Connection) incomingResponse(msg *Response) {
+	pending := <-c.outgoingBox
+	response, ok := pending[msg.ID]
+	if ok {
+		delete(pending, msg.ID)
+	}
+	c.outgoingBox <- pending
+	if response != nil {
+		response <- msg
+	}
+}
+
+// manageQueue reads incoming requests, attempts to proccess them with the preempter, or queue them
+// up for normal handling.
+func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) {
+	defer close(toDeliver)
+	q := []*incoming{}
+	ok := true
+	for {
+		var nextReq *incoming
+		if len(q) == 0 {
+			// no messages in the queue
+			// if we were closing, then we are done
+			if !ok {
+				return
+			}
+			// not closing, but nothing in the queue, so just block waiting for a read
+			nextReq, ok = <-fromRead
+		} else {
+			// we have a non empty queue, so pick whichever of reading or delivering
+			// that we can make progress on
+			select {
+			case nextReq, ok = <-fromRead:
+			case toDeliver <- q[0]:
+				//TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction?
+				q = q[1:]
+			}
+		}
+		if nextReq != nil {
+			// TODO: should we allow to limit the queue size?
+			var result interface{}
+			rerr := nextReq.handleCtx.Err()
+			if rerr == nil {
+				// only preempt if not already cancelled
+				result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request)
+			}
+			switch {
+			case rerr == ErrNotHandled:
+				// message not handled, add it to the queue for the main handler
+				q = append(q, nextReq)
+			case rerr == ErrAsyncResponse:
+				// message handled but the response will come later
+			default:
+				// anything else means the message is fully handled
+				c.reply(nextReq, result, rerr)
+			}
+		}
+	}
+}
+
+func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) {
+	defer c.async.done()
+	for entry := range fromQueue {
+		// cancel any messages in the queue that we have a pending cancel for
+		var result interface{}
+		rerr := entry.handleCtx.Err()
+		if rerr == nil {
+			// only deliver if not already cancelled
+			result, rerr = handler.Handle(entry.handleCtx, entry.request)
+		}
+		switch {
+		case rerr == ErrNotHandled:
+			// message not handled, report it back to the caller as an error
+			c.reply(entry, nil, errors.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method))
+		case rerr == ErrAsyncResponse:
+			// message handled but the response will come later
+		default:
+			c.reply(entry, result, rerr)
+		}
+	}
+}
+
+// reply is used to reply to an incoming request that has just been handled
+func (c *Connection) reply(entry *incoming, result interface{}, rerr error) {
+	if entry.request.IsCall() {
+		// we have a call finishing, remove it from the incoming map
+		pending := <-c.incomingBox
+		defer func() { c.incomingBox <- pending }()
+		delete(pending, entry.request.ID)
+	}
+	if err := c.respond(entry, result, rerr); err != nil {
+		// no way to propagate this error
+		//TODO: should we do more than just log it?
+		event.To(entry.baseCtx).With(Error.Of(err)).Log("jsonrpc2 message delivery failed")
+	}
+}
+
+// respond sends a response.
+// This is the code shared between reply and SendResponse.
+func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error {
+	var err error
+	if entry.request.IsCall() {
+		// send the response
+		if result == nil && rerr == nil {
+			// call with no response, send an error anyway
+			rerr = errors.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method)
+		}
+		var response *Response
+		response, err = NewResponse(entry.request.ID, result, rerr)
+		if err == nil {
+			// we write the response with the base context, in case the message was cancelled
+			err = c.write(entry.baseCtx, response)
+		}
+	} else {
+		switch {
+		case rerr != nil:
+			// notification failed
+			err = errors.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr)
+			rerr = nil
+		case result != nil:
+			//notification produced a response, which is an error
+			err = errors.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method)
+		default:
+			// normal notification finish
+		}
+	}
+	var status string
+	switch {
+	case rerr != nil || err != nil:
+		status = "ERROR"
+	default:
+		status = "OK"
+	}
+	// and just to be clean, invoke and clear the cancel if needed
+	if entry.cancel != nil {
+		entry.cancel()
+		entry.cancel = nil
+	}
+	// mark the entire request processing as done
+	entry.endBuilder.With(StatusCode.Of(status)).End()
+	return err
+}
+
+// write is used by all things that write outgoing messages, including replies.
+// it makes sure that writes are atomic
+func (c *Connection) write(ctx context.Context, msg Message) error {
+	writer := <-c.writerBox
+	defer func() { c.writerBox <- writer }()
+	n, err := writer.Write(ctx, msg)
+	// TODO: get a method label in here somehow.
+	event.To(ctx).Metric(SentBytes.Record(n))
+	return err
+}
diff --git a/jsonrpc2/defs.go b/jsonrpc2/defs.go
new file mode 100644
index 0000000..f40800d
--- /dev/null
+++ b/jsonrpc2/defs.go
@@ -0,0 +1,31 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+	"golang.org/x/exp/event"
+	"golang.org/x/exp/event/keys"
+)
+
+var (
+	Method       = keys.String("method")
+	RPCID        = keys.String("id")
+	RPCDirection = keys.String("direction")
+	Error        = keys.Value("error")
+	StatusCode   = keys.String("status.code")
+)
+
+var (
+	Started       = event.NewCounter("started", "Count of started RPCs.")
+	Finished      = event.NewCounter("finished", "Count of finished RPCs (includes error).")
+	ReceivedBytes = event.NewIntDistribution("received_bytes", "Bytes received.") //, unit.Bytes)
+	SentBytes     = event.NewIntDistribution("sent_bytes", "Bytes sent.")         //, unit.Bytes)
+	Latency       = event.NewDuration("latency", "Elapsed time of an RPC.")       //, unit.Milliseconds)
+)
+
+const (
+	Inbound  = "in"
+	Outbound = "out"
+)
diff --git a/jsonrpc2/frame.go b/jsonrpc2/frame.go
new file mode 100644
index 0000000..634717c
--- /dev/null
+++ b/jsonrpc2/frame.go
@@ -0,0 +1,179 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+	"bufio"
+	"context"
+	"encoding/json"
+	"fmt"
+	"io"
+	"strconv"
+	"strings"
+
+	errors "golang.org/x/xerrors"
+)
+
+// Reader abstracts the transport mechanics from the JSON RPC protocol.
+// A Conn reads messages from the reader it was provided on construction,
+// and assumes that each call to Read fully transfers a single message,
+// or returns an error.
+// A reader is not safe for concurrent use, it is expected it will be used by
+// a single Conn in a safe manner.
+type Reader interface {
+	// Read gets the next message from the stream.
+	Read(context.Context) (Message, int64, error)
+}
+
+// Writer abstracts the transport mechanics from the JSON RPC protocol.
+// A Conn writes messages using the writer it was provided on construction,
+// and assumes that each call to Write fully transfers a single message,
+// or returns an error.
+// A writer is not safe for concurrent use, it is expected it will be used by
+// a single Conn in a safe manner.
+type Writer interface {
+	// Write sends a message to the stream.
+	Write(context.Context, Message) (int64, error)
+}
+
+// Framer wraps low level byte readers and writers into jsonrpc2 message
+// readers and writers.
+// It is responsible for the framing and encoding of messages into wire form.
+type Framer interface {
+	// Reader wraps a byte reader into a message reader.
+	Reader(rw io.Reader) Reader
+	// Writer wraps a byte writer into a message writer.
+	Writer(rw io.Writer) Writer
+}
+
+// RawFramer returns a new Framer.
+// The messages are sent with no wrapping, and rely on json decode consistency
+// to determine message boundaries.
+func RawFramer() Framer { return rawFramer{} }
+
+type rawFramer struct{}
+type rawReader struct{ in *json.Decoder }
+type rawWriter struct{ out io.Writer }
+
+func (rawFramer) Reader(rw io.Reader) Reader {
+	return &rawReader{in: json.NewDecoder(rw)}
+}
+
+func (rawFramer) Writer(rw io.Writer) Writer {
+	return &rawWriter{out: rw}
+}
+
+func (r *rawReader) Read(ctx context.Context) (Message, int64, error) {
+	select {
+	case <-ctx.Done():
+		return nil, 0, ctx.Err()
+	default:
+	}
+	var raw json.RawMessage
+	if err := r.in.Decode(&raw); err != nil {
+		return nil, 0, err
+	}
+	msg, err := DecodeMessage(raw)
+	return msg, int64(len(raw)), err
+}
+
+func (w *rawWriter) Write(ctx context.Context, msg Message) (int64, error) {
+	select {
+	case <-ctx.Done():
+		return 0, ctx.Err()
+	default:
+	}
+	data, err := EncodeMessage(msg)
+	if err != nil {
+		return 0, errors.Errorf("marshaling message: %v", err)
+	}
+	n, err := w.out.Write(data)
+	return int64(n), err
+}
+
+// HeaderFramer returns a new Framer.
+// The messages are sent with HTTP content length and MIME type headers.
+// This is the format used by LSP and others.
+func HeaderFramer() Framer { return headerFramer{} }
+
+type headerFramer struct{}
+type headerReader struct{ in *bufio.Reader }
+type headerWriter struct{ out io.Writer }
+
+func (headerFramer) Reader(rw io.Reader) Reader {
+	return &headerReader{in: bufio.NewReader(rw)}
+}
+
+func (headerFramer) Writer(rw io.Writer) Writer {
+	return &headerWriter{out: rw}
+}
+
+func (r *headerReader) Read(ctx context.Context) (Message, int64, error) {
+	select {
+	case <-ctx.Done():
+		return nil, 0, ctx.Err()
+	default:
+	}
+	var total, length int64
+	// read the header, stop on the first empty line
+	for {
+		line, err := r.in.ReadString('\n')
+		total += int64(len(line))
+		if err != nil {
+			return nil, total, errors.Errorf("failed reading header line: %w", err)
+		}
+		line = strings.TrimSpace(line)
+		// check we have a header line
+		if line == "" {
+			break
+		}
+		colon := strings.IndexRune(line, ':')
+		if colon < 0 {
+			return nil, total, errors.Errorf("invalid header line %q", line)
+		}
+		name, value := line[:colon], strings.TrimSpace(line[colon+1:])
+		switch name {
+		case "Content-Length":
+			if length, err = strconv.ParseInt(value, 10, 32); err != nil {
+				return nil, total, errors.Errorf("failed parsing Content-Length: %v", value)
+			}
+			if length <= 0 {
+				return nil, total, errors.Errorf("invalid Content-Length: %v", length)
+			}
+		default:
+			// ignoring unknown headers
+		}
+	}
+	if length == 0 {
+		return nil, total, errors.Errorf("missing Content-Length header")
+	}
+	data := make([]byte, length)
+	n, err := io.ReadFull(r.in, data)
+	total += int64(n)
+	if err != nil {
+		return nil, total, err
+	}
+	msg, err := DecodeMessage(data)
+	return msg, total, err
+}
+
+func (w *headerWriter) Write(ctx context.Context, msg Message) (int64, error) {
+	select {
+	case <-ctx.Done():
+		return 0, ctx.Err()
+	default:
+	}
+	data, err := EncodeMessage(msg)
+	if err != nil {
+		return 0, errors.Errorf("marshaling message: %v", err)
+	}
+	n, err := fmt.Fprintf(w.out, "Content-Length: %v\r\n\r\n", len(data))
+	total := int64(n)
+	if err == nil {
+		n, err = w.out.Write(data)
+		total += int64(n)
+	}
+	return total, err
+}
diff --git a/jsonrpc2/jsonrpc2.go b/jsonrpc2/jsonrpc2.go
new file mode 100644
index 0000000..4e853d5
--- /dev/null
+++ b/jsonrpc2/jsonrpc2.go
@@ -0,0 +1,99 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec.
+// https://www.jsonrpc.org/specification
+// It is intended to be compatible with other implementations at the wire level.
+package jsonrpc2
+
+import (
+	"context"
+	"errors"
+)
+
+var (
+	// ErrIdleTimeout is returned when serving timed out waiting for new connections.
+	ErrIdleTimeout = errors.New("timed out waiting for new connections")
+	// ErrNotHandled is returned from a handler to indicate it did not handle the
+	// message.
+	ErrNotHandled = errors.New("JSON RPC not handled")
+	// ErrAsyncResponse is returned from a handler to indicate it will generate a
+	// response asynchronously.
+	ErrAsyncResponse = errors.New("JSON RPC asynchronous response")
+)
+
+// Preempter handles messages on a connection before they are queued to the main
+// handler.
+// Primarily this is used for cancel handlers or notifications for which out of
+// order processing is not an issue.
+type Preempter interface {
+	// Preempt is invoked for each incoming request before it is queued.
+	// If the request is a call, it must return a value or an error for the reply.
+	// Preempt should not block or start any new messages on the connection.
+	Preempt(ctx context.Context, req *Request) (interface{}, error)
+}
+
+// Handler handles messages on a connection.
+type Handler interface {
+	// Handle is invoked for each incoming request.
+	// If the request is a call, it must return a value or an error for the reply.
+	Handle(ctx context.Context, req *Request) (interface{}, error)
+}
+
+type defaultHandler struct{}
+
+func (defaultHandler) Preempt(context.Context, *Request) (interface{}, error) {
+	return nil, ErrNotHandled
+}
+
+func (defaultHandler) Handle(context.Context, *Request) (interface{}, error) {
+	return nil, ErrNotHandled
+}
+
+type HandlerFunc func(ctx context.Context, req *Request) (interface{}, error)
+
+func (f HandlerFunc) Handle(ctx context.Context, req *Request) (interface{}, error) {
+	return f(ctx, req)
+}
+
+// async is a small helper for things with an asynchronous result that you can
+// wait for.
+type async struct {
+	ready  chan struct{}
+	errBox chan error
+}
+
+func (a *async) init() {
+	a.ready = make(chan struct{})
+	a.errBox = make(chan error, 1)
+	a.errBox <- nil
+}
+
+func (a *async) done() {
+	close(a.ready)
+}
+
+func (a *async) isDone() bool {
+	select {
+	case <-a.ready:
+		return true
+	default:
+		return false
+	}
+}
+
+func (a *async) wait() error {
+	<-a.ready
+	err := <-a.errBox
+	a.errBox <- err
+	return err
+}
+
+func (a *async) setError(err error) {
+	storedErr := <-a.errBox
+	if storedErr == nil {
+		storedErr = err
+	}
+	a.errBox <- storedErr
+}
diff --git a/jsonrpc2/jsonrpc2_test.go b/jsonrpc2/jsonrpc2_test.go
new file mode 100644
index 0000000..e0ffe97
--- /dev/null
+++ b/jsonrpc2/jsonrpc2_test.go
@@ -0,0 +1,389 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2_test
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"path"
+	"reflect"
+	"testing"
+	"time"
+
+	"golang.org/x/exp/event/adapter/eventtest"
+	"golang.org/x/exp/internal/stack/stacktest"
+	"golang.org/x/exp/jsonrpc2"
+	errors "golang.org/x/xerrors"
+)
+
+var callTests = []invoker{
+	call{"no_args", nil, true},
+	call{"one_string", "fish", "got:fish"},
+	call{"one_number", 10, "got:10"},
+	call{"join", []string{"a", "b", "c"}, "a/b/c"},
+	sequence{"notify", []invoker{
+		notify{"set", 3},
+		notify{"add", 5},
+		call{"get", nil, 8},
+	}},
+	sequence{"preempt", []invoker{
+		async{"a", "wait", "a"},
+		notify{"unblock", "a"},
+		collect{"a", true, false},
+	}},
+	sequence{"basic cancel", []invoker{
+		async{"b", "wait", "b"},
+		cancel{"b"},
+		collect{"b", nil, true},
+	}},
+	sequence{"queue", []invoker{
+		async{"a", "wait", "a"},
+		notify{"set", 1},
+		notify{"add", 2},
+		notify{"add", 3},
+		notify{"add", 4},
+		call{"peek", nil, 0}, // accumulator will not have any adds yet
+		notify{"unblock", "a"},
+		collect{"a", true, false},
+		call{"get", nil, 10}, // accumulator now has all the adds
+	}},
+	sequence{"fork", []invoker{
+		async{"a", "fork", "a"},
+		notify{"set", 1},
+		notify{"add", 2},
+		notify{"add", 3},
+		notify{"add", 4},
+		call{"get", nil, 10}, // fork will not have blocked the adds
+		notify{"unblock", "a"},
+		collect{"a", true, false},
+	}},
+}
+
+type binder struct {
+	framer  jsonrpc2.Framer
+	runTest func(*handler)
+}
+
+type handler struct {
+	conn        *jsonrpc2.Connection
+	accumulator int
+	waitersBox  chan map[string]chan struct{}
+	calls       map[string]*jsonrpc2.AsyncCall
+}
+
+type invoker interface {
+	Name() string
+	Invoke(t *testing.T, ctx context.Context, h *handler)
+}
+
+type notify struct {
+	method string
+	params interface{}
+}
+
+type call struct {
+	method string
+	params interface{}
+	expect interface{}
+}
+
+type async struct {
+	name   string
+	method string
+	params interface{}
+}
+
+type collect struct {
+	name   string
+	expect interface{}
+	fails  bool
+}
+
+type cancel struct {
+	name string
+}
+
+type sequence struct {
+	name  string
+	tests []invoker
+}
+
+type echo call
+
+type cancelParams struct{ ID int64 }
+
+func TestConnectionRaw(t *testing.T) {
+	testConnection(t, jsonrpc2.RawFramer())
+}
+
+func TestConnectionHeader(t *testing.T) {
+	testConnection(t, jsonrpc2.HeaderFramer())
+}
+
+func testConnection(t *testing.T, framer jsonrpc2.Framer) {
+	stacktest.NoLeak(t)
+	ctx := eventtest.NewContext(context.Background(), t)
+	listener, err := jsonrpc2.NetPipe(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	server, err := jsonrpc2.Serve(ctx, listener, binder{framer, nil})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		listener.Close()
+		server.Wait()
+	}()
+
+	for _, test := range callTests {
+		t.Run(test.Name(), func(t *testing.T) {
+			client, err := jsonrpc2.Dial(ctx,
+				listener.Dialer(), binder{framer, func(h *handler) {
+					defer h.conn.Close()
+					ctx := eventtest.NewContext(ctx, t)
+					test.Invoke(t, ctx, h)
+					if call, ok := test.(*call); ok {
+						// also run all simple call tests in echo mode
+						(*echo)(call).Invoke(t, ctx, h)
+					}
+				}})
+			if err != nil {
+				t.Fatal(err)
+			}
+			client.Wait()
+		})
+	}
+}
+
+func (test notify) Name() string { return test.method }
+func (test notify) Invoke(t *testing.T, ctx context.Context, h *handler) {
+	if err := h.conn.Notify(ctx, test.method, test.params); err != nil {
+		t.Fatalf("%v:Notify failed: %v", test.method, err)
+	}
+}
+
+func (test call) Name() string { return test.method }
+func (test call) Invoke(t *testing.T, ctx context.Context, h *handler) {
+	results := newResults(test.expect)
+	if err := h.conn.Call(ctx, test.method, test.params).Await(ctx, results); err != nil {
+		t.Fatalf("%v:Call failed: %v", test.method, err)
+	}
+	verifyResults(t, test.method, results, test.expect)
+}
+
+func (test echo) Invoke(t *testing.T, ctx context.Context, h *handler) {
+	results := newResults(test.expect)
+	if err := h.conn.Call(ctx, "echo", []interface{}{test.method, test.params}).Await(ctx, results); err != nil {
+		t.Fatalf("%v:Echo failed: %v", test.method, err)
+	}
+	verifyResults(t, test.method, results, test.expect)
+}
+
+func (test async) Name() string { return test.name }
+func (test async) Invoke(t *testing.T, ctx context.Context, h *handler) {
+	h.calls[test.name] = h.conn.Call(ctx, test.method, test.params)
+}
+
+func (test collect) Name() string { return test.name }
+func (test collect) Invoke(t *testing.T, ctx context.Context, h *handler) {
+	o := h.calls[test.name]
+	results := newResults(test.expect)
+	err := o.Await(ctx, results)
+	switch {
+	case test.fails && err == nil:
+		t.Fatalf("%v:Collect was supposed to fail", test.name)
+	case !test.fails && err != nil:
+		t.Fatalf("%v:Collect failed: %v", test.name, err)
+	}
+	verifyResults(t, test.name, results, test.expect)
+}
+
+func (test cancel) Name() string { return test.name }
+func (test cancel) Invoke(t *testing.T, ctx context.Context, h *handler) {
+	o := h.calls[test.name]
+	if err := h.conn.Notify(ctx, "cancel", &cancelParams{o.ID().Raw().(int64)}); err != nil {
+		t.Fatalf("%v:Collect failed: %v", test.name, err)
+	}
+}
+
+func (test sequence) Name() string { return test.name }
+func (test sequence) Invoke(t *testing.T, ctx context.Context, h *handler) {
+	for _, child := range test.tests {
+		child.Invoke(t, ctx, h)
+	}
+}
+
+// newResults makes a new empty copy of the expected type to put the results into
+func newResults(expect interface{}) interface{} {
+	switch e := expect.(type) {
+	case []interface{}:
+		var r []interface{}
+		for _, v := range e {
+			r = append(r, reflect.New(reflect.TypeOf(v)).Interface())
+		}
+		return r
+	case nil:
+		return nil
+	default:
+		return reflect.New(reflect.TypeOf(expect)).Interface()
+	}
+}
+
+// verifyResults compares the results to the expected values
+func verifyResults(t *testing.T, method string, results interface{}, expect interface{}) {
+	if expect == nil {
+		if results != nil {
+			t.Errorf("%v:Got results %+v where none expeted", method, expect)
+		}
+		return
+	}
+	val := reflect.Indirect(reflect.ValueOf(results)).Interface()
+	if !reflect.DeepEqual(val, expect) {
+		t.Errorf("%v:Results are incorrect, got %+v expect %+v", method, val, expect)
+	}
+}
+
+func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) (jsonrpc2.ConnectionOptions, error) {
+	h := &handler{
+		conn:       conn,
+		waitersBox: make(chan map[string]chan struct{}, 1),
+		calls:      make(map[string]*jsonrpc2.AsyncCall),
+	}
+	h.waitersBox <- make(map[string]chan struct{})
+	if b.runTest != nil {
+		go b.runTest(h)
+	}
+	return jsonrpc2.ConnectionOptions{
+		Framer:    b.framer,
+		Preempter: h,
+		Handler:   h,
+	}, nil
+}
+
+func (h *handler) waiter(name string) chan struct{} {
+	waiters := <-h.waitersBox
+	defer func() { h.waitersBox <- waiters }()
+	waiter, found := waiters[name]
+	if !found {
+		waiter = make(chan struct{})
+		waiters[name] = waiter
+	}
+	return waiter
+}
+
+func (h *handler) Preempt(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
+	switch req.Method {
+	case "unblock":
+		var name string
+		if err := json.Unmarshal(req.Params, &name); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		close(h.waiter(name))
+		return nil, nil
+	case "peek":
+		if len(req.Params) > 0 {
+			return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
+		}
+		return h.accumulator, nil
+	case "cancel":
+		var params cancelParams
+		if err := json.Unmarshal(req.Params, &params); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		h.conn.Cancel(jsonrpc2.Int64ID(params.ID))
+		return nil, nil
+	default:
+		return nil, jsonrpc2.ErrNotHandled
+	}
+}
+
+func (h *handler) Handle(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
+	switch req.Method {
+	case "no_args":
+		if len(req.Params) > 0 {
+			return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
+		}
+		return true, nil
+	case "one_string":
+		var v string
+		if err := json.Unmarshal(req.Params, &v); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		return "got:" + v, nil
+	case "one_number":
+		var v int
+		if err := json.Unmarshal(req.Params, &v); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		return fmt.Sprintf("got:%d", v), nil
+	case "set":
+		var v int
+		if err := json.Unmarshal(req.Params, &v); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		h.accumulator = v
+		return nil, nil
+	case "add":
+		var v int
+		if err := json.Unmarshal(req.Params, &v); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		h.accumulator += v
+		return nil, nil
+	case "get":
+		if len(req.Params) > 0 {
+			return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
+		}
+		return h.accumulator, nil
+	case "join":
+		var v []string
+		if err := json.Unmarshal(req.Params, &v); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		return path.Join(v...), nil
+	case "echo":
+		var v []interface{}
+		if err := json.Unmarshal(req.Params, &v); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		var result interface{}
+		err := h.conn.Call(ctx, v[0].(string), v[1]).Await(ctx, &result)
+		return result, err
+	case "wait":
+		var name string
+		if err := json.Unmarshal(req.Params, &name); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		select {
+		case <-h.waiter(name):
+			return true, nil
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		case <-time.After(time.Second):
+			return nil, errors.Errorf("wait for %q timed out", name)
+		}
+	case "fork":
+		var name string
+		if err := json.Unmarshal(req.Params, &name); err != nil {
+			return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+		}
+		waitFor := h.waiter(name)
+		go func() {
+			select {
+			case <-waitFor:
+				h.conn.Respond(req.ID, true, nil)
+			case <-ctx.Done():
+				h.conn.Respond(req.ID, nil, ctx.Err())
+			case <-time.After(time.Second):
+				h.conn.Respond(req.ID, nil, errors.Errorf("wait for %q timed out", name))
+			}
+		}()
+		return nil, jsonrpc2.ErrAsyncResponse
+	default:
+		return nil, jsonrpc2.ErrNotHandled
+	}
+}
diff --git a/jsonrpc2/messages.go b/jsonrpc2/messages.go
new file mode 100644
index 0000000..652ac81
--- /dev/null
+++ b/jsonrpc2/messages.go
@@ -0,0 +1,181 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+	"encoding/json"
+
+	errors "golang.org/x/xerrors"
+)
+
+// ID is a Request identifier.
+type ID struct {
+	value interface{}
+}
+
+// Message is the interface to all jsonrpc2 message types.
+// They share no common functionality, but are a closed set of concrete types
+// that are allowed to implement this interface. The message types are *Request
+// and *Response.
+type Message interface {
+	// marshal builds the wire form from the API form.
+	// It is private, which makes the set of Message implementations closed.
+	marshal(to *wireCombined)
+}
+
+// Request is a Message sent to a peer to request behavior.
+// If it has an ID it is a call, otherwise it is a notification.
+type Request struct {
+	// ID of this request, used to tie the Response back to the request.
+	// This will be nil for notifications.
+	ID ID
+	// Method is a string containing the method name to invoke.
+	Method string
+	// Params is either a struct or an array with the parameters of the method.
+	Params json.RawMessage
+}
+
+// Response is a Message used as a reply to a call Request.
+// It will have the same ID as the call it is a response to.
+type Response struct {
+	// result is the content of the response.
+	Result json.RawMessage
+	// err is set only if the call failed.
+	Error error
+	// id of the request this is a response to.
+	ID ID
+}
+
+// StringID creates a new string request identifier.
+func StringID(s string) ID { return ID{value: s} }
+
+// Int64ID creates a new integer request identifier.
+func Int64ID(i int64) ID { return ID{value: i} }
+
+// IsValid returns true if the ID is a valid identifier.
+// The default value for ID will return false.
+func (id ID) IsValid() bool { return id.value != nil }
+
+// Raw returns the underlying value of the ID.
+func (id ID) Raw() interface{} { return id.value }
+
+// NewNotification constructs a new Notification message for the supplied
+// method and parameters.
+func NewNotification(method string, params interface{}) (*Request, error) {
+	p, merr := marshalToRaw(params)
+	return &Request{Method: method, Params: p}, merr
+}
+
+// NewCall constructs a new Call message for the supplied ID, method and
+// parameters.
+func NewCall(id ID, method string, params interface{}) (*Request, error) {
+	p, merr := marshalToRaw(params)
+	return &Request{ID: id, Method: method, Params: p}, merr
+}
+
+func (msg *Request) IsCall() bool { return msg.ID.IsValid() }
+
+func (msg *Request) marshal(to *wireCombined) {
+	to.ID = msg.ID.value
+	to.Method = msg.Method
+	to.Params = msg.Params
+}
+
+// NewResponse constructs a new Response message that is a reply to the
+// supplied. If err is set result may be ignored.
+func NewResponse(id ID, result interface{}, rerr error) (*Response, error) {
+	r, merr := marshalToRaw(result)
+	return &Response{ID: id, Result: r, Error: rerr}, merr
+}
+
+func (msg *Response) marshal(to *wireCombined) {
+	to.ID = msg.ID.value
+	to.Error = toWireError(msg.Error)
+	to.Result = msg.Result
+}
+
+func toWireError(err error) *wireError {
+	if err == nil {
+		// no error, the response is complete
+		return nil
+	}
+	if err, ok := err.(*wireError); ok {
+		// already a wire error, just use it
+		return err
+	}
+	result := &wireError{Message: err.Error()}
+	var wrapped *wireError
+	if errors.As(err, &wrapped) {
+		// if we wrapped a wire error, keep the code from the wrapped error
+		// but the message from the outer error
+		result.Code = wrapped.Code
+	}
+	return result
+}
+
+func EncodeMessage(msg Message) ([]byte, error) {
+	wire := wireCombined{VersionTag: wireVersion}
+	msg.marshal(&wire)
+	data, err := json.Marshal(&wire)
+	if err != nil {
+		return data, errors.Errorf("marshaling jsonrpc message: %w", err)
+	}
+	return data, nil
+}
+
+func DecodeMessage(data []byte) (Message, error) {
+	msg := wireCombined{}
+	if err := json.Unmarshal(data, &msg); err != nil {
+		return nil, errors.Errorf("unmarshaling jsonrpc message: %w", err)
+	}
+	if msg.VersionTag != wireVersion {
+		return nil, errors.Errorf("invalid message version tag %s expected %s", msg.VersionTag, wireVersion)
+	}
+	id := ID{}
+	switch v := msg.ID.(type) {
+	case nil:
+	case float64:
+		// coerce the id type to int64 if it is float64, the spec does not allow fractional parts
+		id = Int64ID(int64(v))
+	case int64:
+		id = Int64ID(v)
+	case string:
+		id = StringID(v)
+	default:
+		return nil, errors.Errorf("invalid message id type <%T>%v", v, v)
+	}
+	if msg.Method != "" {
+		// has a method, must be a call
+		return &Request{
+			Method: msg.Method,
+			ID:     id,
+			Params: msg.Params,
+		}, nil
+	}
+	// no method, should be a response
+	if !id.IsValid() {
+		return nil, ErrInvalidRequest
+	}
+	resp := &Response{
+		ID:     id,
+		Result: msg.Result,
+	}
+	// we have to check if msg.Error is nil to avoid a typed error
+	if msg.Error != nil {
+		resp.Error = msg.Error
+	}
+	return resp, nil
+}
+
+func marshalToRaw(obj interface{}) (json.RawMessage, error) {
+	if obj == nil {
+		return nil, nil
+	}
+	data, err := json.Marshal(obj)
+	if err != nil {
+		return nil, err
+	}
+	return json.RawMessage(data), nil
+}
diff --git a/jsonrpc2/net.go b/jsonrpc2/net.go
new file mode 100644
index 0000000..c8cfaab
--- /dev/null
+++ b/jsonrpc2/net.go
@@ -0,0 +1,129 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+	"context"
+	"io"
+	"net"
+	"os"
+	"time"
+)
+
+// This file contains implementations of the transport primitives that use the standard network
+// package.
+
+// NetListenOptions is the optional arguments to the NetListen function.
+type NetListenOptions struct {
+	NetListenConfig net.ListenConfig
+	NetDialer       net.Dialer
+}
+
+// NetListener returns a new Listener that listents on a socket using the net package.
+func NetListener(ctx context.Context, network, address string, options NetListenOptions) (Listener, error) {
+	ln, err := options.NetListenConfig.Listen(ctx, network, address)
+	if err != nil {
+		return nil, err
+	}
+	return &netListener{net: ln}, nil
+}
+
+// netListener is the implementation of Listener for connections made using the net package.
+type netListener struct {
+	net net.Listener
+}
+
+// Accept blocks waiting for an incoming connection to the listener.
+func (l *netListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
+	return l.net.Accept()
+}
+
+// Close will cause the listener to stop listening. It will not close any connections that have
+// already been accepted.
+func (l *netListener) Close() error {
+	addr := l.net.Addr()
+	err := l.net.Close()
+	if addr.Network() == "unix" {
+		rerr := os.Remove(addr.String())
+		if rerr != nil && err == nil {
+			err = rerr
+		}
+	}
+	return err
+}
+
+// Dialer returns a dialer that can be used to connect to the listener.
+func (l *netListener) Dialer() Dialer {
+	return NetDialer(l.net.Addr().Network(), l.net.Addr().String(), net.Dialer{
+		Timeout: 5 * time.Second,
+	})
+}
+
+// NetDialer returns a Dialer using the supplied standard network dialer.
+func NetDialer(network, address string, nd net.Dialer) Dialer {
+	return &netDialer{
+		network: network,
+		address: address,
+		dialer:  nd,
+	}
+}
+
+type netDialer struct {
+	network string
+	address string
+	dialer  net.Dialer
+}
+
+func (n *netDialer) Dial(ctx context.Context) (io.ReadWriteCloser, error) {
+	return n.dialer.DialContext(ctx, n.network, n.address)
+}
+
+// NetPipe returns a new Listener that listens using net.Pipe.
+// It is only possibly to connect to it using the Dialier returned by the
+// Dialer method, each call to that method will generate a new pipe the other
+// side of which will be returnd from the Accept call.
+func NetPipe(ctx context.Context) (Listener, error) {
+	return &netPiper{
+		done:   make(chan struct{}),
+		dialed: make(chan io.ReadWriteCloser),
+	}, nil
+}
+
+// netPiper is the implementation of Listener build on top of net.Pipes.
+type netPiper struct {
+	done   chan struct{}
+	dialed chan io.ReadWriteCloser
+}
+
+// Accept blocks waiting for an incoming connection to the listener.
+func (l *netPiper) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
+	// block until we have a listener, or are closed or cancelled
+	select {
+	case rwc := <-l.dialed:
+		return rwc, nil
+	case <-l.done:
+		return nil, io.EOF
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}
+
+// Close will cause the listener to stop listening. It will not close any connections that have
+// already been accepted.
+func (l *netPiper) Close() error {
+	// unblock any accept calls that are pending
+	close(l.done)
+	return nil
+}
+
+func (l *netPiper) Dialer() Dialer {
+	return l
+}
+
+func (l *netPiper) Dial(ctx context.Context) (io.ReadWriteCloser, error) {
+	client, server := net.Pipe()
+	l.dialed <- server
+	return client, nil
+}
diff --git a/jsonrpc2/serve.go b/jsonrpc2/serve.go
new file mode 100644
index 0000000..f3b78f5
--- /dev/null
+++ b/jsonrpc2/serve.go
@@ -0,0 +1,283 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+	"context"
+	"io"
+	"runtime"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	errors "golang.org/x/xerrors"
+)
+
+// Listener is implemented by protocols to accept new inbound connections.
+type Listener interface {
+	// Accept an inbound connection to a server.
+	// It must block until an inbound connection is made, or the listener is
+	// shut down.
+	Accept(context.Context) (io.ReadWriteCloser, error)
+
+	// Close is used to ask a listener to stop accepting new connections.
+	Close() error
+
+	// Dialer returns a dialer that can be used to connect to this listener
+	// locally.
+	// If a listener does not implement this it will return a nil.
+	Dialer() Dialer
+}
+
+// Dialer is used by clients to dial a server.
+type Dialer interface {
+	// Dial returns a new communication byte stream to a listening server.
+	Dial(ctx context.Context) (io.ReadWriteCloser, error)
+}
+
+// Server is a running server that is accepting incoming connections.
+type Server struct {
+	listener Listener
+	binder   Binder
+	async    async
+}
+
+// Dial uses the dialer to make a new connection, wraps the returned
+// reader and writer using the framer to make a stream, and then builds
+// a connection on top of that stream using the binder.
+func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) {
+	// dial a server
+	rwc, err := dialer.Dial(ctx)
+	if err != nil {
+		return nil, err
+	}
+	return newConnection(ctx, rwc, binder)
+}
+
+// Serve starts a new server listening for incoming connections and returns
+// it.
+// This returns a fully running and connected server, it does not block on
+// the listener.
+// You can call Wait to block on the server, or Shutdown to get the sever to
+// terminate gracefully.
+// To notice incoming connections, use an intercepting Binder.
+func Serve(ctx context.Context, listener Listener, binder Binder) (*Server, error) {
+	server := &Server{
+		listener: listener,
+		binder:   binder,
+	}
+	server.async.init()
+	go server.run(ctx)
+	return server, nil
+}
+
+// Wait returns only when the server has shut down.
+func (s *Server) Wait() error {
+	return s.async.wait()
+}
+
+// run accepts incoming connections from the listener,
+// If IdleTimeout is non-zero, run exits after there are no clients for this
+// duration, otherwise it exits only on error.
+func (s *Server) run(ctx context.Context) {
+	defer s.async.done()
+	var activeConns []*Connection
+	for {
+		// we never close the accepted connection, we rely on the other end
+		// closing or the socket closing itself naturally
+		rwc, err := s.listener.Accept(ctx)
+		if err != nil {
+			if !isClosingError(err) {
+				s.async.setError(err)
+			}
+			// we are done generating new connections for good
+			break
+		}
+
+		// see if any connections were closed while we were waiting
+		activeConns = onlyActive(activeConns)
+
+		// a new inbound connection,
+		conn, err := newConnection(ctx, rwc, s.binder)
+		if err != nil {
+			if !isClosingError(err) {
+				s.async.setError(err)
+			}
+			continue
+		}
+		activeConns = append(activeConns, conn)
+	}
+
+	// wait for all active conns to finish
+	for _, c := range activeConns {
+		c.Wait()
+	}
+}
+
+func onlyActive(conns []*Connection) []*Connection {
+	i := 0
+	for _, c := range conns {
+		if !c.async.isDone() {
+			conns[i] = c
+			i++
+		}
+	}
+	// trim the slice down
+	return conns[:i]
+}
+
+// isClosingError reports if the error occurs normally during the process of
+// closing a network connection. It uses imperfect heuristics that err on the
+// side of false negatives, and should not be used for anything critical.
+func isClosingError(err error) bool {
+	if err == nil {
+		return false
+	}
+	// Fully unwrap the error, so the following tests work.
+	for wrapped := err; wrapped != nil; wrapped = errors.Unwrap(err) {
+		err = wrapped
+	}
+
+	// Was it based on an EOF error?
+	if err == io.EOF {
+		return true
+	}
+
+	// Was it based on a closed pipe?
+	if err == io.ErrClosedPipe {
+		return true
+	}
+
+	// Per https://github.com/golang/go/issues/4373, this error string should not
+	// change. This is not ideal, but since the worst that could happen here is
+	// some superfluous logging, it is acceptable.
+	if err.Error() == "use of closed network connection" {
+		return true
+	}
+
+	if runtime.GOOS == "plan9" {
+		// Error reading from a closed connection.
+		if err == syscall.EINVAL {
+			return true
+		}
+		// Error trying to accept a new connection from a closed listener.
+		if strings.HasSuffix(err.Error(), " listen hungup") {
+			return true
+		}
+	}
+	return false
+}
+
+// NewIdleListener wraps a listener with an idle timeout.
+// When there are no active connections for at least the timeout duration a
+// call to accept will fail with ErrIdleTimeout.
+func NewIdleListener(timeout time.Duration, wrap Listener) Listener {
+	l := &idleListener{
+		timeout:    timeout,
+		wrapped:    wrap,
+		newConns:   make(chan *idleCloser),
+		closed:     make(chan struct{}),
+		wasTimeout: make(chan struct{}),
+	}
+	go l.run()
+	return l
+}
+
+type idleListener struct {
+	wrapped    Listener
+	timeout    time.Duration
+	newConns   chan *idleCloser
+	closed     chan struct{}
+	wasTimeout chan struct{}
+	closeOnce  sync.Once
+}
+
+type idleCloser struct {
+	wrapped   io.ReadWriteCloser
+	closed    chan struct{}
+	closeOnce sync.Once
+}
+
+func (c *idleCloser) Read(p []byte) (int, error) {
+	n, err := c.wrapped.Read(p)
+	if err != nil && isClosingError(err) {
+		c.closeOnce.Do(func() { close(c.closed) })
+	}
+	return n, err
+}
+
+func (c *idleCloser) Write(p []byte) (int, error) {
+	// we do not close on write failure, we rely on the wrapped writer to do that
+	// if it is appropriate, which we will detect in the next read.
+	return c.wrapped.Write(p)
+}
+
+func (c *idleCloser) Close() error {
+	// we rely on closing the wrapped stream to signal to the next read that we
+	// are closed, rather than triggering the closed signal directly
+	return c.wrapped.Close()
+}
+
+func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
+	rwc, err := l.wrapped.Accept(ctx)
+	if err != nil {
+		if isClosingError(err) {
+			// underlying listener was closed
+			l.closeOnce.Do(func() { close(l.closed) })
+			// was it closed because of the idle timeout?
+			select {
+			case <-l.wasTimeout:
+				err = ErrIdleTimeout
+			default:
+			}
+		}
+		return nil, err
+	}
+	conn := &idleCloser{
+		wrapped: rwc,
+		closed:  make(chan struct{}),
+	}
+	l.newConns <- conn
+	return conn, err
+}
+
+func (l *idleListener) Close() error {
+	defer l.closeOnce.Do(func() { close(l.closed) })
+	return l.wrapped.Close()
+}
+
+func (l *idleListener) Dialer() Dialer {
+	return l.wrapped.Dialer()
+}
+
+func (l *idleListener) run() {
+	var conns []*idleCloser
+	for {
+		var firstClosed chan struct{} // left at nil if there are no active conns
+		var timeout <-chan time.Time  // left at nil if there are  active conns
+		if len(conns) > 0 {
+			firstClosed = conns[0].closed
+		} else {
+			timeout = time.After(l.timeout)
+		}
+		select {
+		case <-l.closed:
+			// the main listener closed, no need to keep going
+			return
+		case conn := <-l.newConns:
+			// a new conn arrived, add it to the list
+			conns = append(conns, conn)
+		case <-timeout:
+			// we timed out, only happens when there are no active conns
+			// close the underlying listener, and allow the normal closing process to happen
+			close(l.wasTimeout)
+			l.wrapped.Close()
+		case <-firstClosed:
+			// a conn closed, remove it from the active list
+			conns = conns[:copy(conns, conns[1:])]
+		}
+	}
+}
diff --git a/jsonrpc2/serve_test.go b/jsonrpc2/serve_test.go
new file mode 100644
index 0000000..475a966
--- /dev/null
+++ b/jsonrpc2/serve_test.go
@@ -0,0 +1,144 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2_test
+
+import (
+	"context"
+	"errors"
+	"testing"
+	"time"
+
+	"golang.org/x/exp/internal/stack/stacktest"
+	"golang.org/x/exp/jsonrpc2"
+)
+
+func TestIdleTimeout(t *testing.T) {
+	stacktest.NoLeak(t)
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+
+	listener, err := jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	listener = jsonrpc2.NewIdleListener(100*time.Millisecond, listener)
+	defer listener.Close()
+	server, err := jsonrpc2.Serve(ctx, listener, jsonrpc2.ConnectionOptions{})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	connect := func() *jsonrpc2.Connection {
+		client, err := jsonrpc2.Dial(ctx,
+			listener.Dialer(),
+			jsonrpc2.ConnectionOptions{})
+		if err != nil {
+			t.Fatal(err)
+		}
+		return client
+	}
+	// Exercise some connection/disconnection patterns, and then assert that when
+	// our timer fires, the server exits.
+	conn1 := connect()
+	conn2 := connect()
+	if err := conn1.Close(); err != nil {
+		t.Fatalf("conn1.Close failed with error: %v", err)
+	}
+	if err := conn2.Close(); err != nil {
+		t.Fatalf("conn2.Close failed with error: %v", err)
+	}
+	conn3 := connect()
+	if err := conn3.Close(); err != nil {
+		t.Fatalf("conn3.Close failed with error: %v", err)
+	}
+
+	serverError := server.Wait()
+
+	if !errors.Is(serverError, jsonrpc2.ErrIdleTimeout) {
+		t.Errorf("run() returned error %v, want %v", serverError, jsonrpc2.ErrIdleTimeout)
+	}
+}
+
+type msg struct {
+	Msg string
+}
+
+type fakeHandler struct{}
+
+func (fakeHandler) Handle(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
+	switch req.Method {
+	case "ping":
+		return &msg{"pong"}, nil
+	default:
+		return nil, jsonrpc2.ErrNotHandled
+	}
+}
+
+func TestServe(t *testing.T) {
+	stacktest.NoLeak(t)
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+
+	tests := []struct {
+		name    string
+		factory func(context.Context) (jsonrpc2.Listener, error)
+	}{
+		{"tcp", func(ctx context.Context) (jsonrpc2.Listener, error) {
+			return jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{})
+		}},
+		{"pipe", func(ctx context.Context) (jsonrpc2.Listener, error) {
+			return jsonrpc2.NetPipe(ctx)
+		}},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			fake, err := test.factory(ctx)
+			if err != nil {
+				t.Fatal(err)
+			}
+			conn, shutdown, err := newFake(t, ctx, fake)
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer shutdown()
+			var got msg
+			if err := conn.Call(ctx, "ping", &msg{"ting"}).Await(ctx, &got); err != nil {
+				t.Fatal(err)
+			}
+			if want := "pong"; got.Msg != want {
+				t.Errorf("conn.Call(...): returned %q, want %q", got, want)
+			}
+		})
+	}
+}
+
+func newFake(t *testing.T, ctx context.Context, l jsonrpc2.Listener) (*jsonrpc2.Connection, func(), error) {
+	l = jsonrpc2.NewIdleListener(100*time.Millisecond, l)
+	server, err := jsonrpc2.Serve(ctx, l, jsonrpc2.ConnectionOptions{
+		Handler: fakeHandler{},
+	})
+	if err != nil {
+		return nil, nil, err
+	}
+
+	client, err := jsonrpc2.Dial(ctx,
+		l.Dialer(),
+		jsonrpc2.ConnectionOptions{
+			Handler: fakeHandler{},
+		})
+	if err != nil {
+		return nil, nil, err
+	}
+	return client, func() {
+		if err := l.Close(); err != nil {
+			t.Fatal(err)
+		}
+		if err := client.Close(); err != nil {
+			t.Fatal(err)
+		}
+		server.Wait()
+	}, nil
+}
diff --git a/jsonrpc2/wire.go b/jsonrpc2/wire.go
new file mode 100644
index 0000000..97b1ae8
--- /dev/null
+++ b/jsonrpc2/wire.go
@@ -0,0 +1,74 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+	"encoding/json"
+)
+
+// This file contains the go forms of the wire specification.
+// see http://www.jsonrpc.org/specification for details
+
+var (
+	// ErrUnknown should be used for all non coded errors.
+	ErrUnknown = NewError(-32001, "JSON RPC unknown error")
+	// ErrParse is used when invalid JSON was received by the server.
+	ErrParse = NewError(-32700, "JSON RPC parse error")
+	// ErrInvalidRequest is used when the JSON sent is not a valid Request object.
+	ErrInvalidRequest = NewError(-32600, "JSON RPC invalid request")
+	// ErrMethodNotFound should be returned by the handler when the method does
+	// not exist / is not available.
+	ErrMethodNotFound = NewError(-32601, "JSON RPC method not found")
+	// ErrInvalidParams should be returned by the handler when method
+	// parameter(s) were invalid.
+	ErrInvalidParams = NewError(-32602, "JSON RPC invalid params")
+	// ErrInternal indicates a failure to process a call correctly
+	ErrInternal = NewError(-32603, "JSON RPC internal error")
+
+	// The following errors are not part of the json specification, but
+	// compliant extensions specific to this implimentation.
+
+	// ErrServerOverloaded is returned when a message was refused due to a
+	// server being temporarily unable to accept any new messages.
+	ErrServerOverloaded = NewError(-32000, "JSON RPC overloaded")
+)
+
+const wireVersion = "2.0"
+
+// wireCombined has all the fields of both Request and Response.
+// We can decode this and then work out which it is.
+type wireCombined struct {
+	VersionTag string          `json:"jsonrpc"`
+	ID         interface{}     `json:"id,omitempty"`
+	Method     string          `json:"method,omitempty"`
+	Params     json.RawMessage `json:"params,omitempty"`
+	Result     json.RawMessage `json:"result,omitempty"`
+	Error      *wireError      `json:"error,omitempty"`
+}
+
+// wireError represents a structured error in a Response.
+type wireError struct {
+	// Code is an error code indicating the type of failure.
+	Code int64 `json:"code"`
+	// Message is a short description of the error.
+	Message string `json:"message"`
+	// Data is optional structured data containing additional information about the error.
+	Data json.RawMessage `json:"data,omitempty"`
+}
+
+// NewError returns an error that will encode on the wire correctly.
+// The standard codes are made available from this package, this function should
+// only be used to build errors for application specific codes as allowed by the
+// specification.
+func NewError(code int64, message string) error {
+	return &wireError{
+		Code:    code,
+		Message: message,
+	}
+}
+
+func (err *wireError) Error() string {
+	return err.Message
+}
diff --git a/jsonrpc2/wire_test.go b/jsonrpc2/wire_test.go
new file mode 100644
index 0000000..905687d
--- /dev/null
+++ b/jsonrpc2/wire_test.go
@@ -0,0 +1,118 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2_test
+
+import (
+	"bytes"
+	"encoding/json"
+	"reflect"
+	"testing"
+
+	"golang.org/x/exp/jsonrpc2"
+)
+
+func TestWireMessage(t *testing.T) {
+	for _, test := range []struct {
+		name    string
+		msg     jsonrpc2.Message
+		encoded []byte
+	}{{
+		name:    "notification",
+		msg:     newNotification("alive", nil),
+		encoded: []byte(`{"jsonrpc":"2.0","method":"alive"}`),
+	}, {
+		name:    "call",
+		msg:     newCall("msg1", "ping", nil),
+		encoded: []byte(`{"jsonrpc":"2.0","id":"msg1","method":"ping"}`),
+	}, {
+		name:    "response",
+		msg:     newResponse("msg2", "pong", nil),
+		encoded: []byte(`{"jsonrpc":"2.0","id":"msg2","result":"pong"}`),
+	}, {
+		name:    "numerical id",
+		msg:     newCall(1, "poke", nil),
+		encoded: []byte(`{"jsonrpc":"2.0","id":1,"method":"poke"}`),
+	}, {
+		// originally reported in #39719, this checks that result is not present if
+		// it is an error response
+		name: "computing fix edits",
+		msg:  newResponse(3, nil, jsonrpc2.NewError(0, "computing fix edits")),
+		encoded: []byte(`{
+		"jsonrpc":"2.0",
+		"id":3,
+		"error":{
+			"code":0,
+			"message":"computing fix edits"
+		}
+	}`),
+	}} {
+		b, err := jsonrpc2.EncodeMessage(test.msg)
+		if err != nil {
+			t.Fatal(err)
+		}
+		checkJSON(t, b, test.encoded)
+		msg, err := jsonrpc2.DecodeMessage(test.encoded)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if !reflect.DeepEqual(msg, test.msg) {
+			t.Errorf("decoded message does not match\nGot:\n%+#v\nWant:\n%+#v", msg, test.msg)
+		}
+	}
+}
+
+func newNotification(method string, params interface{}) jsonrpc2.Message {
+	msg, err := jsonrpc2.NewNotification(method, params)
+	if err != nil {
+		panic(err)
+	}
+	return msg
+}
+
+func newID(id interface{}) jsonrpc2.ID {
+	switch v := id.(type) {
+	case nil:
+		return jsonrpc2.ID{}
+	case string:
+		return jsonrpc2.StringID(v)
+	case int:
+		return jsonrpc2.Int64ID(int64(v))
+	case int64:
+		return jsonrpc2.Int64ID(v)
+	default:
+		panic("invalid ID type")
+	}
+}
+
+func newCall(id interface{}, method string, params interface{}) jsonrpc2.Message {
+	msg, err := jsonrpc2.NewCall(newID(id), method, params)
+	if err != nil {
+		panic(err)
+	}
+	return msg
+}
+
+func newResponse(id interface{}, result interface{}, rerr error) jsonrpc2.Message {
+	msg, err := jsonrpc2.NewResponse(newID(id), result, rerr)
+	if err != nil {
+		panic(err)
+	}
+	return msg
+}
+
+func checkJSON(t *testing.T, got, want []byte) {
+	// compare the compact form, to allow for formatting differences
+	g := &bytes.Buffer{}
+	if err := json.Compact(g, []byte(got)); err != nil {
+		t.Fatal(err)
+	}
+	w := &bytes.Buffer{}
+	if err := json.Compact(w, []byte(want)); err != nil {
+		t.Fatal(err)
+	}
+	if g.String() != w.String() {
+		t.Errorf("encoded message does not match\nGot:\n%s\nWant:\n%s", g, w)
+	}
+}