Created
April 6, 2016 16:26
-
-
Save anonymous/8f7db780b338db6fa07eafff8a03873a to your computer and use it in GitHub Desktop.
Shared via Rust Playground
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! This code demonstrates a data flow-like programming pattern, in which each | |
//! vertex of a DAG is associated with a long-running thead. The vertices | |
//! forward information along the edges, and the leaves just print out any data | |
//! they see. In this example, the vertices perform no computation, whereas | |
//! normally they would. | |
use std::thread; | |
use std::sync; | |
use std::sync::mpsc; | |
use std::collections::HashMap; | |
/// Config holds information about the graph, and potentially other pieces of | |
/// information useful for the computation. | |
struct Config { | |
tree: HashMap<u64, Vec<u64>>, | |
} | |
/// Manager keeps track of the vertex threads, and possibly stores other shared | |
/// state like database connection pools, etc. The Manager will resist dropping | |
/// until all the vertex threads have terminated. | |
struct Manager { | |
config: Config, | |
wait: sync::Mutex<Vec<thread::JoinHandle<()>>>, | |
} | |
impl Manager { | |
/// start() sets up channels for all the edges in the graph, and spawns all | |
/// the vertex threads. It returns channel send endpoints for all the roots | |
/// of the DAG to allow users to inject data into the data flow graph. | |
fn start(&self) -> HashMap<u64, mpsc::SyncSender<u64>> { | |
// root input channels | |
let mut sources = HashMap::new(); | |
// edge send channels | |
let mut feed_forward = HashMap::new(); | |
// edge read channels | |
let mut inputs = HashMap::<u64, mpsc::Receiver<u64>>::new(); | |
for (thing, depends_on) in self.config.tree.iter() { | |
let (send_input, recv_input) = mpsc::sync_channel(0); | |
// register inputs to this vertex | |
if depends_on.is_empty() { | |
sources.insert(thing.to_owned(), send_input); | |
} else { | |
for d in depends_on.iter() { | |
if !feed_forward.contains_key(d) { | |
feed_forward.insert(d, Vec::new()); | |
} | |
feed_forward.get_mut(d).unwrap().push(send_input.to_owned()); | |
} | |
} | |
// store the input end of our incoming edges so "our" thread can | |
// read from it later. | |
inputs.insert(thing.to_owned(), recv_input); | |
} | |
let mut w = self.wait.lock().unwrap(); | |
w.extend(inputs.into_iter() | |
.map(|(thing, recv_input)| { | |
// And now, the problem: | |
// | |
// We want to let these threads borrow Manager. | |
// Could be many reasons for this, such as allowing | |
// them to read the config. This should be safe, | |
// because Manager waits for all the threads before | |
// allowing itself to be dropped. | |
// | |
// Possible solutions: | |
// - could make Manager be a sync::Arc, but then | |
// drop() won't be called since each thread would | |
// have outstanding strong references to the | |
// Manager. | |
// - could do the above, but give threads sync::Weak. | |
// However, then we incur the unnecessary overhead | |
// of modifying ref count all the time. It also | |
// adds extra code for upgrades everywhere. | |
// - could do the above, but have threads cast | |
// upgraded weak to *const on startup. We could | |
// then unsafe cast back to &Manager. | |
// It's ugly, but it works with no overhead. | |
// | |
// Really though, I should be able to just pass | |
// `&'a self` to the threads, and have the resulting | |
// threads be limited to lifetime 'a. bwchk should | |
// make sure that the JoinHandle is joined against | |
// before 'a ends. This probably requires JoinHandle | |
// (or a variant thereof) to join on drop(). | |
// Find all our output edges | |
let downstream = feed_forward.remove(&thing); | |
thread::spawn(move || { | |
let downstream = downstream; | |
// Keep reading inputs | |
for i in recv_input.iter() { | |
match downstream { | |
// If we have outgoing edges, forward | |
Some(ref chs) => { | |
for ch in chs.iter() { | |
ch.send(i.clone()).unwrap(); | |
} | |
} | |
// Otherwise, print | |
None => println!("leaf {} got {}", thing, i), | |
} | |
} | |
}) | |
})); | |
sources | |
} | |
} | |
impl Drop for Manager { | |
/// The Sender returned from start() must be dropped before the Manager, | |
/// otherwise the threads will never terminate, and the Manager will never | |
/// be dropped. | |
fn drop(&mut self) { | |
let mut ws = self.wait.lock().unwrap(); | |
ws.drain(..).map(|w| w.join()).count(); | |
} | |
} | |
fn main() { | |
let mut tree = HashMap::new(); | |
tree.insert(111, vec![10, 11]); | |
tree.insert(10, vec![1, 2]); | |
tree.insert(11, vec![3]); | |
tree.insert(1, vec![]); | |
tree.insert(2, vec![]); | |
tree.insert(3, vec![]); | |
let m = Manager { | |
config: Config { tree: tree }, | |
wait: sync::Mutex::new(Vec::new()), | |
}; | |
let s = m.start(); | |
s[&1].send(9000).unwrap(); | |
// Need to drop root inputs before dropping manager. | |
// This drains all the edges, causing all the threads to exit. | |
drop(s); | |
drop(m); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment