Use DTLS 1.3 ACKs to avoid retransmitting ACKed fragments

This implements the first part of ACK processing: track which parts of
each outgoing message have been ACKed, update when receiving an ACK, and
use it to reduce retransmits. To do this, we also need to track the last
handful of records we sent, and use that to correlate ACKs with packets.

Test this by extending the new retransmit fragment to manage ACKs. The
callback gets told record numbers, along with what message segments they
cover, and may choose to ACK them. If it does, the ReadRetransmit
expectations will be automatically updated.

For now, I've made no attempt to test or handle post-handshake messages.
That has a lot of subtle assumptions around there not being multiple
concurrent transactions, so I think we'll tackle this later.

This also does not handle:

- Triggering retransmits when we receive partial ACKs.
- Implicitly ACKing flights when we receive any part of the next flight.
- Any kind of sending ACKs.

Bug: 42290594
Change-Id: I9e81a7d5c8838d4d31fe828e9fd9871631fe38ed
Reviewed-on: https://boringssl-review.googlesource.com/c/boringssl/+/72387
Reviewed-by: Nick Harper <nharper@chromium.org>
Commit-Queue: David Benjamin <davidben@google.com>
diff --git a/ssl/d1_both.cc b/ssl/d1_both.cc
index 0eb3970..16b32c1 100644
--- a/ssl/d1_both.cc
+++ b/ssl/d1_both.cc
@@ -459,7 +459,7 @@
       return ssl_open_record_success;
 
     case SSL3_RT_ACK:
-      return dtls1_process_ack(ssl, out_alert);
+      return dtls1_process_ack(ssl, out_alert, record_number, record);
 
     case SSL3_RT_HANDSHAKE:
       // Break out to main processing.
