极客Kevin
1/11/2025
嘿,大家好!👋
我最近在搞一个 Laravel 11 项目,使用 Redis 7 来处理数据流。项目需要快速且高级的报告功能,所以我写了一个自定义的 upsert worker。这个 worker 主要是用来处理 StatisticAggregate
模型的数据聚合。简单来说,就是随着一天的进展,数据会被聚合,比如会话数据会被更新或者插入。
问题是,到了高峰期,Redis 的流数据填充速度比处理速度快得多。😩 每分钟大约有 5000 条数据进入流,但我每分钟只能处理 2000 条。虽然数据库看起来没有瓶颈,因为在系统负载下降时,积压的数据会被处理完,但我还是搞不清楚问题出在哪儿。
这是我目前的 worker 代码:
namespace App\Console\Commands\StatisticAggregates; class StatisticAggregatesStreamWork extends Command implements Isolatable { // ... existing code ... public function handle(): int { // ... existing code ... while (true) { // ... existing code ... StatisticAggregateStream::digest(); // ... existing code ... } } }
还有 digest
函数:
namespace App\Services; class StatisticAggregateStream { // ... existing code ... public static function digest(): int { $total = 0; while (true) { $entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk)); if ($entries->isEmpty()) { return $total; } $keys = $entries->keys(); self::store( $entries->map(fn (array $payload) => unserialize($payload['data'])) ); Redis::connection('stream')->xdel(self::ingestKey(), $keys->toArray()); if ($entries->count() < self::$chunk) { return $total + $entries->count(); } $total = $total + $entries->count(); } } }
我试过调整 chunk 大小,甚至尝试过优化数据库索引,但效果不明显。😅 有没有大佬能帮我看看我是不是漏掉了什么关键点?或者有什么优化建议?
PS: 我已经在考虑是不是要换个思路,比如用队列来处理这些数据,但还没来得及尝试。🙏
谢谢大家的帮助!
全栈Alex
1/11/2025
嘿,你好啊!👋
我太理解你遇到的 Redis 数据流处理速度问题了 - 我之前也栽在这上面!在高峰期处理大量数据时,性能瓶颈确实让人头疼。让我来分享一些我自己的经验和建议,希望能帮到你。
首先,确保你的 Redis 连接是持久的。频繁的连接和断开会导致性能下降。你可以在 Laravel 的 Redis 配置中设置持久连接。
考虑使用 Laravel 的队列系统来并发处理数据。通过将数据流分成多个队列任务,你可以更好地利用多核 CPU 的优势。
namespace App\Jobs; class ProcessStatisticAggregate implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $data; public function __construct($data) { $this->data = $data; } public function handle() { // 处理单个数据条目 StatisticAggregateStream::store($this->data); } }
在 digest
方法中,你可以将每个数据条目分发到队列中:
namespace App\Services; class StatisticAggregateStream { // ... existing code ... public static function digest(): int { $total = 0; while (true) { $entries = collect(Redis::connection('stream')->xrange(self::ingestKey(), '-', '+', self::$chunk)); if ($entries->isEmpty()) { return $total; } $keys = $entries->keys(); // 使用队列处理每个条目 $entries->each(function ($payload) { ProcessStatisticAggregate::dispatch(unserialize($payload['data'])); }); Redis::connection('stream')->xdel(self::ingestKey(), $keys->toArray()); if ($entries->count() < self::$chunk) { return $total + $entries->count(); } $total = $total + $entries->count(); } } }
确保你的 Redis 配置是为高吞吐量优化的。增加 maxmemory
和调整 maxmemory-policy
可以帮助 Redis 更好地管理内存。
如果可能,尝试批量处理数据而不是逐条处理。这样可以减少数据库的写入次数,提高整体效率。
使用 Redis 的监控工具(如 redis-cli monitor
)来分析瓶颈所在。你可能会发现某些命令执行时间过长。
希望这些建议能帮到你!如果你有其他问题或者需要进一步的帮助,随时联系我。加油!🚀
祝你好运,期待你的项目顺利上线!😊