import base64 import os import random
import faust
# Models describe how messages are serialized: # {"user_id": "3fae...", username": "my_new_username"} class UserEvent(faust.Record): user_id: str username: str follower_id: str following_id: str
app = faust.App('es_event_processor', broker='kafka://broker') topic = app.topic( 'user_events', key_type=str, value_type=UserEvent )
def add_value(user_docs, user_id, key, value, op): if user_id not in user_docs: user_docs[user_id] = {} if op == 'set': user_docs[user_id][key] = value elif op == 'append': if not user_docs[user_id][key]: user_docs[user_id][key] = [] user_docs[user_id][key].append(value)
@app.agent(topic) async def user_event_consumer(user_events): """ Very simple way to aggregate user events and storing them into ES Other options like aggregating into another kafka topic or using RocksDB table is also valid :param user_events: :return: """ print("starting agent") # get 1000 messages with 30s timeout async for lst in user_events.take(1000, within=30.0): user_docs = {} for user_event in lst: if user_event.username: add_value(user_docs, user_event.user_id, 'username', user_event.username, 'set') if user_event.follower_id: add_value(user_docs, user_event.user_id, 'follower_id', user_event.follower_id, 'set') if user_event.following_id: add_value(user_docs, user_event.user_id, 'following_id', user_event.following_id, 'set')
# ES Load logic goes here # user_docs already has the aggregated data ready to bulk load into the index # or even multiple indexes print(len(user_docs.keys()))
@app.timer(interval=0.5) async def example_sender(app): """ Used to simulate user interactions with the system like username change, add follower, etc :param app: :return: """ print("preparing msg") user_id_lst = ['er56kmn', 'oiuh76n', 'df47kj']
user_id = random.choice(user_id_lst) extra_kwargs = {} if random.randint(0,10) >= 5: extra_kwargs['username'] = f'c{base64.urlsafe_b64encode(os.urandom(6)).decode()}'
user_event = UserEvent( user_id=user_id, follower_id=f'c{base64.urlsafe_b64encode(os.urandom(6)).decode()}', following_id=f'c{base64.urlsafe_b64encode(os.urandom(6)).decode()}', **extra_kwargs ) await topic.send( key=user_id, value=user_event, ) print("sent msg")
if __name__ == '__main__': app.main()
|