rust: bssl-tls: Raw Public Key credentials

Bug: 479599893

Signed-off-by: Xiangfei Ding <xfding@google.com>
Change-Id: I8e6a335d6038568ec7117f8a3b7a0e2f6a6a6964
Reviewed-on: https://boringssl-review.googlesource.com/c/boringssl/+/92227
Presubmit-BoringSSL-Verified: boringssl-scoped@luci-project-accounts.iam.gserviceaccount.com <boringssl-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Adam Langley <agl@google.com>
diff --git a/rust/bssl-tls/src/connection/credentials.rs b/rust/bssl-tls/src/connection/credentials.rs
index 08baed7..4caceec 100644
--- a/rust/bssl-tls/src/connection/credentials.rs
+++ b/rust/bssl-tls/src/connection/credentials.rs
@@ -31,6 +31,7 @@
     check_lib_error,
     config::ConfigurationError,
     connection::{
+        TlsConnection,
         TlsConnectionBuilder,
         lifecycle::{
             EstablishedTlsConnection,
@@ -38,6 +39,7 @@
         }, //
     }, //
     credentials::{
+        CertificateType,
         CertificateVerificationMode,
         SignatureAlgorithm,
         TlsCredential,
@@ -108,6 +110,28 @@
     }
 }
 
+impl<M> TlsConnectionBuilder<Client, M>
+where
+    M: HasTlsConnectionMethod,
+{
+    /// Set the list of available client certificate types.
+    pub fn with_available_client_cert_types(
+        &mut self,
+        types: &[CertificateType],
+    ) -> Result<&mut Self, Error> {
+        let (ptr, len) = slice_into_ffi_raw_parts(types);
+        check_lib_error!(unsafe {
+            // Safety:
+            // - `self.ptr()` is a valid `SSL` handle.
+            // - `ptr` is a valid pointer to an array of `i32` representing certificate types.
+            // - `len` is the number of elements in the array.
+            // - The function copies the data, so the pointer only needs to be valid for the call.
+            bssl_sys::SSL_set1_available_client_cert_types(self.ptr(), ptr as *const _, len)
+        });
+        Ok(self)
+    }
+}
+
 /// # Custom certificate verification
 impl<M> TlsConnectionInHandshake<'_, Client, M>
 where
@@ -304,3 +328,41 @@
         Ok(())
     }
 }
+
+impl<R, M> TlsConnection<R, M> {
+    /// Get the peer's [`CertificateType`].
+    pub fn get_peer_certificate_type(&self) -> Option<CertificateType> {
+        let ty = unsafe {
+            // Safety:
+            // - `self.ptr()` is a valid `SSL` handle.
+            bssl_sys::SSL_get_peer_cert_type(self.ptr())
+        };
+        ty.try_into().ok().and_then(|ty: u8| ty.try_into().ok())
+    }
+
+    /// Get the peer's raw public key as DER-encoded SubjectPublicKeyInfo.
+    pub fn get_peer_raw_public_key(&self) -> Option<Vec<u8>> {
+        let pkey = unsafe {
+            // Safety:
+            // - `self.ptr()` is a valid `SSL` handle.
+            // - `pkey` does not escape the current function frame.
+            bssl_sys::SSL_get0_peer_rpk(self.ptr())
+        };
+        if pkey.is_null() {
+            return None;
+        }
+
+        let buffer = bssl_crypto::cbb_to_buffer(64, |cbb| {
+            assert_eq!(
+                unsafe {
+                    // Safety:
+                    // - `cbb` is a valid pointer to `CBB` provided by `cbb_to_buffer`.
+                    // - `pkey` is a valid pointer to `EVP_PKEY`.
+                    bssl_sys::EVP_marshal_public_key(cbb, pkey)
+                },
+                1
+            );
+        });
+        Some(buffer.as_ref().to_vec())
+    }
+}
diff --git a/rust/bssl-tls/src/context/credentials.rs b/rust/bssl-tls/src/context/credentials.rs
index 6c27400..69d35c1 100644
--- a/rust/bssl-tls/src/context/credentials.rs
+++ b/rust/bssl-tls/src/context/credentials.rs
@@ -28,6 +28,7 @@
         SupportedMode, //
     },
     credentials::{
+        CertificateType,
         CertificateVerificationMode,
         SignatureAlgorithm,
         TlsCredential,
@@ -142,6 +143,40 @@
         });
         Ok(self)
     }
