reactive_graph_sys_file/behaviour/component/fs_notify.rs
1use log::trace;
2use reactive_graph_behaviour_model_api::behaviour_validator;
3use reactive_graph_behaviour_model_api::prelude::*;
4use reactive_graph_behaviour_model_impl::entity_behaviour;
5use reactive_graph_graph::prelude::*;
6use reactive_graph_reactive_model_impl::ReactiveEntity;
7use serde_json::json;
8use uuid::Uuid;
9
10use reactive_graph_runtime_model::ActionProperties::TRIGGER;
11use reactive_graph_sys_file_model::FileProperties::FILENAME;
12
13use std::path::Path;
14use std::time::Duration;
15
16use crossbeam::channel::unbounded;
17use notify::Config;
18use notify::Event;
19use notify::RecommendedWatcher;
20use notify::RecursiveMode;
21use notify::Watcher;
22
23entity_behaviour!(FsNotify, FsNotifyFactory, FsNotifyFsm, FsNotifyBehaviourTransitions, FsNotifyValidator);
24
25behaviour_validator!(FsNotifyValidator, Uuid, ReactiveEntity, TRIGGER.as_ref(), FILENAME.as_ref());
26
27impl BehaviourInit<Uuid, ReactiveEntity> for FsNotifyBehaviourTransitions {}
28
29impl BehaviourConnect<Uuid, ReactiveEntity> for FsNotifyBehaviourTransitions {
30 fn connect(&self) -> Result<(), BehaviourConnectFailed> {
31 if let Some(filename) = self.reactive_instance.get(FILENAME).and_then(|v| v.as_str().map(String::from)) {
32 let filename = shellexpand::tilde(&filename);
33 let path = Path::new(filename.as_ref()).to_owned();
34
35 // TODO: disconnect -> stopper
36 // let (stopper_tx, stopper_rx) = unbounded();
37 let (notify_tx, notify_rx) = unbounded();
38
39 let mut watcher: RecommendedWatcher = RecommendedWatcher::new(
40 move |result: Result<Event, notify::Error>| {
41 let _ = notify_tx.send(result);
42 },
43 Config::default(),
44 )
45 .map_err(|_| BehaviourConnectFailed {})?;
46 watcher.watch(&path, RecursiveMode::NonRecursive).map_err(|_| BehaviourConnectFailed {})?;
47
48 let reactive_instance = self.reactive_instance.clone();
49 async_std::task::spawn(async move {
50 loop {
51 if let Ok(Ok(_notify_event)) = notify_rx.try_recv() {
52 trace!("{:?} has changed", &path);
53 reactive_instance.set(TRIGGER, json!(true));
54 }
55 async_std::task::sleep(Duration::from_millis(1000)).await;
56 // TODO: disconnect -> stopper
57 // match stopper_rx.try_recv() {
58 // // Stop thread
59 // Ok(_) => break,
60 // Err(_) => std::thread::sleep(Duration::from_millis(1000)),
61 // }
62 }
63 // TODO: disconnect -> unwatch
64 // if let Err(err) = watcher.unwatch(&path) {
65 // error!("Failed to unwatch {:?}: {:?}", &path, err);
66 // }
67 });
68 }
69
70 // let reactive_instance = self.reactive_instance.clone();
71 // self.property_observers.observe_with_handle(TRIGGER.as_ref(), move |v: &Value| {
72 // if !v.is_boolean() {
73 // // Invalid type
74 // return;
75 // }
76 // if !v.as_bool().unwrap() {
77 // // Counter only on true (= high)
78 // return;
79 // }
80 // match reactive_instance.get(RESULT).and_then(|v| v.as_i64()) {
81 // Some(current_value) => {
82 // reactive_instance.set(RESULT, json!(current_value + 1));
83 // }
84 // None => {
85 // reactive_instance.set(RESULT, json!(0));
86 // }
87 // }
88 // });
89 Ok(())
90 }
91}
92
93// impl BehaviourDisconnect<ReactiveEntityInstance> for FsNotifyBehaviourTransitions {
94// fn disconnect(&self) -> Result<(), BehaviourDisconnectFailed> {
95// // TODO: stopper -> stop thread
96// OK(())
97// }
98// }
99
100impl BehaviourShutdown<Uuid, ReactiveEntity> for FsNotifyBehaviourTransitions {}
101
102impl BehaviourTransitions<Uuid, ReactiveEntity> for FsNotifyBehaviourTransitions {}
103
104//
105// use crate::behaviour::component::FsNotifyProperties;
106// use reactive_graph_graph::PropertyInstanceSetter;
107// use reactive_graph_graph::ReactiveEntityInstance;
108// use crate::reactive::entity::Disconnectable;
109// use crate::reactive::BehaviourCreationError;
110//
111// pub const FS_NOTIFY: &str = "fs_notify";
112//
113// pub struct FsNotify {
114// pub entity: Arc<ReactiveEntityInstance>,
115//
116// stopper_tx: Sender<()>,
117// }
118//
119// impl FsNotify {
120// pub fn new(e: Arc<ReactiveEntityInstance>, runtime: &Handle) -> Result<FsNotify, BehaviourCreationError> {
121// let filename = e
122// .properties
123// .get(FsNotifyProperties::FILENAME.as_ref())
124// .ok_or(BehaviourCreationError)?
125// .as_string()
126// .ok_or(BehaviourCreationError)?;
127// let filename = shellexpand::tilde(&filename);
128// let path = Path::new(filename.as_ref()).to_owned();
129// let _ = e.properties.get(FsNotifyProperties::TRIGGER.as_ref()).ok_or(BehaviourCreationError)?;
130//
131// let (stopper_tx, stopper_rx) = unbounded();
132// let (notify_tx, notify_rx) = unbounded();
133//
134// let mut watcher: RecommendedWatcher = RecommendedWatcher::new(
135// move |result: std::result::Result<Event, notify::Error>| {
136// let _ = notify_tx.send(result);
137// },
138// Config::default(),
139// )
140// .map_err(|_| BehaviourCreationError)?;
141// watcher.watch(&path, RecursiveMode::NonRecursive).map_err(|_| BehaviourCreationError)?;
142//
143// let entity = e.clone();
144// runtime.spawn(async move {
145// loop {
146// if let Ok(Ok(_notify_event)) = notify_rx.try_recv() {
147// trace!("{:?} has changed", &path);
148// entity.set(FsNotifyProperties::TRIGGER, json!(true));
149// }
150// match stopper_rx.try_recv() {
151// // Stop thread
152// Ok(_) => break,
153// Err(_) => std::thread::sleep(Duration::from_millis(1000)),
154// }
155// }
156// if let Err(err) = watcher.unwatch(&path) {
157// error!("Failed to unwatch {:?}: {:?}", &path, err);
158// }
159// });
160// Ok(FsNotify { entity: e, stopper_tx })
161// }
162//
163// pub fn unwatch(&self) {
164// trace!("Stop watching {} with id {}", FS_NOTIFY, self.entity.id);
165// let _ = self.stopper_tx.send(());
166// }
167// }
168//
169// impl Disconnectable for FsNotify {
170// fn disconnect(&self) {
171// self.unwatch();
172// }
173// }
174//
175// impl Drop for FsNotify {
176// fn drop(&mut self) {
177// self.disconnect();
178// }
179// }