Я реализовал простой NFA который распознает слова параллельно. Весь код на GitHub, здесь.
Основное
recognizesметод сообщает, еслиword(&str) признан NFA. Внутри этого метода я настроил пул потоков и канал и позвонитьrecognizes_in_parallelметод.В
recognizes_in_parallelзанимаетwordиstateи обрабатывает две вещи:- Он порождает новый
threadкаждый раз, когда функция перехода приводит к более чем одному состоянию, т. е. когда возникает недетерминизм. - Он сообщает, если начать с
stateаргумент, автомат распознаетwordаргумент.
- Он порождает новый
В
recognizes_in_parallelметод отправляет результат попытки распознать слово по каналу.Затем основной поток получает до тех пор, пока
trueотправлено, или когда все потоки повесили трубку или отправилиfalse.
Я очень новичок в Ржавчина, только что закончил читать Книга Ржавчины пару недель назад, и я хочу знать, можно ли улучшить читаемость, эффективность и организованность этого кода.
Можно ли написать качественные модульные тесты для распараллеливания? Могу ли я здесь улучшить использование замков? Могу ли я написать менее подробную инициализацию? Я думаю, что злоупотребил Copy а также Clone черт характера, есть ли способ уменьшить их использование?
Следует помнить о двух вещах:
ThreadPoolне являетсяSync, поэтому нам нуженMutex(или так я понял).- Я не понимаю, почему следующее изменение нарушает код, т.е. не останавливается.
// Works!!
pool.lock().unwrap().execute(move || {
root_nfa.recognize_in_parallel(&word[..], root_nfa.start, tx, pool_clone);
});
---
// Does not work :(
let pool = pool.lock().unwrap();
pool.execute(move || {
root_nfa.recognize_in_parallel(&word[..], root_nfa.start, tx, pool_clone);
});
Наконец, вот соответствующий код:
// state.rs
#[cfg(test)]
#[path = "./state.test.rs"]
mod tests;
#[derive(PartialEq, Eq, Debug, Hash, Copy, Clone)]
pub struct State {
pub id: i32,
}
impl State {
pub fn new(id: i32) -> State {
State { id }
}
}
// symbol.rs
#[derive(PartialEq, Eq, Debug, Hash, Copy, Clone)]
pub enum Symbol {
Epsilon,
Identifier(char),
}
// nfa.rs
use crate::{state::State, symbol::Symbol};
use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc, Mutex};
use threadpool::ThreadPool;
#[cfg(test)]
#[path = "./nfa.test.rs"]
mod tests;
const WORKER_COUNT: usize = 1024;
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct NFA {
states: HashSet<State>,
alphabet: HashSet<Symbol>,
start: State,
transition_function: HashMap<(State, Symbol), HashSet<State>>,
accepting_states: HashSet<State>,
}
impl NFA {
pub fn new(
states: HashSet<State>,
alphabet: HashSet<Symbol>,
start: State,
transition_function: HashMap<(State, Symbol), HashSet<State>>,
accepting_states: HashSet<State>,
) -> NFA {
NFA {
states,
alphabet,
start,
transition_function,
accepting_states,
}
}
pub fn add_transition(
&mut self,
source_state: State,
symbol: Symbol,
destination_states: HashSet<State>,
) {
self.transition_function
.insert((source_state, symbol), destination_states);
}
fn step(&self, ch: char, state: State) -> HashSet<State> {
let symbol_states = self
.transition_function
.get(&(state, Symbol::Identifier(ch)))
.expect(&format!("No transition found for ({:?}, {:?})", state, ch));
match self.transition_function.get(&(state, Symbol::Epsilon)) {
Some(set) => symbol_states.union(set).cloned().collect(),
None => symbol_states.to_owned(),
}
}
fn recognize_in_parallel<'a>(
&self,
word: &str,
state: State,
tx: mpsc::Sender<bool>,
pool: Arc<Mutex<ThreadPool>>,
) {
let mut state = state;
for ch in word.chars() {
let next_states = self.step(ch, state);
if next_states.len() == 1 {
state = *next_states.iter().next().unwrap();
} else {
let mut next_states = next_states.iter();
state = *next_states.next().unwrap();
for &state in next_states {
let tx = tx.clone();
let child_nfa = self.clone();
let word = word.to_owned();
let pool_clone = Arc::clone(&pool);
pool.lock().unwrap().execute(move || {
child_nfa.recognize_in_parallel(&word[1..], state, tx, pool_clone);
})
}
}
}
tx.send(self.accepting_states.contains(&state)).unwrap();
}
pub fn recognizes(&self, word: &str) -> bool {
let pool = threadpool::ThreadPool::new(WORKER_COUNT);
let pool = Arc::new(Mutex::new(pool));
let (tx, rx) = mpsc::channel();
let root_nfa = self.clone();
let word = word.to_owned();
let pool_clone = Arc::clone(&pool);
pool.lock().unwrap().execute(move || {
root_nfa.recognize_in_parallel(&word[..], root_nfa.start, tx, pool_clone);
});
let did_recognize = rx.iter().any(|did_recognize| did_recognize);
// If we don't join here, `rx` would be droped
// and a thread might try to send to a closed channel.
pool.lock().unwrap().join();
did_recognize
}
}
