acvptool: implement pipelining.

This causes avcptool to send requests without blocking on responses. See
the diff in ACVP.md for details of how to use this feature.

Change-Id: I922b3bd2383cb7d22a5d12ead49d2fa733ee1b97
Reviewed-on: https://boringssl-review.googlesource.com/c/boringssl/+/55345
Reviewed-by: David Benjamin <davidben@google.com>
Commit-Queue: Adam Langley <agl@google.com>
diff --git a/util/fipstools/acvp/acvptool/subprocess/subprocess.go b/util/fipstools/acvp/acvptool/subprocess/subprocess.go
index ee1cb87..9167b47 100644
--- a/util/fipstools/acvp/acvptool/subprocess/subprocess.go
+++ b/util/fipstools/acvp/acvptool/subprocess/subprocess.go
@@ -30,6 +30,9 @@
 // that don't call a server.
 type Transactable interface {
 	Transact(cmd string, expectedResults int, args ...[]byte) ([][]byte, error)
+	TransactAsync(cmd string, expectedResults int, args [][]byte, callback func([][]byte) error)
+	Barrier(callback func()) error
+	Flush() error
 }
 
 // Subprocess is a "middle" layer that interacts with a FIPS module via running
@@ -39,6 +42,26 @@
 	stdin      io.WriteCloser
 	stdout     io.ReadCloser
 	primitives map[string]primitive
+	// supportsFlush is true if the modulewrapper indicated that it wants to receive flush commands.
+	supportsFlush bool
+	// pendingReads is a queue of expected responses. `readerRoutine` reads each response and calls the callback in the matching pendingRead.
+	pendingReads chan pendingRead
+	// readerFinished is a channel that is closed if `readerRoutine` has finished (e.g. because of a read error).
+	readerFinished chan struct{}
+	// readerError is set iff readerFinished is closed. If non-nil then it is the read error that caused `readerRoutine` to finished.
+	readerError error
+}
+
+// pendingRead represents an expected response from the modulewrapper.
+type pendingRead struct {
+	// barrierCallback is called as soon as this pendingRead is the next in the queue, before any read from the modulewrapper.
+	barrierCallback func()
+
+	// callback is called with the result from the modulewrapper. If this is nil then no read is performed.
+	callback func(result [][]byte) error
+	// cmd is the command that requested this read for logging purposes.
+	cmd                string
+	expectedNumResults int
 }
 
 // New returns a new Subprocess middle layer that runs the given binary.
@@ -61,13 +84,18 @@
 	return NewWithIO(cmd, stdin, stdout), nil
 }
 
+// maxPending is the maximum number of requests that can be in the pipeline.
+const maxPending = 4096
+
 // NewWithIO returns a new Subprocess middle layer with the given ReadCloser and
 // WriteCloser. The returned Subprocess will call Wait on the Cmd when closed.
 func NewWithIO(cmd *exec.Cmd, in io.WriteCloser, out io.ReadCloser) *Subprocess {
 	m := &Subprocess{
-		cmd:    cmd,
-		stdin:  in,
-		stdout: out,
+		cmd:            cmd,
+		stdin:          in,
+		stdout:         out,
+		pendingReads:   make(chan pendingRead, maxPending),
+		readerFinished: make(chan struct{}),
 	}
 
 	m.primitives = map[string]primitive{
@@ -116,6 +144,7 @@
 	}
 	m.primitives["ECDSA"] = &ecdsa{"ECDSA", map[string]bool{"P-224": true, "P-256": true, "P-384": true, "P-521": true}, m.primitives}
 
+	go m.readerRoutine()
 	return m
 }
 
@@ -124,10 +153,57 @@
 	m.stdout.Close()
 	m.stdin.Close()
 	m.cmd.Wait()
+	<-m.readerFinished
 }
 