@@ -562,6 +562,7 @@
 
 void dtls_clear_outgoing_messages(SSL *ssl) {
   ssl->d1->outgoing_messages.clear();
+  ssl->d1->sent_records = nullptr;
   ssl->d1->outgoing_written = 0;
   ssl->d1->outgoing_offset = 0;
   ssl->d1->outgoing_messages_complete = false;
@@ -572,15 +573,13 @@
 void dtls_clear_unused_write_epochs(SSL *ssl) {
   ssl->d1->extra_write_epochs.EraseIf(
       [ssl](const UniquePtr<DTLSWriteEpoch> &write_epoch) -> bool {
-        // Non-current epochs may be discarded once there are no outgoing
-        // messages that reference them.
+        // Non-current epochs may be discarded once there are no incomplete
+        // outgoing messages that reference them.
         //
-        // TODO(crbug.com/42290594): If |msg| has been fully ACKed, its epoch
-        // may be discarded.
         // TODO(crbug.com/42290594): Epoch 1 (0-RTT) should be retained until
         // epoch 3 (app data) is available.
         for (const auto &msg : ssl->d1->outgoing_messages) {
-          if (msg.epoch == write_epoch->epoch()) {
+          if (msg.epoch == write_epoch->epoch() && !msg.IsFullyAcked()) {
             return false;
           }
         }
@@ -641,6 +640,19 @@
   msg.data = std::move(data);
   msg.epoch = ssl->d1->write_epoch.epoch();
   msg.is_ccs = is_ccs;
+  // Zero-length messages need 1 bit to track whether the peer has received the
+  // message header. (Normally the message header is implicitly received when
+  // any fragment of the message is received at all.)
+  if (!is_ccs && !msg.acked.Init(std::max(msg.msg_len(), size_t{1}))) {
+    return false;
+  }
+
+  // This should not fail if |SSL_MAX_HANDSHAKE_FLIGHT| was sized correctly.
+  //
+  // TODO(crbug.com/42290594): This can currently fail in DTLS 1.3. The caller
+  // can configure how many tickets to send, up to kMaxTickets. Additionally, if
+  // we send 0.5-RTT tickets in 0-RTT, we may even have tickets queued up with
+  // the server flight.
   if (!ssl->d1->outgoing_messages.TryPushBack(std::move(msg))) {
     assert(false);
     OPENSSL_PUT_ERROR(SSL, ERR_R_INTERNAL_ERROR);
@@ -697,15 +709,27 @@
 //
 // If the function stopped because the next message could not be combined into
 // this record, it returns |seal_continue| and the caller should loop again.
-// Otherwise, it returns |seal_flush| and the packet is complete.
+// Otherwise, it returns |seal_flush| and the packet is complete (either because
+// there are no more messages or the packet is full).
 static seal_result_t seal_next_record(SSL *ssl, Span<uint8_t> out,
                                       size_t *out_len) {
-  assert(ssl->d1->outgoing_written < ssl->d1->outgoing_messages.size());
+  *out_len = 0;
+
+  // Skip any fully acked messages.
+  while (ssl->d1->outgoing_written < ssl->d1->outgoing_messages.size() &&
+         ssl->d1->outgoing_messages[ssl->d1->outgoing_written].IsFullyAcked()) {
+    ssl->d1->outgoing_offset = 0;
+    ssl->d1->outgoing_written++;
+  }
+
+  // There was nothing left to write.
+  if (ssl->d1->outgoing_written >= ssl->d1->outgoing_messages.size()) {
+    return seal_flush;
+  }
+
   const auto &first_msg = ssl->d1->outgoing_messages[ssl->d1->outgoing_written];
   size_t prefix_len = dtls_seal_prefix_len(ssl, first_msg.epoch);
   size_t max_in_len = dtls_seal_max_input_len(ssl, first_msg.epoch, out.size());
-  *out_len = 0;
-
   if (max_in_len == 0) {
     // There is no room for a single record.
     return seal_flush;
@@ -733,6 +757,9 @@
   Span<uint8_t> fragments = out.subspan(prefix_len, max_in_len);
   CBB cbb;
   CBB_init_fixed(&cbb, fragments.data(), fragments.size());
+  DTLSSentRecord sent_record;
+  sent_record.first_msg = ssl->d1->outgoing_written;
+  sent_record.first_msg_start = ssl->d1->outgoing_offset;
   while (ssl->d1->outgoing_written < ssl->d1->outgoing_messages.size()) {
     const auto &msg = ssl->d1->outgoing_messages[ssl->d1->outgoing_written];
     if (msg.epoch != first_msg.epoch || msg.is_ccs) {
@@ -743,72 +770,100 @@
     }
 
     // Decode |msg|'s header.
-    CBS cbs(msg.data), body;
+    CBS cbs(msg.data), body_cbs;
     struct hm_header_st hdr;
-    if (!dtls1_parse_fragment(&cbs, &hdr, &body) ||  //
-        hdr.frag_off != 0 ||                         //
-        hdr.frag_len != CBS_len(&body) ||            //
-        hdr.msg_len != CBS_len(&body) ||
-        !CBS_skip(&body, ssl->d1->outgoing_offset) ||  //
+    if (!dtls1_parse_fragment(&cbs, &hdr, &body_cbs) ||  //
+        hdr.frag_off != 0 ||                             //
+        hdr.frag_len != CBS_len(&body_cbs) ||            //
+        hdr.msg_len != CBS_len(&body_cbs) ||             //
         CBS_len(&cbs) != 0) {
       OPENSSL_PUT_ERROR(SSL, ERR_R_INTERNAL_ERROR);
       return seal_error;
     }
 
-    // Determine how much progress can be made.
-    size_t capacity = fragments.size() - CBB_len(&cbb);
-    if (capacity < DTLS1_HM_HEADER_LENGTH + 1) {
-      // We could not fit even 1 byte.
-      break;
+    // Iterate over every un-acked range in the message, if any.
+    Span<const uint8_t> body = body_cbs;
+    for (;;) {
+      auto range = msg.acked.NextUnmarkedRange(ssl->d1->outgoing_offset);
+      if (range.empty()) {
+        // Advance to the next message.
+        ssl->d1->outgoing_offset = 0;
+        ssl->d1->outgoing_written++;
+        break;
+      }
+
+      // Determine how much progress can be made (minimum one byte of progress).
+      size_t capacity = fragments.size() - CBB_len(&cbb);
+      if (capacity < DTLS1_HM_HEADER_LENGTH + 1) {
+        goto packet_full;
+      }
+      size_t todo = std::min(range.size(), capacity - DTLS1_HM_HEADER_LENGTH);
+
+      // Empty messages are special-cased in ACK tracking. We act as if they
+      // have one byte, but in reality that byte is tracking the header.
+      Span<const uint8_t> frag;
+      if (!body.empty()) {
+        frag = body.subspan(range.start, todo);
+      }
+
+      // Assemble the fragment.
+      size_t frag_start = CBB_len(&cbb);
+      CBB child;
+      if (!CBB_add_u8(&cbb, hdr.type) ||      //
+          !CBB_add_u24(&cbb, hdr.msg_len) ||  //
+          !CBB_add_u16(&cbb, hdr.seq) ||
+          !CBB_add_u24(&cbb, range.start) ||
+          !CBB_add_u24_length_prefixed(&cbb, &child) ||
+          !CBB_add_bytes(&child, frag.data(), frag.size()) ||  //
+          !CBB_flush(&cbb)) {
+        OPENSSL_PUT_ERROR(SSL, ERR_R_INTERNAL_ERROR);
+        return seal_error;
+      }
+      size_t frag_end = CBB_len(&cbb);
+
+      // TODO(davidben): It is odd that, on output, we inform the caller of
+      // retransmits and individual fragments, but on input we only inform the
+      // caller of complete messages.
+      ssl_do_msg_callback(ssl, /*is_write=*/1, SSL3_RT_HANDSHAKE,
+                          fragments.subspan(frag_start, frag_end - frag_start));
+
+      ssl->d1->outgoing_offset = range.start + todo;
+      if (todo < range.size()) {
+        // The packet was the limiting factor.
+        goto packet_full;
+      }
     }
-    size_t todo = std::min(CBS_len(&body), capacity - DTLS1_HM_HEADER_LENGTH);
-
-    // Assemble the fragment.
-    size_t frag_start = CBB_len(&cbb);
-    CBB child;
-    if (!CBB_add_u8(&cbb, hdr.type) ||      //
-        !CBB_add_u24(&cbb, hdr.msg_len) ||  //
-        !CBB_add_u16(&cbb, hdr.seq) ||
-        !CBB_add_u24(&cbb, ssl->d1->outgoing_offset) ||
-        !CBB_add_u24_length_prefixed(&cbb, &child) ||
-        !CBB_add_bytes(&child, CBS_data(&body), todo) ||  //
-        !CBB_flush(&cbb)) {
-      OPENSSL_PUT_ERROR(SSL, ERR_R_INTERNAL_ERROR);
-      return seal_error;
-    }
-    size_t frag_end = CBB_len(&cbb);
-
-    // TODO(davidben): It is odd that, on output, we inform the caller of
-    // retransmits and individual fragments, but on input we only inform the
-    // caller of complete messages.
-    ssl_do_msg_callback(ssl, /*is_write=*/1, SSL3_RT_HANDSHAKE,
-                        fragments.subspan(frag_start, frag_end - frag_start));
-
-    if (todo < CBS_len(&body)) {
-      // The packet was the limiting factor. Save the offset for the next packet
-      // and stop.
-      ssl->d1->outgoing_offset += todo;
-      break;
-    }
-
-    // There is still room. Continue to the next message.
-    ssl->d1->outgoing_offset = 0;
-    ssl->d1->outgoing_written++;
   }
 
+packet_full:
+  sent_record.last_msg = ssl->d1->outgoing_written;
+  sent_record.last_msg_end = ssl->d1->outgoing_offset;
+
   // We could not fit anything. Don't try to make a record.
   if (CBB_len(&cbb) == 0) {
     assert(!should_continue);
     return seal_flush;
   }
 
-  DTLSRecordNumber record_number;
-  if (!dtls_seal_record(ssl, &record_number, out.data(), out_len, out.size(),
-                        SSL3_RT_HANDSHAKE, CBB_data(&cbb), CBB_len(&cbb),
-                        first_msg.epoch)) {
+  if (!dtls_seal_record(ssl, &sent_record.number, out.data(), out_len,
+                        out.size(), SSL3_RT_HANDSHAKE, CBB_data(&cbb),
+                        CBB_len(&cbb), first_msg.epoch)) {
     return seal_error;
   }
 
+  // If DTLS 1.3 (or if the version is not yet known and it may be DTLS 1.3),
+  // save the record number to match against ACKs later.
+  if (ssl->s3->version == 0 || ssl_protocol_version(ssl) >= TLS1_3_VERSION) {
+    if (ssl->d1->sent_records == nullptr) {
+      ssl->d1->sent_records =
+          MakeUnique<MRUQueue<DTLSSentRecord, DTLS_MAX_ACK_BUFFER>>();
+      if (ssl->d1->sent_records == nullptr) {
+        return seal_error;
+      }
+    }
+    ssl->d1->sent_records->PushBack(sent_record);
+  }
+
   return should_continue ? seal_continue : seal_flush;
 }
 
@@ -817,8 +872,7 @@
 // appropriate.
 static bool seal_next_packet(SSL *ssl, Span<uint8_t> out, size_t *out_len) {
   size_t total = 0;
-  assert(ssl->d1->outgoing_written < ssl->d1->outgoing_messages.size());
-  while (ssl->d1->outgoing_written < ssl->d1->outgoing_messages.size()) {
+  for (;;) {
     size_t len;
     seal_result_t ret = seal_next_record(ssl, out, &len);
     switch (ret) {
@@ -837,12 +891,6 @@
     }
   }
 
-  // The MTU was too small to make any progress.
-  if (total == 0) {
-    OPENSSL_PUT_ERROR(SSL, SSL_R_MTU_TOO_SMALL);
-    return false;
-  }
-
   *out_len = total;
   return true;
 }
@@ -874,13 +922,23 @@
       return -1;
     }
 
-    int bio_ret = BIO_write(ssl->wbio.get(), packet.data(), packet_len);
-    if (bio_ret <= 0) {
-      // Retry this packet the next time around.
-      ssl->d1->outgoing_written = old_written;
-      ssl->d1->outgoing_offset = old_offset;
-      ssl->s3->rwstate = SSL_ERROR_WANT_WRITE;
-      return bio_ret;
+    if (packet_len == 0 &&
+        ssl->d1->outgoing_written < ssl->d1->outgoing_messages.size()) {
+      // We made no progress with the packet size available, but did not reach
+      // the end.
+      OPENSSL_PUT_ERROR(SSL, SSL_R_MTU_TOO_SMALL);
+      return false;
+    }
+
+    if (packet_len != 0) {
+      int bio_ret = BIO_write(ssl->wbio.get(), packet.data(), packet_len);
+      if (bio_ret <= 0) {
+        // Retry this packet the next time around.
+        ssl->d1->outgoing_written = old_written;
+        ssl->d1->outgoing_offset = old_offset;
+        ssl->s3->rwstate = SSL_ERROR_WANT_WRITE;
+        return bio_ret;
+      }
     }
   }
 
diff --git a/ssl/d1_pkt.cc b/ssl/d1_pkt.cc
index a26fc70..d11b33f 100644
--- a/ssl/d1_pkt.cc
+++ b/ssl/d1_pkt.cc
@@ -114,6 +114,8 @@
 #include <assert.h>
 #include <string.h>
 
+#include <algorithm>
+
 #include <openssl/bio.h>
 #include <openssl/bytestring.h>
 #include <openssl/mem.h>
@@ -127,19 +129,113 @@
 
 BSSL_NAMESPACE_BEGIN
 
-ssl_open_record_t dtls1_process_ack(SSL *ssl, uint8_t *out_alert) {
+ssl_open_record_t dtls1_process_ack(SSL *ssl, uint8_t *out_alert,
+                                    DTLSRecordNumber ack_record_number,
+                                    Span<const uint8_t> data) {
+  // As a DTLS-1.3-capable client, it is possible to receive an ACK before we
+  // receive ServerHello and learned the server picked DTLS 1.3. Thus, tolerate
+  // but ignore ACKs before the version is set.
+  if (ssl->s3->version == 0) {
+    return ssl_open_record_discard;
+  }
+
   // ACKs are only allowed in DTLS 1.3. Reject them if we've negotiated a
-  // version and it's not 1.3. (It's theoretically possible to receive an ACK
-  // before version negotiation, e.g. due to packet loss or a server ACKing a
-  // ClientHello prior to sending the ServerHello, so if we don't have a version
-  // we'll accept the ACK.)
-  if (ssl->s3->version != 0 && ssl_protocol_version(ssl) < TLS1_3_VERSION) {
+  // version and it's not 1.3.
+  if (ssl_protocol_version(ssl) < TLS1_3_VERSION) {
     OPENSSL_PUT_ERROR(SSL, SSL_R_UNEXPECTED_RECORD);
     *out_alert = SSL_AD_UNEXPECTED_MESSAGE;
     return ssl_open_record_error;
   }
-  // TODO(crbug.com/42290594): Implement proper support for ACKs. Currently,
-  // this just drops the ACK on the floor.
+
+  // TODO(crbug.com/42290594): If we implement DTLS 1.3 0-RTT, we may switch to
+  // DTLS 1.3, but then rewind to DTLS 1.2. In the intervening time, we may have
+  // received an ACK and updated state, and then expose the DTLS 1.2 logic to
+  // it. If we haven't received ServerHello, we probably need to continue
+  // dropping ACKs.
+
+  CBS cbs = data, record_numbers;
+  if (!CBS_get_u16_length_prefixed(&cbs, &record_numbers) ||
+      CBS_len(&cbs) != 0) {
+    OPENSSL_PUT_ERROR(SSL, SSL_R_DECODE_ERROR);
+    *out_alert = SSL_AD_DECODE_ERROR;
+    return ssl_open_record_error;
+  }
+
+  while (CBS_len(&record_numbers) != 0) {
+    uint64_t epoch, seq;
+    if (!CBS_get_u64(&record_numbers, &epoch) ||
+        !CBS_get_u64(&record_numbers, &seq)) {
+      OPENSSL_PUT_ERROR(SSL, SSL_R_DECODE_ERROR);
+      *out_alert = SSL_AD_DECODE_ERROR;
+      return ssl_open_record_error;
+    }
+
+    // During the handshake, records must be ACKed at the same or higher epoch.
+    // See https://www.rfc-editor.org/errata/eid8108. Additionally, if the
+    // record does not fit in DTLSRecordNumber, it is definitely not a record
+    // number that we sent.
+    if ((ack_record_number.epoch() < ssl_encryption_application &&
+         epoch > ack_record_number.epoch()) ||
+        epoch > UINT16_MAX || seq > DTLSRecordNumber::kMaxSequence) {
+      OPENSSL_PUT_ERROR(SSL, SSL_R_DECODE_ERROR);
+      *out_alert = SSL_AD_ILLEGAL_PARAMETER;
+      return ssl_open_record_error;
+    }
+
+    // Find the sent record that matches this ACK.
+    DTLSRecordNumber number(static_cast<uint16_t>(epoch), seq);
+    DTLSSentRecord *sent_record = nullptr;
+    if (ssl->d1->sent_records != nullptr) {
+      for (size_t i = 0; i < ssl->d1->sent_records->size(); i++) {
+        if ((*ssl->d1->sent_records)[i].number == number) {
+          sent_record = &(*ssl->d1->sent_records)[i];
+          break;
+        }
+      }
+    }
+    if (sent_record == nullptr) {
+      // We may have sent this record and forgotten it, so this is not an error.
+      continue;
+    }
+
+    // Mark each message as ACKed.
+    if (sent_record->first_msg == sent_record->last_msg) {
+      ssl->d1->outgoing_messages[sent_record->first_msg].acked.MarkRange(
+          sent_record->first_msg_start, sent_record->last_msg_end);
+    } else {
+      ssl->d1->outgoing_messages[sent_record->first_msg].acked.MarkRange(
+          sent_record->first_msg_start, SIZE_MAX);
+      for (size_t i = size_t{sent_record->first_msg} + 1;
+           i < sent_record->last_msg; i++) {
+        ssl->d1->outgoing_messages[i].acked.MarkRange(0, SIZE_MAX);
+      }
+      if (sent_record->last_msg_end != 0) {
+        ssl->d1->outgoing_messages[sent_record->last_msg].acked.MarkRange(
+            0, sent_record->last_msg_end);
+      }
+    }
+
+    // Clear the state so we don't bother re-marking the messages next time.
+    sent_record->first_msg = 0;
+    sent_record->first_msg_start = 0;
+    sent_record->last_msg = 0;
+    sent_record->last_msg_end = 0;
+  }
+
+  // If the outgoing flight is now fully ACKed, we are done retransmitting.
+  if (std::all_of(ssl->d1->outgoing_messages.begin(),
+                  ssl->d1->outgoing_messages.end(),
+                  [](const auto &msg) { return msg.IsFullyAcked(); })) {
+    dtls1_stop_timer(ssl);
+    dtls_clear_outgoing_messages(ssl);
+  } else {
+    // We may still be able to drop unused write epochs.
+    dtls_clear_unused_write_epochs(ssl);
+
+    // TODO(crbug.com/42290594): Schedule a retransmit. The peer will have
+    // waited before sending the ACK, so a partial ACK suggests packet loss.
+  }
+
   return ssl_open_record_discard;
 }
 
@@ -199,7 +295,7 @@
   }
 
   if (type == SSL3_RT_ACK) {
-    return dtls1_process_ack(ssl, out_alert);
+    return dtls1_process_ack(ssl, out_alert, record_number, record);
   }
 
   if (type != SSL3_RT_APPLICATION_DATA) {
diff --git a/ssl/internal.h b/ssl/internal.h
index 636decf..371a08c 100644
--- a/ssl/internal.h
+++ b/ssl/internal.h
@@ -1294,6 +1294,11 @@
     return DTLSRecordNumber(combined);
   }
 
+  bool operator==(DTLSRecordNumber r) const {
+    return combined() == r.combined();
+  };
+  bool operator!=(DTLSRecordNumber r) const { return !((*this) == r); }
+
   uint64_t combined() const { return combined_; }
   uint16_t epoch() const { return combined_ >> 48; }
   uint64_t sequence() const { return combined_ & kMaxSequence; }
@@ -1600,12 +1605,6 @@
 // tls_flush_pending_hs_data flushes any handshake plaintext data.
 bool tls_flush_pending_hs_data(SSL *ssl);
 
-struct DTLSOutgoingMessage {
-  Array<uint8_t> data;
-  uint16_t epoch = 0;
-  bool is_ccs = false;
-};
-
 // dtls_clear_outgoing_messages releases all buffered outgoing messages.
 void dtls_clear_outgoing_messages(SSL *ssl);
 
@@ -3361,6 +3360,7 @@
     size_t end = 0;
 
     bool empty() const { return start == end; }
+    size_t size() const { return end - start; }
     bool operator==(const Range &r) const {
       return start == r.start && end == r.end;
     }
@@ -3421,6 +3421,27 @@
   DTLSMessageBitmap reassembly;
 };
 
+struct DTLSOutgoingMessage {
+  size_t msg_len() const {
+    assert(!is_ccs);
+    assert(data.size() >= DTLS1_HM_HEADER_LENGTH);
+    return data.size() - DTLS1_HM_HEADER_LENGTH;
+  }
+
+  bool IsFullyAcked() const {
+    // ACKs only exist in DTLS 1.3, which does not send ChangeCipherSpec.
+    return !is_ccs && acked.IsComplete();
+  }
+
+  Array<uint8_t> data;
+  uint16_t epoch = 0;
+  bool is_ccs = false;
+  // acked tracks which bits of the message have been ACKed by the peer. If
+  // |msg_len| is zero, it tracks one bit for whether the header has been
+  // received.
+  DTLSMessageBitmap acked;
+};
+
 struct OPENSSL_timeval {
   uint64_t tv_sec;
   uint32_t tv_usec;
@@ -3438,6 +3459,29 @@
 // discarded.
 #define DTLS_MAX_EXTRA_WRITE_EPOCHS 2
 
+// DTLS_MAX_ACK_BUFFER is the maximum number of records worth of data we'll keep
+// track of with DTLS 1.3 ACKs. When we exceed this value, information about
+// stale records will be dropped. This will not break the connection but may
+// cause ACKs to perform worse and retransmit unnecessary information.
+#define DTLS_MAX_ACK_BUFFER 32
+
+// A DTLSSentRecord records information about a record we sent. Each record
+// covers all bytes from |first_msg_start| (inclusive) of |first_msg| to
+// |last_msg_end| (exclusive) of |last_msg|. Messages are referenced by index
+// into |outgoing_messages|. |last_msg_end| may be |outgoing_messages.size()| if
+// |last_msg_end| is zero.
+//
+// When the message is empty, |first_msg_start| and |last_msg_end| are
+// maintained as if there is a single bit in the message representing the
+// header. See |acked| in DTLSOutgoingMessage.
+struct DTLSSentRecord {
+  DTLSRecordNumber number;
+  PackedSize<SSL_MAX_HANDSHAKE_FLIGHT> first_msg = 0;
+  PackedSize<SSL_MAX_HANDSHAKE_FLIGHT> last_msg = 0;
+  uint32_t first_msg_start = 0;
+  uint32_t last_msg_end = 0;
+};
+
 struct DTLS1_STATE {
   static constexpr bool kAllowUniquePtr = true;
 
@@ -3490,6 +3534,11 @@
   InplaceVector<DTLSOutgoingMessage, SSL_MAX_HANDSHAKE_FLIGHT>
       outgoing_messages;
 
+  // sent_records is a queue of records we sent, for processing ACKs. To save
+  // memory in the steady state, the structure is stored on the heap and dropped
+  // when empty.
+  UniquePtr<MRUQueue<DTLSSentRecord, DTLS_MAX_ACK_BUFFER>> sent_records;
+
   // outgoing_written is the number of outgoing messages that have been
   // written.
   uint8_t outgoing_written = 0;
@@ -3828,7 +3877,9 @@
 // on success and false on allocation failure.
 bool ssl_hash_message(SSL_HANDSHAKE *hs, const SSLMessage &msg);
 
-ssl_open_record_t dtls1_process_ack(SSL *ssl, uint8_t *out_alert);
+ssl_open_record_t dtls1_process_ack(SSL *ssl, uint8_t *out_alert,
+                                    DTLSRecordNumber ack_record_number,
+                                    Span<const uint8_t> data);
 ssl_open_record_t dtls1_open_app_data(SSL *ssl, Span<uint8_t> *out,
                                       size_t *out_consumed, uint8_t *out_alert,
                                       Span<uint8_t> in);
diff --git a/ssl/ssl_internal_test.cc b/ssl/ssl_internal_test.cc
index 90d91bd..9ff2e1b 100644
--- a/ssl/ssl_internal_test.cc
+++ b/ssl/ssl_internal_test.cc
@@ -599,6 +599,15 @@
   expect_bitmap(bitmap2, {{0, 1}, {2, 3}, {9 - 2, 9}, {27 - 4, 27 - 2}});
   bitmap2.MarkRange(0, 50);
   expect_bitmap(bitmap2, {});
+
+  // MarkRange inputs may be "out of bounds". The bitmap has conceptually
+  // infinitely many marked bits past where it was initialized.
+  ASSERT_TRUE(bitmap.Init(10));
+  expect_bitmap(bitmap, {{0, 10}});
+  bitmap.MarkRange(5, SIZE_MAX);
+  expect_bitmap(bitmap, {{0, 5}});
+  bitmap.MarkRange(0, SIZE_MAX);
+  expect_bitmap(bitmap, {});
 }
 
 TEST(MRUQueueTest, Basic) {
diff --git a/ssl/test/runner/common.go b/ssl/test/runner/common.go
index 82fde25..16a1df2 100644
--- a/ssl/test/runner/common.go
+++ b/ssl/test/runner/common.go
@@ -1293,12 +1293,12 @@
 
 	// WriteFlightDTLS, if not nil, overrides the default behavior for writing
 	// the flight in DTLS. See DTLSController for details.
-	WriteFlightDTLS func(c *DTLSController, prev, received, next []DTLSMessage)
+	WriteFlightDTLS func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo)
 
 	// ACKFlightDTLS, if not nil, overrides the default behavior for
 	// acknowledging the final flight (of either the handshake or a
 	// post-handshake transaction) in DTLS. See DTLSController for details.
-	ACKFlightDTLS func(c *DTLSController, prev, received []DTLSMessage)
+	ACKFlightDTLS func(c *DTLSController, prev, received []DTLSMessage, records []DTLSRecordNumberInfo)
 
 	// MockQUICTransport is the mockQUICTransport used when testing
 	// QUIC interfaces.
@@ -2023,10 +2023,6 @@
 	// the DTLS 1.3 record header.
 	DTLS13RecordHeaderSetCIDBit bool
 
-	// ACKEveryRecord sends an ACK record immediately on response to each
-	// handshake record received.
-	ACKEveryRecord bool
-
 	// EncryptSessionTicketKey, if non-nil, is the ticket key to use when
 	// encrypting tickets.
 	EncryptSessionTicketKey *[32]byte
diff --git a/ssl/test/runner/conn.go b/ssl/test/runner/conn.go
index d70061b..cef3091 100644
--- a/ssl/test/runner/conn.go
+++ b/ssl/test/runner/conn.go
@@ -121,9 +121,10 @@
 	pendingPacket    []byte // pending outgoing packet.
 	maxPacketLen     int
 
-	previousFlight []DTLSMessage
-	receivedFlight []DTLSMessage
-	nextFlight     []DTLSMessage
+	previousFlight        []DTLSMessage
+	receivedFlight        []DTLSMessage
+	receivedFlightRecords []DTLSRecordNumberInfo
+	nextFlight            []DTLSMessage
 
 	keyUpdateSeen      bool
 	keyUpdateRequested bool
@@ -829,6 +830,11 @@
 }
 
 func (c *Conn) useOutTrafficSecret(epoch uint16, version uint16, suite *cipherSuite, secret []byte) {
+	if !c.isDTLS {
+		// The TLS logic relies on flushHandshake to write out packed handshake
+		// data on key changes. The DTLS logic handles key changes directly.
+		c.flushHandshake()
+	}
 	side := serverWrite
 	if c.isClient {
 		side = clientWrite
diff --git a/ssl/test/runner/dtls.go b/ssl/test/runner/dtls.go
index d718f9a..f360e38 100644
--- a/ssl/test/runner/dtls.go
+++ b/ssl/test/runner/dtls.go
@@ -16,6 +16,7 @@
 
 import (
 	"bytes"
+	"cmp"
 	"encoding/binary"
 	"errors"
 	"fmt"
@@ -87,6 +88,36 @@
 	return bb.BytesOrPanic()
 }
 
+func comparePair[T1 cmp.Ordered, T2 cmp.Ordered](a1 T1, a2 T2, b1 T1, b2 T2) int {
+	cmp1 := cmp.Compare(a1, b1)
+	if cmp1 != 0 {
+		return cmp1
+	}
+	return cmp.Compare(a2, b2)
+}
+
+// A DTLSRecordNumberInfo contains information about a record received from the
+// shim, which we may attempt to ACK.
+type DTLSRecordNumberInfo struct {
+	// Store the Epoch as a uint64, so that tests can send ACKs for epochs that
+	// the shim would never use.
+	Epoch    uint64
+	Sequence uint64
+	// The first byte covered by this record, inclusive. We only need to store
+	// one range because we require that the shim arrange fragments in order.
+	// Any gaps will have been previously-ACKed data, so there is no harm in
+	// double-ACKing.
+	MessageStartSequence uint16
+	MessageStartOffset   int
+	// The last byte covered by this record, exclusive.
+	MessageEndSequence uint16
+	MessageEndOffset   int
+}
+
+func (r *DTLSRecordNumberInfo) HasACKInformation() bool {
+	return comparePair(r.MessageStartSequence, r.MessageStartOffset, r.MessageEndSequence, r.MessageEndOffset) < 0
+}
+
 func (c *Conn) readDTLS13RecordHeader(epoch *epochState, b []byte) (headerLen int, recordLen int, recTyp recordType, err error) {
 	// The DTLS 1.3 record header starts with the type byte containing
 	// 0b001CSLEE, where C, S, L, and EE are bits with the following
@@ -210,18 +241,6 @@
 	return recordHeaderLen, recordLen, typ, nil
 }
 
-func (c *Conn) writeACKs(seqnums []uint64) {
-	recordNumbers := new(cryptobyte.Builder)
-	epoch := binary.BigEndian.Uint16(c.in.epoch.seq[:2])
-	recordNumbers.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) {
-		for _, seq := range seqnums {
-			b.AddUint64(uint64(epoch))
-			b.AddUint64(seq)
-		}
-	})
-	c.writeRecord(recordTypeACK, recordNumbers.BytesOrPanic())
-}
-
 func (c *Conn) dtlsDoReadRecord(epoch *epochState, want recordType) (recordType, []byte, error) {
 	// Read a new packet only if the current one is empty.
 	var newPacket bool
@@ -258,7 +277,6 @@
 	b := c.rawInput.Next(recordHeaderLen + n)
 
 	// Process message.
-	seq := slices.Clone(epoch.seq[:])
 	ok, encTyp, data, alertValue := c.in.decrypt(epoch, recordHeaderLen, b)
 	if !ok {
 		// A real DTLS implementation would silently ignore bad records,
@@ -266,9 +284,6 @@
 		// test.
 		return 0, nil, c.in.setErrorLocked(c.sendAlert(alertValue))
 	}
-	if c.config.Bugs.ACKEveryRecord {
-		c.writeACKs([]uint64{binary.BigEndian.Uint64(seq)})
-	}
 
 	if typ == 0 {
 		// readDTLSRecordHeader sets typ=0 when decoding the DTLS 1.3
@@ -406,12 +421,12 @@
 
 	// Avoid re-entrancy issues by updating the state immediately. The callback
 	// may try to write records.
-	prev, received, next := c.previousFlight, c.receivedFlight, c.nextFlight
-	c.previousFlight, c.receivedFlight, c.nextFlight = next, nil, nil
+	prev, received, next, records := c.previousFlight, c.receivedFlight, c.nextFlight, c.receivedFlightRecords
+	c.previousFlight, c.receivedFlight, c.nextFlight, c.receivedFlightRecords = next, nil, nil, nil
 
-	controller := DTLSController{conn: c, received: received}
+	controller := newDTLSController(c, received)
 	if c.config.Bugs.WriteFlightDTLS != nil {
-		c.config.Bugs.WriteFlightDTLS(&controller, prev, received, next)
+		c.config.Bugs.WriteFlightDTLS(&controller, prev, received, next, records)
 	} else {
 		controller.WriteFlight(next)
 	}
@@ -444,12 +459,12 @@
 
 	// Avoid re-entrancy issues by updating the state immediately. The callback
 	// may try to write records.
-	prev, received := c.previousFlight, c.receivedFlight
-	c.previousFlight, c.receivedFlight = nil, nil
+	prev, received, records := c.previousFlight, c.receivedFlight, c.receivedFlightRecords
+	c.previousFlight, c.receivedFlight, c.receivedFlightRecords = nil, nil, nil
 
-	controller := DTLSController{conn: c, received: received}
+	controller := newDTLSController(c, received)
 	if c.config.Bugs.ACKFlightDTLS != nil {
-		c.config.Bugs.ACKFlightDTLS(&controller, prev, received)
+		c.config.Bugs.ACKFlightDTLS(&controller, prev, received, records)
 	} else {
 		// TODO(crbug.com/42290594): In DTLS 1.3, send an ACK by default.
 	}
@@ -606,6 +621,38 @@
 	return f, nil
 }
 
+func makeDTLSRecordNumberInfo(epoch *epochState, data []byte) (DTLSRecordNumberInfo, error) {
+	info := DTLSRecordNumberInfo{
+		Epoch: uint64(epoch.epoch),
+		// Remove the embedded epoch number. The sequence number has also since
+		// been incremented, so adjust it back down.
+		//
+		// TODO(crbug.com/376641666): The record abstractions should reliably
+		// return the sequence number.
+		Sequence: (binary.BigEndian.Uint64(epoch.seq[:]) & (1<<48 - 1)) - 1,
+	}
+
+	s := cryptobyte.String(data)
+	first := true
+	for !s.Empty() {
+		f, err := readDTLSFragment(&s)
+		if err != nil {
+			return DTLSRecordNumberInfo{}, err
+		}
+		// This assumes the shim sent fragments in order. This isn't checked
+		// here, but the caller will check when processing the fragments.
+		if first {
+			info.MessageStartSequence = f.Sequence
+			info.MessageStartOffset = f.Offset
+			first = false
+		}
+		info.MessageEndSequence = f.Sequence
+		info.MessageEndOffset = f.Offset + len(f.Data)
+	}
+
+	return info, nil
+}
+
 func (c *Conn) dtlsDoReadHandshake() ([]byte, error) {
 	// Assemble a full handshake message.  For test purposes, this
 	// implementation assumes fragments arrive in order. It may
@@ -621,6 +668,11 @@
 			if err := c.readRecord(recordTypeHandshake); err != nil {
 				return nil, err
 			}
+			record, err := makeDTLSRecordNumberInfo(&c.in.epoch, c.hand.Bytes())
+			if err != nil {
+				return nil, err
+			}
+			c.receivedFlightRecords = append(c.receivedFlightRecords, record)
 		}
 
 		// Read the next fragment. It must fit entirely within
@@ -707,7 +759,7 @@
 // shim's "lost" flight as usual. But, instead of responding, it calls a
 // test-provided function of the form:
 //
-//	func WriteFlight(c *DTLSController, prev, received, next []DTLSMessage)
+//	func WriteFlight(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo)
 //
 // WriteFlight will be called next as the flight for the runner to send. prev is
 // the previous flight sent by the runner, and received is the most recent
@@ -728,7 +780,7 @@
 //
 // If unspecified, the default implementation of WriteFlight is:
 //
-//	func WriteFlight(c *DTLSController, prev, received, next []DTLSMessage) {
+//	func WriteFlight(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
 //		c.WriteFlight(next)
 //	}
 //
@@ -754,30 +806,55 @@
 // test that final flight retransmissions and post-handshake messages can
 // interleave with application data.
 //
-// TODO(crbug.com/42290594): ExpectRetransmit should return a sequence of record
-// numbers, which the test callback can use to send ACKs. Track outgoing ACKs in
-// the test framework, so calls to ExpectRetransmit implicitly check that the
-// shim only retransmits unACKed data. Have some way to account for the shim
-// forgetting packet numbers when its buffer is full, and the point before the
-// shim learns it's speaking DTLS 1.3.
-//
 // TODO(crbug.com/42290594): When we implement ACK-sending on the shim, add a
 // way for the test to specify which ACKs are expected, unless we can derive
 // that automatically?
 //
 // TODO(crbug.com/42290594): The default behavior for ACKFlight should be to
-// send an ACK. The callback also needs to take, as input, the list of record
-// numbers matching the initial flight.
+// send an ACK.
 type DTLSController struct {
-	conn     *Conn
-	err      error
-	received []DTLSMessage
+	conn *Conn
+	err  error
+	// retransmitNeeded contains the list of fragments which the shim must
+	// retransmit.
+	retransmitNeeded []DTLSFragment
+}
+
+func newDTLSController(conn *Conn, received []DTLSMessage) DTLSController {
+	var retransmitNeeded []DTLSFragment
+	for i := range received {
+		msg := &received[i]
+		retransmitNeeded = append(retransmitNeeded, msg.Fragment(0, len(msg.Data)))
+	}
+
+	return DTLSController{conn: conn, retransmitNeeded: retransmitNeeded}
+}
+
+func (c *DTLSController) getOutEpochOrPanic(epochValue uint16) *epochState {
+	epoch, ok := c.conn.out.getEpoch(epochValue)
+	if !ok {
+		panic(fmt.Sprintf("tls: could not find epoch %d", epochValue))
+	}
+	return epoch
+}
+
+func (c *DTLSController) getInEpochOrPanic(epochValue uint16) *epochState {
+	epoch, ok := c.conn.in.getEpoch(epochValue)
+	if !ok {
+		panic(fmt.Sprintf("tls: could not find epoch %d", epochValue))
+	}
+	return epoch
 }
 
 // Err returns whether the controller has stopped due to an error, or nil
 // otherwise. If it returns non-nil, other methods will silently do nothing.
 func (c *DTLSController) Err() error { return c.err }
 
+// OutEpoch returns the current outgoing epoch.
+func (c *DTLSController) OutEpoch() uint16 {
+	return c.conn.out.epoch.epoch
+}
+
 // AdvanceClock advances the shim's clock by duration. It is a test failure if
 // the shim sends anything before picking up the command.
 func (c *DTLSController) AdvanceClock(duration time.Duration) {
@@ -942,11 +1019,7 @@
 		}
 
 		if epoch == nil {
-			var ok bool
-			epoch, ok = c.conn.out.getEpoch(f.Epoch)
-			if !ok {
-				panic(fmt.Sprintf("tls: could not find epoch %d", f.Epoch))
-			}
+			epoch = c.getOutEpochOrPanic(f.Epoch)
 		}
 
 		if f.IsChangeCipherSpec {
@@ -988,30 +1061,96 @@
 	c.err = flush()
 }
 
-// ReadRetransmit indicates the shim is expected to retransmit its current
-// flight and consumes the retransmission.
-func (c *DTLSController) ReadRetransmit() {
+// WriteACK writes the specified record numbers in an ACK record to the shim,
+// and updates shim expectations according to the specified byte ranges. To send
+// an ACK which the shim is expected to ignore (e.g. because it should have
+// forgotten a packet number), use a DTLSRecordNumberInfo with the
+// MessageStartSequence, etc., fields all set to zero.
+func (c *DTLSController) WriteACK(epoch uint16, records []DTLSRecordNumberInfo) {
 	if c.err != nil {
 		return
 	}
 
-	c.err = c.doReadRetransmit()
+	// Send the ACK.
+	ack := cryptobyte.NewBuilder(make([]byte, 0, 2+8*len(records)))
+	ack.AddUint16LengthPrefixed(func(child *cryptobyte.Builder) {
+		for _, r := range records {
+			child.AddUint64(r.Epoch)
+			child.AddUint64(r.Sequence)
+		}
+	})
+	_, c.err = c.conn.dtlsPackRecord(c.getOutEpochOrPanic(epoch), recordTypeACK, ack.BytesOrPanic(), false)
+	if c.err != nil {
+		return
+	}
+
+	// Update the list of expectations. This is inefficient, but is fine for
+	// test code.
+	for _, r := range records {
+		if !r.HasACKInformation() {
+			continue
+		}
+		var update []DTLSFragment
+		for _, f := range c.retransmitNeeded {
+			endOffset := f.Offset + len(f.Data)
+			// Compute two, possibly empty, intersections: the fragment with
+			// [0, ackStart) and the fragment with [ackStart, infinity).
+
+			// First, the portion of the fragment that is before the ACK:
+			if comparePair(f.Sequence, f.Offset, r.MessageStartSequence, r.MessageStartOffset) < 0 {
+				// The fragment begins before the ACK.
+				if comparePair(f.Sequence, endOffset, r.MessageStartSequence, r.MessageStartOffset) <= 0 {
+					// The fragment ends before the ACK.
+					update = append(update, f)
+				} else {
+					// The ACK starts in the middle of the fragment. Retain a
+					// prefix of the fragment.
+					prefix := f
+					prefix.Data = f.Data[:r.MessageStartOffset-f.Offset]
+					update = append(update, prefix)
+				}
+			}
+
+			// Next, the portion of the fragment that is after the ACK:
+			if comparePair(r.MessageEndSequence, r.MessageEndOffset, f.Sequence, endOffset) < 0 {
+				// The fragment ends after the ACK.
+				if comparePair(r.MessageEndSequence, r.MessageEndOffset, f.Sequence, f.Offset) <= 0 {
+					// The fragment begins after the ACK.
+					update = append(update, f)
+				} else {
+					// The ACK ends in the middle of the fragment. Retain a
+					// suffix of the fragment.
+					suffix := f
+					suffix.Offset = r.MessageEndOffset
+					suffix.Data = f.Data[r.MessageEndOffset-f.Offset:]
+					update = append(update, suffix)
+				}
+			}
+		}
+		c.retransmitNeeded = update
+	}
 }
 
-func (c *DTLSController) doReadRetransmit() error {
+// ReadRetransmit indicates the shim is expected to retransmit its current
+// flight and consumes the retransmission. It returns the record numbers of the
+// retransmission, for the test to ACK if it chooses.
+func (c *DTLSController) ReadRetransmit() []DTLSRecordNumberInfo {
+	if c.err != nil {
+		return nil
+	}
+
+	var ret []DTLSRecordNumberInfo
+	ret, c.err = c.doReadRetransmit()
+	return ret
+}
+
+func (c *DTLSController) doReadRetransmit() ([]DTLSRecordNumberInfo, error) {
 	if err := c.conn.dtlsFlushPacket(); err != nil {
-		return err
+		return nil, err
 	}
 
-	// Determine what the shim should have retransmited. For now, we expect
-	// whole messages, but later some fragments will already have been ACKed in
-	// DTLS 1.3.
-	var expected []DTLSFragment
-	for i := range c.received {
-		msg := &c.received[i]
-		expected = append(expected, msg.Fragment(0, len(msg.Data)))
-	}
-
+	var records []DTLSRecordNumberInfo
+	expected := slices.Clone(c.retransmitNeeded)
 	for len(expected) > 0 {
 		// Read a record from the expected epoch. The peer should retransmit in
 		// order.
@@ -1019,10 +1158,7 @@
 		if expected[0].IsChangeCipherSpec {
 			wantTyp = recordTypeChangeCipherSpec
 		}
-		epoch, ok := c.conn.in.getEpoch(expected[0].Epoch)
-		if !ok {
-			panic(fmt.Sprintf("tls: could not find epoch %d", expected[0].Epoch))
-		}
+		epoch := c.getInEpochOrPanic(expected[0].Epoch)
 		// Retransmitted ClientHellos predate the shim learning the version.
 		// Ideally we would enforce the initial record-layer version, but
 		// post-HelloVerifyRequest ClientHellos and post-HelloRetryRequest
@@ -1031,14 +1167,14 @@
 		typ, data, err := c.conn.dtlsDoReadRecord(epoch, wantTyp)
 		c.conn.skipRecordVersionCheck = false
 		if err != nil {
-			return err
+			return nil, err
 		}
 		if typ != wantTyp {
-			return fmt.Errorf("tls: got record of type %d in retransmit, but expected %d", typ, wantTyp)
+			return nil, fmt.Errorf("tls: got record of type %d in retransmit, but expected %d", typ, wantTyp)
 		}
 		if typ == recordTypeChangeCipherSpec {
 			if len(data) != 1 || data[0] != 1 {
-				return errors.New("tls: got invalid ChangeCipherSpec")
+				return nil, errors.New("tls: got invalid ChangeCipherSpec")
 			}
 			expected = expected[1:]
 			continue
@@ -1047,24 +1183,24 @@
 		// Consume all the handshake fragments and match them to what we expect.
 		s := cryptobyte.String(data)
 		if s.Empty() {
-			return fmt.Errorf("tls: got empty record in retransmit")
+			return nil, fmt.Errorf("tls: got empty record in retransmit")
 		}
 		for !s.Empty() {
 			if len(expected) == 0 || expected[0].Epoch != epoch.epoch || expected[0].IsChangeCipherSpec {
-				return fmt.Errorf("tls: got excess data at epoch %d in retransmit", epoch.epoch)
+				return nil, fmt.Errorf("tls: got excess data at epoch %d in retransmit", epoch.epoch)
 			}
 
 			exp := &expected[0]
 			var f DTLSFragment
 			f, err = readDTLSFragment(&s)
 			if f.Type != exp.Type || f.TotalLength != exp.TotalLength || f.Sequence != exp.Sequence || f.Offset != exp.Offset {
-				return fmt.Errorf("tls: got offset %d of message %d (type %d, length %d), expected offset %d of message %d (type %d, length %d)", f.Offset, f.Sequence, f.Type, f.TotalLength, exp.Offset, exp.Sequence, exp.Type, exp.TotalLength)
+				return nil, fmt.Errorf("tls: got offset %d of message %d (type %d, length %d), expected offset %d of message %d (type %d, length %d)", f.Offset, f.Sequence, f.Type, f.TotalLength, exp.Offset, exp.Sequence, exp.Type, exp.TotalLength)
 			}
 			if len(f.Data) > len(exp.Data) {
-				return fmt.Errorf("tls: got %d bytes at offset %d of message %d but only %d bytes were missing", len(f.Data), f.Offset, f.Sequence, len(exp.Data))
+				return nil, fmt.Errorf("tls: got %d bytes at offset %d of message %d but only %d bytes were missing", len(f.Data), f.Offset, f.Sequence, len(exp.Data))
 			}
 			if !bytes.Equal(f.Data, exp.Data[:len(f.Data)]) {
-				return fmt.Errorf("tls: got %d bytes at offset %d of message %d but did not match original", len(f.Data), f.Offset, f.Sequence)
+				return nil, fmt.Errorf("tls: got %d bytes at offset %d of message %d but did not match original", len(f.Data), f.Offset, f.Sequence)
 			}
 			if len(f.Data) == len(exp.Data) {
 				expected = expected[1:]
@@ -1074,13 +1210,19 @@
 				exp.Data = exp.Data[len(f.Data):]
 				// Check that the peer could not have fit more into the record.
 				if !s.Empty() {
-					return errors.New("dtls: truncated handshake fragment was not last in the record")
+					return nil, errors.New("dtls: truncated handshake fragment was not last in the record")
 				}
 				if c.conn.lastRecordInFlight.bytesAvailable > 0 {
-					return fmt.Errorf("dtls: handshake fragment was truncated, but record could have fit %d more bytes", c.conn.lastRecordInFlight.bytesAvailable)
+					return nil, fmt.Errorf("dtls: handshake fragment was truncated, but record could have fit %d more bytes", c.conn.lastRecordInFlight.bytesAvailable)
 				}
 			}
 		}
+
+		record, err := makeDTLSRecordNumberInfo(epoch, data)
+		if err != nil {
+			return nil, err
+		}
+		records = append(records, record)
 	}
-	return nil
+	return records, nil
 }
diff --git a/ssl/test/runner/handshake_server.go b/ssl/test/runner/handshake_server.go
index 418daf7..8c10c82 100644
--- a/ssl/test/runner/handshake_server.go
+++ b/ssl/test/runner/handshake_server.go
@@ -1044,9 +1044,6 @@
 	} else {
 		c.writeRecord(recordTypeHandshake, helloBytes)
 	}
-	if err := c.flushHandshake(); err != nil {
-		return err
-	}
 
 	if !c.config.Bugs.SkipChangeCipherSpec && !sendHelloRetryRequest && !c.isDTLS {
 		c.writeRecord(recordTypeChangeCipherSpec, []byte{1})
diff --git a/ssl/test/runner/runner.go b/ssl/test/runner/runner.go
index 604bb1a..362a763 100644
--- a/ssl/test/runner/runner.go
+++ b/ssl/test/runner/runner.go
@@ -11481,134 +11481,556 @@
 }
 
 func addDTLSRetransmitTests() {
-	// Test that this is indeed the timeout schedule. Stress all
-	// four patterns of handshake.
-	//
-	// TODO(crbug.com/42290594): Add DTLS 1.3 versions of these tests.
 	for _, shortTimeout := range []bool{false, true} {
-		for _, partialProgress := range []bool{false, true} {
-			var suffix string
-			var flags []string
+		for _, vers := range allVersions(dtls) {
+			suffix := "-" + vers.name
+			flags := []string{"-async"} // Retransmit tests require async.
 			useTimeouts := timeouts
 			if shortTimeout {
-				suffix = "-Short"
-				flags = []string{"-initial-timeout-duration-ms", "250"}
+				suffix += "-Short"
+				flags = append(flags, "-initial-timeout-duration-ms", "250")
 				useTimeouts = shortTimeouts
 			}
-			if partialProgress {
-				suffix += "-PartialProgress"
-			}
 
-			writeFlight := func(c *DTLSController, prev, received, next []DTLSMessage) {
+			// In all versions, the sender will retransmit the whole flight if
+			// it times out and hears nothing.
+			writeFlightBasic := func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
 				if len(received) > 0 {
 					// Exercise every timeout but the last one (which would fail the
 					// connection).
 					for _, t := range useTimeouts[:len(useTimeouts)-1] {
 						c.AdvanceClock(t)
 						c.ReadRetransmit()
-						// Release part of the first message to the shim.
-						if partialProgress {
-							tot := len(next[0].Data)
-							c.WriteFragments([]DTLSFragment{next[0].Fragment(tot/3, 2*tot/3)})
-						}
 					}
 				}
 				// Finally release the whole flight to the shim.
 				c.WriteFlight(next)
 			}
-			ackFlight := func(c *DTLSController, prev, received []DTLSMessage) {
-				// The final flight is retransmitted on receipt of the previous
-				// flight. Test the peer is willing to retransmit it several times.
+			ackFlightBasic := func(c *DTLSController, prev, received []DTLSMessage, records []DTLSRecordNumberInfo) {
+				if vers.version >= VersionTLS13 {
+					// TODO(crbug.com/42290594): Implement retransmitting the
+					// final flight in DTLS 1.3.
+					return
+				}
+				// In DTLS 1.2, the final flight is retransmitted on receipt of
+				// the previous flight. Test the peer is willing to retransmit
+				// it several times.
 				for i := 0; i < 5; i++ {
 					c.WriteFlight(prev)
 					c.ReadRetransmit()
 				}
 			}
-
 			testCases = append(testCases, testCase{
 				protocol: dtls,
-				name:     "DTLS-Retransmit-Client-TLS12" + suffix,
+				name:     "DTLS-Retransmit-Client-Basic" + suffix,
 				config: Config{
-					MaxVersion: VersionTLS12,
+					MaxVersion: vers.version,
 					Bugs: ProtocolBugs{
-						WriteFlightDTLS: writeFlight,
-						ACKFlightDTLS:   ackFlight,
+						WriteFlightDTLS: writeFlightBasic,
+						ACKFlightDTLS:   ackFlightBasic,
 					},
 				},
 				resumeSession: true,
-				flags:         slices.Concat(flags, []string{"-async"}),
+				flags:         flags,
 			})
 			testCases = append(testCases, testCase{
 				protocol: dtls,
 				testType: serverTest,
-				name:     "DTLS-Retransmit-Server-TLS12" + suffix,
+				name:     "DTLS-Retransmit-Server-Basic" + suffix,
 				config: Config{
-					MaxVersion: VersionTLS12,
+					MaxVersion: vers.version,
 					Bugs: ProtocolBugs{
-						WriteFlightDTLS: writeFlight,
-						ACKFlightDTLS:   ackFlight,
+						WriteFlightDTLS: writeFlightBasic,
+						ACKFlightDTLS:   ackFlightBasic,
 					},
 				},
 				resumeSession: true,
-				flags:         slices.Concat(flags, []string{"-async"}),
+				flags:         flags,
 			})
+
+			// In DTLS 1.2, receiving a part of the next flight should not stop
+			// the retransmission timer.
+			//
+			// TODO(crbug.com/42290594): In DTLS 1.3, it should stop the timer
+			// because we can use ACKs to request a retransmit. Test this.
+			if vers.version <= VersionTLS12 {
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					name:     "DTLS-Retransmit-PartialProgress" + suffix,
+					config: Config{
+						MaxVersion: vers.version,
+						Bugs: ProtocolBugs{
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								// Send a portion of the first message. The rest was lost.
+								msg := next[0]
+								split := len(msg.Data) / 2
+								c.WriteFragments([]DTLSFragment{msg.Fragment(0, split)})
+								// If we time out, the shim should still retransmit. It knows
+								// we received the whole flight, but the shim should use a
+								// retransmit to request the runner try again.
+								c.AdvanceClock(useTimeouts[0])
+								c.ReadRetransmit()
+								// "Retransmit" the rest of the flight. The shim should remember
+								// the portion that was already sent.
+								rest := []DTLSFragment{msg.Fragment(split, len(msg.Data)-split)}
+								for _, m := range next[1:] {
+									rest = append(rest, m.Fragment(0, len(m.Data)))
+								}
+								c.WriteFragments(rest)
+							},
+						},
+					},
+					flags: flags,
+				})
+			}
+
+			// Test that exceeding the timeout schedule hits a read
+			// timeout.
+			testCases = append(testCases, testCase{
+				protocol: dtls,
+				name:     "DTLS-Retransmit-Timeout" + suffix,
+				config: Config{
+					MaxVersion: vers.version,
+					Bugs: ProtocolBugs{
+						WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+							for _, t := range useTimeouts[:len(useTimeouts)-1] {
+								c.AdvanceClock(t)
+								c.ReadRetransmit()
+							}
+							c.AdvanceClock(useTimeouts[len(useTimeouts)-1])
+							// The shim should give up at this point.
+						},
+					},
+				},
+				resumeSession: true,
+				flags:         flags,
+				shouldFail:    true,
+				expectedError: ":READ_TIMEOUT_EXPIRED:",
+			})
+
+			// Test that timeout handling has a fudge factor, due to API
+			// problems.
+			testCases = append(testCases, testCase{
+				protocol: dtls,
+				name:     "DTLS-Retransmit-Fudge" + suffix,
+				config: Config{
+					MaxVersion: vers.version,
+					Bugs: ProtocolBugs{
+						WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+							c.AdvanceClock(useTimeouts[0] - 10*time.Millisecond)
+							c.ReadRetransmit()
+							c.WriteFlight(next)
+						},
+					},
+				},
+				resumeSession: true,
+				flags:         flags,
+			})
+
+			// Test that the shim can retransmit at different MTUs.
+			testCases = append(testCases, testCase{
+				protocol: dtls,
+				name:     "DTLS-Retransmit-ChangeMTU" + suffix,
+				config: Config{
+					MaxVersion: vers.version,
+					// Request a client certificate, so the shim has more to send.
+					ClientAuth: RequireAnyClientCert,
+					Bugs: ProtocolBugs{
+						WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+							for i, mtu := range []int{300, 301, 302, 303, 299, 298, 297} {
+								c.SetMTU(mtu)
+								c.AdvanceClock(useTimeouts[i])
+								c.ReadRetransmit()
+							}
+							c.WriteFlight(next)
+						},
+					},
+				},
+				shimCertificate: &rsaChainCertificate,
+				flags:           flags,
+			})
+
+			// DTLS 1.3 uses explicit ACKs.
+			if vers.version >= VersionTLS13 {
+				// The two server flights (HelloRetryRequest and ServerHello..Finished)
+				// happen after the shim has learned the version, so they are more
+				// straightforward. In these tests, we trigger HelloRetryRequest,
+				// and also use ML-KEM with rsaChainCertificate and a limited MTU,
+				// to increase the number of records and exercise more complex
+				// ACK patterns.
+
+				// After ACKing everything, the shim should stop retransmitting.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKEverything" + suffix,
+					config: Config{
+						MaxVersion:       vers.version,
+						CurvePreferences: []CurveID{CurveX25519MLKEM768},
+						DefaultCurves:    []CurveID{}, // Force HelloRetryRequest.
+						Bugs: ProtocolBugs{
+							MaxPacketLength: 512,
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) > 0 {
+									c.WriteACK(c.OutEpoch(), records)
+									// After ACKing everything, the shim should stop the timer
+									// and wait for the next flight.
+									for _, t := range useTimeouts {
+										c.AdvanceClock(t)
+									}
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					shimCertificate: &rsaChainCertificate,
+					flags:           slices.Concat(flags, []string{"-mtu", "512", "-curves", strconv.Itoa(int(CurveX25519MLKEM768))}),
+				})
+
+				// ACK packets one by one, in reverse.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKReverse" + suffix,
+					config: Config{
+						MaxVersion:       vers.version,
+						CurvePreferences: []CurveID{CurveX25519MLKEM768},
+						DefaultCurves:    []CurveID{}, // Force HelloRetryRequest.
+						Bugs: ProtocolBugs{
+							MaxPacketLength: 512,
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) > 0 {
+									for _, t := range useTimeouts[:len(useTimeouts)-1] {
+										if len(records) > 0 {
+											c.WriteACK(c.OutEpoch(), []DTLSRecordNumberInfo{records[len(records)-1]})
+										}
+										c.AdvanceClock(t)
+										records = c.ReadRetransmit()
+									}
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					shimCertificate: &rsaChainCertificate,
+					flags:           slices.Concat(flags, []string{"-mtu", "512", "-curves", strconv.Itoa(int(CurveX25519MLKEM768))}),
+				})
+
+				// ACK packets one by one, forwards.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKForwards" + suffix,
+					config: Config{
+						MaxVersion:       vers.version,
+						CurvePreferences: []CurveID{CurveX25519MLKEM768},
+						DefaultCurves:    []CurveID{}, // Force HelloRetryRequest.
+						Bugs: ProtocolBugs{
+							MaxPacketLength: 512,
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) > 0 {
+									for _, t := range useTimeouts[:len(useTimeouts)-1] {
+										if len(records) > 0 {
+											c.WriteACK(c.OutEpoch(), []DTLSRecordNumberInfo{records[0]})
+										}
+										c.AdvanceClock(t)
+										records = c.ReadRetransmit()
+									}
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					shimCertificate: &rsaChainCertificate,
+					flags:           slices.Concat(flags, []string{"-mtu", "512", "-curves", strconv.Itoa(int(CurveX25519MLKEM768))}),
+				})
+
+				// ACK 1/3 the packets each time.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKIterate" + suffix,
+					config: Config{
+						MaxVersion:       vers.version,
+						CurvePreferences: []CurveID{CurveX25519MLKEM768},
+						DefaultCurves:    []CurveID{}, // Force HelloRetryRequest.
+						Bugs: ProtocolBugs{
+							MaxPacketLength: 512,
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) > 0 {
+									for i, t := range useTimeouts[:len(useTimeouts)-1] {
+										if len(records) > 0 {
+											ack := make([]DTLSRecordNumberInfo, 0, (len(records)+2)/3)
+											for i := 0; i < len(records); i += 3 {
+												ack = append(ack, records[i])
+											}
+											c.WriteACK(c.OutEpoch(), ack)
+										}
+										// Change the MTU every iteration, to make the fragment
+										// patterns more complex.
+										c.SetMTU(512 + i)
+										c.AdvanceClock(t)
+										records = c.ReadRetransmit()
+									}
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					shimCertificate: &rsaChainCertificate,
+					flags:           slices.Concat(flags, []string{"-mtu", "512", "-curves", strconv.Itoa(int(CurveX25519MLKEM768))}),
+				})
+
+				// ACKing packets that have already been ACKed is a no-op.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKDuplicate" + suffix,
+					config: Config{
+						MaxVersion: vers.version,
+						Bugs: ProtocolBugs{
+							SendHelloRetryRequestCookie: []byte("cookie"),
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) > 0 {
+									// Keep ACKing the same record over and over.
+									c.WriteACK(c.OutEpoch(), records[:1])
+									c.AdvanceClock(useTimeouts[0])
+									c.ReadRetransmit()
+									c.WriteACK(c.OutEpoch(), records[:1])
+									c.AdvanceClock(useTimeouts[1])
+									c.ReadRetransmit()
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					flags: flags,
+				})
+
+				// When ACKing ServerHello..Finished, the ServerHello might be
+				// ACKed at epoch 0 or epoch 2, depending on how far the client
+				// received. Test tha epoch 0 is allowed by ACKing each packet
+				// at the record it was received.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKMatchingEpoch" + suffix,
+					config: Config{
+						MaxVersion: vers.version,
+						Bugs: ProtocolBugs{
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) > 0 {
+									for _, t := range useTimeouts[:len(useTimeouts)-1] {
+										if len(records) > 0 {
+											c.WriteACK(uint16(records[0].Epoch), []DTLSRecordNumberInfo{records[0]})
+										}
+										c.AdvanceClock(t)
+										records = c.ReadRetransmit()
+									}
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					flags: flags,
+				})
+
+				// However, records may not be ACKed at lower epoch than they
+				// were received.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKBadEpoch" + suffix,
+					config: Config{
+						MaxVersion: vers.version,
+						Bugs: ProtocolBugs{
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) == 0 {
+									// Send the ClientHello.
+									c.WriteFlight(next)
+								} else {
+									// Try to ACK ServerHello..Finished at epoch 0. The shim should reject this.
+									c.WriteACK(0, records)
+								}
+							},
+						},
+					},
+					flags:         flags,
+					shouldFail:    true,
+					expectedError: ":DECODE_ERROR:",
+				})
+
+				// The bad epoch check should notice when the epoch number
+				// would overflow 2^16.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKEpochOverflow" + suffix,
+					config: Config{
+						MaxVersion: vers.version,
+						Bugs: ProtocolBugs{
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) == 0 {
+									// Send the ClientHello.
+									c.WriteFlight(next)
+								} else {
+									r := records[0]
+									r.Epoch += 1 << 63
+									c.WriteACK(0, []DTLSRecordNumberInfo{r})
+								}
+							},
+						},
+					},
+					flags:         flags,
+					shouldFail:    true,
+					expectedError: ":DECODE_ERROR:",
+				})
+
+				// ACK some records from the first transmission, trigger a
+				// retransmit, but then ACK the rest of the first transmission.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKOldRecords" + suffix,
+					config: Config{
+						MaxVersion:       vers.version,
+						CurvePreferences: []CurveID{CurveX25519MLKEM768},
+						Bugs: ProtocolBugs{
+							MaxPacketLength: 512,
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) > 0 {
+									c.WriteACK(c.OutEpoch(), records[len(records)/2:])
+									c.AdvanceClock(useTimeouts[0])
+									c.ReadRetransmit()
+									c.WriteACK(c.OutEpoch(), records[:len(records)/2])
+									// Everything should be ACKed now. The shim should not
+									// retransmit anything.
+									c.AdvanceClock(useTimeouts[1])
+									c.AdvanceClock(useTimeouts[2])
+									c.AdvanceClock(useTimeouts[3])
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					flags: slices.Concat(flags, []string{"-mtu", "512", "-curves", strconv.Itoa(int(CurveX25519MLKEM768))}),
+				})
+
+				// If the shim sends too many records, it will eventually forget them.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKForgottenRecords" + suffix,
+					config: Config{
+						MaxVersion:       vers.version,
+						CurvePreferences: []CurveID{CurveX25519MLKEM768},
+						Bugs: ProtocolBugs{
+							MaxPacketLength: 256,
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) > 0 {
+									// Make the peer retransmit many times, with a small MTU.
+									for _, t := range useTimeouts[:len(useTimeouts)-2] {
+										c.AdvanceClock(t)
+										c.ReadRetransmit()
+									}
+									// ACK the first record the shim ever sent. It will have
+									// fallen off the queue by now, so it is expected to not
+									// impact the shim's retransmissions.
+									c.WriteACK(c.OutEpoch(), []DTLSRecordNumberInfo{{Epoch: records[0].Epoch, Sequence: records[0].Sequence}})
+									c.AdvanceClock(useTimeouts[len(useTimeouts)-2])
+									c.ReadRetransmit()
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					flags: slices.Concat(flags, []string{"-mtu", "256", "-curves", strconv.Itoa(int(CurveX25519MLKEM768))}),
+				})
+
+				// The shim should ignore ACKs for a previous flight, and not get its
+				// internal state confused.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					testType: serverTest,
+					name:     "DTLS-Retransmit-Server-ACKPreviousFlight" + suffix,
+					config: Config{
+						MaxVersion:    vers.version,
+						DefaultCurves: []CurveID{}, // Force a HelloRetryRequest.
+						Bugs: ProtocolBugs{
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if next[len(next)-1].Type == typeFinished {
+									// We are now sending client Finished, in response
+									// to the shim's ServerHello. ACK the shim's first
+									// record, which would have been part of
+									// HelloRetryRequest. This should not impact retransmit.
+									c.WriteACK(c.OutEpoch(), []DTLSRecordNumberInfo{{Epoch: 0, Sequence: 0}})
+									c.AdvanceClock(useTimeouts[0])
+									c.ReadRetransmit()
+								}
+								c.WriteFlight(next)
+							},
+						},
+					},
+					flags: flags,
+				})
+
+				// As a client, the shim must tolerate ACKs in response to its
+				// initial ClientHello, but it will not process them because the
+				// version is not yet known. The second ClientHello, in response
+				// to HelloRetryRequest, however, is ACKed.
+				//
+				// TODO(crbug.com/42290594): Test ACKs for the Finished flight.
+				testCases = append(testCases, testCase{
+					protocol: dtls,
+					name:     "DTLS-Retransmit-Client" + suffix,
+					config: Config{
+						MaxVersion: vers.version,
+						Bugs: ProtocolBugs{
+							SendHelloRetryRequestCookie: []byte("cookie"), // Send HelloRetryRequest
+							MaxPacketLength:             512,
+							WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage, records []DTLSRecordNumberInfo) {
+								if len(received) == 0 || received[0].Type != typeClientHello {
+									// Leave post-handshake flights alone.
+									c.WriteFlight(next)
+									return
+								}
+
+								// This is either HelloRetryRequest in response to ClientHello1,
+								// or ServerHello..Finished in response to ClientHello2.
+								first := records[0]
+								if len(prev) == 0 {
+									// This is HelloRetryRequest in response to ClientHello1. The client
+									// will accept the ACK, but it will ignore it. Do not expect
+									// retransmits to be impacted.
+									first.MessageStartSequence = 0
+									first.MessageStartOffset = 0
+									first.MessageEndSequence = 0
+									first.MessageEndOffset = 0
+								}
+								c.WriteACK(0, []DTLSRecordNumberInfo{first})
+								c.AdvanceClock(useTimeouts[0])
+								c.ReadRetransmit()
+								c.WriteFlight(next)
+							},
+						},
+					},
+					flags: slices.Concat(flags, []string{"-mtu", "512", "-curves", strconv.Itoa(int(CurveX25519MLKEM768))}),
+				})
+			}
 		}
 	}
 
