1use crate::api::error::ApiError;
2use std::collections::HashMap;
3use std::time::Instant;
4
5#[derive(Debug, Clone)]
6pub struct EventData {
7 pub event_type: String,
8 pub timestamp: Instant,
9 pub data: serde_json::Value,
10 pub source: Option<String>,
11}
12
13impl EventData {
14 pub fn new(event_type: String, data: serde_json::Value) -> Self {
15 Self {
16 event_type,
17 timestamp: Instant::now(),
18 data,
19 source: None,
20 }
21 }
22
23 pub fn with_source(mut self, source: String) -> Self {
24 self.source = Some(source);
25 self
26 }
27}
28
29pub type EventCallback = Box<dyn FnMut(&EventData) + Send + Sync>;
30pub type AsyncEventCallback = Box<dyn Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync>;
31
32pub struct EventEmitter {
33 listeners: HashMap<String, Vec<EventCallback>>,
34 async_listeners: HashMap<String, Vec<AsyncEventCallback>>,
35 event_history: Vec<EventData>,
36 max_history_size: usize,
37}
38
39impl EventEmitter {
40 pub fn new() -> Self {
41 Self {
42 listeners: HashMap::new(),
43 async_listeners: HashMap::new(),
44 event_history: Vec::new(),
45 max_history_size: 1000,
46 }
47 }
48
49 pub fn on<F>(&mut self, event: &str, callback: F)
50 where
51 F: FnMut(&EventData) + Send + Sync + 'static,
52 {
53 self.listeners
54 .entry(event.to_string())
55 .or_insert_with(Vec::new)
56 .push(Box::new(callback));
57 }
58
59 pub fn on_async<F>(&mut self, event: &str, callback: F)
60 where
61 F: Fn(EventData) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> + Send + Sync + 'static,
62 {
63 self.async_listeners
64 .entry(event.to_string())
65 .or_insert_with(Vec::new)
66 .push(Box::new(callback));
67 }
68
69 pub fn emit(&mut self, event: &str, data: serde_json::Value) {
70 let event_data = EventData::new(event.to_string(), data);
71
72 self.event_history.push(event_data.clone());
74 if self.event_history.len() > self.max_history_size {
75 self.event_history.remove(0);
76 }
77
78 if let Some(listeners) = self.listeners.get_mut(event) {
80 for listener in listeners {
81 listener(&event_data);
82 }
83 }
84
85 if let Some(_async_listeners) = self.async_listeners.get(event) {
87 }
92 }
93
94 pub fn emit_with_source(&mut self, event: &str, data: serde_json::Value, source: String) {
95 let event_data = EventData::new(event.to_string(), data).with_source(source);
96
97 self.event_history.push(event_data.clone());
98 if self.event_history.len() > self.max_history_size {
99 self.event_history.remove(0);
100 }
101
102 if let Some(listeners) = self.listeners.get_mut(event) {
103 for listener in listeners {
104 listener(&event_data);
105 }
106 }
107 }
108
109 pub fn remove_listener(&mut self, event: &str, index: usize) -> Result<(), ApiError> {
110 if let Some(listeners) = self.listeners.get_mut(event) {
111 if index < listeners.len() {
112 let _ = listeners.remove(index);
113 Ok(())
114 } else {
115 Err(ApiError::InvalidInput {
116 message: "Listener index out of bounds".to_string(),
117 input: index.to_string(),
118 position: None,
119 })
120 }
121 } else {
122 Err(ApiError::InvalidInput {
123 message: "Event not found".to_string(),
124 input: event.to_string(),
125 position: None,
126 })
127 }
128 }
129
130 pub fn clear_listeners(&mut self, event: &str) {
131 self.listeners.remove(event);
132 self.async_listeners.remove(event);
133 }
134
135 pub fn get_event_history(&self) -> &[EventData] {
136 &self.event_history
137 }
138
139 pub fn set_max_history_size(&mut self, size: usize) {
140 self.max_history_size = size;
141 while self.event_history.len() > self.max_history_size {
142 self.event_history.remove(0);
143 }
144 }
145}
146
147impl Default for EventEmitter {
148 fn default() -> Self {
149 Self::new()
150 }
151}
152
153pub struct CallbackRegistry {
154 callbacks: HashMap<String, std::sync::Arc<std::sync::Mutex<Box<dyn Fn(serde_json::Value) -> Result<serde_json::Value, ApiError> + Send + Sync>>>>,
155 metadata: HashMap<String, CallbackMetadata>,
156}
157
158#[derive(Debug, Clone)]
159pub struct CallbackMetadata {
160 pub name: String,
161 pub description: String,
162 pub parameters: Vec<String>,
163 pub return_type: String,
164 pub created_at: Instant,
165 pub call_count: usize,
166}
167
168impl CallbackRegistry {
169 pub fn new() -> Self {
170 Self {
171 callbacks: HashMap::new(),
172 metadata: HashMap::new(),
173 }
174 }
175
176 pub fn register<F>(&mut self, name: &str, description: &str, callback: F)
177 where
178 F: Fn(serde_json::Value) -> Result<serde_json::Value, ApiError> + Send + Sync + 'static,
179 {
180 let metadata = CallbackMetadata {
181 name: name.to_string(),
182 description: description.to_string(),
183 parameters: Vec::new(),
184 return_type: "any".to_string(),
185 created_at: Instant::now(),
186 call_count: 0,
187 };
188
189 self.callbacks.insert(name.to_string(), std::sync::Arc::new(std::sync::Mutex::new(Box::new(callback))));
190 self.metadata.insert(name.to_string(), metadata);
191 }
192
193 pub fn register_with_metadata<F>(&mut self, name: &str, metadata: CallbackMetadata, callback: F)
194 where
195 F: Fn(serde_json::Value) -> Result<serde_json::Value, ApiError> + Send + Sync + 'static,
196 {
197 self.callbacks.insert(name.to_string(), std::sync::Arc::new(std::sync::Mutex::new(Box::new(callback))));
198 self.metadata.insert(name.to_string(), metadata);
199 }
200
201 pub fn call(&mut self, name: &str, data: serde_json::Value) -> Result<serde_json::Value, ApiError> {
202 if let Some(callback) = self.callbacks.get(name) {
203 if let Some(metadata) = self.metadata.get_mut(name) {
204 metadata.call_count += 1;
205 }
206
207 let callback_guard = callback.lock().map_err(|_| ApiError::InvalidInput {
208 message: "Failed to acquire callback lock".to_string(),
209 input: "".to_string(),
210 position: None,
211 })?;
212
213 callback_guard(data)
214 } else {
215 Err(ApiError::InvalidInput {
216 message: "Callback not found".to_string(),
217 input: name.to_string(),
218 position: None,
219 })
220 }
221 .map_err(|_| ApiError::ExecutionError {
222 message: "Callback execution failed".to_string(),
223 position: None,
224 })
225 }
226
227 pub fn unregister(&mut self, name: &str) -> bool {
228 self.callbacks.remove(name).is_some() && self.metadata.remove(name).is_some()
229 }
230
231 pub fn list_callbacks(&self) -> Vec<&CallbackMetadata> {
232 self.metadata.values().collect()
233 }
234
235 pub fn get_callback_info(&self, name: &str) -> Option<&CallbackMetadata> {
236 self.metadata.get(name)
237 }
238
239 pub fn exists(&self, name: &str) -> bool {
240 self.callbacks.contains_key(name)
241 }
242}
243
244impl Default for CallbackRegistry {
245 fn default() -> Self {
246 Self::new()
247 }
248}
249
250pub struct EventManager {
251 emitter: EventEmitter,
252 callback_registry: CallbackRegistry,
253 event_filters: HashMap<String, Vec<Box<dyn Fn(&EventData) -> bool + Send + Sync>>>,
254}
255
256impl EventManager {
257 pub fn new() -> Self {
258 Self {
259 emitter: EventEmitter::new(),
260 callback_registry: CallbackRegistry::new(),
261 event_filters: HashMap::new(),
262 }
263 }
264
265 pub fn get_emitter(&mut self) -> &mut EventEmitter {
266 &mut self.emitter
267 }
268
269 pub fn get_callback_registry(&mut self) -> &mut CallbackRegistry {
270 &mut self.callback_registry
271 }
272
273 pub fn add_event_filter<F>(&mut self, event: &str, filter: F)
274 where
275 F: Fn(&EventData) -> bool + Send + Sync + 'static,
276 {
277 self.event_filters
278 .entry(event.to_string())
279 .or_insert_with(Vec::new)
280 .push(Box::new(filter));
281 }
282
283 pub fn emit_filtered(&mut self, event: &str, data: serde_json::Value) {
284 let event_data = EventData::new(event.to_string(), data);
285
286 if let Some(filters) = self.event_filters.get(event) {
288 let should_emit = filters.iter().all(|filter| filter(&event_data));
289 if !should_emit {
290 return;
291 }
292 }
293
294 self.emitter.emit(event, event_data.data);
295 }
296
297 pub fn create_event_chain(&mut self, events: Vec<String>) -> EventChain {
298 EventChain::new(events, self)
299 }
300}
301
302impl Default for EventManager {
303 fn default() -> Self {
304 Self::new()
305 }
306}
307
308pub struct EventChain<'a> {
309 events: Vec<String>,
310 manager: &'a mut EventManager,
311 current_index: usize,
312}
313
314impl<'a> EventChain<'a> {
315 pub fn new(events: Vec<String>, manager: &'a mut EventManager) -> Self {
316 Self {
317 events,
318 manager,
319 current_index: 0,
320 }
321 }
322
323 pub fn trigger_next(&mut self, data: serde_json::Value) -> Result<(), ApiError> {
324 if self.current_index < self.events.len() {
325 let event = &self.events[self.current_index];
326 self.manager.get_emitter().emit(event, data);
327 self.current_index += 1;
328 Ok(())
329 } else {
330 Err(ApiError::ExecutionError {
331 message: "Event chain completed".to_string(),
332 position: None,
333 })
334 }
335 }
336
337 pub fn reset(&mut self) {
338 self.current_index = 0;
339 }
340
341 pub fn is_complete(&self) -> bool {
342 self.current_index >= self.events.len()
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use serde_json::json;
350 use std::sync::{Arc, Mutex};
351
352 #[test]
353 fn test_event_emitter() {
354 let mut emitter = EventEmitter::new();
355 let received = Arc::new(Mutex::new(false));
356 let received_clone = Arc::clone(&received);
357
358 emitter.on("test", move |_| {
359 *received_clone.lock().unwrap() = true;
360 });
361
362 emitter.emit("test", json!("data"));
363
364 let is_received = *received.lock().unwrap();
365 assert!(is_received);
366 }
367
368 #[test]
369 fn test_callback_registry() {
370 let mut registry = CallbackRegistry::new();
371
372 registry.register("test", "Test callback", |data| {
373 Ok(json!({ "result": data }))
374 });
375
376 let result = registry.call("test", json!("input")).unwrap();
377 assert_eq!(result["result"], "input");
378 }
379
380 #[test]
381 fn test_event_manager() {
382 let mut manager = EventManager::new();
383 let received = Arc::new(Mutex::new(false));
384 let received_clone = Arc::clone(&received);
385
386 manager.get_emitter().on("test", move |_| {
387 *received_clone.lock().unwrap() = true;
388 });
389
390 manager.emit_filtered("test", json!("data"));
391
392 let is_received = *received.lock().unwrap();
393 assert!(is_received);
394 }
395
396 #[test]
397 fn test_event_chain() {
398 let mut manager = EventManager::new();
399 let events = vec!["step1".to_string(), "step2".to_string()];
400
401 manager.get_emitter().on("step1", move |_| {
403 });
405
406 let mut chain = manager.create_event_chain(events);
408
409 assert!(!chain.is_complete());
410 chain.trigger_next(json!("data")).unwrap();
411 assert!(!chain.is_complete());
412 chain.trigger_next(json!("data")).unwrap();
413 assert!(chain.is_complete());
414 }
415}