cmd/go/internal: remove some users of par.Work

par.Work is used in a number of places as a parallel
work queue. This change replaces it with goroutines
and channels in a number of simpler places where it's
used.

Change-Id: I0620eda46ec7b2c0599a8b9361639af7bb73a05a
Reviewed-on: https://go-review.googlesource.com/c/go/+/248326
Run-TryBot: Michael Matloob <matloob@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Bryan C. Mills <bcmills@google.com>
diff --git a/src/cmd/go/internal/modcmd/download.go b/src/cmd/go/internal/modcmd/download.go
index 946e8ed..857362a 100644
--- a/src/cmd/go/internal/modcmd/download.go
+++ b/src/cmd/go/internal/modcmd/download.go
@@ -5,15 +5,15 @@
 package modcmd
 
 import (
+	"cmd/go/internal/modfetch"
 	"context"
 	"encoding/json"
 	"os"
+	"runtime"
 
 	"cmd/go/internal/base"
 	"cmd/go/internal/cfg"
-	"cmd/go/internal/modfetch"
 	"cmd/go/internal/modload"
-	"cmd/go/internal/par"
 	"cmd/go/internal/work"
 
 	"golang.org/x/mod/module"
@@ -102,33 +102,7 @@
 		}
 	}
 
-	var mods []*moduleJSON
-	var work par.Work
-	listU := false
-	listVersions := false
-	for _, info := range modload.ListModules(ctx, args, listU, listVersions) {
-		if info.Replace != nil {
-			info = info.Replace
-		}
-		if info.Version == "" && info.Error == nil {
-			// main module or module replaced with file path.
-			// Nothing to download.
-			continue
-		}
-		m := &moduleJSON{
-			Path:    info.Path,
-			Version: info.Version,
-		}
-		mods = append(mods, m)
-		if info.Error != nil {
-			m.Error = info.Error.Err
-			continue
-		}
-		work.Add(m)
-	}
-
-	work.Do(10, func(item interface{}) {
-		m := item.(*moduleJSON)
+	downloadModule := func(m *moduleJSON) {
 		var err error
 		m.Info, err = modfetch.InfoFile(m.Path, m.Version)
 		if err != nil {
@@ -157,7 +131,42 @@
 			m.Error = err.Error()
 			return
 		}
-	})
+	}
+
+	var mods []*moduleJSON
+	listU := false
+	listVersions := false
+	type token struct{}
+	sem := make(chan token, runtime.GOMAXPROCS(0))
+	for _, info := range modload.ListModules(ctx, args, listU, listVersions) {
+		if info.Replace != nil {
+			info = info.Replace
+		}
+		if info.Version == "" && info.Error == nil {
+			// main module or module replaced with file path.
+			// Nothing to download.
+			continue
+		}
+		m := &moduleJSON{
+			Path:    info.Path,
+			Version: info.Version,
+		}
+		mods = append(mods, m)
+		if info.Error != nil {
+			m.Error = info.Error.Err
+			continue
+		}
+		sem <- token{}
+		go func() {
+			downloadModule(m)
+			<-sem
+		}()
+	}
+
+	// Fill semaphore channel to wait for goroutines to finish.
+	for n := cap(sem); n > 0; n-- {
+		sem <- token{}
+	}
 
 	if *downloadJSON {
 		for _, m := range mods {
diff --git a/src/cmd/go/internal/modcmd/graph.go b/src/cmd/go/internal/modcmd/graph.go
index 4853503..6da12b9 100644
--- a/src/cmd/go/internal/modcmd/graph.go
+++ b/src/cmd/go/internal/modcmd/graph.go
@@ -15,7 +15,6 @@
 	"cmd/go/internal/base"
 	"cmd/go/internal/cfg"
 	"cmd/go/internal/modload"
-	"cmd/go/internal/par"
 	"cmd/go/internal/work"
 
 	"golang.org/x/mod/module"
@@ -59,23 +58,25 @@
 		return m.Path + "@" + m.Version
 	}
 
-	// Note: using par.Work only to manage work queue.
-	// No parallelism here, so no locking.
 	var out []string
 	var deps int // index in out where deps start
-	var work par.Work
-	work.Add(modload.Target)
-	work.Do(1, func(item interface{}) {
-		m := item.(module.Version)
+	seen := map[module.Version]bool{modload.Target: true}
+	queue := []module.Version{modload.Target}
+	for len(queue) > 0 {
+		var m module.Version
+		m, queue = queue[0], queue[1:]
 		list, _ := reqs.Required(m)
 		for _, r := range list {
-			work.Add(r)
+			if !seen[r] {
+				queue = append(queue, r)
+				seen[r] = true
+			}
 			out = append(out, format(m)+" "+format(r)+"\n")
 		}
 		if m == modload.Target {
 			deps = len(out)
 		}
-	})
+	}
 
 	sort.Slice(out[deps:], func(i, j int) bool {
 		return out[deps+i][0] < out[deps+j][0]
diff --git a/src/cmd/go/internal/modconv/convert.go b/src/cmd/go/internal/modconv/convert.go
index f465a9f..d5a0bc2 100644
--- a/src/cmd/go/internal/modconv/convert.go
+++ b/src/cmd/go/internal/modconv/convert.go
@@ -7,13 +7,12 @@
 import (
 	"fmt"
 	"os"
+	"runtime"
 	"sort"
 	"strings"
-	"sync"
 
 	"cmd/go/internal/base"
 	"cmd/go/internal/modfetch"
-	"cmd/go/internal/par"
 
 	"golang.org/x/mod/modfile"
 	"golang.org/x/mod/module"
@@ -42,46 +41,52 @@
 
 	// Convert requirements block, which may use raw SHA1 hashes as versions,
 	// to valid semver requirement list, respecting major versions.
-	var (
-		work    par.Work
-		mu      sync.Mutex
-		need    = make(map[string]string)
-		replace = make(map[string]*modfile.Replace)
-	)
+	versions := make([]*module.Version, len(mf.Require))
+	replace := make(map[string]*modfile.Replace)
 
 	for _, r := range mf.Replace {
 		replace[r.New.Path] = r
 		replace[r.Old.Path] = r
 	}
-	for _, r := range mf.Require {
+
+	type token struct{}
+	sem := make(chan token, runtime.GOMAXPROCS(0))
+	for i, r := range mf.Require {
 		m := r.Mod
 		if m.Path == "" {
 			continue
 		}
 		if re, ok := replace[m.Path]; ok {
-			work.Add(re.New)
-			continue
+			m = re.New
 		}
-		work.Add(r.Mod)
+		sem <- token{}
+		go func(i int, m module.Version) {
+			repo, info, err := modfetch.ImportRepoRev(m.Path, m.Version)
+			if err != nil {
+				fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), m.Path, m.Version, err)
+				return
+			}
+
+			path := repo.ModulePath()
+			versions[i].Path = path
+			versions[i].Version = info.Version
+
+			<-sem
+		}(i, m)
+	}
+	// Fill semaphore channel to wait for all tasks to finish.
+	for n := cap(sem); n > 0; n-- {
+		sem <- token{}
 	}
 
-	work.Do(10, func(item interface{}) {
-		r := item.(module.Version)
-		repo, info, err := modfetch.ImportRepoRev(r.Path, r.Version)
-		if err != nil {
-			fmt.Fprintf(os.Stderr, "go: converting %s: stat %s@%s: %v\n", base.ShortPath(file), r.Path, r.Version, err)
-			return
-		}
-		mu.Lock()
-		path := repo.ModulePath()
+	need := map[string]string{}
+	for _, v := range versions {
 		// Don't use semver.Max here; need to preserve +incompatible suffix.
-		if v, ok := need[path]; !ok || semver.Compare(v, info.Version) < 0 {
-			need[path] = info.Version
+		if needv, ok := need[v.Path]; !ok || semver.Compare(needv, v.Version) < 0 {
+			need[v.Path] = v.Version
 		}
-		mu.Unlock()
-	})
-
-	var paths []string
+	}
+	paths := make([]string, 0, len(need))
 	for path := range need {
 		paths = append(paths, path)
 	}
diff --git a/src/cmd/go/internal/modget/get.go b/src/cmd/go/internal/modget/get.go
index 93a6bb5..d02c9a8 100644
--- a/src/cmd/go/internal/modget/get.go
+++ b/src/cmd/go/internal/modget/get.go
@@ -11,6 +11,7 @@
 	"fmt"
 	"os"
 	"path/filepath"
+	"runtime"
 	"sort"
 	"strings"
 	"sync"
@@ -21,7 +22,6 @@
 	"cmd/go/internal/load"
 	"cmd/go/internal/modload"
 	"cmd/go/internal/mvs"
-	"cmd/go/internal/par"
 	"cmd/go/internal/search"
 	"cmd/go/internal/work"
 
@@ -725,18 +725,8 @@
 // reported. A map from module paths to queries is returned, which includes
 // queries and modOnly.
 func runQueries(ctx context.Context, cache map[querySpec]*query, queries []*query, modOnly map[string]*query) map[string]*query {
-	var lookup par.Work
-	for _, q := range queries {
-		if cached := cache[q.querySpec]; cached != nil {
-			*q = *cached
-		} else {
-			cache[q.querySpec] = q
-			lookup.Add(q)
-		}
-	}
 
-	lookup.Do(10, func(item interface{}) {
-		q := item.(*query)
+	runQuery := func(q *query) {
 		if q.vers == "none" {
 			// Wait for downgrade step.
 			q.m = module.Version{Path: q.path, Version: "none"}
@@ -747,7 +737,32 @@
 			base.Errorf("go get %s: %v", q.arg, err)
 		}
 		q.m = m
-	})
+	}
+
+	type token struct{}
+	sem := make(chan token, runtime.GOMAXPROCS(0))
+	for _, q := range queries {
+		if cached := cache[q.querySpec]; cached != nil {
+			*q = *cached
+		} else {
+			sem <- token{}
+			go func(q *query) {
+				runQuery(q)
+				<-sem
+			}(q)
+		}
+	}
+
+	// Fill semaphore channel to wait for goroutines to finish.
+	for n := cap(sem); n > 0; n-- {
+		sem <- token{}
+	}
+
+	// Add to cache after concurrent section to avoid races...
+	for _, q := range queries {
+		cache[q.querySpec] = q
+	}
+
 	base.ExitIfErrors()
 
 	byPath := make(map[string]*query)
diff --git a/src/cmd/go/internal/modload/list.go b/src/cmd/go/internal/modload/list.go
index 4768516..8db4d64 100644
--- a/src/cmd/go/internal/modload/list.go
+++ b/src/cmd/go/internal/modload/list.go
@@ -9,12 +9,12 @@
 	"errors"
 	"fmt"
 	"os"
+	"runtime"
 	"strings"
 
 	"cmd/go/internal/base"
 	"cmd/go/internal/cfg"
 	"cmd/go/internal/modinfo"
-	"cmd/go/internal/par"
 	"cmd/go/internal/search"
 
 	"golang.org/x/mod/module"
@@ -22,24 +22,35 @@
 
 func ListModules(ctx context.Context, args []string, listU, listVersions bool) []*modinfo.ModulePublic {
 	mods := listModules(ctx, args, listVersions)
+
+	type token struct{}
+	sem := make(chan token, runtime.GOMAXPROCS(0))
 	if listU || listVersions {
-		var work par.Work
 		for _, m := range mods {
-			work.Add(m)
+			add := func(m *modinfo.ModulePublic) {
+				sem <- token{}
+				go func() {
+					if listU {
+						addUpdate(m)
+					}
+					if listVersions {
+						addVersions(m)
+					}
+					<-sem
+				}()
+			}
+
+			add(m)
 			if m.Replace != nil {
-				work.Add(m.Replace)
+				add(m.Replace)
 			}
 		}
-		work.Do(10, func(item interface{}) {
-			m := item.(*modinfo.ModulePublic)
-			if listU {
-				addUpdate(m)
-			}
-			if listVersions {
-				addVersions(m)
-			}
-		})
 	}
+	// Fill semaphore channel to wait for all tasks to finish.
+	for n := cap(sem); n > 0; n-- {
+		sem <- token{}
+	}
+
 	return mods
 }