| // Copyright 2010 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package textproto |
| |
| import ( |
| "sync" |
| ) |
| |
| // A Pipeline manages a pipelined in-order request/response sequence. |
| // |
| // To use a Pipeline p to manage multiple clients on a connection, |
| // each client should run: |
| // |
| // id := p.Next() // take a number |
| // |
| // p.StartRequest(id) // wait for turn to send request |
| // «send request» |
| // p.EndRequest(id) // notify Pipeline that request is sent |
| // |
| // p.StartResponse(id) // wait for turn to read response |
| // «read response» |
| // p.EndResponse(id) // notify Pipeline that response is read |
| // |
| // A pipelined server can use the same calls to ensure that |
| // responses computed in parallel are written in the correct order. |
| type Pipeline struct { |
| mu sync.Mutex |
| id uint |
| request sequencer |
| response sequencer |
| } |
| |
| // Next returns the next id for a request/response pair. |
| func (p *Pipeline) Next() uint { |
| p.mu.Lock() |
| id := p.id |
| p.id++ |
| p.mu.Unlock() |
| return id |
| } |
| |
| // StartRequest blocks until it is time to send (or, if this is a server, receive) |
| // the request with the given id. |
| func (p *Pipeline) StartRequest(id uint) { |
| p.request.Start(id) |
| } |
| |
| // EndRequest notifies p that the request with the given id has been sent |
| // (or, if this is a server, received). |
| func (p *Pipeline) EndRequest(id uint) { |
| p.request.End(id) |
| } |
| |
| // StartResponse blocks until it is time to receive (or, if this is a server, send) |
| // the request with the given id. |
| func (p *Pipeline) StartResponse(id uint) { |
| p.response.Start(id) |
| } |
| |
| // EndResponse notifies p that the response with the given id has been received |
| // (or, if this is a server, sent). |
| func (p *Pipeline) EndResponse(id uint) { |
| p.response.End(id) |
| } |
| |
| // A sequencer schedules a sequence of numbered events that must |
| // happen in order, one after the other. The event numbering must start |
| // at 0 and increment without skipping. The event number wraps around |
| // safely as long as there are not 2^32 simultaneous events pending. |
| type sequencer struct { |
| mu sync.Mutex |
| id uint |
| wait map[uint]chan struct{} |
| } |
| |
| // Start waits until it is time for the event numbered id to begin. |
| // That is, except for the first event, it waits until End(id-1) has |
| // been called. |
| func (s *sequencer) Start(id uint) { |
| s.mu.Lock() |
| if s.id == id { |
| s.mu.Unlock() |
| return |
| } |
| c := make(chan struct{}) |
| if s.wait == nil { |
| s.wait = make(map[uint]chan struct{}) |
| } |
| s.wait[id] = c |
| s.mu.Unlock() |
| <-c |
| } |
| |
| // End notifies the sequencer that the event numbered id has completed, |
| // allowing it to schedule the event numbered id+1. It is a run-time error |
| // to call End with an id that is not the number of the active event. |
| func (s *sequencer) End(id uint) { |
| s.mu.Lock() |
| if s.id != id { |
| s.mu.Unlock() |
| panic("out of sync") |
| } |
| id++ |
| s.id = id |
| if s.wait == nil { |
| s.wait = make(map[uint]chan struct{}) |
| } |
| c, ok := s.wait[id] |
| if ok { |
| delete(s.wait, id) |
| } |
| s.mu.Unlock() |
| if ok { |
| close(c) |
| } |
| } |