CloudFog API Gateway

Limited Time

200+ AI Models Integration Hub

Claim Offer Now
Resolvedmysql

Laravel 11 项目性能问题:如何优化 Redis 数据流处理速度?🚀

极客Kevin

1/11/2025

113 views6 likes

嘿,大家好!👋

我最近在搞一个 Laravel 11 项目,使用 Redis 7 来处理数据流。项目需要快速且高级的报告功能,所以我写了一个自定义的 upsert worker。这个 worker 主要是用来处理 StatisticAggregate 模型的数据聚合。简单来说,就是随着一天的进展,数据会被聚合,比如会话数据会被更新或者插入。

问题是,到了高峰期,Redis 的流数据填充速度比处理速度快得多。😩 每分钟大约有 5000 条数据进入流,但我每分钟只能处理 2000 条。虽然数据库看起来没有瓶颈,因为在系统负载下降时,积压的数据会被处理完,但我还是搞不清楚问题出在哪儿。

这是我目前的 worker 代码:

namespace App\Console\Commands\StatisticAggregates; class StatisticAggregatesStreamWork extends Command implements Isolatable { // ... existing code ... public function handle(): int { // ... existing code ... while (true) { // ... existing code ... StatisticAggregateStream::digest(); // ... existing code ... } } }

还有 digest 函数:

namespace App\Services; class StatisticAggregateStream { // ... existing code ... public static function digest(): int { $total = 0; while (true) { $entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk)); if ($entries->isEmpty()) { return $total; } $keys = $entries->keys(); self::store( $entries->map(fn (array $payload) => unserialize($payload['data'])) ); Redis::connection('stream')->xdel(self::ingestKey(), $keys->toArray()); if ($entries->count() < self::$chunk) { return $total + $entries->count(); } $total = $total + $entries->count(); } } }

我试过调整 chunk 大小,甚至尝试过优化数据库索引,但效果不明显。😅 有没有大佬能帮我看看我是不是漏掉了什么关键点?或者有什么优化建议?

PS: 我已经在考虑是不是要换个思路,比如用队列来处理这些数据,但还没来得及尝试。🙏

谢谢大家的帮助!

1 Answers

全栈Alex

1/11/2025

Best Answer2

Answer #1 - Best Answer

嘿,你好啊!👋

我太理解你遇到的 Redis 数据流处理速度问题了 - 我之前也栽在这上面!在高峰期处理大量数据时,性能瓶颈确实让人头疼。让我来分享一些我自己的经验和建议,希望能帮到你。

1. 优化 Redis 连接

首先,确保你的 Redis 连接是持久的。频繁的连接和断开会导致性能下降。你可以在 Laravel 的 Redis 配置中设置持久连接。

2. 增加并发处理

考虑使用 Laravel 的队列系统来并发处理数据。通过将数据流分成多个队列任务,你可以更好地利用多核 CPU 的优势。

namespace App\Jobs; class ProcessStatisticAggregate implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $data; public function __construct($data) { $this->data = $data; } public function handle() { // 处理单个数据条目 StatisticAggregateStream::store($this->data); } }

digest 方法中,你可以将每个数据条目分发到队列中:

namespace App\Services; class StatisticAggregateStream { // ... existing code ... public static function digest(): int { $total = 0; while (true) { $entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk)); if ($entries->isEmpty()) { return $total; } $keys = $entries->keys(); // 使用队列处理每个条目 $entries->each(function ($payload) { ProcessStatisticAggregate::dispatch(unserialize($payload['data'])); }); Redis::connection('stream')->xdel(self::ingestKey(), $keys->toArray()); if ($entries->count() < self::$chunk) { return $total + $entries->count(); } $total = $total + $entries->count(); } } }

3. 调整 Redis 配置

确保你的 Redis 配置是为高吞吐量优化的。增加 maxmemory 和调整 maxmemory-policy 可以帮助 Redis 更好地管理内存。

4. 使用批量处理

如果可能,尝试批量处理数据而不是逐条处理。这样可以减少数据库的写入次数,提高整体效率。

5. 监控和分析

使用 Redis 的监控工具(如 redis-cli monitor)来分析瓶颈所在。你可能会发现某些命令执行时间过长。

6. 常见错误提醒

  • 确保没有在循环中创建新的 Redis 连接。
  • 检查是否有未处理的异常导致 worker 停止。

希望这些建议能帮到你!如果你有其他问题或者需要进一步的帮助,随时联系我。加油!🚀

祝你好运,期待你的项目顺利上线!😊

CloudFog API Gateway 🔥 New User Special

💥 New User Offer: Get $1 Credit for ¥0.5

Claim Offer Now