mio是rust实现的一个轻量级的I/O库。其实现基本上就是对不同操作系统底层相关API的封装,抽象出统一的接口供上层使用。Linux下为epoll,Windows下为IOCP,OS X下为kqueue



  1. 创建Poll
  2. 注册事件
  3. 事件循环等待与处理事件


pub struct Poll {
    // Platform specific IO selector
    selector: sys::Selector,    #不同操作系统不同,在Linux下是epoll

    // Custom readiness queue
    readiness_queue: ReadinessQueue,

    // Use an atomic to first check if a full lock will be required. This is a
    // fast-path check for single threaded cases avoiding the extra syscall
    lock_state: AtomicUsize,

    // Sequences concurrent calls to `Poll::poll`
    lock: Mutex<()>,

    // Wakeup the next waiter
    condvar: Condvar,

mio提供可跨平台的sytem selector访问,不同平台如下表,都可调用相同的API。不同平台使用的API开销不尽相同。由于mio是基于readiness(就绪状态)的API,与Linux epoll相似,可以看到很多API在Linux上都可以一对一映射。相比之下,Windows IOCP是基于完成(completion-based)而非基于就绪的API,所以两者间会有较多桥接。 同时mio提供自身版本的TcpListener、TcpStream、UdpSocket,这些API封装了底层平台相关API,并设为非阻塞且实现Evented trait。

OS Selector
Linux epoll
OS X, iOS kqueue
Windows IOCP
FreeBSD kqueue
Android epoll



//!  mio demo 1

extern crate log;
extern crate simple_logger;
extern crate mio;

use mio::*;
use mio::tcp::{TcpListener, TcpStream};
use std::io::{Read,Write};

fn main() {


    // Setup some tokens to allow us to identify which event is for which socket.
    const SERVER: Token = Token(0);
    const CLIENT: Token = Token(1);

    let addr = "".parse().unwrap();

    // Setup the server socket
    let server = TcpListener::bind(&addr).unwrap();

    // Create a poll instance
    let poll = Poll::new().unwrap();

    // Start listening for incoming connections
    poll.register(&server, SERVER, Ready::readable(), PollOpt::edge()).unwrap();

    // Setup the client socket
    let sock = TcpStream::connect(&addr).unwrap();

    // Register the socket
    poll.register(&sock, CLIENT, Ready::readable(), PollOpt::edge()).unwrap();

    // Create storage for events
    let mut events = Events::with_capacity(1024);

    loop {
        poll.poll(&mut events, None).unwrap();

        for event in events.iter() {
            match event.token() {
                SERVER => {
                    // Accept and drop the socket immediately, this will close
                    // the socket and notify the client of the EOF.
                    let (stream,addr) = server.accept().unwrap();
                    info!("Listener accept {:?}",addr);
                CLIENT => {
                    // The server just shuts down the socket, let's just exit
                    // from our event loop.
                    info!("client response.");
                _ => unreachable!(),
//! mio demo 2
extern crate log;
extern crate simple_logger;
extern crate mio;

use mio::*;
use mio::timer::{Timeout};
use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
use std::thread;
use std::time::Duration;

fn main() {

    let mut event_loop=EventLoop::new().unwrap();

    thread::spawn(move ||{

    let timeout = event_loop.timeout(Token(123), Duration::from_millis(3000)).unwrap();

    let mut handler=MioHandler::new();
    let _ = handler).unwrap();

pub enum IoMessage{

pub struct MioHandler{

impl MioHandler{
    pub fn new()->Self{

impl Handler for MioHandler {
    type Timeout = Token;
    type Message = IoMessage;

    /// Invoked when the socket represented by `token` is ready to be operated on.
    fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {

    /// Invoked when a message has been received via the event loop's channel.
    fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
        match msg {
            IoMessage::Notify=>info!("channel notify"),
                info!("shutdown eventloop.");

    /// Invoked when a timeout has completed.
    fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {
        match timeout{
            Token(123)=>info!("time out."),

    /// Invoked when `EventLoop` has been interrupted by a signal interrupt.
    fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {

    /// Invoked at the end of an event loop tick.
    fn tick(&mut self, event_loop: &mut EventLoop<Self>) {


pub trait Handler: Sized {
    type Timeout;
    type Message;

    /// Invoked when the socket represented by `token` is ready to be operated
    /// on. `events` indicates the specific operations that are
    /// ready to be performed.
    /// For example, when a TCP socket is ready to be read from, `events` will
    /// have `readable` set. When the socket is ready to be written to,
    /// `events` will have `writable` set.
    /// This function will only be invoked a single time per socket per event
    /// loop tick.
    fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token, events: Ready) {

    /// Invoked when a message has been received via the event loop's channel.
    fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {

    /// Invoked when a timeout has completed.
    fn timeout(&mut self, event_loop: &mut EventLoop<Self>, timeout: Self::Timeout) {

    /// Invoked when `EventLoop` has been interrupted by a signal interrupt.
    fn interrupted(&mut self, event_loop: &mut EventLoop<Self>) {

    /// Invoked at the end of an event loop tick.
    fn tick(&mut self, event_loop: &mut EventLoop<Self>) {
impl<H: Handler> EventLoop<H> {

    /// Constructs a new `EventLoop` using the default configuration values.
    /// The `EventLoop` will not be running.
    pub fn new() -> io::Result<EventLoop<H>> {

    fn configured(config: Config) -> io::Result<EventLoop<H>> {
        // Create the IO poller
        let poll = Poll::new()?;

        let timer = timer::Builder::default()

        // Create cross thread notification queue
        let (tx, rx) = channel::sync_channel(config.notify_capacity);  //这里创建的是同步管道,可配置同步管道内部的buffer queue bound size.

        // Register the notification wakeup FD with the IO poller
        poll.register(&rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot())?;
        poll.register(&timer, TIMER, Ready::readable(), PollOpt::edge())?;

        Ok(EventLoop {
            run: true,
            poll: poll,
            timer: timer,
            notify_tx: tx,
            notify_rx: rx,
            config: config,
            events: Events::with_capacity(1024),

    /// Returns a sender that allows sending messages to the event loop in a
    /// thread-safe way, waking up the event loop if needed.

    /// Each [EventLoop](#) contains a lock-free queue with a pre-allocated
    /// buffer size. The size can be changed by modifying
    /// [EventLoopConfig.notify_capacity](struct.EventLoopConfig.html#method.notify_capacity).
    /// When a message is sent to the EventLoop, it is first pushed on to the
    /// queue. Then, if the EventLoop is currently running, an atomic flag is
    /// set to indicate that the next loop iteration should be started without
    /// waiting.
    /// If the loop is blocked waiting for IO events, then it is woken up. The
    /// strategy for waking up the event loop is platform dependent. For
    /// example, on a modern Linux OS, eventfd is used. On older OSes, a pipe
    /// is used.
    /// The strategy of setting an atomic flag if the event loop is not already
    /// sleeping allows avoiding an expensive wakeup operation if at all possible.
    pub fn channel(&self) -> Sender<H::Message> {

    /// Schedules a timeout after the requested time interval. When the
    /// duration has been reached,
    /// [Handler::timeout](trait.Handler.html#method.timeout) will be invoked
    /// passing in the supplied token.
    /// Returns a handle to the timeout that can be used to cancel the timeout
    /// using [#clear_timeout](#method.clear_timeout).
    pub fn timeout(&mut self, token: H::Timeout, delay: Duration) -> timer::Result<Timeout> {
        self.timer.set_timeout(delay, token)

    /// If the supplied timeout has not been triggered, cancel it such that it
    /// will not be triggered in the future.
    pub fn clear_timeout(&mut self, timeout: &Timeout) -> bool {

    /// Tells the event loop to exit after it is done handling all events in the current iteration.
    pub fn shutdown(&mut self) { = false;

    /// Indicates whether the event loop is currently running. If it's not it has either
    /// stopped or is scheduled to stop on the next tick.
    pub fn is_running(&self) -> bool {

    /// Registers an IO handle with the event loop.
    pub fn register<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
        where E: Evented
        self.poll.register(io, token, interest, opt)

    /// Re-Registers an IO handle with the event loop.
    pub fn reregister<E: ?Sized>(&mut self, io: &E, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()>
        where E: Evented
        self.poll.reregister(io, token, interest, opt)

    /// Keep spinning the event loop indefinitely, and notify the handler whenever
    /// any of the registered handles are ready.
    pub fn run(&mut self, handler: &mut H) -> io::Result<()> { = true;

        while {
            // Execute ticks as long as the event loop is running
            self.run_once(handler, None)?;


    /// Deregisters an IO handle with the event loop.
    /// Both kqueue and epoll will automatically clear any pending events when closing a
    /// file descriptor (socket). In that case, this method does not need to be called
    /// prior to dropping a connection from the slab.
    /// Warning: kqueue effectively builds in deregister when using edge-triggered mode with
    /// oneshot. Calling `deregister()` on the socket will cause a TcpStream error.
    pub fn deregister<E: ?Sized>(&mut self, io: &E) -> io::Result<()> where E: Evented {

    /// Spin the event loop once, with a given timeout (forever if `None`),
    /// and notify the handler if any of the registered handles become ready
    /// during that time.
    pub fn run_once(&mut self, handler: &mut H, timeout: Option<Duration>) -> io::Result<()> {
        trace!("event loop tick");

        // Check the registered IO handles for any new events. Each poll
        // is for one second, so a shutdown request can last as long as
        // one second before it takes effect.
        let events = match self.io_poll(timeout) {
            Ok(e) => e,
            Err(err) => {
                if err.kind() == io::ErrorKind::Interrupted {
                } else {
                    return Err(err);

        self.io_process(handler, events);

    fn io_poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
        self.poll.poll(&mut, timeout)

    // Process IO events that have been previously polled
    fn io_process(&mut self, handler: &mut H, cnt: usize) {
        let mut i = 0;

        trace!("io_process(..); cnt={}; len={}", cnt,;

        // Iterate over the notifications. Each event provides the token
        // it was registered with (which usually represents, at least, the
        // handle that the event is about) as well as information about
        // what kind of event occurred (readable, writable, signal, etc.)
        while i < cnt {
            let evt =;

            trace!("event={:?}; idx={:?}", evt, i);

            match evt.token() {
                NOTIFY => self.notify(handler),
                TIMER => self.timer_process(handler),
                _ => self.io_event(handler, evt)

            i += 1;

    fn io_event(&mut self, handler: &mut H, evt: Event) {
        handler.ready(self, evt.token(), evt.readiness());

    fn notify(&mut self, handler: &mut H) {
        for _ in 0..self.config.messages_per_tick {
            match self.notify_rx.try_recv() {
                Ok(msg) => handler.notify(self, msg),
                _ => break,

        // Re-register
        let _ = self.poll.reregister(&self.notify_rx, NOTIFY, Ready::readable(), PollOpt::edge() | PollOpt::oneshot());

    fn timer_process(&mut self, handler: &mut H) {
        while let Some(t) = self.timer.poll() {
            handler.timeout(self, t);
pub struct Selector {
    id: usize,
    epfd: RawFd,

impl Selector {
    pub fn new() -> io::Result<Selector> {
        let epfd = unsafe {
            // Emulate `epoll_create` by using `epoll_create1` if it's available
            // and otherwise falling back to `epoll_create` followed by a call to
            // set the CLOEXEC flag.
            dlsym!(fn epoll_create1(c_int) -> c_int);

            match epoll_create1.get() {
                Some(epoll_create1_fn) => {
                None => {
                    let fd = cvt(libc::epoll_create(1024))?;

        // offset by 1 to avoid choosing 0 as the id of a selector
        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;

        Ok(Selector {
            id: id,
            epfd: epfd,

    pub fn id(&self) -> usize {

    /// Wait for events from the OS
    pub fn select(&self, evts: &mut Events, awakener: Token, timeout: Option<Duration>) -> io::Result<bool> {
        let timeout_ms = timeout
            .map(|to| cmp::min(millis(to), i32::MAX as u64) as i32)

        // Wait for epoll events for at most timeout_ms milliseconds
        unsafe {
            let cnt = cvt(libc::epoll_wait(self.epfd,
                                  as i32,
            let cnt = cnt as usize;

            for i in 0..cnt {
                if[i].u64 as usize == awakener.into() {
                    return Ok(true);


    /// Register event interests for the given IO handle with the OS
    pub fn register(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
        let mut info = libc::epoll_event {
            events: ioevent_to_epoll(interests, opts),
            u64: usize::from(token) as u64

        unsafe {
            cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd, &mut info))?;

    /// Register event interests for the given IO handle with the OS
    pub fn reregister(&self, fd: RawFd, token: Token, interests: Ready, opts: PollOpt) -> io::Result<()> {
        let mut info = libc::epoll_event {
            events: ioevent_to_epoll(interests, opts),
            u64: usize::from(token) as u64

        unsafe {
            cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_MOD, fd, &mut info))?;

    /// Deregister event interests for the given IO handle with the OS
    pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
        // The &info argument should be ignored by the system,
        // but linux < 2.6.9 required it to be not null.
        // For compatibility, we provide a dummy EpollEvent.
        let mut info = libc::epoll_event {
            events: 0,
            u64: 0,

        unsafe {
            cvt(libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, fd, &mut info))?;


