Test async BIO_flush and fix a corner case

We were simulating non-blocking writes, but not non-blocking flush.
Model it as consuming one byte in AsyncBio.

In doing so, fix an obscure corner case in DTLS: If flushing after ACK
blocked, the next try would write a new ACK to the transport. There's no
real harm in this (we're running over UDP), but our tests intentionally
check for exactly the right number of writes and this was easy to fix.

This completely doesn't matter as a non-blocking writes on UDP-like
sockets are not really a thing, much less buffered non-blocking writes
on them. I don't even concretely know of anyone relying on BIO_flush in
TCP-like BIOs anymore in libssl. But since we try to support it, we
should test that we get it right.

Fixed: 381906252
Change-Id: I5296fcb01ca409d3026ca1150d6bdeaccc868014
Reviewed-on: https://boringssl-review.googlesource.com/c/boringssl/+/81348
Auto-Submit: David Benjamin <davidben@google.com>
Reviewed-by: Lily Chen <chlily@google.com>
Commit-Queue: David Benjamin <davidben@google.com>
diff --git a/ssl/d1_both.cc b/ssl/d1_both.cc
index 4a02d26..c81e6f5 100644
--- a/ssl/d1_both.cc
+++ b/ssl/d1_both.cc
@@ -920,11 +920,7 @@
     }
   }
 
-  if (BIO_flush(ssl->wbio.get()) <= 0) {
-    ssl->s3->rwstate = SSL_ERROR_WANT_WRITE;
-    return -1;
-  }
-
+  ssl->d1->pending_flush = true;
   return 1;
 }
 
@@ -1015,11 +1011,7 @@
     return bio_ret;
   }
 
-  if (BIO_flush(ssl->wbio.get()) <= 0) {
-    ssl->s3->rwstate = SSL_ERROR_WANT_WRITE;
-    return -1;
-  }
-
+  ssl->d1->pending_flush = true;
   return 1;
 }
 
@@ -1062,6 +1054,14 @@
     }
   }
 
+  if (ssl->d1->pending_flush) {
+    if (BIO_flush(ssl->wbio.get()) <= 0) {
+      ssl->s3->rwstate = SSL_ERROR_WANT_WRITE;
+      return -1;
+    }
+    ssl->d1->pending_flush = false;
+  }
+
   return 1;
 }
 
diff --git a/ssl/d1_lib.cc b/ssl/d1_lib.cc
index c37e704..7e34420 100644
--- a/ssl/d1_lib.cc
+++ b/ssl/d1_lib.cc
@@ -36,6 +36,7 @@
       handshake_read_overflow(false),
       sending_flight(false),
       sending_ack(false),
+      pending_flush(false),
       queued_key_update(QueuedKeyUpdate::kNone) {}
 
 DTLS1_STATE::~DTLS1_STATE() {}
diff --git a/ssl/internal.h b/ssl/internal.h
index fe41e49..4a442e4 100644
--- a/ssl/internal.h
+++ b/ssl/internal.h
@@ -3113,6 +3113,8 @@
   // a handshake flight and ACK, respectively.
   bool sending_flight : 1;
   bool sending_ack : 1;
+  // pending_flush is whether we have a pending flush on the transport.
+  bool pending_flush : 1;
 
   // queued_key_update, if not kNone, indicates we've queued a KeyUpdate message
   // to send after the current flight is ACKed.
diff --git a/ssl/test/async_bio.cc b/ssl/test/async_bio.cc
index 6e2009b..e7e657d 100644
--- a/ssl/test/async_bio.cc
+++ b/ssl/test/async_bio.cc
@@ -102,12 +102,22 @@
 }
 
 static long AsyncCtrl(BIO *bio, int cmd, long num, void *ptr) {
+  AsyncBio *a = GetData(bio);
   BIO *next = BIO_next(bio);
   if (next == nullptr) {
     return 0;
   }
   BIO_clear_retry_flags(bio);
+  // Model an async flush as consuming one byte of write quota.
+  if (cmd == BIO_CTRL_FLUSH && a->write_quota == 0) {
+    BIO_set_retry_write(bio);
+    errno = EAGAIN;
+    return -1;
+  }
   long ret = BIO_ctrl(next, cmd, num, ptr);
+  if (cmd == BIO_CTRL_FLUSH && ret > 0) {
+    a->write_quota--;
+  }
   BIO_copy_next_retry(bio);
   return ret;
 }