-// Transact performs a single request--response pair with the subprocess.
-func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) ([][]byte, error) {
+func (m *Subprocess) flush() error {
+	if !m.supportsFlush {
+		return nil
+	}
+
+	const cmd = "flush"
+	buf := make([]byte, 8, 8+len(cmd))
+	binary.LittleEndian.PutUint32(buf, 1)
+	binary.LittleEndian.PutUint32(buf[4:], uint32(len(cmd)))
+	buf = append(buf, []byte(cmd)...)
+
+	if _, err := m.stdin.Write(buf); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (m *Subprocess) enqueueRead(pending pendingRead) error {
+	select {
+	case <-m.readerFinished:
+		return m.readerError
+	default:
+	}
+
+	select {
+	case m.pendingReads <- pending:
+		break
+	default:
+		// `pendingReads` is full. Ensure that the modulewrapper will process
+		// some outstanding requests to free up space in the queue.
+		if err := m.flush(); err != nil {
+			return err
+		}
+		m.pendingReads <- pending
+	}
+
+	return nil
+}
+
+// TransactAsync performs a single request--response pair with the subprocess.
+// The callback will run at some future point, in a separate goroutine. All
+// callbacks will, however, be run in the order that TransactAsync was called.
+// Use Flush to wait for all outstanding callbacks.
+func (m *Subprocess) TransactAsync(cmd string, expectedNumResults int, args [][]byte, callback func(result [][]byte) error) {
+	if err := m.enqueueRead(pendingRead{nil, callback, cmd, expectedNumResults}); err != nil {
+		panic(err)
+	}
+
 	argLength := len(cmd)
 	for _, arg := range args {
 		argLength += len(arg)
@@ -145,17 +221,90 @@
 	}
 
 	if _, err := m.stdin.Write(buf); err != nil {
+		panic(err)
+	}
+}
+
+// Flush tells the subprocess to complete all outstanding requests and waits
+// for all outstanding TransactAsync callbacks to complete.
+func (m *Subprocess) Flush() error {
+	if m.supportsFlush {
+		m.flush()
+	}
+
+	done := make(chan struct{})
+	if err := m.enqueueRead(pendingRead{barrierCallback: func() {
+		close(done)
+	}}); err != nil {
+		return err
+	}
+
+	<-done
+	return nil
+}
+
+// Barrier runs callback after all outstanding TransactAsync callbacks have
+// been run.
+func (m *Subprocess) Barrier(callback func()) error {
+	return m.enqueueRead(pendingRead{barrierCallback: callback})
+}
+
+func (m *Subprocess) Transact(cmd string, expectedNumResults int, args ...[]byte) ([][]byte, error) {
+	done := make(chan struct{})
+	var result [][]byte
+	m.TransactAsync(cmd, expectedNumResults, args, func(r [][]byte) error {
+		result = r
+		close(done)
+		return nil
+	})
+
+	if err := m.flush(); err != nil {
 		return nil, err
 	}
 
-	buf = buf[:4]
+	select {
+	case <-done:
+		return result, nil
+	case <-m.readerFinished:
+		return nil, m.readerError
+	}
+}
+
+func (m *Subprocess) readerRoutine() {
+	defer close(m.readerFinished)
+
+	for pendingRead := range m.pendingReads {
+		if pendingRead.barrierCallback != nil {
+			pendingRead.barrierCallback()
+		}
+
+		if pendingRead.callback == nil {
+			continue
+		}
+
+		result, err := m.readResult(pendingRead.cmd, pendingRead.expectedNumResults)
+		if err != nil {
+			m.readerError = err
+			return
+		}
+
+		if err := pendingRead.callback(result); err != nil {
+			m.readerError = err
+			return
+		}
+	}
+}
+
+func (m *Subprocess) readResult(cmd string, expectedNumResults int) ([][]byte, error) {
+	buf := make([]byte, 4)
+
 	if _, err := io.ReadFull(m.stdout, buf); err != nil {
 		return nil, err
 	}
 
 	numResults := binary.LittleEndian.Uint32(buf)
-	if int(numResults) != expectedResults {
-		return nil, fmt.Errorf("expected %d results from %q but got %d", expectedResults, cmd, numResults)
+	if int(numResults) != expectedNumResults {
+		return nil, fmt.Errorf("expected %d results from %q but got %d", expectedNumResults, cmd, numResults)
 	}
 
 	buf = make([]byte, 4*numResults)
@@ -197,16 +346,25 @@
 		return nil, err
 	}
 	var config []struct {
-		Algorithm string `json:"algorithm"`
+		Algorithm string   `json:"algorithm"`
+		Features  []string `json:"features"`
 	}
 	if err := json.Unmarshal(results[0], &config); err != nil {
 		return nil, errors.New("failed to parse config response from wrapper: " + err.Error())
 	}
 	for _, algo := range config {
-		if _, ok := m.primitives[algo.Algorithm]; !ok {
+		if algo.Algorithm == "acvptool" {
+			for _, feature := range algo.Features {
+				switch feature {
+				case "batch":
+					m.supportsFlush = true
+				}
+			}
+		} else if _, ok := m.primitives[algo.Algorithm]; !ok {
 			return nil, fmt.Errorf("wrapper config advertises support for unknown algorithm %q", algo.Algorithm)
 		}
 	}
+
 	return results[0], nil
 }