观察者是一种行为设计模式,它允许一些对象通知其他对象其状态的变化。
在 Rust中,定义订阅者的一种便捷方法是将函数 作为可调用对象,并通过复杂的逻辑将其传递给事件发布者。
首先看看一个直观天真实现:
有一个可观察observable的集合和一个观察者Observer。我希望观察者成为trait Observer. 当某些事件发生时,可观察对象应该能够通知每个观察者:
impl Observer for A { fn event(&mut self, ev: &String) { println!("Got event from observable: {}", ev); } }
struct Observable { observers: Vec<dyn Observer>, // How to contain references to observers? (this line is invalid) }
impl Observable { fn new() -> Observable { Observable { observers: Vec::new(), } }
fn add_observer(&mut self, o: &dyn Observer) { // incorrect line too self.observers.push(o); }
fn remove_observer(&mut self, o: &dyn Observer) { // incorrect line too self.observers.remove(o); }
fn notify_observers(&self, ev: &String) { for o in &mut self.observers { o.event(ev); } } }
|
但是编译器错误:
error[E0277]: the size for values of type `(dyn Observer + 'static)` cannot be known at compilation time --> src/lib.rs:24:5 | 24 | observers: Vec<dyn Observer>, // How to contain references to observers? | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time | = help: the trait `std::marker::Sized` is not implemented for `(dyn Observer + 'static)` = note: to learn more, visit <https://doc.rust-lang.org/book/second-edition/ch19-04-advanced-types.htmldynamically-sized-types-and-the-sized-trait> = note: required by `std::vec::Vec`
|
观察者模式可能会带来Rust所有权挑战。
在垃圾收集的语言中,通常会Observable引用Observer(通知它)和Observer引用Observable(注销自身)......这会在所有权方面造成一些挑战(谁比谁活得长?),这就需要完全整体“取消注册通知”。
简单的解决方案
Observable和有不同的Observer生命周期,没有人拥有另一个或被期望比另一个长寿:
use std::rc::Weak;
struct Event;
trait Observable { fn register(&mut self, observer: Weak<dyn Observer>); }
trait Observer { fn notify(&self, event: &Event); }
|
关键是将观察者分配到一个Rc中,然后将Weak(弱引用)交给Observable。
如果观察者需要在事件中被修改,那么要么它需要内部可变性,要么它需要被包装成一个RefCell(将Weak<RefCell<dyn Observer>>传递给观察者)。
当通知时,Observable会定期意识到有死的弱引用(Observer已经消失了),那么它就可以懒惰地删除这些引用。
Weak<dyn Observer>缺点:
- 它需要一个Rc
- 您必须重复代码来创建不同的观察者结构。
- 它需要类型擦除
回调函数
使用callback函数。它简单而强大,没有生命周期问题或类型擦除。
pub struct Notifier<E> { subscribers: Vec<Box<dyn Fn(&E)>>, }
impl<E> Notifier<E> { pub fn new() -> Notifier<E> { Notifier { subscribers: Vec::new(), } }
pub fn register<F>(&mut self, callback: F) where F: 'static + Fn(&E), { self.subscribers.push(Box::new(callback)); }
pub fn notify(&self, event: E) { for callback in &self.subscribers { callback(&event); } } }
|
lambda 函数
在这个观察者示例中,订阅者观察者是 lambda 函数,是订阅活动事件的显式函数。显式函数对象也可以取消订阅(或者某些函数类型可能存在限制)。
observer.rs:
use std::collections::HashMap;
/// An event type.一个事件类型 #[derive(PartialEq, Eq, Hash, Clone)] pub enum Event { Load, Save, }
/// 一个订阅者(监听器)具有可调用函数的类型。 pub type Subscriber = fn(file_path: String);
/// 发布者向订阅者(监听器)发送事件。 #[derive(Default)] pub struct Publisher { events: HashMap<Event, Vec<Subscriber>>, }
impl Publisher { pub fn subscribe(&mut self, event_type: Event, listener: Subscriber) { self.events.entry(event_type.clone()).or_default(); self.events.get_mut(&event_type).unwrap().push(listener); }
pub fn unsubscribe(&mut self, event_type: Event, listener: Subscriber) { self.events .get_mut(&event_type) .unwrap() .retain(|&x| x != listener); }
pub fn notify(&self, event_type: Event, file_path: String) { let listeners = self.events.get(&event_type).unwrap(); for listener in listeners { listener(file_path.clone()); } } }
|
结合泛型实现
来自https://github.com/lpxxn/rust-design-pattern:
trait IObserver { fn update(&self); }
trait ISubject<'a, T: IObserver> { fn attach(&mut self, observer: &'a T); fn detach(&mut self, observer: &'a T); fn notify_observers(&self); }
struct Subject<'a, T: IObserver> { observers: Vec<&'a T>, } impl<'a, T: IObserver + PartialEq> Subject<'a, T> { fn new() -> Subject<'a, T> { Subject { observers: Vec::new(), } } }
impl<'a, T: IObserver + PartialEq> ISubject<'a, T> for Subject<'a, T> { fn attach(&mut self, observer: &'a T) { self.observers.push(observer); } fn detach(&mut self, observer: &'a T) { if let Some(idx) = self.observers.iter().position(|x| *x == observer) { self.observers.remove(idx); } } fn notify_observers(&self) { for item in self.observers.iter() { item.update(); } } }
#[derive(PartialEq)] struct ConcreteObserver { id: i32, } impl IObserver for ConcreteObserver { fn update(&self) { println!("Observer id:{} received event!", self.id); } }
fn main() { let mut subject = Subject::new(); let observer_a = ConcreteObserver { id: 1 }; let observer_b = ConcreteObserver { id: 2 };
subject.attach(&observer_a); subject.attach(&observer_b); subject.notify_observers();
subject.detach(&observer_b); subject.notify_observers(); }
|
知识点,生命周期与作用域范围 scope区别:
// 生命周期在下面的注释中用行来表示创建 // 和每个变量的销毁。 // `i`有最长的寿命,因为它的作用域范围Scope被完全包括了`borrow1`和`borrow2` //borrow1 与 borrow2相比 ,生命周期无关,因为它们是不相干的。 fn main() { let i = 3; // `i` 的生命周期开始. ────────────────┐ // │ { // │ let borrow1 = &i; // 通过& 借用变量i `borrow1` 生命周期开始. ──┐│ // ││ println!("borrow1: {}", borrow1); // ││ } // `borrow1 生命周期结束. ──────────────────────────────────┘│ // │ // │ { // │ let borrow2 = &i; // `borrow2` 生命周期开始. ──┐│ // ││ println!("borrow2: {}", borrow2); // ││ } // `borrow2` 生命周期结束. ─────────────────────────────────┘│ // │ } // i的生命周期结束. ─────────────────────────────────────┘
|
我们通过&借用一个变量i,借用的生命周期取决于它的声明位置。
因此,只要借用者再被销毁之前,借用就一直有效。
但是,借用的作用域scope 范围取决于使用引用的位置。
我们用'a来表达对对象生命周期的约束:
fn foo<'a>(/**/); // ^-- Lifetime.
|
'a 被读成 ‘生命周期lifetime a’
<>用于声明生命周期。这表示foo有一个生命周期'a.
例如,如果我们把一个结构的引用作为输入,并返回它的一部分的引用,我们必须说,我们返回的东西只能活到我们获得引用的结构的长度。
struct Bar { value: i32, }
fn get_value<'a>(my_bar: &'a Bar) -> &'a i32 { &my_bar.value }
|
总结
观察者模式在实践中主要是通过消息队列 实现的,有EDA Eventsourcing CQRS Kafka ,通过引入第三方组件消息系统,实现了观察者和被观察者之间的异步通讯。
观察者模式没有强调同步或异步,其实观察者模式与异步结合才真正发挥其含义,因为观察者和被观察者肯定不能同步,或处于空闲,同时关注一件事,这违背观察这个词语含义,有可能一个观察者观察监察很多被观察者,不可能在其中一个被观察者中停留很长时间,只能等被观察者发事件消息通知。