-	// Test that exceeding the timeout schedule hits a read
-	// timeout.
-	testCases = append(testCases, testCase{
-		protocol: dtls,
-		name:     "DTLS-Retransmit-Timeout",
-		config: Config{
-			MaxVersion: VersionTLS12,
-			Bugs: ProtocolBugs{
-				WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage) {
-					for _, t := range timeouts[:len(timeouts)-1] {
-						c.AdvanceClock(t)
-						c.ReadRetransmit()
-					}
-					c.AdvanceClock(timeouts[len(timeouts)-1])
-					// The shim should give up at this point.
-				},
-			},
-		},
-		resumeSession: true,
-		flags:         []string{"-async"},
-		shouldFail:    true,
-		expectedError: ":READ_TIMEOUT_EXPIRED:",
-	})
-
-	// Test that timeout handling has a fudge factor, due to API
-	// problems.
-	testCases = append(testCases, testCase{
-		protocol: dtls,
-		name:     "DTLS-Retransmit-Fudge",
-		config: Config{
-			MaxVersion: VersionTLS12,
-			Bugs: ProtocolBugs{
-				WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage) {
-					c.AdvanceClock(timeouts[0] - 10*time.Millisecond)
-					c.ReadRetransmit()
-					c.WriteFlight(next)
-				},
-			},
-		},
-		resumeSession: true,
-		flags:         []string{"-async"},
-	})
-
 	// Test that the final Finished retransmitting isn't
 	// duplicated if the peer badly fragments everything.
 	testCases = append(testCases, testCase{
 		testType: serverTest,
 		protocol: dtls,
-		name:     "DTLS-Retransmit-Fragmented",
+		name:     "DTLS-RetransmitFinished-Fragmented",
 		config: Config{
 			MaxVersion: VersionTLS12,
 			Bugs: ProtocolBugs{
 				MaxHandshakeRecordLength: 2,
-				ACKFlightDTLS: func(c *DTLSController, prev, received []DTLSMessage) {
+				ACKFlightDTLS: func(c *DTLSController, prev, received []DTLSMessage, records []DTLSRecordNumberInfo) {
 					c.WriteFlight(prev)
 					c.ReadRetransmit()
 				},
@@ -11617,29 +12039,6 @@
 		flags: []string{"-async"},
 	})
 
-	// Test that the shim can retransmit at different MTUs.
-	testCases = append(testCases, testCase{
-		protocol: dtls,
-		name:     "DTLS-Retransmit-ChangeMTU",
-		config: Config{
-			MaxVersion: VersionTLS12,
-			// Request a client certificate, so the shim has more to send.
-			ClientAuth: RequireAnyClientCert,
-			Bugs: ProtocolBugs{
-				WriteFlightDTLS: func(c *DTLSController, prev, received, next []DTLSMessage) {
-					for i, mtu := range []int{300, 301, 302, 303, 299, 298, 297} {
-						c.SetMTU(mtu)
-						c.AdvanceClock(timeouts[i])
-						c.ReadRetransmit()
-					}
-					c.WriteFlight(next)
-				},
-			},
-		},
-		shimCertificate: &rsaChainCertificate,
-		flags:           []string{"-async"},
-	})
-
 	// If the shim sends the last Finished (server full or client resume
 	// handshakes), it must retransmit that Finished when it sees a
 	// post-handshake penultimate Finished from the runner. The above tests
@@ -11671,30 +12070,6 @@
 		},
 		resumeSession: true,
 	})
