From 9b00099ead876871d89cd9b37825e8180a52f5a1 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Thu, 20 Jan 2022 15:41:00 +0100 Subject: [PATCH] drop RawWaker usage this was also leaking a refcount before, this is fixed now See-also: proxmox/proxmox-async: * d0a3e38006fe ("drop RawWaker usage") * ff132e93c6fd ("rustfmt") Signed-off-by: Wolfgang Bumiller --- src/tools/runtime.rs | 68 ++++++++++++++++---------------------------- 1 file changed, 24 insertions(+), 44 deletions(-) diff --git a/src/tools/runtime.rs b/src/tools/runtime.rs index 477d26d6..0fe9fae5 100644 --- a/src/tools/runtime.rs +++ b/src/tools/runtime.rs @@ -2,8 +2,8 @@ use std::cell::RefCell; use std::future::Future; -use std::sync::{Arc, Weak, Mutex}; -use std::task::{Context, Poll, RawWaker, Waker}; +use std::sync::{Arc, Mutex, Weak}; +use std::task::{Context, Poll, Waker}; use std::thread::{self, Thread}; use lazy_static::lazy_static; @@ -15,8 +15,7 @@ thread_local! { } fn is_in_tokio() -> bool { - tokio::runtime::Handle::try_current() - .is_ok() + tokio::runtime::Handle::try_current().is_ok() } fn is_blocking() -> bool { @@ -49,7 +48,8 @@ lazy_static! { static ref RUNTIME: Mutex> = Mutex::new(Weak::new()); } -extern { +#[link(name = "crypto")] +extern "C" { fn OPENSSL_thread_stop(); } @@ -58,16 +58,19 @@ extern { /// This makes sure that tokio's worker threads are marked for us so that we know whether we /// can/need to use `block_in_place` in our `block_on` helper. pub fn get_runtime_with_builder runtime::Builder>(get_builder: F) -> Arc { - let mut guard = RUNTIME.lock().unwrap(); - if let Some(rt) = guard.upgrade() { return rt; } + if let Some(rt) = guard.upgrade() { + return rt; + } let mut builder = get_builder(); builder.on_thread_stop(|| { // avoid openssl bug: https://github.com/openssl/openssl/issues/6214 // call OPENSSL_thread_stop to avoid race with openssl cleanup handlers - unsafe { OPENSSL_thread_stop(); } + unsafe { + OPENSSL_thread_stop(); + } }); let runtime = builder.build().expect("failed to spawn tokio runtime"); @@ -82,7 +85,6 @@ pub fn get_runtime_with_builder runtime::Builder>(get_builder: F) -> /// /// This calls get_runtime_with_builder() using the tokio default threaded scheduler pub fn get_runtime() -> Arc { - get_runtime_with_builder(|| { let mut builder = runtime::Builder::new_multi_thread(); builder.enable_all(); @@ -90,7 +92,6 @@ pub fn get_runtime() -> Arc { }) } - /// Block on a synchronous piece of code. pub fn block_in_place(fut: impl FnOnce() -> R) -> R { // don't double-exit the context (tokio doesn't like that) @@ -155,12 +156,22 @@ pub fn main(fut: F) -> F::Output { block_on(fut) } +struct ThreadWaker(Thread); + +impl std::task::Wake for ThreadWaker { + fn wake(self: Arc) { + self.0.unpark(); + } + + fn wake_by_ref(self: &Arc) { + self.0.unpark(); + } +} + fn block_on_local_future(fut: F) -> F::Output { pin_mut!(fut); - let waker = Arc::new(thread::current()); - let waker = thread_waker_clone(Arc::into_raw(waker) as *const ()); - let waker = unsafe { Waker::from_raw(waker) }; + let waker = Waker::from(Arc::new(ThreadWaker(thread::current()))); let mut context = Context::from_waker(&waker); loop { match fut.as_mut().poll(&mut context) { @@ -169,34 +180,3 @@ fn block_on_local_future(fut: F) -> F::Output { } } } - -const THREAD_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new( - thread_waker_clone, - thread_waker_wake, - thread_waker_wake_by_ref, - thread_waker_drop, -); - -fn thread_waker_clone(this: *const ()) -> RawWaker { - let this = unsafe { Arc::from_raw(this as *const Thread) }; - let cloned = Arc::clone(&this); - let _ = Arc::into_raw(this); - - RawWaker::new(Arc::into_raw(cloned) as *const (), &THREAD_WAKER_VTABLE) -} - -fn thread_waker_wake(this: *const ()) { - let this = unsafe { Arc::from_raw(this as *const Thread) }; - this.unpark(); -} - -fn thread_waker_wake_by_ref(this: *const ()) { - let this = unsafe { Arc::from_raw(this as *const Thread) }; - this.unpark(); - let _ = Arc::into_raw(this); -} - -fn thread_waker_drop(this: *const ()) { - let this = unsafe { Arc::from_raw(this as *const Thread) }; - drop(this); -}