Rust Async: Custom Futures with `pin` and `Poll` for State Machines
Beyond `async/await`: The Case for Manual Future Implementation
In modern Rust, the async/await syntax provides a remarkably ergonomic way to write asynchronous code. The compiler transforms our sequential-looking async functions into complex state machines that implement the Future trait. For 95% of use cases, this is the ideal approach. It's safe, readable, and highly efficient.
However, for senior systems programmers, performance-critical library authors, or architects of low-level networking stacks, there are scenarios where the compiler-generated state machine isn't enough. We might need:
async fn captures its local variables in a single, compiler-generated struct. If we need to dynamically allocate within our async logic (e.g., Box::pin), we introduce heap allocations. A manually implemented Future can be designed to live entirely on the stack, offering predictable performance in hot paths.Future gives us explicit control over the state enum and its transitions. This is invaluable when building complex protocols (like a database driver or an RPC framework) where the state transitions are non-trivial and require careful resource management across polls.Future is often the cleanest way to bridge the two concurrency models.This article assumes you are already proficient with Rust's async/await and have a solid grasp of ownership, borrowing, and lifetimes. We will not cover the basics. Instead, we will deconstruct the Future trait and its components—Pin, Poll, and Context—to build a robust, multi-stage asynchronous state machine from scratch.
The `Future` Trait: A Deconstruction
Let's start with the definition of the Future trait itself:
// std::future::Future
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Three components are critical for an implementor:
self: Pin<&mut Self>: This is the most complex and crucial part. It's a pinned mutable reference to our future's state. The Pin wrapper is a compile-time guarantee that the memory location of self will not* change until it is dropped. We will dedicate an entire section to why this is essential for self-referential structs, which are ubiquitous in async state machines.
* cx: &mut Context<'_>: The context provides access to a Waker. The Waker is a handle that the Future must store if it cannot make progress. When the external event it's waiting for occurs (e.g., a socket becomes readable), that event source will use the Waker to notify the async runtime's executor that our Future is ready to be polled again.
* Poll: This is the return type, an enum with two variants:
Poll::Pending: Signifies that the Future is not yet complete. Before returning Pending, the future must* ensure it has cloned the Waker from the Context and arranged for it to be called when progress can be made. Failure to do so will result in the future sleeping forever.
* Poll::Ready(value): Signifies that the Future has completed, yielding its Output value.
The Pinning Problem: Self-Referential Structs and Memory Stability
Why Pin? Why can't the poll method just take &mut self? The answer lies in self-referential data. Consider a future that needs to read from a buffer into a field:
// A naive, INCORRECT future that will not compile with a safe poll method.
struct UnsafeReadFuture<'a> {
buffer: [u8; 1024],
// This slice refers to the memory of `buffer` within the same struct.
read_slice: &'a [u8],
}
If we could move an UnsafeReadFuture in memory (e.g., by passing it by value or via std::mem::swap), the read_slice pointer would become invalid, pointing to the old memory location of buffer. This is a classic dangling pointer problem.
async blocks frequently create self-referential structs under the hood. When you .await a future, the state of the parent future often holds references into the state of the child future.
The Pin<&mut T> type acts as a promise to the compiler. It guarantees that the T it points to will not be moved. Its memory location is fixed. This allows us to safely create and work with self-referential structs. When you implement poll, you receive self: Pin<&mut Self>. You can't just get a &mut T from it, because that would break the pinning guarantee. You must work through the Pin API, which provides safe methods to access the fields.
For a struct to be pinnable, it doesn't need to do anything special. But to get a &mut to a field of a pinned struct, the struct itself must either be Unpin or you must use unsafe code with extreme care. The Unpin trait is an auto-trait that signals a type has no problem with being moved. Most standard library types are Unpin. However, the state machine generated by an async block is !Unpin (not Unpin). Our manually implemented, self-referential state machine will also be !Unpin.
We'll see how to handle this safely when we build our state machine.
Building a Simple Custom Future: `AsyncTimer`
Let's start with a foundational example: a future that completes after a specified duration. This will demonstrate the core polling and Waker mechanism.
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::Duration;
// Shared state between the future and the timer thread
struct TimerSharedState {
completed: bool,
waker: Option<Waker>,
}
pub struct AsyncTimer {
shared_state: Arc<Mutex<TimerSharedState>>,
}
impl AsyncTimer {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(TimerSharedState {
completed: false,
waker: None,
}));
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut state = thread_shared_state.lock().unwrap();
state.completed = true;
if let Some(waker) = state.waker.take() {
// The crucial step: wake the task to be polled again.
waker.wake();
}
});
AsyncTimer { shared_state }
}
}
impl Future for AsyncTimer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.shared_state.lock().unwrap();
if state.completed {
Poll::Ready(())
} else {
// The future is not ready. We must store the waker so the timer thread
// can notify the executor to poll us again.
// It's important to handle the case where the waker has changed
// since the last poll.
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
// Example usage with Tokio
#[tokio::main]
asyn fn main() {
println!("Starting timer...");
let timer = AsyncTimer::new(Duration::from_secs(2));
timer.await;
println!("Timer finished!");
}
Analysis of AsyncTimer:
Future. An Arc> is a straightforward (though not the most performant) way to achieve this.new function, we spawn a dedicated thread that simply sleeps. In a real-world scenario, this would be an event from an I/O reactor (like epoll, kqueue, or iocp).poll Logic: This is the core of the implementation.* We lock the shared state to check if the timer has completed.
* If completed is true, we are done: Poll::Ready(()).
* If completed is false, we must return Poll::Pending. But before we do, we clone the Waker from the Context and store it in our shared state. This is the contract: if you return Pending, you are responsible for arranging a future wake-up call.
completed to true, and crucially, calls waker.wake(). This tells the executor (e.g., Tokio) that the task associated with this waker should be scheduled to run again. The executor will then call our poll method once more, and this time, we will find state.completed is true and return Poll::Ready(()).Advanced Implementation: A Multi-Stage TCP Connection State Machine
Now for a more realistic and complex example. Let's implement a future that represents a simplified TCP client connection performing a request/response cycle. This state machine will progress through several stages:
ResolvingDns: Looking up the IP address for a given hostname.Connecting: Establishing the TCP connection to the resolved IP address.Writing: Sending the request data to the stream.Reading: Reading the response data from the stream.Closed: The final state.This example requires tokio for its async networking primitives. We will use tokio::net::TcpStream and tokio::net::lookup_host.
use std::future::Future;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpStream, ToSocketAddrs};
use pin_project_lite::pin_project; // Helper for pinning fields
// pin_project is a procedural macro that creates projections from a Pin<&mut T> to its fields.
// It generates the unsafe code required to do this correctly.
pin_project! {
pub struct SimpleHttpClient {
// The current state of our machine
state: State,
// A domain name or IP address to connect to
address: String,
// The data we want to send
request_body: Vec<u8>,
}
}
// The various states our machine can be in. Each state holds the necessary data
// for that stage of the process.
enum State {
Initial,
Resolving(Pin<Box<dyn Future<Output = io::Result<std::vec::IntoIter<SocketAddr>>> + Send>>),
Connecting(Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>),
Writing(TcpStream, usize), // The stream and how many bytes we've written
Reading(TcpStream, Vec<u8>), // The stream and the buffer for the response
Closed,
}
impl SimpleHttpClient {
pub fn new(address: &str, request_body: Vec<u8>) -> Self {
SimpleHttpClient {
state: State::Initial,
address: address.to_string(),
request_body,
}
}
}
impl Future for SimpleHttpClient {
type Output = io::Result<Vec<u8>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// The pin_project macro gives us a `project()` method to safely access fields
let mut this = self.as_mut().project();
loop {
match this.state {
State::Initial => {
// Transition to the DNS resolving state
let address = this.address.clone();
let fut = Box::pin(tokio::net::lookup_host(address));
*this.state = State::Resolving(fut);
// We don't return here, we loop to poll the new state immediately
}
State::Resolving(ref mut fut) => {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(mut addresses)) => {
if let Some(addr) = addresses.next() {
// DNS resolved, transition to connecting state
let fut = Box::pin(TcpStream::connect(addr));
*this.state = State::Connecting(fut);
} else {
// No addresses found
let err = io::Error::new(io::ErrorKind::NotFound, "No addresses found for host");
*this.state = State::Closed;
return Poll::Ready(Err(err));
}
}
Poll::Ready(Err(e)) => {
*this.state = State::Closed;
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
}
}
State::Connecting(ref mut fut) => {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(stream)) => {
// Connection established, transition to writing state
*this.state = State::Writing(stream, 0);
}
Poll::Ready(Err(e)) => {
*this.state = State::Closed;
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
}
}
State::Writing(ref mut stream, ref mut written) => {
while *written < this.request_body.len() {
// We need to pin the stream to call poll_write
let pinned_stream = Pin::new(stream);
match pinned_stream.poll_write(cx, &this.request_body[*written..]) {
Poll::Ready(Ok(n)) => {
if n == 0 {
let err = io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer");
*this.state = State::Closed;
return Poll::Ready(Err(err));
}
*written += n;
}
Poll::Ready(Err(e)) => {
*this.state = State::Closed;
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
}
}
// Finished writing, transition to reading state
// We must take the stream out of the old state
if let State::Writing(stream, _) = std::mem::replace(this.state, State::Closed) {
*this.state = State::Reading(stream, Vec::new());
} else {
unreachable!(); // Should not happen
}
}
State::Reading(ref mut stream, ref mut buffer) => {
let mut read_buf = [0u8; 4096];
loop {
let pinned_stream = Pin::new(stream);
match pinned_stream.poll_read(cx, &mut read_buf) {
Poll::Ready(Ok(0)) => {
// EOF, we are done reading.
// Take the buffer out of the state
let final_buffer = std::mem::take(buffer);
*this.state = State::Closed;
return Poll::Ready(Ok(final_buffer));
}
Poll::Ready(Ok(n)) => {
buffer.extend_from_slice(&read_buf[..n]);
// Loop to try and read more immediately
}
Poll::Ready(Err(e)) => {
*this.state = State::Closed;
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
}
}
}
State::Closed => {
// Should not be polled after completion
panic!("SimpleHttpClient polled after completion");
}
}
}
}
}
#[tokio::main]
async fn main() {
let request = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n".to_vec();
let client = SimpleHttpClient::new("example.com:80", request);
println!("Executing client future...");
match client.await {
Ok(response) => {
println!("Received response:\n{}", String::from_utf8_lossy(&response));
}
Err(e) => {
eprintln!("An error occurred: {}", e);
}
}
}
Analysis of the State Machine:
pin-project-lite Crate: This is a crucial helper. Because our SimpleHttpClient struct is not Unpin, we cannot get a &mut to its fields from Pin<&mut Self>. pin_project generates an unsafe implementation that provides a safe project() method, returning a projection struct that holds pinned mutable references to the fields. This lets us work with the fields without breaking the top-level pinning guarantee.State enum is the heart of the machine. Each variant holds the data required for that specific stage. For async operations like Resolving and Connecting, the state holds the sub-future (Pin> ).poll Loop: The poll method is structured as a loop. This is a common and efficient pattern. When one state transition completes (e.g., DNS resolution finishes), instead of returning Poll::Pending and waiting for another wake-up, we immediately loop back and poll the new state. We only return Pending when a sub-future (like poll_connect or poll_read) itself returns Pending.*this.state. We use std::mem::replace or std::mem::take to move resources (like the TcpStream) out of the old state variant into the new one. This is memory-efficient as it avoids re-allocation.Writing or Reading), we call the corresponding poll_* method (e.g., poll_write, poll_read) on the pinned resource, passing through the Context (cx). This propagates the waker mechanism down the chain.Edge Cases and Production Considerations
* Cancellation Safety: What happens if our SimpleHttpClient future is dropped before it completes? Our current implementation is mostly cancellation-safe. When dropped, the TcpStream and any boxed futures are also dropped, which will close the connection and clean up resources. However, if we were performing an operation that had side effects that needed to be rolled back, we would need to implement the Drop trait for SimpleHttpClient to handle that cleanup.
* Unpin and Stack Pinning: We used Box::pin to heap-allocate our sub-futures. This is convenient but introduces allocations. In a zero-allocation context, we would need to store the sub-futures directly within our state enum. This is more complex because the size of the enum would need to accommodate the largest future variant. Utilities like tokio::pin! can be used to pin a value to the stack for the duration of a scope, avoiding heap allocation.
* Error Handling: Our implementation propagates io::Error directly. A production-grade client would have a more sophisticated error type, wrapping I/O errors with contextual information (e.g., Error::DnsResolutionFailed, Error::ConnectionFailed).
* Backpressure: In the Writing state, we loop on poll_write. If the underlying socket is not ready, poll_write will return Pending and correctly register the waker. In the Reading state, we also loop on poll_read. This is a greedy approach. In a more complex application, you might want to yield control back to the executor between reads if the buffer grows too large, to prevent one task from monopolizing the CPU.
Conclusion: Control vs. Complexity
Manually implementing the Future trait is a powerful technique that peels back the layers of abstraction provided by async/await. It grants you the ultimate control over state transitions, memory layout, and allocation strategy, which is critical in performance-sensitive systems programming.
This power comes at the cost of significant complexity. You become responsible for correctly managing state, propagating wakers, and upholding the invariants of Pin. The compiler is no longer generating the state machine for you; you are the state machine's architect. For most application-level code, async/await remains the superior choice for its safety and readability. But for the systems programmer building the foundational libraries and protocols that power the async ecosystem, a deep, first-principles understanding of Future, Pin, and Poll is an indispensable tool.