1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
use std::{
io,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, SyncSender},
Arc,
},
task::{Context, Poll},
thread,
time::Duration,
};
use futures_core::stream::Stream;
use crate::event::{
filter::EventFilter, lock_internal_event_reader, poll_internal, read_internal, sys::Waker,
Event, InternalEvent,
};
/// A stream of `Result<Event>`.
///
/// **This type is not available by default. You have to use the `event-stream` feature flag
/// to make it available.**
///
/// It implements the [Stream](futures_core::stream::Stream)
/// trait and allows you to receive [`Event`]s with [`async-std`](https://crates.io/crates/async-std)
/// or [`tokio`](https://crates.io/crates/tokio) crates.
///
/// Check the [examples](https://github.com/crossterm-rs/crossterm/tree/master/examples) folder to see how to use
/// it (`event-stream-*`).
#[derive(Debug)]
pub struct EventStream {
poll_internal_waker: Waker,
stream_wake_task_executed: Arc<AtomicBool>,
stream_wake_task_should_shutdown: Arc<AtomicBool>,
task_sender: SyncSender<Task>,
}
impl Default for EventStream {
fn default() -> Self {
let (task_sender, receiver) = mpsc::sync_channel::<Task>(1);
thread::spawn(move || {
while let Ok(task) = receiver.recv() {
loop {
if let Ok(true) = poll_internal(None, &EventFilter) {
break;
}
if task.stream_wake_task_should_shutdown.load(Ordering::SeqCst) {
break;
}
}
task.stream_wake_task_executed
.store(false, Ordering::SeqCst);
task.stream_waker.wake();
}
});
EventStream {
poll_internal_waker: lock_internal_event_reader().waker(),
stream_wake_task_executed: Arc::new(AtomicBool::new(false)),
stream_wake_task_should_shutdown: Arc::new(AtomicBool::new(false)),
task_sender,
}
}
}
impl EventStream {
/// Constructs a new instance of `EventStream`.
pub fn new() -> EventStream {
EventStream::default()
}
}
struct Task {
stream_waker: std::task::Waker,
stream_wake_task_executed: Arc<AtomicBool>,
stream_wake_task_should_shutdown: Arc<AtomicBool>,
}
// Note to future me
//
// We need two wakers in order to implement EventStream correctly.
//
// 1. futures::Stream waker
//
// Stream::poll_next can return Poll::Pending which means that there's no
// event available. We are going to spawn a thread with the
// poll_internal(None, &EventFilter) call. This call blocks until an
// event is available and then we have to wake up the executor with notification
// that the task can be resumed.
//
// 2. poll_internal waker
//
// There's no event available, Poll::Pending was returned, stream waker thread
// is up and sitting in the poll_internal. User wants to drop the EventStream.
// We have to wake up the poll_internal (force it to return Ok(false)) and quit
// the thread before we drop.
impl Stream for EventStream {
type Item = io::Result<Event>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let result = match poll_internal(Some(Duration::from_secs(0)), &EventFilter) {
Ok(true) => match read_internal(&EventFilter) {
Ok(InternalEvent::Event(event)) => Poll::Ready(Some(Ok(event))),
Err(e) => Poll::Ready(Some(Err(e))),
#[cfg(unix)]
_ => unreachable!(),
},
Ok(false) => {
if !self
.stream_wake_task_executed
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
// https://github.com/rust-lang/rust/issues/80486#issuecomment-752244166
.unwrap_or_else(|x| x)
{
let stream_waker = cx.waker().clone();
let stream_wake_task_executed = self.stream_wake_task_executed.clone();
let stream_wake_task_should_shutdown =
self.stream_wake_task_should_shutdown.clone();
stream_wake_task_should_shutdown.store(false, Ordering::SeqCst);
let _ = self.task_sender.send(Task {
stream_waker,
stream_wake_task_executed,
stream_wake_task_should_shutdown,
});
}
Poll::Pending
}
Err(e) => Poll::Ready(Some(Err(e))),
};
result
}
}
impl Drop for EventStream {
fn drop(&mut self) {
self.stream_wake_task_should_shutdown
.store(true, Ordering::SeqCst);
let _ = self.poll_internal_waker.wake();
}
}