MySQL的事件溯源Event Sourcing表结构

-- EVENT STORE事件存储表
    CREATE TABLE events (
        sequence BIGINT UNSIGNED PRIMARY KEY,
        event_type VARCHAR(32) NOT NULL,
        aggregate_type VARCHAR(32) NOT NULL,
        aggregate_id CHAR(36) NULL,
        payload JSON NOT NULL,
        recorded_at DATETIME NOT NULL,
        KEY idx_event_type (event_type),
        KEY idx_aggregate_id (aggregate_id),
        KEY idx_aggregate_type (aggregate_type)
    ) //

-- EVENT STORE TRIGGERS触发器
    -- Before Insert
    CREATE TRIGGER events_before_insert BEFORE INSERT ON events FOR EACH ROW
    BEGIN
        DECLARE next_sequence BIGINT DEFAULT 0;

        SELECT COALESCE(new.sequence, COALESCE(MAX(sequence), 0) + 1) FROM events INTO next_sequence;

        SET new.sequence = next_sequence;
        SET new.recorded_at = COALESCE(new.recorded_at, NOW());
    END
//

    -- Before Update
    CREATE TRIGGER events_before_update BEFORE UPDATE ON events FOR EACH ROW
    BEGIN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '[E001] The event store is immutable (append only), updating events is not allowed.';
    END
//

    -- Before Delete
    CREATE TRIGGER events_before_delete BEFORE DELETE ON events FOR EACH ROW
    BEGIN
        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = '[E001] The event store is immutable (append only), deleting events is not allowed.';
    END
//

-- EVENT STORE RECORDING MECHANISM
    CREATE PROCEDURE record_event (IN event_type VARCHAR(32), IN aggregate_type VARCHAR(32), IN aggregate_id CHAR(36), IN payload JSON)
    BEGIN
        DECLARE recorded_at DATETIME DEFAULT NOW();

        DECLARE EXIT HANDLER FOR SQLEXCEPTION
        BEGIN
            ROLLBACK;
            RESIGNAL;
        END;

        START TRANSACTION;
            INSERT INTO events (event_type, aggregate_type, aggregate_id, payload, recorded_at) VALUES (event_type, aggregate_type, aggregate_id, payload, recorded_at);
            CALL process_event(event_type, aggregate_id, payload, recorded_at);
        COMMIT;
    END
//

-- EVENT HANDLING
    CREATE PROCEDURE process_event (IN event_type VARCHAR(32), IN aggregate_id CHAR(36), IN payload JSON, IN recorded_at DATETIME)
    BEGIN
        DECLARE handler_name VARCHAR(38) DEFAULT CONCAT('handle', event_type);

        DECLARE EXIT HANDLER FOR 1305 BEGIN END; -- Procedure does not exist, which is okay.

        SET @handler_sql = CONCAT('CALL ', handler_name, '(?,?,?)');

        PREPARE prepared_statement FROM @handler_sql;

        SET @aggregate_id = aggregate_id;
        SET @payload = payload;
        SET @recorded_at = recorded_at;

        EXECUTE prepared_statement USING @aggregate_id, @payload, @recorded_at;
    END
//

-- HELPERS
    CREATE PROCEDURE map_error (errors JSON)
    BEGIN
        GET DIAGNOSTICS CONDITION 1
                @error_number = MYSQL_ERRNO,
                @error_message = MESSAGE_TEXT;

        SET @error_message = JSON_UNQUOTE(COALESCE(JSON_EXTRACT(errors, CONCAT('$.e', @error_number)), @error_message));

        SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = @error_message;
    END
//










-- SAMPLE IMPLEMENTATION
    CREATE TABLE customer (
        customer_id CHAR(36) PRIMARY KEY,
        first_name VARCHAR(64) NOT NULL,
        last_name VARCHAR(64) NOT NULL,
        phone_number VARCHAR(15) NOT NULL,
        email VARCHAR(255) NOT NULL,
        password CHAR(98) NOT NULL,
        registered DATETIME NOT NULL,
        UNIQUE KEY idx_email (email)
    )
//

    -- This is the signature all handlers should implement: (IN aggregate_id CHAR(36), IN payload JSON, IN recorded_at DATETIME)
    CREATE PROCEDURE handleUserRegistered (IN aggregate_id CHAR(36), IN payload JSON, IN recorded_at DATETIME)
    BEGIN
        DECLARE EXIT HANDLER FOR SQLEXCEPTION
        BEGIN
            CALL map_error(JSON_OBJECT(
                -- map e{error_code} to a more domain specific message
                'e1062', 'A customer with this e-mail address already exists'
            ));
        END;

        INSERT INTO customer (customer_id, first_name, last_name, phone_number, email, password, registered)
        VALUES (
            aggregate_id,
            payload->>
"$.firstName",
            payload->>
"$.lastName",
            payload->>
"$.phoneNumber",
            payload->>
"$.email",
            payload->>
"$.passwordHash",
            recorded_at
        );
    END
//

DELIMITER ;

-- This yields an entry in the customer table
CALL record_event ('UserRegistered', 'User', '5d1e0bcd-6cfb-47af-b4f8-8159f390163a', '{
"firstName": "John", "lastName": "Doe", "phoneNumber": "+31612345678", "email": "john@doe.com", "passwordHash": "[xxx]"}');

-- Trying to create this customer again results in an error with the following message 'A customer with this e-mail address already exists'
CALL record_event ('UserRegistered', 'User', '5d1e0bcd-6cfb-47af-b4f8-8159f390163a', '{
"firstName": "John", "lastName": "Doe", "phoneNumber": "+31612345678", "email": "john@doe.com", "passwordHash": "[xxx]"}');