Rust语言之GoF设计模式:观察者模式


观察者是一种行为设计模式,它允许一些对象通知其他对象其状态的变化。

在 Rust中,定义订阅者的一种便捷方法是将函数 作为可调用对象,并通过复杂的逻辑将其传递给事件发布者。

首先看看一个直观天真实现
有一个可观察observable的集合和一个观察者Observer。我希望观察者成为trait Observer. 当某些事件发生时,可观察对象应该能够通知每个观察者:

impl Observer for A {
    fn event(&mut self, ev: &String) {
        println!("Got event from observable: {}", ev);
    }
}

struct Observable {
    observers: Vec<dyn Observer>, // How to contain references to observers? (this line is invalid)
}

impl Observable {
    fn new() -> Observable {
        Observable {
            observers: Vec::new(),
        }
    }

    fn add_observer(&mut self, o: &dyn Observer) {
        // incorrect line too
        self.observers.push(o);
    }

    fn remove_observer(&mut self, o: &dyn Observer) {
        // incorrect line too
        self.observers.remove(o);
    }

    fn notify_observers(&self, ev: &String) {
        for o in &mut self.observers {
            o.event(ev);
        }
    }
}

但是编译器错误:

error[E0277]: the size for values of type `(dyn Observer + 'static)` cannot be known at compilation time
  --> src/lib.rs:24:5
   |
24 |     observers: Vec<dyn Observer>, // How to contain references to observers?
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
   |
   = help: the trait `std::marker::Sized` is not implemented for `(dyn Observer + 'static)`
   = note: to learn more, visit <https://doc.rust-lang.org/book/second-edition/ch19-04-advanced-types.htmldynamically-sized-types-and-the-sized-trait>
   = note: required by `std::vec::Vec`

观察者模式可能会带来Rust所有权挑战。
在垃圾收集的语言中,通常会Observable引用Observer(通知它)和Observer引用Observable(注销自身)......这会在所有权方面造成一些挑战(谁比谁活得长?),这就需要完全整体“取消注册通知”。

简单的解决方案
Observable和有不同的Observer生命周期,没有人拥有另一个或被期望比另一个长寿:

use std::rc::Weak;

struct Event;

trait Observable {
    fn register(&mut self, observer: Weak<dyn Observer>);
}

trait Observer {
    fn notify(&self, event: &Event);
}

关键是将观察者分配到一个Rc中,然后将Weak(弱引用)交给Observable。

如果观察者需要在事件中被修改,那么要么它需要内部可变性,要么它需要被包装成一个RefCell(将Weak<RefCell<dyn Observer>>传递给观察者)。

当通知时,Observable会定期意识到有死的弱引用(Observer已经消失了),那么它就可以懒惰地删除这些引用。

Weak<dyn Observer>缺点:

  1. 它需要一个Rc
  2. 您必须重复代码来创建不同的观察者结构。
  3. 它需要类型擦除

回调函数
使用callback函数。它简单而强大,没有生命周期问题或类型擦除。

pub struct Notifier<E> {
    subscribers: Vec<Box<dyn Fn(&E)>>,
}

impl<E> Notifier<E> {
    pub fn new() -> Notifier<E> {
        Notifier {
            subscribers: Vec::new(),
        }
    }

    pub fn register<F>(&mut self, callback: F)
    where
        F: 'static + Fn(&E),
    {
        self.subscribers.push(Box::new(callback));
    }

    pub fn notify(&self, event: E) {
        for callback in &self.subscribers {
            callback(&event);
        }
    }
}

lambda 函数
在这个观察者示例中,订阅者观察者是 lambda 函数,是订阅活动事件的显式函数。显式函数对象也可以取消订阅(或者某些函数类型可能存在限制)。
observer.rs

use std::collections::HashMap;

/// An event type.一个事件类型
#[derive(PartialEq, Eq, Hash, Clone)]
pub enum Event {
    Load,
    Save,
}

/// 一个订阅者(监听器)具有可调用函数的类型。
pub type Subscriber = fn(file_path: String);

/// 发布者向订阅者(监听器)发送事件。
#[derive(Default)]
pub struct Publisher {
    events: HashMap<Event, Vec<Subscriber>>,
}

impl Publisher {
    pub fn subscribe(&mut self, event_type: Event, listener: Subscriber) {
        self.events.entry(event_type.clone()).or_default();
        self.events.get_mut(&event_type).unwrap().push(listener);
    }

