Если у вас уже есть Redis в стеке и вы пишете на PHP, то желание «быстро собрать очередь» из Redis Streams выглядит естественно. Streams дают конкурентных потребителей, подтверждение обработки (ack), «висящие» сообщения в pending списке и автоматическое повторное назначение с XAUTOCLAIM. Во многих задачах это проще и быстрее, чем разворачивать отдельный брокер. В этой статье я покажу, как построить надежную очередь на Redis Streams в PHP: от продюсера и consumer group до ретраев, идемпотентности и мониторинга. В конце — честное сравнение с RabbitMQ: когда Streams хороши, а когда брокер сообщений все же предпочтительнее.
Коротко о модели Redis Streams
Streams — это лог структурированных записей (entry) с монотонно растущими идентификаторами, обычно создаваемыми автоматически (XADD mystream * field value ...). Каждая запись попадает в «ленточку» потока; читатели используют либо обычное чтение (XREAD), либо чтение от имени группы (XREADGROUP).
Consumer group — это способ распределить сообщения между конкурентными воркерами: один элемент обрабатывает только один consumer в группе. Когда consumer читает запись через XREADGROUP GROUP group consumer STREAMS mystream >, запись попадает в Pending Entries List (PEL) группы. Пока она не подтверждена (XACK), система считает ее «в обработке».
Если consumer умер или «забыл» сделать XACK, запись остается в PEL и может быть «переназначена» другому потребителю (redelivery) после тайм-аута бездействия через XAUTOCLAIM (или старым способом XCLAIM).
Доставка в Redis Streams — «at-least-once». Это означает, что повторы возможны, и вам нужна идемпотентность обработчика.
Когда Redis Streams удобнее RabbitMQ
Практика показывает, что Redis Streams отлично подходят для:
- Быстрых внутренних очередей в веб-проектах на PHP, где Redis уже используется для кэша/сессий.
- Задач, где важны простота деплоя и минимальная операционка (без отдельного брокера).
- Фан-аутов и событийной обработки с базовыми требованиями к маршрутизации.
- Очередей с высокой скоростью продюсинга, коротким временем жизни сообщений и «подтираемым» хвостом.
А вот если у вас сложная маршрутизация, пер-сообщенческие TTL/приоритеты, подтверждения издателя, стабильная долговечность и наблюдаемость уровня «классического» брокера — RabbitMQ зачастую лучше. Ниже я подробно сравню особенности.
Схема очереди: один поток — одна тема
Базовый дизайн: на каждую «тему» (тип событий) один stream. Для горизонтального масштабирования можно шардировать: например, orders:0 … orders:7 по хэшу ключа заказа, чтобы сохранить локальный порядок по ключу и увеличить пропускную способность при росте воркеров.
Внутри группы у каждого воркера свой consumer name — это просто строка (например, api-1, api-2). Сообщения распределяются конкурентно: один воркер получает один элемент, другой — следующий, и так далее.

