解决方案是使用RediSearch和Redis Streams,该解决方案由多个协作组件组成。
代码可在此 GitHub 存储库中找到 — https://github.com/abhirockzz/redis-streams-in-action
以下是各个组件的摘要:
- Twitter Stream Consumer:一个Rust应用程序,用于使用流式 Twitter 数据并将它们传递给 Redis Streams。
- 推文处理器:来自 Redis Streams 的推文由 Java 应用程序处理
- 监控服务:最后一部分是一个Go应用程序,用于监控推文处理器服务的进度并确保重新处理任何失败的记录。
在 Redis 5.0 中引入的 Redis Streams 提供了最好的 Pub/Sub 和列表以及可靠的消息传递、消息重播的持久性、用于负载平衡的消费者组、用于监控的待处理条目列表等等!它的不同之处在于它是一种append-only log数据结构。简而言之,生产者可以添加记录(使用XADD),消费者可以订阅到达流的新项目(使用XREAD)。它支持范围查询(XRANGE等),并且由于消费者组,一组应用程序可以分配处理负载(XREADGROUP)并可以监控其状态(XPENDING等)。
Redis Streams
让我们回顾一下Redis Streams的一些命令,按功能分组以便于理解:
添加条目
- 只有一种方法可以将消息添加到 Redis 流。XADD将指定的流条目附加到指定键的流中。如果键不存在,作为运行此命令的副作用,键是使用流值创建的。
读取条目
- XRANGE返回匹配给定 ID 范围的流条目(-和+特殊 ID 分别表示流中可能的最小 ID 和最大可能 ID)
- XREVRANGE与 完全一样XRANGE,但不同的是以相反的顺序返回条目(首先使用结束 ID,然后使用开始 ID)
- XREAD从一个或多个流中读取数据,只返回 ID 大于调用者报告的最后接收到的 ID 的条目。
- XREADGROUP是该XREAD命令的特殊版本,支持消费者组。您可以创建客户端组,这些客户端使用到达给定流的消息的不同部分
管理 Redis 流
- XACK从PEL流消费者组的 Pending Entries List ( ) 中删除一条或多条消息。
- XGROUP用于管理与 Redis 流关联的消费者组。
- XPENDING用于检查待处理消息列表,以观察和了解流消费者组发生的情况。
- XCLAIM用于获取消息的所有权并继续处理。
- XAUTOCLIAM转移与指定条件匹配的待处理流条目的所有权。从概念上讲,XAUTOCLAIM相当于调用XPENDING然后XCLAIM
删除
RediSearch
Redis 拥有一套通用的数据结构,从简单的字符串一直到强大的抽象,如 Redis Streams。本机数据类型可以带您走很长的路,但有些用例可能需要解决方法。一个例子是要求在 Redis 中使用二级索引,以便超越基于键的搜索/查找以获得更丰富的查询功能。虽然您可以使用排序集、列表等来完成工作,但您需要考虑一些权衡因素。
RediSearch由于一流的二级索引引擎,可作为 Redis 模块使用,提供灵活的搜索功能。它的一些主要功能包括全文搜索、自动完成和地理索引。还有许多其他功能的详细探索超出了本博客系列的范围。我强烈建议您阅读文档以进一步探索。现在,这里是一些RediSearch命令的快速概述。您将在后续的博客文章中看到它们的作用。
两个最重要的命令包括创建索引和执行搜索查询:
您可以对索引执行其他操作:
- FT.DROPINDEX删除索引。请注意,默认情况下,它不会删除与索引关联的文档哈希
- FT.INFO返回有关索引的信息和统计信息,例如文档数量、不同术语的数量等。
- FT.ALTER SCHEMA ADD向索引添加一个新字段。这会导致将来的文档更新在索引和重新索引现有文档时使用新字段。
要使用自动完成功能,您可以使用“建议”:
RediSearch支持同义词,它是由一组组组成的数据结构,每个组包含同义词。FT.SYNUPDATE可用于创建或更新具有附加术语的同义词组。
如果您想要查询拼写检查更正(类似于“您的意思是”功能),您可以使用FT.SPELLCHECK对查询执行拼写更正,返回拼写错误的建议。
字典是一组术语。字典可用于修改 RediSearch 查询拼写更正的行为,方法是在潜在的拼写更正建议中包含或排除其内容。您可以分别使用FT.DICTADD和FT.DICTDEL添加和删除术语。