reactive_graph_sys_file/behaviour/component/
fs_notify.rs

1use log::trace;
2use reactive_graph_behaviour_model_api::behaviour_validator;
3use reactive_graph_behaviour_model_api::prelude::*;
4use reactive_graph_behaviour_model_impl::entity_behaviour;
5use reactive_graph_graph::prelude::*;
6use reactive_graph_reactive_model_impl::ReactiveEntity;
7use serde_json::json;
8use uuid::Uuid;
9
10use reactive_graph_runtime_model::ActionProperties::TRIGGER;
11use reactive_graph_sys_file_model::FileProperties::FILENAME;
12
13use std::path::Path;
14use std::time::Duration;
15
16use crossbeam::channel::unbounded;
17use notify::Config;
18use notify::Event;
19use notify::RecommendedWatcher;
20use notify::RecursiveMode;
21use notify::Watcher;
22
23entity_behaviour!(FsNotify, FsNotifyFactory, FsNotifyFsm, FsNotifyBehaviourTransitions, FsNotifyValidator);
24
25behaviour_validator!(FsNotifyValidator, Uuid, ReactiveEntity, TRIGGER.as_ref(), FILENAME.as_ref());
26
27impl BehaviourInit<Uuid, ReactiveEntity> for FsNotifyBehaviourTransitions {}
28
29impl BehaviourConnect<Uuid, ReactiveEntity> for FsNotifyBehaviourTransitions {
30    fn connect(&self) -> Result<(), BehaviourConnectFailed> {
31        if let Some(filename) = self.reactive_instance.get(FILENAME).and_then(|v| v.as_str().map(String::from)) {
32            let filename = shellexpand::tilde(&filename);
33            let path = Path::new(filename.as_ref()).to_owned();
34
35            // TODO: disconnect -> stopper
36            // let (stopper_tx, stopper_rx) = unbounded();
37            let (notify_tx, notify_rx) = unbounded();
38
39            let mut watcher: RecommendedWatcher = RecommendedWatcher::new(
40                move |result: Result<Event, notify::Error>| {
41                    let _ = notify_tx.send(result);
42                },
43                Config::default(),
44            )
45            .map_err(|_| BehaviourConnectFailed {})?;
46            watcher.watch(&path, RecursiveMode::NonRecursive).map_err(|_| BehaviourConnectFailed {})?;
47
48            let reactive_instance = self.reactive_instance.clone();
49            async_std::task::spawn(async move {
50                loop {
51                    if let Ok(Ok(_notify_event)) = notify_rx.try_recv() {
52                        trace!("{:?} has changed", &path);
53                        reactive_instance.set(TRIGGER, json!(true));
54                    }
55                    async_std::task::sleep(Duration::from_millis(1000)).await;
56                    // TODO: disconnect -> stopper
57                    // match stopper_rx.try_recv() {
58                    //     // Stop thread
59                    //     Ok(_) => break,
60                    //     Err(_) => std::thread::sleep(Duration::from_millis(1000)),
61                    // }
62                }
63                // TODO: disconnect -> unwatch
64                // if let Err(err) = watcher.unwatch(&path) {
65                //     error!("Failed to unwatch {:?}: {:?}", &path, err);
66                // }
67            });
68        }
69
70        // let reactive_instance = self.reactive_instance.clone();
71        // self.property_observers.observe_with_handle(TRIGGER.as_ref(), move |v: &Value| {
72        //     if !v.is_boolean() {
73        //         // Invalid type
74        //         return;
75        //     }
76        //     if !v.as_bool().unwrap() {
77        //         // Counter only on true (= high)
78        //         return;
79        //     }
80        //     match reactive_instance.get(RESULT).and_then(|v| v.as_i64()) {
81        //         Some(current_value) => {
82        //             reactive_instance.set(RESULT, json!(current_value + 1));
83        //         }
84        //         None => {
85        //             reactive_instance.set(RESULT, json!(0));
86        //         }
87        //     }
88        // });
89        Ok(())
90    }
91}
92
93// impl BehaviourDisconnect<ReactiveEntityInstance> for FsNotifyBehaviourTransitions {
94//     fn disconnect(&self) -> Result<(), BehaviourDisconnectFailed> {
95//         // TODO: stopper -> stop thread
96//         OK(())
97//     }
98// }
99
100impl BehaviourShutdown<Uuid, ReactiveEntity> for FsNotifyBehaviourTransitions {}
101
102impl BehaviourTransitions<Uuid, ReactiveEntity> for FsNotifyBehaviourTransitions {}
103
104//
105// use crate::behaviour::component::FsNotifyProperties;
106// use reactive_graph_graph::PropertyInstanceSetter;
107// use reactive_graph_graph::ReactiveEntityInstance;
108// use crate::reactive::entity::Disconnectable;
109// use crate::reactive::BehaviourCreationError;
110//
111// pub const FS_NOTIFY: &str = "fs_notify";
112//
113// pub struct FsNotify {
114//     pub entity: Arc<ReactiveEntityInstance>,
115//
116//     stopper_tx: Sender<()>,
117// }
118//
119// impl FsNotify {
120//     pub fn new(e: Arc<ReactiveEntityInstance>, runtime: &Handle) -> Result<FsNotify, BehaviourCreationError> {
121//         let filename = e
122//             .properties
123//             .get(FsNotifyProperties::FILENAME.as_ref())
124//             .ok_or(BehaviourCreationError)?
125//             .as_string()
126//             .ok_or(BehaviourCreationError)?;
127//         let filename = shellexpand::tilde(&filename);
128//         let path = Path::new(filename.as_ref()).to_owned();
129//         let _ = e.properties.get(FsNotifyProperties::TRIGGER.as_ref()).ok_or(BehaviourCreationError)?;
130//
131//         let (stopper_tx, stopper_rx) = unbounded();
132//         let (notify_tx, notify_rx) = unbounded();
133//
134//         let mut watcher: RecommendedWatcher = RecommendedWatcher::new(
135//             move |result: std::result::Result<Event, notify::Error>| {
136//                 let _ = notify_tx.send(result);
137//             },
138//             Config::default(),
139//         )
140//         .map_err(|_| BehaviourCreationError)?;
141//         watcher.watch(&path, RecursiveMode::NonRecursive).map_err(|_| BehaviourCreationError)?;
142//
143//         let entity = e.clone();
144//         runtime.spawn(async move {
145//             loop {
146//                 if let Ok(Ok(_notify_event)) = notify_rx.try_recv() {
147//                     trace!("{:?} has changed", &path);
148//                     entity.set(FsNotifyProperties::TRIGGER, json!(true));
149//                 }
150//                 match stopper_rx.try_recv() {
151//                     // Stop thread
152//                     Ok(_) => break,
153//                     Err(_) => std::thread::sleep(Duration::from_millis(1000)),
154//                 }
155//             }
156//             if let Err(err) = watcher.unwatch(&path) {
157//                 error!("Failed to unwatch {:?}: {:?}", &path, err);
158//             }
159//         });
160//         Ok(FsNotify { entity: e, stopper_tx })
161//     }
162//
163//     pub fn unwatch(&self) {
164//         trace!("Stop watching {} with id {}", FS_NOTIFY, self.entity.id);
165//         let _ = self.stopper_tx.send(());
166//     }
167// }
168//
169// impl Disconnectable for FsNotify {
170//     fn disconnect(&self) {
171//         self.unwatch();
172//     }
173// }
174//
175// impl Drop for FsNotify {
176//     fn drop(&mut self) {
177//         self.disconnect();
178//     }
179// }