-
-	// DTLS 1.3 ACK/retransmit tests
-	testCases = append(testCases, testCase{
-		protocol: dtls,
-		name:     "DTLS13-ImmediateACKs",
-		config: Config{
-			MinVersion: VersionTLS13,
-			Bugs: ProtocolBugs{
-				ACKEveryRecord: true,
-			},
-		},
-	})
-	testCases = append(testCases, testCase{
-		protocol: dtls,
-		name:     "DTLS12-RejectACKs",
-		config: Config{
-			MaxVersion: VersionTLS12,
-			Bugs: ProtocolBugs{
-				ACKEveryRecord: true,
-			},
-		},
-		shouldFail:    true,
-		expectedError: ":UNEXPECTED_RECORD:",
-	})
 }
 
 func addDTLSReorderTests() {
diff --git a/ssl/tls13_server.cc b/ssl/tls13_server.cc
index a8b0c8b..7fda54b 100644
--- a/ssl/tls13_server.cc
+++ b/ssl/tls13_server.cc
@@ -999,6 +999,9 @@
 
     // Feed the predicted Finished into the transcript. This allows us to derive
     // the resumption secret early and send half-RTT tickets.
+    //
+    // TODO(crbug.com/42290594): Queuing up half-RTT tickets with DTLS will also
+    // make implicit ACKing more subtle.
     assert(!SSL_is_dtls(hs->ssl));
     assert(hs->expected_client_finished.size() <= 0xff);
     uint8_t header[4] = {