acvptool: implement pipelining.
This causes avcptool to send requests without blocking on responses. See
the diff in ACVP.md for details of how to use this feature.
Change-Id: I922b3bd2383cb7d22a5d12ead49d2fa733ee1b97
Reviewed-on: https://boringssl-review.googlesource.com/c/boringssl/+/55345
Reviewed-by: David Benjamin <davidben@google.com>
Commit-Queue: Adam Langley <agl@google.com>
diff --git a/util/fipstools/acvp/acvptool/subprocess/subprocess.go b/util/fipstools/acvp/acvptool/subprocess/subprocess.go
index ee1cb87..9167b47 100644
--- a/util/fipstools/acvp/acvptool/subprocess/subprocess.go
+++ b/util/fipstools/acvp/acvptool/subprocess/subprocess.go
@@ -30,6 +30,9 @@
// that don't call a server.
type Transactable interface {
Transact(cmd string, expectedResults int, args ...[]byte) ([][]byte, error)
+ TransactAsync(cmd string, expectedResults int, args [][]byte, callback func([][]byte) error)
+ Barrier(callback func()) error
+ Flush() error
}
// Subprocess is a "middle" layer that interacts with a FIPS module via running
@@ -39,6 +42,26 @@
stdin io.WriteCloser
stdout io.ReadCloser
primitives map[string]primitive
+ // supportsFlush is true if the modulewrapper indicated that it wants to receive flush commands.
+ supportsFlush bool
+ // pendingReads is a queue of expected responses. `readerRoutine` reads each response and calls the callback in the matching pendingRead.
+ pendingReads chan pendingRead
+ // readerFinished is a channel that is closed if `readerRoutine` has finished (e.g. because of a read error).
+ readerFinished chan struct{}
+ // readerError is set iff readerFinished is closed. If non-nil then it is the read error that caused `readerRoutine` to finished.
+ readerError error
+}
+
+// pendingRead represents an expected response from the modulewrapper.
+type pendingRead struct {
+ // barrierCallback is called as soon as this pendingRead is the next in the queue, before any read from the modulewrapper.
+ barrierCallback func()
+
+ // callback is called with the result from the modulewrapper. If this is nil then no read is performed.
+ callback func(result [][]byte) error
+ // cmd is the command that requested this read for logging purposes.
+ cmd string
+ expectedNumResults int
}
// New returns a new Subprocess middle layer that runs the given binary.
@@ -61,13 +84,18 @@
return NewWithIO(cmd, stdin, stdout), nil
}
+// maxPending is the maximum number of requests that can be in the pipeline.
+const maxPending = 4096
+
// NewWithIO returns a new Subprocess middle layer with the given ReadCloser and
// WriteCloser. The returned Subprocess will call Wait on the Cmd when closed.
func NewWithIO(cmd *exec.Cmd, in io.WriteCloser, out io.ReadCloser) *Subprocess {
m := &Subprocess{
- cmd: cmd,
- stdin: in,
- stdout: out,
+ cmd: cmd,
+ stdin: in,
+ stdout: out,
+ pendingReads: make(chan pendingRead, maxPending),
+ readerFinished: make(chan struct{}),
}
m.primitives = map[string]primitive{
@@ -116,6 +144,7 @@
}
m.primitives["ECDSA"] = &ecdsa{"ECDSA", map[string]bool{"P-224": true, "P-256": true, "P-384": true, "P-521": true}, m.primitives}
+ go m.readerRoutine()
return m
}
@@ -124,10 +153,57 @@
m.stdout.Close()
m.stdin.Close()
m.cmd.Wait()
+ <-m.readerFinished
}
-// Transact performs a single request--response pair with the subprocess.
-func (m *Subprocess) Transact(cmd string, expectedResults int, args ...[]byte) ([][]byte, error) {
+func (m *Subprocess) flush() error {
+ if !m.supportsFlush {
+ return nil
+ }
+
+ const cmd = "flush"
+ buf := make([]byte, 8, 8+len(cmd))
+ binary.LittleEndian.PutUint32(buf, 1)
+ binary.LittleEndian.PutUint32(buf[4:], uint32(len(cmd)))
+ buf = append(buf, []byte(cmd)...)
+
+ if _, err := m.stdin.Write(buf); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (m *Subprocess) enqueueRead(pending pendingRead) error {
+ select {
+ case <-m.readerFinished:
+ return m.readerError
+ default:
+ }
+
+ select {
+ case m.pendingReads <- pending:
+ break
+ default:
+ // `pendingReads` is full. Ensure that the modulewrapper will process
+ // some outstanding requests to free up space in the queue.
+ if err := m.flush(); err != nil {
+ return err
+ }
+ m.pendingReads <- pending
+ }
+
+ return nil
+}
+
+// TransactAsync performs a single request--response pair with the subprocess.
+// The callback will run at some future point, in a separate goroutine. All
+// callbacks will, however, be run in the order that TransactAsync was called.
+// Use Flush to wait for all outstanding callbacks.
+func (m *Subprocess) TransactAsync(cmd string, expectedNumResults int, args [][]byte, callback func(result [][]byte) error) {
+ if err := m.enqueueRead(pendingRead{nil, callback, cmd, expectedNumResults}); err != nil {
+ panic(err)
+ }
+
argLength := len(cmd)
for _, arg := range args {
argLength += len(arg)
@@ -145,17 +221,90 @@
}
if _, err := m.stdin.Write(buf); err != nil {
+ panic(err)
+ }
+}
+
+// Flush tells the subprocess to complete all outstanding requests and waits
+// for all outstanding TransactAsync callbacks to complete.
+func (m *Subprocess) Flush() error {
+ if m.supportsFlush {
+ m.flush()
+ }
+
+ done := make(chan struct{})
+ if err := m.enqueueRead(pendingRead{barrierCallback: func() {
+ close(done)
+ }}); err != nil {
+ return err
+ }
+
+ <-done
+ return nil
+}
+
+// Barrier runs callback after all outstanding TransactAsync callbacks have
+// been run.
+func (m *Subprocess) Barrier(callback func()) error {
+ return m.enqueueRead(pendingRead{barrierCallback: callback})
+}
+
+func (m *Subprocess) Transact(cmd string, expectedNumResults int, args ...[]byte) ([][]byte, error) {
+ done := make(chan struct{})
+ var result [][]byte
+ m.TransactAsync(cmd, expectedNumResults, args, func(r [][]byte) error {
+ result = r
+ close(done)
+ return nil
+ })
+
+ if err := m.flush(); err != nil {
return nil, err
}
- buf = buf[:4]
+ select {
+ case <-done:
+ return result, nil
+ case <-m.readerFinished:
+ return nil, m.readerError
+ }
+}
+
+func (m *Subprocess) readerRoutine() {
+ defer close(m.readerFinished)
+
+ for pendingRead := range m.pendingReads {
+ if pendingRead.barrierCallback != nil {
+ pendingRead.barrierCallback()
+ }
+
+ if pendingRead.callback == nil {
+ continue
+ }
+
+ result, err := m.readResult(pendingRead.cmd, pendingRead.expectedNumResults)
+ if err != nil {
+ m.readerError = err
+ return
+ }
+
+ if err := pendingRead.callback(result); err != nil {
+ m.readerError = err
+ return
+ }
+ }
+}
+
+func (m *Subprocess) readResult(cmd string, expectedNumResults int) ([][]byte, error) {
+ buf := make([]byte, 4)
+
if _, err := io.ReadFull(m.stdout, buf); err != nil {
return nil, err
}
numResults := binary.LittleEndian.Uint32(buf)
- if int(numResults) != expectedResults {
- return nil, fmt.Errorf("expected %d results from %q but got %d", expectedResults, cmd, numResults)
+ if int(numResults) != expectedNumResults {
+ return nil, fmt.Errorf("expected %d results from %q but got %d", expectedNumResults, cmd, numResults)
}
buf = make([]byte, 4*numResults)
@@ -197,16 +346,25 @@
return nil, err
}
var config []struct {
- Algorithm string `json:"algorithm"`
+ Algorithm string `json:"algorithm"`
+ Features []string `json:"features"`
}
if err := json.Unmarshal(results[0], &config); err != nil {
return nil, errors.New("failed to parse config response from wrapper: " + err.Error())
}
for _, algo := range config {
- if _, ok := m.primitives[algo.Algorithm]; !ok {
+ if algo.Algorithm == "acvptool" {
+ for _, feature := range algo.Features {
+ switch feature {
+ case "batch":
+ m.supportsFlush = true
+ }
+ }
+ } else if _, ok := m.primitives[algo.Algorithm]; !ok {
return nil, fmt.Errorf("wrapper config advertises support for unknown algorithm %q", algo.Algorithm)
}
}
+
return results[0], nil
}