+
+    /// Set the list of accepted peer certificate types.
+    pub fn with_accepted_peer_cert_types(
+        &mut self,
+        types: &[CertificateType],
+    ) -> Result<&mut Self, Error> {
+        let (ptr, len) = slice_into_ffi_raw_parts(types);
+        check_lib_error!(unsafe {
+            // Safety:
+            // - `self.ptr()` is a valid `SSL_CTX` handle.
+            // - `ptr` is a valid pointer to an array of `i32` representing certificate types.
+            // - `len` is the number of elements in the array.
+            // - The function copies the data, so the pointer only needs to be valid for the call.
+            bssl_sys::SSL_CTX_set1_accepted_peer_cert_types(self.ptr(), ptr as *const _, len)
+        });
+        Ok(self)
+    }
+
+    /// Set the list of available client certificate types.
+    pub fn with_available_client_cert_types(
+        &mut self,
+        types: &[CertificateType],
+    ) -> Result<&mut Self, Error> {
+        let (ptr, len) = slice_into_ffi_raw_parts(types);
+        check_lib_error!(unsafe {
+            // Safety:
+            // - `self.ptr()` is a valid `SSL_CTX` handle.
+            // - `ptr` is a valid pointer to an array of `i32` representing certificate types.
+            // - `len` is the number of elements in the array.
+            // - The function copies the data, so the pointer only needs to be valid for the call.
+            bssl_sys::SSL_CTX_set1_available_client_cert_types(self.ptr(), ptr as *const _, len)
+        });
+        Ok(self)
+    }
 }
 
 /// # Certificate verification
diff --git a/rust/bssl-tls/src/credentials.rs b/rust/bssl-tls/src/credentials.rs
index b7610ef..aecab4d 100644
--- a/rust/bssl-tls/src/credentials.rs
+++ b/rust/bssl-tls/src/credentials.rs
@@ -76,9 +76,13 @@
 /// X.509 credential
 pub enum X509Mode {}
 
+/// Raw Public Key credential
+pub enum RawPublicKeyMode {}
+
 pub(crate) trait NeedsPrivateKey {}
 
 impl NeedsPrivateKey for X509Mode {}
+impl NeedsPrivateKey for RawPublicKeyMode {}
 
 // Safety: At this type state, the credential handle is exclusively owned.
 unsafe impl<M> Send for TlsCredentialBuilder<M> {}
@@ -129,6 +133,27 @@
     }
 }
 
+impl TlsCredentialBuilder<RawPublicKeyMode> {
+    /// Construct raw public key credential instance.
+    ///
+    /// TLS connection may use this credential to perform authentication per [RFC 7250].
+    ///
+    /// [RFC 7250]: <https://tools.ietf.org/html/rfc7250>
+    pub fn new_raw_public_key(mut key: PrivateKey) -> Self {
+        let this = Self(
+            NonNull::new(unsafe {
+                // Safety:
+                // - the `PrivateKey` type already contains both public and private key parts.
+                // - the constructor call also claims the ownership of the key.
+                bssl_sys::SSL_CREDENTIAL_new_raw_public_key(key.as_mut_ptr())
+            })
+            .expect("allocation failure"),
+            PhantomData,
+        );
+        this.set_ex_data()
+    }
+}
+
 impl<M> TlsCredentialBuilder<M>
 where
     M: NeedsPrivateKey,
