From 8bf7342c92bf3d20756f6ab7eb2835f33b5c1893 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Thu, 16 May 2019 11:58:38 +0200 Subject: [PATCH] add qemu-io crate, AioContext reactor helper Signed-off-by: Wolfgang Bumiller --- qemu-io/Cargo.toml | 29 ++++ qemu-io/src/aio_context.rs | 11 ++ qemu-io/src/aio_context/standalone.rs | 188 +++++++++++++++++++++++ qemu-io/src/lib.rs | 10 ++ qemu-io/src/util.rs | 39 +++++ qemu-io/src/with_aio_context.rs | 211 ++++++++++++++++++++++++++ 6 files changed, 488 insertions(+) create mode 100644 qemu-io/Cargo.toml create mode 100644 qemu-io/src/aio_context.rs create mode 100644 qemu-io/src/aio_context/standalone.rs create mode 100644 qemu-io/src/lib.rs create mode 100644 qemu-io/src/util.rs create mode 100644 qemu-io/src/with_aio_context.rs diff --git a/qemu-io/Cargo.toml b/qemu-io/Cargo.toml new file mode 100644 index 00000000..75a00f7c --- /dev/null +++ b/qemu-io/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "qemu-io" +version = "0.1.0" +authors = [ + "Wolfgang Bumiller ", +] +edition = "2018" + +#[lib] +#crate-type = ['lib', 'cdylib'] + +[dependencies] +failure = "0.1" +mio = "0.6" + +# In this crate 'future' by default means standard-future. +# The 0.1-futures are exposed under the name 'futures_01'. + +[dependencies.futures-preview] +version = "0.3.0-alpha.15" +features = ["compat", "io-compat"] + +[dependencies.futures_01] +package = "futures" +version = "0.1" + +[features] +default = ["standalone"] +standalone = [] diff --git a/qemu-io/src/aio_context.rs b/qemu-io/src/aio_context.rs new file mode 100644 index 00000000..b2b1c10b --- /dev/null +++ b/qemu-io/src/aio_context.rs @@ -0,0 +1,11 @@ +//! Provides a handle to an AioContext. + +#[cfg(feature="standalone")] +mod standalone; +#[cfg(feature="standalone")] +pub use standalone::AioContext; + +// TODO: Add the non-standalone variant to be linked with Qemu: +// The AioContext struct should provide a high-level version of `set_fd_handler` with the same +// interface the standalone version provides out of the box (transparently turning closures into +// `extern "C" fn(opaque: *const c_void)` calls. diff --git a/qemu-io/src/aio_context/standalone.rs b/qemu-io/src/aio_context/standalone.rs new file mode 100644 index 00000000..5b425675 --- /dev/null +++ b/qemu-io/src/aio_context/standalone.rs @@ -0,0 +1,188 @@ +//! This implements the parts of qemu's AioContext interface we need for testing outside qemu. + +use std::collections::HashMap; +use std::os::unix::io::RawFd; +use std::sync::{Arc, Mutex, RwLock}; +use std::thread; + +use failure::Error; +use mio::{Events, Poll, Token}; +use mio::unix::EventedFd; + +use crate::util::{AioCb, AioHandlerState}; + +/// This is a reference to a standalone `AioContextImpl` and allows instantiating a new context +/// with a polling thread. +#[derive(Clone)] +#[repr(transparent)] +pub struct AioContext(Arc); + +impl std::ops::Deref for AioContext { + type Target = AioContextImpl; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +impl AioContext { + /// Create a new `AioContext` instance with an associated polling thread, which will live as + /// long as there are references to it. + pub fn new() -> Result { + Ok(Self(AioContextImpl::new()?)) + } +} + +pub struct AioContextImpl { + poll: Poll, + handlers: RwLock>, + poll_thread: Mutex>>, +} + +impl AioContextImpl { + pub fn new() -> Result, Error> { + let this = Arc::new(Self { + poll: Poll::new()?, + handlers: RwLock::new(HashMap::new()), + poll_thread: Mutex::new(None), + }); + + let this2 = Arc::clone(&this); + this.poll_thread.lock().unwrap().replace(thread::spawn(|| this2.main_loop())); + + Ok(this) + } + + /// Qemu's aio_set_fd_handler. We're skipping the `io_poll` parameter for this implementation + /// as we don't use it. + /// ``` + /// void aio_set_fd_handler(AioContext *ctx, + /// int fd, + /// bool is_external, + /// IOHandler *io_read, + /// IOHandler *io_write, + /// AioPollFn *io_poll, + /// void *opaque); + /// ``` + /// + /// Since this does not have any ways of returning errors, wrong usage will cause a panic in + /// this test implementation. + pub fn set_fd_handler( + &self, + fd: RawFd, + io_read: Option, + io_write: Option, + // skipping io_poll, + //opaque: *const (), + ) { + self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::level()) + } + + /// This is going to be a proposed new api for Qemu's AioContext. + pub fn set_fd_handler_edge( + &self, + fd: RawFd, + io_read: Option, + io_write: Option, + // skipping io_poll, + //opaque: *const (), + ) { + self.set_fd_handler_impl(fd, io_read, io_write, mio::PollOpt::edge()) + } + + fn set_fd_handler_impl( + &self, + fd: RawFd, + io_read: Option, + io_write: Option, + // skipping io_poll, + //opaque: *const (), + poll_opt: mio::PollOpt, + ) { + if io_read.is_none() && io_write.is_none() { + return self.remove_fd_handler(fd); + } + + let handlers = AioHandlerState { + read: io_read, + write: io_write, + }; + + let mio_ready = handlers.mio_ready(); + + let token = Token(fd as usize); + + use std::collections::hash_map::Entry; + match self.handlers.write().unwrap().entry(token) { + Entry::Vacant(entry) => { + self.poll.register(&EventedFd(&fd), token, mio_ready, poll_opt) + .expect("failed to register a new fd for polling"); + entry.insert(handlers); + } + Entry::Occupied(mut entry) => { + self.poll.reregister(&EventedFd(&fd), token, mio_ready, poll_opt) + .expect("failed to update an existing poll fd"); + entry.insert(handlers); + } + } + } + + fn remove_fd_handler(&self, fd: RawFd) { + let mut guard = self.handlers.write().unwrap(); + self.poll.deregister(&EventedFd(&fd)) + .expect("failed to remove an existing poll fd"); + guard.remove(&Token(fd as usize)); + } + + /// We don't use qemu's aio_poll, so let's make this easy: + /// + /// ``` + /// bool aio_poll(AioContext *ctx, bool blocking); + /// ``` + pub fn poll(&self) -> Result<(), Error> { + let timeout = Some(std::time::Duration::from_millis(100)); + + let mut events = Events::with_capacity(16); + + if self.poll.poll(&mut events, timeout)? == 0 { + return Ok(()); + } + + for event in events.iter() { + let token = event.token(); + let ready = event.readiness(); + // NOTE: We need to read-lock while fetching handlers, but handlers need a write-lock!!! + // because they need to be edge-triggered and therefore *update* this handler list! + // + // While we could instead do this here (or use edge triggering from mio), this would + // not properly simulate Qemu's AioContext, so we enforce this behavior here as well. + // + // This means we cannot just hold a read lock during the events.iter() iteration + // though. + let handler = self.handlers.read().unwrap().get(&token).map(|h| AioHandlerState { + // Those are Option! + read: h.read.clone(), + write: h.write.clone(), + }); + if let Some(handler) = handler { + if ready.is_readable() { + handler.read.as_ref().map(|func| func()); + } + if ready.is_writable() { + handler.write.as_ref().map(|func| func()); + } + } + } + + Ok(()) + } + + fn main_loop(mut self: Arc) { + while Arc::get_mut(&mut self).is_none() { + if let Err(err) = self.poll() { + dbg!("error AioContextImpl::poll(): {}", err); + break; + } + } + } +} diff --git a/qemu-io/src/lib.rs b/qemu-io/src/lib.rs new file mode 100644 index 00000000..073c2c43 --- /dev/null +++ b/qemu-io/src/lib.rs @@ -0,0 +1,10 @@ +// used for testing + +mod util; +mod with_aio_context; + +#[cfg(feature="standalone")] +mod aio_context; + +pub use with_aio_context::WithAioContext; +pub use aio_context::AioContext; diff --git a/qemu-io/src/util.rs b/qemu-io/src/util.rs new file mode 100644 index 00000000..6fed0164 --- /dev/null +++ b/qemu-io/src/util.rs @@ -0,0 +1,39 @@ +//! Some types used by both our internal testing AioContext implementation as well as our +//! WithAioContext wrapper. + +/// An Aio Callback. Qemu's AioContext actually uses a void function taking an opaque pointer. +/// For simplicity we stick to closures for now. +pub type AioCb = std::sync::Arc; + +/// This keeps track of our poll state (whether we wait to be notified for read or write +/// readiness.) +#[derive(Default)] +pub struct AioHandlerState { + pub read: Option, + pub write: Option, +} + +impl AioHandlerState { + /// Get an mio::Ready with readable set if `read` is `Some`, and writable + /// set if `write` is `Some`. + pub fn mio_ready(&self) -> mio::Ready { + use mio::Ready; + + let mut ready = Ready::empty(); + if self.read.is_some() { + ready |= Ready::readable(); + } + + if self.write.is_some() { + ready |= Ready::writable(); + } + + ready + } + + /// Shortcut + pub fn clear(&mut self) { + self.read = None; + self.write = None; + } +} diff --git a/qemu-io/src/with_aio_context.rs b/qemu-io/src/with_aio_context.rs new file mode 100644 index 00000000..3ed0b94b --- /dev/null +++ b/qemu-io/src/with_aio_context.rs @@ -0,0 +1,211 @@ +//! This module provides `WithAioContext`, which is a helper to connect any raw I/O file descriptor +//! (`T: AsRawFd`) with an `AioContext`. + +use std::io; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::pin::Pin; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::task::{Context, Poll}; + +use mio::Ready; + +use crate::AioContext; +use crate::util::{AioCb, AioHandlerState}; + +/// This provides a basic mechanism to connect a type containing a file descriptor (i.e. it +/// implements `AsRawFd`) to an `AioContext`. +/// +/// If the underlying type implements `Read` this wrapper also provides an `AsyncRead` +/// implementation. Likewise it'll provide `AsyncWrite` for types implementing `Write`. +/// For this to function properly, the underlying type needs to return `io::Error` of kind +/// `io::ErrorKind::WouldBlock` on blocking operations which should be retried when the file +/// descriptor becomes ready. +/// +/// `WithAioContext` _owns_ the underlying object. This is because our Drop handler wants to +/// unregister the file descriptor, but systems like linux' epoll do that automatically when the fd +/// is closed, so we cannot have our file descriptor vanish before de-registering it, otherwise we +/// may be de-registering an already re-used number. +/// +/// Implements `Deref` so any methods of `T` still work on a `WithAioContext`. +pub struct WithAioContext { + aio_context: AioContext, + fd: RawFd, + handlers: Arc>, + inner: Option, +} + +impl std::ops::Deref for WithAioContext { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref().unwrap() + } +} + +impl std::ops::DerefMut for WithAioContext { + fn deref_mut(&mut self) -> &mut Self::Target { + self.inner.as_mut().unwrap() + } +} + +impl WithAioContext { + pub fn new(aio_context: AioContext, inner: T) -> Self { + Self { + aio_context, + fd: inner.as_raw_fd(), + handlers: Arc::new(Mutex::new(Default::default())), + inner: Some(inner), + } + } + + /// Deregister from the `AioContext` and return the inner file handle. + pub fn into_inner(mut self) -> T { + let out = self.inner.take().unwrap(); + std::mem::drop(self); + out + } + + /// Shortcut around the `unwrap()`. The `Option<>` around `inner` is only there because we have + /// a `Drop` implementation which prevents us to move-out the value in the `into_inner()` + /// method. + fn inner_mut(&mut self) -> &mut T { + self.inner.as_mut().unwrap() + } + + /// Shortcut around the `unwrap()`, immutable variant: + //fn inner(&self) -> &T { + // self.inner.as_ref().unwrap() + //} + + /// Shortcut to set_fd_handlers. For the "real" qemu interface we'll have to turn the closures + /// into raw function pointers here (they'll get an opaque pointer parameter). + fn commit_handlers( + aio_context: &AioContext, + fd: RawFd, + handlers: &mut MutexGuard, + ) { + aio_context.set_fd_handler( + fd, + handlers.read.as_ref().map(|x| (*x).clone()), + handlers.write.as_ref().map(|x| (*x).clone()), + ) + } + + /// Create a waker closure for a context for a specific ready state. When a file descriptor is + /// ready for reading or writing, we need to remove the corresponding handler from the + /// `AioContext` (make it an edge-trigger instead of a level trigger) before finally calling + /// `waker.wake_by_ref()` to queue the task for polling. + fn make_wake_fn(&self, cx: &mut Context, ready: Ready) -> AioCb { + let waker = cx.waker().clone(); + + // we don't want to be publicly clonable so clone manually here: + let aio_context = self.aio_context.clone(); + let fd = self.fd; + let handlers = Arc::clone(&self.handlers); + Arc::new(move || { + let mut guard = handlers.lock().unwrap(); + + if ready.is_readable() { + guard.read = None; + } + + if ready.is_writable() { + guard.write = None; + } + + Self::commit_handlers(&aio_context, fd, &mut guard); + waker.wake_by_ref(); + }) + } + + /// Register our file descriptor with the `AioContext` for reading or writing. + /// This only affects the directions present in the provided `ready` value, and will leave the + /// other directions unchanged. + pub fn register(&self, cx: &mut Context, ready: Ready) { + let mut guard = self.handlers.lock().unwrap(); + + if ready.is_readable() { + guard.read = Some(self.make_wake_fn(cx, ready)); + } + + if ready.is_writable() { + guard.write = Some(self.make_wake_fn(cx, ready)); + } + + Self::commit_handlers(&self.aio_context, self.fd, &mut guard) + } + + /// Helper to handle an `io::Result`, turning `Result` into `Poll>`, by + /// changing an `io::ErrorKind::WouldBlock` into `Poll::Pending` and taking care of registering + /// the file descriptor with the AioContext for the next wake-up. + /// `Ok` and errors other than the above will be passed through wrapped in `Poll::Ready`. + pub fn handle_aio_result( + &self, + cx: &mut Context, + result: io::Result, + ready: Ready, + ) -> Poll> { + match result { + Ok(res) => Poll::Ready(Ok(res)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.register(cx, ready); + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + } +} + +impl Drop for WithAioContext { + fn drop(&mut self) { + let mut guard = self.handlers.lock().unwrap(); + (*guard).clear(); + if !guard.mio_ready().is_empty() { + Self::commit_handlers(&self.aio_context, self.fd, &mut guard); + } + } +} + +impl futures::io::AsyncRead for WithAioContext +where + T: AsRawFd + io::Read + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let res = self.inner_mut().read(buf); + self.handle_aio_result(cx, res, mio::Ready::readable()) + } +} + +impl futures::io::AsyncWrite for WithAioContext +where + T: AsRawFd + io::Write + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + let result = self.inner_mut().write(buf); + self.handle_aio_result(cx, result, mio::Ready::writable()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let result = self.inner_mut().flush(); + self.handle_aio_result(cx, result, mio::Ready::writable()) + } + + // I'm not sure what they expect me to do here. The `close()` syscall has no async variant, so + // all I can do is `flush()` and then drop the inner stream... + // + // Using `.into_inner()` after this will cause a panic. + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let result = self.inner_mut().flush(); + let _ = futures::ready!(self.handle_aio_result(cx, result, mio::Ready::writable())); + std::mem::drop(self.inner.take()); + Poll::Ready(Ok(())) + } +}