| // Copyright 2020 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package par |
| |
| import "fmt" |
| |
| // Queue manages a set of work items to be executed in parallel. The number of |
| // active work items is limited, and excess items are queued sequentially. |
| type Queue struct { |
| maxActive int |
| st chan queueState |
| } |
| |
| type queueState struct { |
| active int // number of goroutines processing work; always nonzero when len(backlog) > 0 |
| backlog []func() |
| idle chan struct{} // if non-nil, closed when active becomes 0 |
| } |
| |
| // NewQueue returns a Queue that executes up to maxActive items in parallel. |
| // |
| // maxActive must be positive. |
| func NewQueue(maxActive int) *Queue { |
| if maxActive < 1 { |
| panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive)) |
| } |
| |
| q := &Queue{ |
| maxActive: maxActive, |
| st: make(chan queueState, 1), |
| } |
| q.st <- queueState{} |
| return q |
| } |
| |
| // Add adds f as a work item in the queue. |
| // |
| // Add returns immediately, but the queue will be marked as non-idle until after |
| // f (and any subsequently-added work) has completed. |
| func (q *Queue) Add(f func()) { |
| st := <-q.st |
| if st.active == q.maxActive { |
| st.backlog = append(st.backlog, f) |
| q.st <- st |
| return |
| } |
| if st.active == 0 { |
| // Mark q as non-idle. |
| st.idle = nil |
| } |
| st.active++ |
| q.st <- st |
| |
| go func() { |
| for { |
| f() |
| |
| st := <-q.st |
| if len(st.backlog) == 0 { |
| if st.active--; st.active == 0 && st.idle != nil { |
| close(st.idle) |
| } |
| q.st <- st |
| return |
| } |
| f, st.backlog = st.backlog[0], st.backlog[1:] |
| q.st <- st |
| } |
| }() |
| } |
| |
| // Idle returns a channel that will be closed when q has no (active or enqueued) |
| // work outstanding. |
| func (q *Queue) Idle() <-chan struct{} { |
| st := <-q.st |
| defer func() { q.st <- st }() |
| |
| if st.idle == nil { |
| st.idle = make(chan struct{}) |
| if st.active == 0 { |
| close(st.idle) |
| } |
| } |
| |
| return st.idle |
| } |