@@ -831,6 +856,21 @@
 }
 
 bssl_macros::bssl_enum! {
+    /// [IANA] designation of TLS certificate types.
+    ///
+    /// [IANA]: https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#tls-extensiontype-values-3
+    #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
+    pub enum CertificateType: u8 {
+        /// X.509 certificate type.
+        X509 = bssl_sys::TLSEXT_cert_type_x509 as u8,
+        /// Raw Public Key certificate type per [RFC 7250].
+        ///
+        /// [RFC 7250]: https://datatracker.ietf.org/doc/html/rfc7250
+        Rpk = bssl_sys::TLSEXT_cert_type_rpk as u8,
+    }
+}
+
+bssl_macros::bssl_enum! {
     /// Certificate verification mode
     pub enum CertificateVerificationMode: i8 {
         /// Verifies the server certificate on a client but does not make errors fatal.
diff --git a/rust/bssl-tls/src/credentials/tests.rs b/rust/bssl-tls/src/credentials/tests.rs
index 9506db3..1f2c2e5 100644
--- a/rust/bssl-tls/src/credentials/tests.rs
+++ b/rust/bssl-tls/src/credentials/tests.rs
@@ -13,6 +13,7 @@
 // limitations under the License.
 
 use bssl_x509::keys::PrivateKeyAlgorithm;
+use futures::future::try_join;
 
 use super::*;
 use crate::{
@@ -182,7 +183,7 @@
             Ok::<(), crate::errors::Error>(())
         };
 
-        futures::future::try_join(client_handshake, server_handshake).await?;
+        try_join(client_handshake, server_handshake).await?;
         Ok::<(), crate::errors::Error>(())
     };
 
@@ -239,3 +240,120 @@
 
     Ok(())
 }
