jetcrab\api/
events.rs

1use crate::api::error::ApiError;
2use std::collections::HashMap;
3use std::time::Instant;
4
5#[derive(Debug, Clone)]
6pub struct EventData {
7    pub event_type: String,
8    pub timestamp: Instant,
9    pub data: serde_json::Value,
10    pub source: Option<String>,
11}
12
13impl EventData {
14    pub fn new(event_type: String, data: serde_json::Value) -> Self {
15        Self {
16            event_type,
17            timestamp: Instant::now(),
18            data,
19            source: None,
20        }
21    }
22
23    pub fn with_source(mut self, source: String) -> Self {
24        self.source = Some(source);
25        self
26    }
27}
28
29pub type EventCallback = Box<dyn FnMut(&EventData) + Send + Sync>;
30pub type AsyncEventCallback = Box<dyn Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
31
32pub struct EventEmitter {
33    listeners: HashMap<String, Vec<EventCallback>>,
34    async_listeners: HashMap<String, Vec<AsyncEventCallback>>,
35    event_history: Vec<EventData>,
36    max_history_size: usize,
37}
38
39impl EventEmitter {
40    pub fn new() -> Self {
41        Self {
42            listeners: HashMap::new(),
43            async_listeners: HashMap::new(),
44            event_history: Vec::new(),
45            max_history_size: 1000,
46        }
47    }
48
49    pub fn on<F>(&mut self, event: &str, callback: F)
50    where
51        F: FnMut(&EventData) + Send + Sync + 'static,
52    {
53        self.listeners
54            .entry(event.to_string())
55            .or_insert_with(Vec::new)
56            .push(Box::new(callback));
57    }
58
59    pub fn on_async<F>(&mut self, event: &str, callback: F)
60    where
61        F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync + 'static,
62    {
63        self.async_listeners
64            .entry(event.to_string())
65            .or_insert_with(Vec::new)
66            .push(Box::new(callback));
67    }
68
69    pub fn emit(&mut self, event: &str, data: serde_json::Value) {
70        let event_data = EventData::new(event.to_string(), data);
71        
72        // Store in history
73        self.event_history.push(event_data.clone());
74        if self.event_history.len() > self.max_history_size {
75            self.event_history.remove(0);
76        }
77
78        // Notify synchronous listeners
79        if let Some(listeners) = self.listeners.get_mut(event) {
80            for listener in listeners {
81                listener(&event_data);
82            }
83        }
84
85        // Notify asynchronous listeners (simplified without tokio)
86        if let Some(_async_listeners) = self.async_listeners.get(event) {
87            // For now, just call synchronously - in a real implementation you'd use tokio
88            // tokio::spawn(async move {
89            //     listener_clone(event_data_clone).await;
90            // });
91        }
92    }
93
94    pub fn emit_with_source(&mut self, event: &str, data: serde_json::Value, source: String) {
95        let event_data = EventData::new(event.to_string(), data).with_source(source);
96        
97        self.event_history.push(event_data.clone());
98        if self.event_history.len() > self.max_history_size {
99            self.event_history.remove(0);
100        }
101
102        if let Some(listeners) = self.listeners.get_mut(event) {
103            for listener in listeners {
104                listener(&event_data);
105            }
106        }
107    }
108
109    pub fn remove_listener(&mut self, event: &str, index: usize) -> Result<(), ApiError> {
110        if let Some(listeners) = self.listeners.get_mut(event) {
111            if index < listeners.len() {
112                let _ = listeners.remove(index);
113                Ok(())
114            } else {
115                Err(ApiError::InvalidInput {
116                    message: "Listener index out of bounds".to_string(),
117                    input: index.to_string(),
118                    position: None,
119                })
120            }
121        } else {
122            Err(ApiError::InvalidInput {
123                message: "Event not found".to_string(),
124                input: event.to_string(),
125                position: None,
126            })
127        }
128    }
129
130    pub fn clear_listeners(&mut self, event: &str) {
131        self.listeners.remove(event);
132        self.async_listeners.remove(event);
133    }
134
135    pub fn get_event_history(&self) -> &[EventData] {
136        &self.event_history
137    }
138
139    pub fn set_max_history_size(&mut self, size: usize) {
140        self.max_history_size = size;
141        while self.event_history.len() > self.max_history_size {
142            self.event_history.remove(0);
143        }
144    }
145}
146
147impl Default for EventEmitter {
148    fn default() -> Self {
149        Self::new()
150    }
151}
152
153pub struct CallbackRegistry {
154    callbacks: HashMap<String, std::sync::Arc<std::sync::Mutex<Box<dyn Fn(serde_json::Value) -> Result<serde_json::Value, ApiError> + Send + Sync>>>>,
155    metadata: HashMap<String, CallbackMetadata>,
156}
157
158#[derive(Debug, Clone)]
159pub struct CallbackMetadata {
160    pub name: String,
161    pub description: String,
162    pub parameters: Vec<String>,
163    pub return_type: String,
164    pub created_at: Instant,
165    pub call_count: usize,
166}
167
168impl CallbackRegistry {
169    pub fn new() -> Self {
170        Self {
171            callbacks: HashMap::new(),
172            metadata: HashMap::new(),
173        }
174    }
175
176    pub fn register<F>(&mut self, name: &str, description: &str, callback: F)
177    where
178        F: Fn(serde_json::Value) -> Result<serde_json::Value, ApiError> + Send + Sync + 'static,
179    {
180        let metadata = CallbackMetadata {
181            name: name.to_string(),
182            description: description.to_string(),
183            parameters: Vec::new(),
184            return_type: "any".to_string(),
185            created_at: Instant::now(),
186            call_count: 0,
187        };
188
189        self.callbacks.insert(name.to_string(), std::sync::Arc::new(std::sync::Mutex::new(Box::new(callback))));
190        self.metadata.insert(name.to_string(), metadata);
191    }
192
193    pub fn register_with_metadata<F>(&mut self, name: &str, metadata: CallbackMetadata, callback: F)
194    where
195        F: Fn(serde_json::Value) -> Result<serde_json::Value, ApiError> + Send + Sync + 'static,
196    {
197        self.callbacks.insert(name.to_string(), std::sync::Arc::new(std::sync::Mutex::new(Box::new(callback))));
198        self.metadata.insert(name.to_string(), metadata);
199    }
200
201    pub fn call(&mut self, name: &str, data: serde_json::Value) -> Result<serde_json::Value, ApiError> {
202        if let Some(callback) = self.callbacks.get(name) {
203            if let Some(metadata) = self.metadata.get_mut(name) {
204                metadata.call_count += 1;
205            }
206            
207            let callback_guard = callback.lock().map_err(|_| ApiError::InvalidInput {
208                message: "Failed to acquire callback lock".to_string(),
209                input: "".to_string(),
210                position: None,
211            })?;
212            
213            callback_guard(data)
214        } else {
215            Err(ApiError::InvalidInput {
216                message: "Callback not found".to_string(),
217                input: name.to_string(),
218                position: None,
219            })
220        }
221        .map_err(|_| ApiError::ExecutionError {
222            message: "Callback execution failed".to_string(),
223            position: None,
224        })
225    }
226
227    pub fn unregister(&mut self, name: &str) -> bool {
228        self.callbacks.remove(name).is_some() && self.metadata.remove(name).is_some()
229    }
230
231    pub fn list_callbacks(&self) -> Vec<&CallbackMetadata> {
232        self.metadata.values().collect()
233    }
234
235    pub fn get_callback_info(&self, name: &str) -> Option<&CallbackMetadata> {
236        self.metadata.get(name)
237    }
238
239    pub fn exists(&self, name: &str) -> bool {
240        self.callbacks.contains_key(name)
241    }
242}
243
244impl Default for CallbackRegistry {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250pub struct EventManager {
251    emitter: EventEmitter,
252    callback_registry: CallbackRegistry,
253    event_filters: HashMap<String, Vec<Box<dyn Fn(&EventData) -> bool + Send + Sync>>>,
254}
255
256impl EventManager {
257    pub fn new() -> Self {
258        Self {
259            emitter: EventEmitter::new(),
260            callback_registry: CallbackRegistry::new(),
261            event_filters: HashMap::new(),
262        }
263    }
264
265    pub fn get_emitter(&mut self) -> &mut EventEmitter {
266        &mut self.emitter
267    }
268
269    pub fn get_callback_registry(&mut self) -> &mut CallbackRegistry {
270        &mut self.callback_registry
271    }
272
273    pub fn add_event_filter<F>(&mut self, event: &str, filter: F)
274    where
275        F: Fn(&EventData) -> bool + Send + Sync + 'static,
276    {
277        self.event_filters
278            .entry(event.to_string())
279            .or_insert_with(Vec::new)
280            .push(Box::new(filter));
281    }
282
283    pub fn emit_filtered(&mut self, event: &str, data: serde_json::Value) {
284        let event_data = EventData::new(event.to_string(), data);
285        
286        // Check if event should be filtered
287        if let Some(filters) = self.event_filters.get(event) {
288            let should_emit = filters.iter().all(|filter| filter(&event_data));
289            if !should_emit {
290                return;
291            }
292        }
293
294        self.emitter.emit(event, event_data.data);
295    }
296
297    pub fn create_event_chain(&mut self, events: Vec<String>) -> EventChain {
298        EventChain::new(events, self)
299    }
300}
301
302impl Default for EventManager {
303    fn default() -> Self {
304        Self::new()
305    }
306}
307
308pub struct EventChain<'a> {
309    events: Vec<String>,
310    manager: &'a mut EventManager,
311    current_index: usize,
312}
313
314impl<'a> EventChain<'a> {
315    pub fn new(events: Vec<String>, manager: &'a mut EventManager) -> Self {
316        Self {
317            events,
318            manager,
319            current_index: 0,
320        }
321    }
322
323    pub fn trigger_next(&mut self, data: serde_json::Value) -> Result<(), ApiError> {
324        if self.current_index < self.events.len() {
325            let event = &self.events[self.current_index];
326            self.manager.get_emitter().emit(event, data);
327            self.current_index += 1;
328            Ok(())
329        } else {
330            Err(ApiError::ExecutionError {
331                message: "Event chain completed".to_string(),
332                position: None,
333            })
334        }
335    }
336
337    pub fn reset(&mut self) {
338        self.current_index = 0;
339    }
340
341    pub fn is_complete(&self) -> bool {
342        self.current_index >= self.events.len()
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use serde_json::json;
350    use std::sync::{Arc, Mutex};
351
352    #[test]
353    fn test_event_emitter() {
354        let mut emitter = EventEmitter::new();
355        let received = Arc::new(Mutex::new(false));
356        let received_clone = Arc::clone(&received);
357        
358        emitter.on("test", move |_| {
359            *received_clone.lock().unwrap() = true;
360        });
361        
362        emitter.emit("test", json!("data"));
363        
364        let is_received = *received.lock().unwrap();
365        assert!(is_received);
366    }
367
368    #[test]
369    fn test_callback_registry() {
370        let mut registry = CallbackRegistry::new();
371        
372        registry.register("test", "Test callback", |data| {
373            Ok(json!({ "result": data }))
374        });
375        
376        let result = registry.call("test", json!("input")).unwrap();
377        assert_eq!(result["result"], "input");
378    }
379
380    #[test]
381    fn test_event_manager() {
382        let mut manager = EventManager::new();
383        let received = Arc::new(Mutex::new(false));
384        let received_clone = Arc::clone(&received);
385        
386        manager.get_emitter().on("test", move |_| {
387            *received_clone.lock().unwrap() = true;
388        });
389        
390        manager.emit_filtered("test", json!("data"));
391        
392        let is_received = *received.lock().unwrap();
393        assert!(is_received);
394    }
395
396    #[test]
397    fn test_event_chain() {
398        let mut manager = EventManager::new();
399        let events = vec!["step1".to_string(), "step2".to_string()];
400        
401        // Add listener first
402        manager.get_emitter().on("step1", move |_| {
403            // This will be called when the event is emitted
404        });
405        
406        // Then create and execute the chain
407        let mut chain = manager.create_event_chain(events);
408        
409        assert!(!chain.is_complete());
410        chain.trigger_next(json!("data")).unwrap();
411        assert!(!chain.is_complete());
412        chain.trigger_next(json!("data")).unwrap();
413        assert!(chain.is_complete());
414    }
415}