Producer на PHP (PhpRedis)
Ниже — минимальный продюсер. Сериализация — любая удобная: JSON, MessagePack, Protobuf. В примере — JSON. Важный момент: добавляйте уникальный «бизнес-идентификатор» события для идемпотентности (например, event_id).
// php -d detect_unicode=0 producer.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379, 1.5);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // не рвем блокирующие операции
$stream = 'orders';
$payload = [
'event' => 'order.created',
'event_id' => bin2hex(random_bytes(16)),
'order_id' => 12345,
'sum' => 4999,
'ts' => time()
];
$id = $redis->xAdd($stream, '*', [
'v' => json_encode($payload, JSON_UNESCAPED_UNICODE)
]);
echo "Appended to $stream with ID $id\n";
Ключи в записи лучше нормализовать и минимизировать (например, хранить целиком JSON в поле v) — это экономит память и ускоряет операции.
Создаем consumer group и читаем сообщения
Группа создается один раз. Параметр $ означает «начать читать только новые сообщения», 0-0 — «зачитать историю».
$redis = new Redis();
$redis->connect('127.0.0.1', 6379, 1.5);
$stream = 'orders';
$group = 'billing';
try {
// MKSTREAM создаст поток, если его еще нет
$ok = $redis->xGroup('CREATE', $stream, $group, '$', true);
} catch (RedisException $e) {
// Группа может уже существовать — это нормально в идемпотентном деплое
}
Блокирующий consumer: читаем батчом, подтверждаем XACK после успешной обработки. На случай падений — периодически «автоклеймим» висящие записи из PEL.
$redis = new Redis();
$redis->connect('127.0.0.1', 6379, 1.5);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
$stream = 'orders';
$group = 'billing';
$consumer = gethostname() . ':' . getmypid();
$blockMs = 5000; // блокируемся до 5 сек
$batch = 50; // размер пачки
$minIdle = 60000; // 60 сек бездействия для redelivery
while (true) {
// 1) Пытаемся забирать новые
$data = $redis->xReadGroup($group, $consumer, [$stream => '>'], $batch, $blockMs);
if ($data) {
foreach ($data[$stream] as $id => $fields) {
$ok = handle($fields['v']); // ваша бизнес-логика
if ($ok) {
$redis->xAck($stream, $group, [$id]);
} else {
// Не ack: останется в PEL, потом заберем в ретраях
}
}
}
// 2) Периодически подбираем «зависшие» записи через XAUTOCLAIM
// Возвращает массив: [0 => new_start_id, 1 => [id => fields, ...]]
$claimed = $redis->xAutoClaim($stream, $group, $consumer, $minIdle, '0-0', $batch);
if (!empty($claimed[1])) {
foreach ($claimed[1] as $id => $fields) {
$ok = handle($fields['v']);
if ($ok) {
$redis->xAck($stream, $group, [$id]);
}
}
}
}
function handle($json) {
$payload = json_decode($json, true);
// Пример: идемпотентная обработка
return process_order_created($payload);
}
Заметьте: XAUTOCLAIM доступен в Redis 6.2+. Если ваша версия ниже, используйте XPENDING и XCLAIM, но это чуть сложнее.
Идемпотентность: обязательна
Из-за «at-least-once» повторы возможны. Поэтому обработчик должен быть идемпотентным. На практике удобны три уровня защиты:
- Стабильный уникальный идентификатор события (
event_id) в payload. Генерирует продюсер. - Быстрая дедупликация в Redis:
SETNX dedup:{event_id} 1 EX 86400. Если ключ уже есть — пропускаем обработку. - Идемпотентная бизнес-логика: например, UPSERT в БД, «проведен ли платеж» флаг, идемпотентные API с ключом идемпотентности.
function process_order_created(array $p) {
$dedupKey = 'dedup:' . $p['event_id'];
$r = new Redis();
$r->connect('127.0.0.1', 6379, 1.5);
// Пытаемся зарезервировать обработку
$ok = $r->set($dedupKey, 1, ['nx', 'ex' => 86400]);
if (!$ok) {
// Уже обрабатывали
return true;
}
// Бизнес-операции должны быть сами по себе идемпотентны
$ok = db_upsert_order($p['order_id'], $p['sum'], $p['ts']);
if (!$ok) {
// Бизнес-ошибка: разрешаем повторную попытку, ключ можно удалить для повторной обработки
$r->del($dedupKey);
return false;
}
return true;
}
Если вы вызываете внешние API (например, отправка писем или платежей), используйте их механизмы идемпотентности — ключ идемпотентности, детерминированные идентификаторы, «проверку статуса» вместо повторной команды.
Ретраи, PEL и DLQ
В Streams для ретраев есть два основных подхода:
- Оставлять запись в PEL до следующей попытки и подбирать ее через
XAUTOCLAIMпо истеченииmin-idle. - После N неудачных попыток переносить запись в выделенный «DLQ»-поток, например
orders:dlq, для ручной/асинхронной разборки.
Счетчик попыток можно хранить в payload (например, retry), увеличивать и пересоздавать запись через XADD (идентификатор события при этом неизменный). Либо опираться на XPENDING, который показывает, сколько раз запись была доставлена.
// Пример простого backoff: 1, 5, 30, 300 секунд
$delays = [1, 5, 30, 300];
function retry_or_dlq(Redis $r, $stream, $group, $id, array $payload, int $attempt) {
global $delays;
if ($attempt >= count($delays)) {
$r->xAdd($stream . ':dlq', '*', ['v' => json_encode($payload)]);
$r->xAck($stream, $group, [$id]);
return;
}
$payload['retry'] = $attempt + 1;
$payload['next_ts'] = time() + $delays[$attempt];
$r->xAdd($stream, '*', ['v' => json_encode($payload)]);
$r->xAck($stream, $group, [$id]);
}
Отложенные задачи (delayed) удобнее держать в отдельном sorted set по времени и периодически перемещать в stream по наступлению срока. Это надежнее, чем «крутить» записи в PEL, и проще управлять расписанием.
Тримминг и рост памяти
Streams растут. Чтобы обуздать память, используйте тримминг:
XADD mystream MAXLEN ~ 1000000 field value— «мягкий» тримминг до примерно миллиона записей.XTRIM mystream MAXLEN 500000— жестко обрежет хвост до полумиллиона записей.
«Мягкий» вариант быстрее и почти всегда достаточно точен. Критично понимать: тримминг не удаляет записи, которые еще висят в PEL. Поэтому держите PEL чистым: либо XACK после успеха, либо переносите в DLQ.
По персистентности: если сообщения ценны сами по себе (не только как триггеры), включайте AOF с appendfsync everysec и регулярные снапшоты RDB. Помните, что Redis — in-memory. При рестарте из RDB вы потеряете все, что не успело попасть в снапшот, а при AOF с режимом everysec — теоретически до 1 секунды данных.
Производительность PHP-воркеров
Несколько практических советов:
- Читайте батчами (
COUNT) и подтверждайте батчами (XACKс несколькими ID) — меньше сетевых раунд-трипов. - Используйте блокирующий
XREADGROUPс таймаутом; не делайте «busy loop». - Долгоживущие процессы PHP следите за утечками: периодически перезапускайте воркеры или используйте лимит обработанных задач.
- Разделяйте CPU-интенсивные задачи и IO-ограниченные; масштабируйте количество воркеров относительно профиля нагрузки.
- Стабилизируйте порядок по ключу (шардинг) если есть зависимость шагов по одному объекту (например, все события по
order_idв один шард).
Операционка: XPENDING, XINFO и метрики
Для ежедневного контроля полезны:
XPENDING mystream group— диапазон ID в PEL, количество и min/max idle.XINFO CONSUMERS mystream group— список consumers и их pending.XINFO GROUPS mystream— общая статистика по группам.
Если видите рост PEL и «idle» уходит в минуты — значит, воркеры не успевают, или сообщения падают и возвращаются в ретраи. В первом случае масштабируйте; во втором — улучшайте идемпотентность и обработку ошибок.