+
+unsafe extern "C" fn accept_any_verify(
+    _ssl: *mut bssl_sys::SSL,
+    _out_alert: *mut u8,
+) -> bssl_sys::ssl_verify_result_t {
+    bssl_sys::ssl_verify_result_t_ssl_verify_ok
+}
+
+#[test]
+fn rpk_tls13_handshake() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+    use crate::credentials::{
+        CertificateType, CertificateVerificationMode, RawPublicKeyMode, TlsCredentialBuilder,
+    };
+    use crate::tests::create_mock_pipe;
+    use bssl_x509::keys::PrivateKey;
+
+    const PEM: &'_ [u8] = b"-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIJtTBfBgkqhkiG9w0BBQ0wUjAxBgkqhkiG9w0BBQwwJAQQyZj/WEBsVe3FKYbT
+c423LgICCAAwDAYIKoZIhvcNAgkFADAdBglghkgBZQMEASoEEFs8Xuche6vD6u85
+2LwsNGEEgglQOzaEdw7c/mwKrIDdQESZWEDiNjBPRYGmNZuhilm3xHTBs1WcqC2S
+BkbIlxEHOVdOfes6A0wLH0v/Pv6J/qrazTAQeg5jAfTQILip2grygHTYU/4Jhezs
+Z2mk56TQ6oayohX4/B259IYVM2V6Us62J1+cZMFZ51erDrVn6c1RP7b1tw6AzmbM
+NTMHgj54WCENqFcI3q/9JSvm36MU1OCDi9gJ76uJIQ5gyjmGrp7J8BwkicW616Jo
+u+f3JKETR9BRPTICdbvQBev6Tc/fDyueLKsQsR3cxsFqydadmmQjIFjNplVmpWI+
+3veZj3pUQtmPsXzD7cXx66ygbRNEEEeWaXwl+RJkO2IqPBVF/nCXWNsbPFvxbBqJ
+tlGi0wQcMPfuuJwdEFX9+ZBHoed4XSd/FWdOaYHN/86nqtggECfNpTISn8RPnkDm
+Xp3IHoVPx8VQPZodvdqkET6i0QGTeZol9+D3Doa+VUnv6+IF+D3TDnTmGcBuqT3e
+Tm50LtrKAexXGjdYmTMB3cu/QS963yOwvsUVXifutcS7UX9fpDAbtf2UUYln5l5H
+LqlwbTRYrIEYAO2WylJ3KSw/7aQMlxHLGygpoDPtdURsLXyNcpLml7L203EHixX8
+uK7DomFqg9/mR13rKKpnLJGy4f68UYA02RwqHRVTTg7wuTBAX5XF/vL+A5MCklsL
+igDvCGLNN3F6s5oGfxoBNznGrXKkIP5+BagAkZrq9vmX2iFsueU+kBzHjytf9h93
+QWoLi/dkDXiBPyzGPiUvi1AGgQylv2pRR0fg3D5F/U207JsMvyg9YZHe1ZbyEueG
+TQCObkSA1+H+mbWTg8gMdQRWnfGQZ0F0Jet5cFh8NcU6tM5fs/PUxNi9QaHeT5TL
+BpXaaq0d1UYGap06qp977QquBTwTufAKoTtnmxFm0CPn/lckUq2VJc0hHjZqGgSf
+IIIUpxoehCQw81d7tGRIRnneM4AsL7FHBx4WDviFvldU5HPmbj/EI8gOsbF72eMq
+NbZ7YP8VD0JZ5uQfO4wcLEWOv6TlXzE/UNXBPBQSMH2Uoj1I2WO6i6HunNe0HXN2
+2Qea9nhnWuDEcEkDds14UxxNnp2FylLhm+RWNYsixk5Dky+BMf5eUJfT+Tkks42N
+SAnKuPeRGlO5m6HGpfwQEthC8hlvxPx2fW+NHJ3VojTamdNB5nFe2y3be0KF1v17
+pHtJXQPABPKDlW3Z1WHqkI006EZVUOpop6LYxEjQ4UJ574y9xykiUEhyV+KeC1Uo
+1ZH9TiJj0PReLzDx+19nLGW9GvjQwylXX96gmC/OPbzHDPHLj5WyvZNZ14uVKC4s
+FmHthA4GJORVbkzBZxgPs8nrQvTgwkuJ1t9DPknz9q7m6HyRee0jTE/Q9nAu1Ecx
+F0rTOTu7xf6sK68+a/obPNlWW565aKaICDkaTo84wRuutF2KNAI/xaSoZibALCAJ
+foPfTHHWNFpQmKe+PQwi6hmSUklkboMxg5+eiCpD/ke3315lOvnr2oKdpBOPN3iT
+kAzrV1xnEJvXIv3O+zmexoSbQ3zmN3ExubUE1g5cia25Yop2kHVTbbjqFx7cSMSD
+zAeSoDrDEJY/pZn/+wPLdBz9rp+vh/3rDa2q7Uj94c8jSQivKCvVQaeB8JuLpMZM
+yavSF+iAk8cQw5h0ZfyxNsnSC8eI8sqQjNEixOroUDvjfCEyF6W6edm//vOnMv1a
+iDIvMVigXBP46cTgxhK1RsgfdadUmK6gDlAdwIlKlW3xUPVD7K9WXu0A2lK8OA7+
+WVuQildnQiQ8eKYiAWbKFP9/E2vKfz+sJ8w35oFAFVR6q+KQWlx7YpTv8XeeIM4y
+HcmP/I2oBikATfFpQu1i9cfkBmWsv08MPQAa435iBPiBjBov6hnqdppZES6qTc3D
+G3pVAAGLB9JZdqs3bAOZs40aJxnR7v6FNFz/fhMDSkpzJDVYgNFz0a4S5cXNo7Ju
+lyzyUL/WMTwgEV1vVqudXDA+XBjsARDPO1MIuwPejSM8RomeFdlIJ1vTm592ixNt
+Ll3v1nK0jsJ1XXlFPhdQwsvie6kth/Z5RVUl9o/uhjv92T1Lrh5T4Datr7bhKbSZ
+cLyhfFgEE08rId1Je9SsXuhrxwTjOXoBh+tCUCD1l/lcctWsRzQkgtf8tx/p/6Rl
+DJnCMvJKJgFgTMMy3dG4PapxOHJ9ouLwNwYITWs1GEYKm+SJ3odIuh6msgra0E4l
+C/XhGhdTZt/EUp3rz25x9aJ50iIGx0JeWfi2sBJaTjb2Dmo/rQ8Z4Jp4Kn9Sig1I
+WVeI8oDXEvbG1rs4o2+JD9LhYybSxYZctr420+wxMPDJh7bIal5+XBT8ZNAZRO5t
+bPE529NMkg1ZprqJLBI0l88Ex4r6d07MqX4mp6geYmMjkUC+UOfTxwMVQB/013MB
+WkgRd7nb7td5PSBF9B7ff4yU4PzPmfE46hEMleGmUYnjUm/aEUG/oUqX0aQ6rd2f
+QRgqcKMS3HiDfN9WDokvjuGry65+fb+XAcvDcDdPW4z6aOtOM0ld83CbniIb39Sg
+vwbw9eVVz1snfkPa0kpaJyGea5ZI+/B2Gpa8NlFbCeZ33wvzWIAxgsunCTz3ueQm
+CZAttGE8lGGOcgTRvpiS7Zu3M6/54rbnlkeSu3CKwYKCjsu0x+ZVHom3Ax5q3suG
+hMtVqVLQJvbNgxvHMojSoxDffbY/XPHOERbkVz2h3fttIc2zFvx2zbScqPi12X4T
+WpmnsjzOdcrr5FTJ4tEKr8KaHCETfJjF3s6Z4s783C286tjMOgNF1HSG5yqYq9Fb
+NAksrCPMAJkEhWKrEETPd0DWS0zYD+wGhQYZApfZjyas6Eg3kSgNPFIS9kBs4lKM
+mg77d/xMWaw1wjY34dCXMhPgJ+rlWBEa4G+yndu7oD8MaSGRd8/ZZF+B+yZv7jg8
+E/RWaavKuZGjHl6VLqS/sf2s9Uaea3dnODof9c/APqfDpmzTzc+PRfoEloHofo8g
+aU/TeEx333VC493Dzj00c3WobYWU/w3EPgutlGUbi/W0NkA7h/98NxntMpoRy2jr
+cPzbQnwdjykFQ1JXTcQnqzdbSlrTEXtArhIggG98rvJEZ55LXaVq/tTp4F89VR/W
+lTU7GxRvRinKa52GnUNLqxkmTTcFegGMevICfN7JUaUTDiEQGGJ6jNw=
+-----END ENCRYPTED PRIVATE KEY-----
+";
+    let priv_key = PrivateKey::from_pem(PEM, || b"Hello BoringSSL!").unwrap();
+
+    let cred = TlsCredentialBuilder::<RawPublicKeyMode>::new_raw_public_key(priv_key)
+        .build()
+        .unwrap();
+
+    let mut server_ctx = crate::context::TlsContextBuilder::new_tls();
+    server_ctx.with_credential(cred.clone())?;
+    let server_ctx = server_ctx.build();
+
+    let mut client_ctx = crate::context::TlsContextBuilder::new_tls();
+    client_ctx.with_accepted_peer_cert_types(&[CertificateType::Rpk])?;
+    let client_ctx = client_ctx.build();
+
+    let mut client_conn_builder = client_ctx.new_client_connection(None)?;
+    client_conn_builder.with_certificate_verification_mode(CertificateVerificationMode::None);
+    let mut client_conn = client_conn_builder.build();
+    unsafe {
+        // Safety:
+        // - `client_conn.ptr()` is a valid `SSL` handle.
+        // - `accept_any_verify` is a valid callback.
+        bssl_sys::SSL_set_custom_verify(
+            client_conn.ptr(),
+            bssl_sys::SSL_VERIFY_PEER as _,
+            Some(accept_any_verify),
+        );
+    }
+    let mut server_conn = server_ctx.new_server_connection(None)?.build();
+
+    let (sock_client, sock_server, mut executor) = create_mock_pipe();
+
+    client_conn.set_io(sock_client)?;
+    server_conn.set_io(sock_server)?;
+
+    executor.run(try_join(
+        Pin::new(&mut client_conn).async_write(b"hello"),
+        Pin::new(&mut server_conn).async_write(b"world"),
+    ))?;
+
+    assert_eq!(
+        client_conn.get_peer_certificate_type(),
+        Some(CertificateType::Rpk)
+    );
+
+    Ok(())
+}
diff --git a/rust/bssl-tls/src/sessions.rs b/rust/bssl-tls/src/sessions.rs
index 84cb7cc..3026f86 100644
--- a/rust/bssl-tls/src/sessions.rs
+++ b/rust/bssl-tls/src/sessions.rs
@@ -319,138 +319,24 @@
 
 #[cfg(test)]
 mod tests {
+    use core::pin::Pin;
+
+    use futures::future::try_join;
+
     use super::*;
 
-    use alloc::{collections::VecDeque, sync::Arc};
-    use core::task::Context;
-    use std::sync::Mutex;
-
+    use crate::tests::create_mock_pipe;
     use crate::{
-        connection::{Client, Server, TlsConnection},
-        context::{TlsContextBuilder, TlsMode},
-        credentials::{PskHash, TlsCredential},
-        io::{AbstractReader, AbstractSocket, AbstractSocketResult, AbstractWriter},
+        context::TlsContextBuilder,
+        credentials::{
+            PskHash,
+            TlsCredential, //
+        }, //
     };
 
     const TEST_KEY: &[u8; 32] = b"0123456789abcdef0123456789abcdef";
     const TEST_IDENTITY: &[u8] = b"test-identity";
     const TEST_CONTEXT: &[u8] = b"test-context";
-    const TEST_DATA: &[u8] = b"BoringSSL is awesome!";
-
-    struct SharedPipe {
-        buf: VecDeque<u8>,
-    }
-
-    struct ConnectedSocket {
-        read_pipe: Arc<Mutex<SharedPipe>>,
-        write_pipe: Arc<Mutex<SharedPipe>>,
-    }
-
-    impl AbstractReader for ConnectedSocket {
-        fn read(&mut self, _: Option<&mut Context<'_>>, buf: &mut [u8]) -> AbstractSocketResult {
-            let mut pipe = self.read_pipe.lock().unwrap();
-            let len = pipe.buf.len().min(buf.len());
-            if len == 0 {
-                return AbstractSocketResult::Retry;
-            }
-            for i in 0..len {
-                buf[i] = pipe.buf.pop_front().unwrap();
-            }
-            AbstractSocketResult::Ok(len)
-        }
-    }
-
-    impl AbstractWriter for ConnectedSocket {
-        fn write(&mut self, _: Option<&mut Context<'_>>, buf: &[u8]) -> AbstractSocketResult {
-            let mut pipe = self.write_pipe.lock().unwrap();
-            pipe.buf.extend(buf.iter().copied());
-            AbstractSocketResult::Ok(buf.len())
-        }
-        fn flush(&mut self, _: Option<&mut Context<'_>>) -> AbstractSocketResult {
-            AbstractSocketResult::Ok(0)
-        }
-    }
-
-    impl AbstractSocket for ConnectedSocket {}
-
-    fn create_connected_pair() -> (ConnectedSocket, ConnectedSocket) {
-        let pipe1 = Arc::new(Mutex::new(SharedPipe {
-            buf: VecDeque::new(),
-        }));
-        let pipe2 = Arc::new(Mutex::new(SharedPipe {
-            buf: VecDeque::new(),
-        }));
-        (
-            ConnectedSocket {
-                read_pipe: pipe1.clone(),
-                write_pipe: pipe2.clone(),
-            },
-            ConnectedSocket {
-                read_pipe: pipe2,
-                write_pipe: pipe1,
-            },
-        )
-    }
-
-    fn check_connection(
-        conn_client: &mut TlsConnection<Client, TlsMode>,
-        conn_server: &mut TlsConnection<Server, TlsMode>,
-    ) {
-        let mut client_established = false;
-        let mut server_established = false;
-
-        for _ in 0..100 {
-            if !client_established {
-                if let Some(mut handshake) = conn_client.in_handshake() {
-                    match handshake.do_handshake() {
-                        Ok(None) => client_established = true,
-                        Ok(Some(_)) => {}
-                        Err(e) => panic!("Client handshake failed: {:?}", e),
-                    }
-                } else {
-                    client_established = true;
-                }
-            }
-            if !server_established {
-                if let Some(mut handshake) = conn_server.in_handshake() {
-                    match handshake.do_handshake() {
-                        Ok(None) => server_established = true,
-                        Ok(Some(_)) => {}
-                        Err(e) => panic!("Server handshake failed: {:?}", e),
-                    }
-                } else {
-                    server_established = true;
-                }
-            }
-            if client_established && server_established {
-                break;
-            }
-        }
-
-        assert!(client_established);
-        assert!(server_established);
-
-        // Send application data to verify connection works
-        let test_data = TEST_DATA;
-        let mut written = 0;
-        while written < test_data.len() {
-            match conn_client.sync_write(&test_data[written..]) {
-                Ok(crate::io::IoStatus::Ok(n)) => written += n,
-                _ => panic!("Write failed"),
-            }
-        }
-
-        let mut read_buf = [0u8; TEST_DATA.len()];
-        let mut recv_buf = crate::ffi::ReceiveBuffer::new(&mut read_buf);
-        while recv_buf.remaining() > 0 {
-            match conn_server.sync_read(&mut recv_buf) {
-                Ok(crate::io::IoStatus::Ok(_)) => {}
-                _ => panic!("Read failed"),
-            }
-        }
-
-        assert_eq!(&read_buf, test_data);
-    }
 
     #[test]
     fn test_session_ops() {
@@ -485,17 +371,20 @@
         ctx_server.with_credential(cred_server).unwrap();
         let ctx_server = ctx_server.build();
 
-        let (sock_client, sock_server) = create_connected_pair();
+        let mut conn_client = ctx_client.new_client_connection(None).unwrap().build();
+        let mut conn_server = ctx_server.new_server_connection(None).unwrap().build();
 
-        let builder_client = ctx_client.new_client_connection(None).unwrap();
-        let mut conn_client = builder_client.build();
+        let (sock_client, sock_server, mut executor) = create_mock_pipe();
+
         conn_client.set_io(sock_client).unwrap();
-
-        let builder_server = ctx_server.new_server_connection(None).unwrap();
-        let mut conn_server = builder_server.build();
         conn_server.set_io(sock_server).unwrap();
 
-        check_connection(&mut conn_client, &mut conn_server);
+        executor
+            .run(try_join(
+                conn_client.in_handshake().unwrap().async_handshake(),
+                conn_server.in_handshake().unwrap().async_handshake(),
+            ))
+            .unwrap();
 
         let est_client = conn_client.established().unwrap();
         let session = est_client.get_session().unwrap();
@@ -533,35 +422,55 @@
         let mut ctx_server = TlsContextBuilder::new_tls();
         ctx_server.with_credential(cred_server).unwrap();
         let ctx_server = ctx_server.build();
+        let mut conn_client = ctx_client.new_client_connection(None).unwrap().build();
+        let mut conn_server = ctx_server.new_server_connection(None).unwrap().build();
 
-        let (sock_client, sock_server) = create_connected_pair();
+        let (sock_client, sock_server, mut executor) = create_mock_pipe();
 
-        let builder_client = ctx_client.new_client_connection(None).unwrap();
-        let mut conn_client = builder_client.build();
         conn_client.set_io(sock_client).unwrap();
-
-        let builder_server = ctx_server.new_server_connection(None).unwrap();
-        let mut conn_server = builder_server.build();
         conn_server.set_io(sock_server).unwrap();
 
-        check_connection(&mut conn_client, &mut conn_server);
+        executor
+            .run(try_join(
+                conn_client.in_handshake().unwrap().async_handshake(),
+                conn_server.in_handshake().unwrap().async_handshake(),
+            ))
+            .unwrap();
 
         let est_client = conn_client.established().unwrap();
         let session = est_client.get_session().unwrap();
+        let peer = session.get_peer_sha256().unwrap();
 
         // === SESSION RESUMPTION ===
         // Use the session for a new connection
-        let (sock_client_2, sock_server_2) = create_connected_pair();
+        let (sock_client_2, sock_server_2, mut executor) = create_mock_pipe();
 
         let mut builder_client_2 = ctx_client.new_client_connection(None).unwrap();
         builder_client_2.with_session(&session);
         let mut conn_client_2 = builder_client_2.build();
-        conn_client_2.set_io(sock_client_2).unwrap();
 
         let builder_server_2 = ctx_server.new_server_connection(None).unwrap();
         let mut conn_server_2 = builder_server_2.build();
+
+        conn_client_2.set_io(sock_client_2).unwrap();
         conn_server_2.set_io(sock_server_2).unwrap();
 
-        check_connection(&mut conn_client_2, &mut conn_server_2);
+        executor
+            .run(try_join(
+                Pin::new(&mut conn_client_2).async_write(b"hello"),
+                Pin::new(&mut conn_server_2).async_write(b"world"),
+            ))
+            .unwrap();
+        // The peer identity should be the same as before
+        assert_eq!(
+            conn_client_2
+                .established()
+                .unwrap()
+                .get_session()
+                .unwrap()
+                .get_peer_sha256()
+                .unwrap(),
+            peer
+        );
     }
 }