// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package jsonrpc2

import (
	"context"
	"fmt"
	"io"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

// Listener is implemented by protocols to accept new inbound connections.
type Listener interface {
	// Accept accepts an inbound connection to a server.
	// It blocks until either an inbound connection is made, or the listener is closed.
	Accept(context.Context) (io.ReadWriteCloser, error)

	// Close closes the listener.
	// Any blocked Accept or Dial operations will unblock and return errors.
	Close() error

	// Dialer returns a dialer that can be used to connect to this listener
	// locally.
	// If a listener does not implement this it will return nil.
	Dialer() Dialer
}

// Dialer is used by clients to dial a server.
type Dialer interface {
	// Dial returns a new communication byte stream to a listening server.
	Dial(ctx context.Context) (io.ReadWriteCloser, error)
}

// Server is a running server that is accepting incoming connections.
type Server struct {
	listener Listener
	binder   Binder
	async    *async

	shutdownOnce sync.Once
	closing      int32 // atomic: set to nonzero when Shutdown is called
}

// Dial uses the dialer to make a new connection, wraps the returned
// reader and writer using the framer to make a stream, and then builds
// a connection on top of that stream using the binder.
//
// The returned Connection will operate independently using the Preempter and/or
// Handler provided by the Binder, and will release its own resources when the
// connection is broken, but the caller may Close it earlier to stop accepting
// (or sending) new requests.
func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) {
	// dial a server
	rwc, err := dialer.Dial(ctx)
	if err != nil {
		return nil, err
	}
	return newConnection(ctx, rwc, binder, nil), nil
}

// NewServer starts a new server listening for incoming connections and returns
// it.
// This returns a fully running and connected server, it does not block on
// the listener.
// You can call Wait to block on the server, or Shutdown to get the sever to
// terminate gracefully.
// To notice incoming connections, use an intercepting Binder.
func NewServer(ctx context.Context, listener Listener, binder Binder) *Server {
	server := &Server{
		listener: listener,
		binder:   binder,
		async:    newAsync(),
	}
	go server.run(ctx)
	return server
}

// Wait returns only when the server has shut down.
func (s *Server) Wait() error {
	return s.async.wait()
}

// Shutdown informs the server to stop accepting new connections.
func (s *Server) Shutdown() {
	s.shutdownOnce.Do(func() {
		atomic.StoreInt32(&s.closing, 1)
		s.listener.Close()
	})
}

// run accepts incoming connections from the listener,
// If IdleTimeout is non-zero, run exits after there are no clients for this
// duration, otherwise it exits only on error.
func (s *Server) run(ctx context.Context) {
	defer s.async.done()

	var activeConns sync.WaitGroup
	for {
		rwc, err := s.listener.Accept(ctx)
		if err != nil {
			// Only Shutdown closes the listener. If we get an error after Shutdown is
			// called, assume that that was the cause and don't report the error;
			// otherwise, report the error in case it is unexpected.
			if atomic.LoadInt32(&s.closing) == 0 {
				s.async.setError(err)
			}
			// We are done generating new connections for good.
			break
		}

		// A new inbound connection.
		activeConns.Add(1)
		_ = newConnection(ctx, rwc, s.binder, activeConns.Done) // unregisters itself when done
	}
	activeConns.Wait()
}

// NewIdleListener wraps a listener with an idle timeout.
//
// When there are no active connections for at least the timeout duration,
// calls to Accept will fail with ErrIdleTimeout.
//
// A connection is considered inactive as soon as its Close method is called.
func NewIdleListener(timeout time.Duration, wrap Listener) Listener {
	l := &idleListener{
		wrapped:   wrap,
		timeout:   timeout,
		active:    make(chan int, 1),
		timedOut:  make(chan struct{}),
		idleTimer: make(chan *time.Timer, 1),
	}
	l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
	return l
}

type idleListener struct {
	wrapped Listener
	timeout time.Duration

	// Only one of these channels is receivable at any given time.
	active    chan int         // count of active connections; closed when Close is called if not timed out
	timedOut  chan struct{}    // closed when the idle timer expires
	idleTimer chan *time.Timer // holds the timer only when idle
}

