21-11-29
banq
Elasticseach 并不真正支持更新。在 Elasticsearch 中,更新总是意味着删除+创建,持续不断的文档更新可能会使 Elasticsearch 集群瘫痪。幸运的是,有办法避免这种情况。
最终的解决方案包括使用事件溯源设计模式将所有需要保存的更改转换为事件。此案例的应用程序状态将存储在 Elasticsearch 中。
解决方案的第二部分是我们如何保存该状态。
我们需要将对每个文档的所有更改组合到一个操作中。这意味着在某些情况下有一个值列表,而在其他情况下只保留最后一次更改(如用户名更改)。为了确保状态一致,我们需要保证针对同一文档的事件的顺序,否则我们可能会面临状态与我们的真实来源不匹配的风险。
该解决方案基于流的概念。我使用的是 Kafka,但 Redis Streams 或 AWS Kinesis 都可以正常工作。
这个想法是将所有新更改(如新关注者、用户名更改等)存储在分区主题中。确保您的分区键与文档 id 一致以保证顺序,但也要避免每个用户 id 一个分区,否则您将杀死 Kafka 集群。
顺序对于覆盖最后一个字段值(如用户名更改)的事件很重要。我们要确保我们坚持使用最后一个版本而不是中间版本。
为了处理这些消息,我们需要一个流处理解决方案。对于此示例,我将使用Faust,但它是如此简单的模式,我建议您使用更适合您的模式。
import base64 import os import random import faust # Models describe how messages are serialized: # {"user_id": "3fae...", username": "my_new_username"} class UserEvent(faust.Record): user_id: str username: str follower_id: str following_id: str app = faust.App('es_event_processor', broker='kafka://broker') topic = app.topic( 'user_events', key_type=str, value_type=UserEvent ) def add_value(user_docs, user_id, key, value, op): if user_id not in user_docs: user_docs[user_id] = {} if op == 'set': user_docs[user_id][key] = value elif op == 'append': if not user_docs[user_id][key]: user_docs[user_id][key] = [] user_docs[user_id][key].append(value) @app.agent(topic) async def user_event_consumer(user_events): """ Very simple way to aggregate user events and storing them into ES Other options like aggregating into another kafka topic or using RocksDB table is also valid :param user_events: :return: """ print("starting agent") # get 1000 messages with 30s timeout async for lst in user_events.take(1000, within=30.0): user_docs = {} for user_event in lst: if user_event.username: add_value(user_docs, user_event.user_id, 'username', user_event.username, 'set') if user_event.follower_id: add_value(user_docs, user_event.user_id, 'follower_id', user_event.follower_id, 'set') if user_event.following_id: add_value(user_docs, user_event.user_id, 'following_id', user_event.following_id, 'set') # ES Load logic goes here # user_docs already has the aggregated data ready to bulk load into the index # or even multiple indexes print(len(user_docs.keys())) @app.timer(interval=0.5) async def example_sender(app): """ Used to simulate user interactions with the system like username change, add follower, etc :param app: :return: """ print("preparing msg") user_id_lst = ['er56kmn', 'oiuh76n', 'df47kj'] user_id = random.choice(user_id_lst) extra_kwargs = {} if random.randint(0,10) >= 5: extra_kwargs['username'] = f'c{base64.urlsafe_b64encode(os.urandom(6)).decode()}' user_event = UserEvent( user_id=user_id, follower_id=f'c{base64.urlsafe_b64encode(os.urandom(6)).decode()}', following_id=f'c{base64.urlsafe_b64encode(os.urandom(6)).decode()}', **extra_kwargs ) await topic.send( key=user_id, value=user_event, ) print("sent msg") if __name__ == '__main__': app.main() |