Filas, Pub/Sub e Streams
Aula 3 de 6
List como Fila
FIFO Simples
# Producer: adicionar tarefas
RPUSH tasks "processar:pedido:123"
RPUSH tasks "enviar:email:user:456"
RPUSH tasks "gerar:relatorio"
# Consumer: processar (blocking)
BRPOP tasks 0
# Retorna: 1) "tasks" 2) "processar:pedido:123"
# Blocking pop (0 = infinito)
# Consumer com timeout
BRPOP tasks 5 # espera 5 segundos, retorna nil se vazio
Reliable Queue (RPOPLPUSH)
# Padrão: usar RPOPLPUSH para criar fila confiável
# Processa tarefa e SÓ remove quando concluir
# 1. Adicionar tarefas
RPUSH queue:tasks "task:1"
RPUSH queue:tasks "task:2"
# 2. Mover para processing (atômico)
RPOPLPUSH queue:tasks queue:processing
# Retorna: "task:1" e move para processing
# 3. Se processar com sucesso → remover de processing
LREM queue:processing 1 "task:1"
# 4. Se falhar → devolver para fila
RPOPLPUSH queue:processing queue:tasks
# BRPOPLPUSH (blocking version)
BRPOPLPUSH queue:tasks queue:processing 0
Retry Pattern
# Filas com retry
# Producer
RPUSH email:queue '{"to":"[email protected]","subject":"Bem-vindo!"}'
# Consumer com retry
while true; do
# Pegar tarefa
task=$(BRPOPLPUSH email:queue email:processing 0)
if process_email "$task"; then
# Sucesso: remover da processing
LREM email:processing 1 "$task"
else
# Falha: verificar retry count
echo "Falha ao processar: $task"
# Incrementar contador de tentativas
attempts=$(REDIS HINCRBY email:retry "$task" attempts 1)
if [[ $attempts -lt 3 ]]; then
# Devolver para fila principal
RPUSH email:queue "$task"
else
# Mover para dead letter queue
RPUSH email:dead "$task"
fi
LREM email:processing 1 "$task"
fi
done
Pub/Sub
# Publisher: envia mensagem para canal
PUBLISH channel:notifications "Nova mensagem"
PUBLISH channel:orders '{"order_id":123,"status":"paid"}'
# Subscriber 1: subscribe simples
SUBSCRIBE channel:notifications
# Reading messages... (blocking)
# 1) "subscribe"
# 2) "channel:notifications"
# 3) (integer) 1
# Próximo:
# 1) "message"
# 2) "channel:notifications"
# 3) "Nova mensagem"
# Subscriber 2: padrões glob
PSUBSCRIBE channel:*
# Recebe todas as mensagens de canais começando com "channel:"
# Unsubscribe
UNSUBSCRIBE channel:notifications
PUNSUBSCRIBE channel:*
Pub/Sub — Limitações
# ⚠️ Pub/Sub é fire-and-forget:
# - Mensagens não são persistidas
# - Se subscriber está offline, perde a mensagem
# - Não há confirmação (ack)
# - Não há replay
# ❌ NÃO usar Pub/Sub para:
# - Filas de tarefas (use List ou Stream)
# - Mensagens importantes (use Stream com ack)
# - Eventos que precisam de histórico
# ✅ Usar Pub/Sub para:
# - Notificações em tempo real
# - Broadcast para múltiplos consumidores
# - Chat, notificações de UI
# - Sinais de cache invalidation
Streams
Produzir e Consumir
# XADD: adicionar entrada ao stream
XADD orders * order_id 123 user "maria" amount 299.90
XADD orders * order_id 124 user "joao" amount 150.00
# XLEN: comprimento
XLEN orders
# XRANGE: ler entradas por range
XRANGE orders - + # todas
XRANGE orders - + COUNT 5 # primeiras 5
XRANGE orders 1717000000000-0 + # a partir de timestamp
# XREVRANGE: ordem reversa
XREVRANGE orders + - COUNT 5 # últimas 5
# XREAD: ler com blocking
XREAD BLOCK 0 STREAMS orders 0 # blocking desde o início
XREAD BLOCK 0 STREAMS orders $ # apenas mensagens novas
Consumer Groups
# Criar grupo de consumidores
XGROUP CREATE orders group1 $
XGROUP CREATE orders group1 0 MKSTREAM # criar a partir do início
# Consumir via grupo
XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 0 STREAMS orders >
# Confirmar processamento (ACK)
XACK orders group1 1717000000000-0
# Ver pending entries (não confirmadas)
XPENDING orders group1
# Ver pending com detalhes
XPENDING orders group1 - + 10
# Reivindicar mensagens de consumidor falho
XCLAIM orders group1 consumer2 60000 1717000000000-0
# 60000ms = 60s de idle para considerar falho
# Ver informações do grupo
XINFO GROUPS orders
XINFO CONSUMERS orders group1
Dead Letter Pattern
# Stream com dead letter
# Consumer com tentativas
LIMIT=3
while true; do
# Ler mensagem do grupo
result=$(XREADGROUP GROUP processors group1 COUNT 1 BLOCK 0 STREAMS events ">")
entry_id=$(echo "$result" | ... ) # extrair ID
if process_message "$result"; then
# Sucesso: ACK
XACK events processors "$entry_id"
else
# Incrementar tentativas (via hash externo)
attempts=$(redis-cli HINCRBY events:retry "$entry_id" attempts 1)
if [[ $attempts -ge $LIMIT ]]; then
# Mover para dead letter stream
XADD events:dead * source "$entry_id" error "max_retries"
XACK events processors "$entry_id"
redis-cli DEL "events:retry"
fi
fi
done
Stream vs Kafka
| Feature | Redis Stream | Apache Kafka |
|---|---|---|
| Persistência | RDB/AOF | Log commitado em disco |
| Ordenação | Sim (por entry ID) | Sim (por offset) |
| Consumer Groups | Sim | Sim |
| Replay | Sim | Sim |
| Retenção | XTRIM / MAXLEN | Configurável (time/size) |
| Throughput | ~1M msg/s | ~1M msg/s |
| Particionamento | Sim (múltiplos streams) | Sim (partitions) |
| Garantia | At-least-once | Exactly-once (com transactions) |
Bull/BullMQ
# BullMQ: fila robusta para Node.js baseada em Redis
# Instalação
npm install bullmq
# Producer
cat << 'EOF' > producer.js
const { Queue } = require('bullmq');
const queue = new Queue('email-queue', {
connection: { host: 'localhost', port: 6379 }
});
async function main() {
// Job simples
await queue.add('send-email', {
to: '[email protected]',
subject: 'Bem-vindo!'
});
// Job com delay
await queue.add('send-reminder', {
to: '[email protected]'
}, {
delay: 3600000 // 1 hora
});
// Job repetitivo
await queue.add('daily-report', {}, {
repeat: { pattern: '0 8 * * *' }
});
// Job com prioridade
await queue.add('urgent-task', data, {
priority: 1
});
console.log('Jobs adicionados!');
process.exit(0);
}
main().catch(console.error);
EOF
# Worker
cat << 'EOF' > worker.js
const { Worker } = require('bullmq');
const worker = new Worker('email-queue', async job => {
console.log(`Processando job ${job.id} (${job.name})`);
// Simular processamento
await sendEmail(job.data.to, job.data.subject);
// Progresso
await job.updateProgress(100);
return { sent: true };
}, {
connection: { host: 'localhost', port: 6379 },
concurrency: 10, // processar 10 jobs em paralelo
limiter: {
max: 5, // max jobs por
duration: 1000 // segundo
}
});
worker.on('completed', job => {
console.log(`Job ${job.id} concluído`);
});
worker.on('failed', (job, err) => {
console.error(`Job ${job.id} falhou: ${err.message}`);
});
EOF
# Job Scheduler (repeatable jobs)
cat << 'EOF' > scheduler.js
const { QueueScheduler } = require('bullmq');
const scheduler = new QueueScheduler('reports-queue', {
connection: { host: 'localhost', port: 6379 }
});
// Scheduler gerencia jobs repetitivos no Redis
// Mantém cron e dispara no horário correto
console.log('Scheduler ativo');
EOF
Lab: Sistema de Notificações com Stream + Consumer Group
cat > notification-system.sh << 'SCRIPT'
#!/bin/bash
set -euo pipefail
echo "=== SISTEMA DE NOTIFICAÇÕES ==="
# 1. Criar stream e consumer group
echo ""
echo "--- 1. Setup ---"
redis-cli << 'REDIS'
XGROUP CREATE notifications email_group $ MKSTREAM
XGROUP CREATE notifications sms_group $ MKSTREAM
XGROUP CREATE notifications push_group $ MKSTREAM
REDIS
# 2. Produzir notificações (simular eventos)
echo ""
echo "--- 2. Produzindo notificações ---"
for i in $(seq 1 10); do
type=""
case $((RANDOM % 3)) in
0) type="email" ;;
1) type="sms" ;;
2) type="push" ;;
esac
redis-cli XADD notifications * \
type "$type" \
user_id "$((1000 + i))" \
message "Notificação $i enviada via $type" \
priority "$((RANDOM % 5 + 1))"
done
echo "Stream size: $(redis-cli XLEN notifications)"
# 3. Consumir como grupo (cada grupo vê todas as mensagens)
echo ""
echo "--- 3. Consumer Email ---"
redis-cli XREADGROUP GROUP email_group consumer1 COUNT 5 STREAMS notifications ">"
echo ""
echo "--- 4. Consumer SMS ---"
redis-cli XREADGROUP GROUP sms_group consumer1 COUNT 5 STREAMS notifications ">"
# 4. ACK das processadas
echo ""
echo "--- 5. ACK das mensagens processadas ---"
ENTRIES=$(redis-cli XPENDING notifications email_group | head -1)
# ACK apenas da primeira (demonstração)
# 5. Ver pending
echo ""
echo "--- 6. Pending entries ---"
redis-cli XPENDING notifications email_group
# 6. Monitorar grupos
echo ""
echo "--- 7. Informações do Stream ---"
redis-cli XINFO GROUPS notifications
# 7. Cleanup
echo ""
echo "--- 8. Limpeza ---"
redis-cli DEL notifications
echo "Stream removido."
SCRIPT
chmod +x notification-system.sh
Listas (BRPOP) são filas simples. RPOPLPUSH/BRPOPLPUSH criam filas confiáveis com retry/redelivery. Pub/Sub é broadcast fire-and-forget. Streams oferecem consumer groups com ACK, pending entries e replay — a solução mais completa para mensageria no Redis.