Система перезапуска воркеров через systemd
Для предсказуемости в продакшене держите воркеры как службы. Пример systemd юнита:
[Unit]
Description=PHP Redis Streams worker (billing)
After=network.target redis.service
[Service]
Type=simple
ExecStart=/usr/bin/php /srv/app/worker-billing.php
Restart=always
RestartSec=2
KillSignal=SIGINT
Environment=APP_ENV=prod
WorkingDirectory=/srv/app
[Install]
WantedBy=multi-user.target
Больше про оркестрацию, рестарты и логи — в материале как поднимать и мониторить PHP-воркеры через systemd и Supervisor. Если Redis у вас уже используется для кэша и сессий, пригодится разбор тонкостей в статье Redis для PHP: сессии и object cache.
Паттерны
Отложенная доставка (delay)
Храним задачи в sorted set с членом = сериализованный payload и score = unix time. Периодический воркер выбирает готовые записи и XADD в целевой stream. Так же удобно реализовывать «не раньше чем».
Переотправка без дубликатов
Если нужна гарантия «не больше одного сайд-эффекта», используйте «идемпотентный ключ» как часть бизнес-инварианта (например, payment_id) и проверяйте перед выполнением. Потенциальные дубликаты по потоку больше не страшны.
Разделение по типам
Лучше разные потоки для разных бизнес-событий, чем «мега-поток» со свитчом по типу. Так легче тримминг, проще наблюдать PEL и масштабировать воркеров независимо.
Сравнение с RabbitMQ: нюансы
Где Streams сильны:
- Скорость и простота: один Redis, минимум операционки.
- Гибкий consumer group, «батчи» чтения, низкая латентность.
- Прекрасная интеграция там, где Redis уже «центр кэша» и краткоживущие сообщения.
Где RabbitMQ выигрывает:
- Роутинг и топологии (fanout, topic, headers), dead-letter queues «из коробки».
- TTL, приоритеты, подтверждения издателя, задержанные обменники.
- Отдельный дисковый брокер, целевая наблюдаемость и mature-экосистема.
Если вам важны «точно один раз» семантики — их нет ни там, ни там «в общем случае». В обоих мирах вы придете к идемпотентности приложения. Однако RabbitMQ предоставляет больше средств управления жизненным циклом сообщений и маршрутизацией, Streams — минималистичны и очень быстры.
Правильный вопрос не «Streams или RabbitMQ вообще», а «что проще и надежнее для моей конкретной нагрузки и требований».
Чеклист запуска в продакшене
- Версия Redis 6.2+ для
XAUTOCLAIM. Включите AOF, если важна минимальная потеря данных. - Определите политику тримминга
MAXLEN ~ Nдля каждого потока. - Сделайте идемпотентность: уникальные
event_id,SETNXдедуп, идемпотентные операции в БД и внешних API. - Ретраи: лимит попыток, backoff, DLQ-поток, алерты на рост DLQ.
- Мониторинг: метрики PEL, лагов потребителей, ошибок и длительностей.
- Конкурентность воркеров: батчи чтения и ack, отсутствие busy loop.
- Управление конфигурацией Redis: память, eviction policy, персистентность, репликация/фейловер.
- Тесты на отказоустойчивость: падение воркера, повторная доставка, сетевые флаппы.
Расширения: шардирование и порядок
Если важен порядок по ключу (например, все изменения заказа последовательно), используйте шардирование по хэшу ключа на несколько потоков и гарантируйте, что один consumer обрабатывает один шард. Это даст линейный порядок внутри шарда и масштаб по числу шардов.
Итоги
Redis Streams в сочетании с PHP — мощный и простой инструмент для очередей: вы получите быстрый продюсинг, конкурентных потребителей, подтверждения обработки, редоставку «зависших» задач и отличную производительность. За это придется заплатить дисциплиной на уровне приложения: аккуратная идемпотентность, продуманные ретраи с DLQ, регулярный тримминг и мониторинг PEL.
Если вам нужна сложная маршрутизация, богатые политики TTL/приоритетов и зрелая экосистема управления сообщениями, RabbitMQ останется сильным выбором. Но для огромного класса задач внутри веб-проектов Streams выигрывают простотой и скоростью внедрения. При росте нагрузки удобно изолировать воркеры на отдельном облачном VDS и масштабироваться горизонтально.