// Accept accepts an incoming connection.
//
// If an incoming connection is accepted concurrent to the listener being closed
// due to idleness, the new connection is immediately closed.
func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
	rwc, err := l.wrapped.Accept(ctx)

	select {
	case n, ok := <-l.active:
		if err != nil {
			if ok {
				l.active <- n
			}
			return nil, err
		}
		if ok {
			l.active <- n + 1
		} else {
			// l.wrapped.Close Close has been called, but Accept returned a
			// connection. This race can occur with concurrent Accept and Close calls
			// with any net.Listener, and it is benign: since the listener was closed
			// explicitly, it can't have also timed out.
		}
		return l.newConn(rwc), nil

	case <-l.timedOut:
		if err == nil {
			// Keeping the connection open would leave the listener simultaneously
			// active and closed due to idleness, which would be contradictory and
			// confusing. Close the connection and pretend that it never happened.
			rwc.Close()
		} else {
			// In theory the timeout could have raced with an unrelated error return
			// from Accept. However, ErrIdleTimeout is arguably still valid (since we
			// would have closed due to the timeout independent of the error), and the
			// harm from returning a spurious ErrIdleTimeout is negligible anyway.
		}
		return nil, ErrIdleTimeout

	case timer := <-l.idleTimer:
		if err != nil {
			// The idle timer doesn't run until it receives itself from the idleTimer
			// channel, so it can't have called l.wrapped.Close yet and thus err can't
			// be ErrIdleTimeout. Leave the idle timer as it was and return whatever
			// error we got.
			l.idleTimer <- timer
			return nil, err
		}

		if !timer.Stop() {
			// Failed to stop the timer — the timer goroutine is in the process of
			// firing. Send the timer back to the timer goroutine so that it can
			// safely close the timedOut channel, and then wait for the listener to
			// actually be closed before we return ErrIdleTimeout.
			l.idleTimer <- timer
			rwc.Close()
			<-l.timedOut
			return nil, ErrIdleTimeout
		}

		l.active <- 1
		return l.newConn(rwc), nil
	}
}

func (l *idleListener) Close() error {
	select {
	case _, ok := <-l.active:
		if ok {
			close(l.active)
		}

	case <-l.timedOut:
		// Already closed by the timer; take care not to double-close if the caller
		// only explicitly invokes this Close method once, since the io.Closer
		// interface explicitly leaves doubled Close calls undefined.
		return ErrIdleTimeout

	case timer := <-l.idleTimer:
		if !timer.Stop() {
			// Couldn't stop the timer. It shouldn't take long to run, so just wait
			// (so that the Listener is guaranteed to be closed before we return)
			// and pretend that this call happened afterward.
			// That way we won't leak any timers or goroutines when Close returns.
			l.idleTimer <- timer
			<-l.timedOut
			return ErrIdleTimeout
		}
		close(l.active)
	}

	return l.wrapped.Close()
}

func (l *idleListener) Dialer() Dialer {
	return l.wrapped.Dialer()
}

func (l *idleListener) timerExpired() {
	select {
	case n, ok := <-l.active:
		if ok {
			panic(fmt.Sprintf("jsonrpc2: idleListener idle timer fired with %d connections still active", n))
		} else {
			panic("jsonrpc2: Close finished with idle timer still running")
		}

	case <-l.timedOut:
		panic("jsonrpc2: idleListener idle timer fired more than once")

	case <-l.idleTimer:
		// The timer for this very call!
	}

	// Close the Listener with all channels still blocked to ensure that this call
	// to l.wrapped.Close doesn't race with the one in l.Close.
	defer close(l.timedOut)
	l.wrapped.Close()
}

func (l *idleListener) connClosed() {
	select {
	case n, ok := <-l.active:
		if !ok {
			// l is already closed, so it can't close due to idleness,
			// and we don't need to track the number of active connections any more.
			return
		}
		n--
		if n == 0 {
			l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
		} else {
			l.active <- n
		}

	case <-l.timedOut:
		panic("jsonrpc2: idleListener idle timer fired before last active connection was closed")

	case <-l.idleTimer:
		panic("jsonrpc2: idleListener idle timer active before last active connection was closed")
	}
}

type idleListenerConn struct {
	wrapped   io.ReadWriteCloser
	l         *idleListener
	closeOnce sync.Once
}

func (l *idleListener) newConn(rwc io.ReadWriteCloser) *idleListenerConn {
	c := &idleListenerConn{
		wrapped: rwc,
		l:       l,
	}

	// A caller that forgets to call Close may disrupt the idleListener's
	// accounting, even though the file descriptor for the underlying connection
	// may eventually be garbage-collected anyway.
	//
	// Set a (best-effort) finalizer to verify that a Close call always occurs.
	// (We will clear the finalizer explicitly in Close.)
	runtime.SetFinalizer(c, func(c *idleListenerConn) {
		panic("jsonrpc2: IdleListener connection became unreachable without a call to Close")
	})

	return c
}

func (c *idleListenerConn) Read(p []byte) (int, error)  { return c.wrapped.Read(p) }
func (c *idleListenerConn) Write(p []byte) (int, error) { return c.wrapped.Write(p) }

func (c *idleListenerConn) Close() error {
	defer c.closeOnce.Do(func() {
		c.l.connClosed()
		runtime.SetFinalizer(c, nil)
	})
	return c.wrapped.Close()
}