    pub fn unsubscribe(&mut self, event_type: Event, listener: Subscriber) {
        self.events
            .get_mut(&event_type)
            .unwrap()
            .retain(|&x| x != listener);
    }

    pub fn notify(&self, event_type: Event, file_path: String) {
        let listeners = self.events.get(&event_type).unwrap();
        for listener in listeners {
            listener(file_path.clone());
        }
    }
}

结合泛型实现
来自https://github.com/lpxxn/rust-design-pattern

trait IObserver {
    fn update(&self);
}

trait ISubject<'a, T: IObserver> {
    fn attach(&mut self, observer: &'a T);
    fn detach(&mut self, observer: &'a T);
    fn notify_observers(&self);
}

struct Subject<'a, T: IObserver> {
    observers: Vec<&'a T>,
}
impl<'a, T: IObserver + PartialEq> Subject<'a, T> {
    fn new() -> Subject<'a, T> {
        Subject {
            observers: Vec::new(),
        }
    }
}

impl<'a, T: IObserver + PartialEq> ISubject<'a, T> for Subject<'a, T> {
    fn attach(&mut self, observer: &'a T) {
        self.observers.push(observer);
    }
    fn detach(&mut self, observer: &'a T) {
        if let Some(idx) = self.observers.iter().position(|x| *x == observer) {
            self.observers.remove(idx);
        }
    }
    fn notify_observers(&self) {
        for item in self.observers.iter() {
            item.update();
        }
    }
}

#[derive(PartialEq)]
struct ConcreteObserver {
    id: i32,
}
impl IObserver for ConcreteObserver {
    fn update(&self) {
        println!("Observer id:{} received event!", self.id);
    }
}

fn main() {
    let mut subject = Subject::new();
    let observer_a = ConcreteObserver { id: 1 };
    let observer_b = ConcreteObserver { id: 2 };

    subject.attach(&observer_a);
    subject.attach(&observer_b);
    subject.notify_observers();

    subject.detach(&observer_b);
    subject.notify_observers();
}

知识点,生命周期与作用域范围 scope区别:


// 生命周期在下面的注释中用行来表示创建
// 和每个变量的销毁。
// `i`有最长的寿命,因为它的作用域范围Scope被完全包括了`borrow1`和`borrow2` 
//borrow1 与 borrow2相比 ,生命周期无关,因为它们是不相干的。
fn main() {
    let i = 3; //  `i` 的生命周期开始. ────────────────┐
    //                                                     │
    { //                                                   │
        let borrow1 = &i; // 通过& 借用变量i  `borrow1` 生命周期开始. ──┐│
        //                                                ││
        println!("borrow1: {}", borrow1); //              ││
    } // `borrow1 生命周期结束. ──────────────────────────────────┘│
    //                                                     │
    //                                                     │
    { //                                                   │
        let borrow2 = &i; // `borrow2` 生命周期开始. ──┐│
        //                                                ││
        println!("borrow2: {}", borrow2); //              ││
    } // `borrow2` 生命周期结束. ─────────────────────────────────┘│
    //                                                     │
}   // i的生命周期结束. ─────────────────────────────────────┘

我们通过&借用一个变量i,借用的生命周期取决于它的声明位置。
因此,只要借用者再被销毁之前,借用就一直有效。
但是,借用的作用域scope 范围取决于使用引用的位置。

我们用'a来表达对对象生命周期的约束:

fn foo<'a>(/**/);
//      ^-- Lifetime.

'a 被读成 ‘生命周期lifetime a’
<>用于声明生命周期。这表示foo有一个生命周期'a.

例如,如果我们把一个结构的引用作为输入,并返回它的一部分的引用,我们必须说,我们返回的东西只能活到我们获得引用的结构的长度。

struct Bar {
    value: i32,
}

fn get_value<'a>(my_bar: &'a Bar) -> &'a i32 {
    &my_bar.value
}

总结
观察者模式在实践中主要是通过消息队列 实现的,有EDA Eventsourcing CQRS Kafka ,通过引入第三方组件消息系统,实现了观察者和被观察者之间的异步通讯。
观察者模式没有强调同步或异步,其实观察者模式与异步结合才真正发挥其含义,因为观察者和被观察者肯定不能同步,或处于空闲,同时关注一件事,这违背观察这个词语含义,有可能一个观察者观察监察很多被观察者,不可能在其中一个被观察者中停留很长时间,只能等被观察者发事件消息通知。