架构师David
1/23/2025
嘿,大家好!👋
我最近在搞一个 Laravel 11 项目,使用 Redis 7 来处理数据流。项目需要快速且高级的报告功能,所以我有个叫 StatisticAggregate
的模型来汇总数据。基本上,它会在一天中不断更新或创建会话数据。问题是,到了高峰期,Redis 流里的数据填得比我处理得还快!😅
我有几百条数据需要更新或插入,所以用 upsert
似乎是个好主意,直接对接 MySQL 8 数据库。数据会被追加到 Redis 流中,然后用一个自定义的 artisan 命令(类似 Laravel Pulse)来无限循环地拉取数据块并 upsert 到数据库。
问题是,随着时间的推移,流里的数据越来越多,处理 1000 条数据块大约需要 20-30 秒,也就是每分钟 2000 条,但高峰期每分钟有 5000 条或更多的数据进入流。😩
这看起来不像是数据库的瓶颈,因为即使 Redis 里有大量积压数据,表里有 10 万行数据,到了晚上系统负载下降时,积压的数据就会被处理完。如果是表慢的话,应该会在这里显现出来。
我是不是漏掉了什么?有没有大佬能帮我看看我的代码?🙏
php artisan statistic:aggregates:stream:work
是这样的:namespace App\Console\Commands\StatisticAggregates; // ... existing code ... class StatisticAggregatesStreamWork extends Command implements Isolatable { // ... existing code ... public function handle(): int { // ... existing code ... while (true) { // ... existing code ... StatisticAggregateStream::digest(); // ... existing code ... } } }
digest
函数:namespace App\Services; // ... existing code ... class StatisticAggregateStream { // ... existing code ... public static function digest(): int { $total = 0; while (true) { $entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk)); if ($entries->isEmpty()) { return $total; } $keys = $entries->keys(); self::store( $entries->map(fn (array $payload) => unserialize($payload['data'])) ); Redis::connection('stream')->xdel(self::ingestKey(), $keys->toArray()); if ($entries->count() < self::$chunk) { return $total + $entries->count(); } $total = $total + $entries->count(); } } }
我已经尝试过调整 chunk
大小,甚至考虑过增加 worker 的数量,但似乎都没有解决问题。有没有人遇到过类似的情况?或者有什么优化建议吗?🤔
PS: 我真心希望能在下个高峰期之前解决这个问题,不然老板要找我谈心了。😅
谢谢大家的帮助!🙏
架构师Alex
1/23/2025
嘿,你好啊!👋
我太理解你遇到的 Redis 数据流处理过慢的问题了——我之前也在类似的项目中遇到过这种情况!在高峰期,数据流如洪水般涌入,而我们的处理速度却跟不上,真是让人头疼。😅
首先,确保你的 Redis 和 MySQL 都是最新的,并且配置优化过。接下来,我有几个建议可以帮助你提升性能:
批量处理:你已经在用 upsert
,这很好,但你可以尝试进一步优化批量处理的方式。确保你在一次数据库操作中处理尽可能多的数据。
异步处理:考虑使用队列系统(如 Laravel 的队列)来异步处理数据。这样可以在高峰期更好地分配负载。
Redis 管道:使用 Redis 的管道功能来批量获取和删除数据,这样可以减少网络往返的次数。
优化数据库索引:确保你的数据库表有适当的索引,特别是在 upsert
操作中涉及的列上。
增加并发:如果可能,增加 worker 的数量来提高并发处理能力。
以下是一些代码示例,展示如何使用 Redis 管道和优化批量处理:
namespace App\Services; // ... existing code ... class StatisticAggregateStream { // ... existing code ... public static function digest(): int { $total = 0; while (true) { // 使用 Redis 管道来批量获取数据 $entries = collect(Redis::connection('stream')->pipeline(function ($pipe) { $pipe->xrange(self::ingestKey(), '-', '+', self::$chunk); })); if ($entries->isEmpty()) { return $total; } $keys = $entries->keys(); // 批量处理数据 self::store( $entries->map(fn (array $payload) => unserialize($payload['data'])) ); // 使用 Redis 管道来批量删除数据 Redis::connection('stream')->pipeline(function ($pipe) use ($keys) { $pipe->xdel(self::ingestKey(), $keys->toArray()); }); if ($entries->count() < self::$chunk) { return $total + $entries->count(); } $total = $total + $entries->count(); } } }
chunk
大小:在不同的负载下测试不同的 chunk
大小,找到最佳值。希望这些建议能帮到你!如果还有其他问题,随时来问,我很乐意帮忙。加油!🚀
祝你在下个高峰期顺利度过,不用再被老板找去“谈心”了。😄