CloudFog API Gateway

Limited Time

200+ AI Models Integration Hub

Claim Offer Now
Resolvedmysql

Laravel 11 项目中 Redis 数据流处理过慢,如何优化 `StatisticAggregate` 模型的 upsert 操作?🚀

架构师David

1/23/2025

103 views3 likes

嘿,大家好!👋

我最近在搞一个 Laravel 11 项目,使用 Redis 7 来处理数据流。项目需要快速且高级的报告功能,所以我有个叫 StatisticAggregate 的模型来汇总数据。基本上,它会在一天中不断更新或创建会话数据。问题是,到了高峰期,Redis 流里的数据填得比我处理得还快!😅

我有几百条数据需要更新或插入,所以用 upsert 似乎是个好主意,直接对接 MySQL 8 数据库。数据会被追加到 Redis 流中,然后用一个自定义的 artisan 命令(类似 Laravel Pulse)来无限循环地拉取数据块并 upsert 到数据库。

问题是,随着时间的推移,流里的数据越来越多,处理 1000 条数据块大约需要 20-30 秒,也就是每分钟 2000 条,但高峰期每分钟有 5000 条或更多的数据进入流。😩

这看起来不像是数据库的瓶颈,因为即使 Redis 里有大量积压数据,表里有 10 万行数据,到了晚上系统负载下降时,积压的数据就会被处理完。如果是表慢的话,应该会在这里显现出来。

我是不是漏掉了什么?有没有大佬能帮我看看我的代码?🙏

我的 worker 命令 php artisan statistic:aggregates:stream:work 是这样的:

namespace App\Console\Commands\StatisticAggregates; // ... existing code ... class StatisticAggregatesStreamWork extends Command implements Isolatable { // ... existing code ... public function handle(): int { // ... existing code ... while (true) { // ... existing code ... StatisticAggregateStream::digest(); // ... existing code ... } } }

还有 digest 函数:

namespace App\Services; // ... existing code ... class StatisticAggregateStream { // ... existing code ... public static function digest(): int { $total = 0; while (true) { $entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk)); if ($entries->isEmpty()) { return $total; } $keys = $entries->keys(); self::store( $entries->map(fn (array $payload) => unserialize($payload['data'])) ); Redis::connection('stream')->xdel(self::ingestKey(), $keys->toArray()); if ($entries->count() < self::$chunk) { return $total + $entries->count(); } $total = $total + $entries->count(); } } }

我已经尝试过调整 chunk 大小,甚至考虑过增加 worker 的数量,但似乎都没有解决问题。有没有人遇到过类似的情况?或者有什么优化建议吗?🤔

PS: 我真心希望能在下个高峰期之前解决这个问题,不然老板要找我谈心了。😅

谢谢大家的帮助!🙏

1 Answers

架构师Alex

1/23/2025

Best Answer13

Answer #1 - Best Answer

嘿,你好啊!👋

我太理解你遇到的 Redis 数据流处理过慢的问题了——我之前也在类似的项目中遇到过这种情况!在高峰期,数据流如洪水般涌入,而我们的处理速度却跟不上,真是让人头疼。😅

解决方案

首先,确保你的 Redis 和 MySQL 都是最新的,并且配置优化过。接下来,我有几个建议可以帮助你提升性能:

  1. 批量处理:你已经在用 upsert,这很好,但你可以尝试进一步优化批量处理的方式。确保你在一次数据库操作中处理尽可能多的数据。

  2. 异步处理:考虑使用队列系统(如 Laravel 的队列)来异步处理数据。这样可以在高峰期更好地分配负载。

  3. Redis 管道:使用 Redis 的管道功能来批量获取和删除数据,这样可以减少网络往返的次数。

  4. 优化数据库索引:确保你的数据库表有适当的索引,特别是在 upsert 操作中涉及的列上。

  5. 增加并发:如果可能,增加 worker 的数量来提高并发处理能力。

代码示例

以下是一些代码示例,展示如何使用 Redis 管道和优化批量处理:

namespace App\Services; // ... existing code ... class StatisticAggregateStream { // ... existing code ... public static function digest(): int { $total = 0; while (true) { // 使用 Redis 管道来批量获取数据 $entries = collect(Redis::connection('stream')->pipeline(function ($pipe) { $pipe->xrange(self::ingestKey(), '-', '+', self::$chunk); })); if ($entries->isEmpty()) { return $total; } $keys = $entries->keys(); // 批量处理数据 self::store( $entries->map(fn (array $payload) => unserialize($payload['data'])) ); // 使用 Redis 管道来批量删除数据 Redis::connection('stream')->pipeline(function ($pipe) use ($keys) { $pipe->xdel(self::ingestKey(), $keys->toArray()); }); if ($entries->count() < self::$chunk) { return $total + $entries->count(); } $total = $total + $entries->count(); } } }

个人经验提示

  • 监控和日志:使用工具监控 Redis 和 MySQL 的性能,找出瓶颈。
  • 测试不同的 chunk 大小:在不同的负载下测试不同的 chunk 大小,找到最佳值。
  • 避免长时间锁定:确保你的数据库操作不会长时间锁定表,影响其他操作。

希望这些建议能帮到你!如果还有其他问题,随时来问,我很乐意帮忙。加油!🚀

祝你在下个高峰期顺利度过,不用再被老板找去“谈心”了。😄

CloudFog API Gateway 🔥 New User Special

💥 New User Offer: Get $1 Credit for ¥0.5

Claim Offer Now