kb.erickguedes.com
Redis: Cache, Fila e Tempo Real

Filas, Pub/Sub e Streams

Aula 3 de 6

List como Fila

FIFO Simples

# Producer: adicionar tarefas
RPUSH tasks "processar:pedido:123"
RPUSH tasks "enviar:email:user:456"
RPUSH tasks "gerar:relatorio"

# Consumer: processar (blocking)
BRPOP tasks 0
# Retorna: 1) "tasks" 2) "processar:pedido:123"
# Blocking pop (0 = infinito)

# Consumer com timeout
BRPOP tasks 5        # espera 5 segundos, retorna nil se vazio

Reliable Queue (RPOPLPUSH)

# Padrão: usar RPOPLPUSH para criar fila confiável
# Processa tarefa e SÓ remove quando concluir

# 1. Adicionar tarefas
RPUSH queue:tasks "task:1"
RPUSH queue:tasks "task:2"

# 2. Mover para processing (atômico)
RPOPLPUSH queue:tasks queue:processing
# Retorna: "task:1" e move para processing

# 3. Se processar com sucesso → remover de processing
LREM queue:processing 1 "task:1"

# 4. Se falhar → devolver para fila
RPOPLPUSH queue:processing queue:tasks

# BRPOPLPUSH (blocking version)
BRPOPLPUSH queue:tasks queue:processing 0

Retry Pattern

# Filas com retry

# Producer
RPUSH email:queue '{"to":"[email protected]","subject":"Bem-vindo!"}'

# Consumer com retry
while true; do
    # Pegar tarefa
    task=$(BRPOPLPUSH email:queue email:processing 0)

    if process_email "$task"; then
        # Sucesso: remover da processing
        LREM email:processing 1 "$task"
    else
        # Falha: verificar retry count
        echo "Falha ao processar: $task"

        # Incrementar contador de tentativas
        attempts=$(REDIS HINCRBY email:retry "$task" attempts 1)
        if [[ $attempts -lt 3 ]]; then
            # Devolver para fila principal
            RPUSH email:queue "$task"
        else
            # Mover para dead letter queue
            RPUSH email:dead "$task"
        fi
        LREM email:processing 1 "$task"
    fi
done

Pub/Sub

# Publisher: envia mensagem para canal
PUBLISH channel:notifications "Nova mensagem"
PUBLISH channel:orders '{"order_id":123,"status":"paid"}'

# Subscriber 1: subscribe simples
SUBSCRIBE channel:notifications
# Reading messages... (blocking)
# 1) "subscribe"
# 2) "channel:notifications"
# 3) (integer) 1
# Próximo:
# 1) "message"
# 2) "channel:notifications"
# 3) "Nova mensagem"

# Subscriber 2: padrões glob
PSUBSCRIBE channel:*
# Recebe todas as mensagens de canais começando com "channel:"

# Unsubscribe
UNSUBSCRIBE channel:notifications
PUNSUBSCRIBE channel:*

Pub/Sub — Limitações

# ⚠️ Pub/Sub é fire-and-forget:
# - Mensagens não são persistidas
# - Se subscriber está offline, perde a mensagem
# - Não há confirmação (ack)
# - Não há replay

# ❌ NÃO usar Pub/Sub para:
#   - Filas de tarefas (use List ou Stream)
#   - Mensagens importantes (use Stream com ack)
#   - Eventos que precisam de histórico

# ✅ Usar Pub/Sub para:
#   - Notificações em tempo real
#   - Broadcast para múltiplos consumidores
#   - Chat, notificações de UI
#   - Sinais de cache invalidation

Streams

Produzir e Consumir

# XADD: adicionar entrada ao stream
XADD orders * order_id 123 user "maria" amount 299.90
XADD orders * order_id 124 user "joao" amount 150.00

# XLEN: comprimento
XLEN orders

# XRANGE: ler entradas por range
XRANGE orders - +              # todas
XRANGE orders - + COUNT 5     # primeiras 5
XRANGE orders 1717000000000-0 +   # a partir de timestamp

# XREVRANGE: ordem reversa
XREVRANGE orders + - COUNT 5  # últimas 5

# XREAD: ler com blocking
XREAD BLOCK 0 STREAMS orders 0         # blocking desde o início
XREAD BLOCK 0 STREAMS orders $         # apenas mensagens novas

Consumer Groups

# Criar grupo de consumidores
XGROUP CREATE orders group1 $
XGROUP CREATE orders group1 0 MKSTREAM  # criar a partir do início

# Consumir via grupo
XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 0 STREAMS orders >

# Confirmar processamento (ACK)
XACK orders group1 1717000000000-0

# Ver pending entries (não confirmadas)
XPENDING orders group1

# Ver pending com detalhes
XPENDING orders group1 - + 10

# Reivindicar mensagens de consumidor falho
XCLAIM orders group1 consumer2 60000 1717000000000-0
# 60000ms = 60s de idle para considerar falho

# Ver informações do grupo
XINFO GROUPS orders
XINFO CONSUMERS orders group1

Dead Letter Pattern

# Stream com dead letter

# Consumer com tentativas
LIMIT=3

while true; do
    # Ler mensagem do grupo
    result=$(XREADGROUP GROUP processors group1 COUNT 1 BLOCK 0 STREAMS events ">")

    entry_id=$(echo "$result" | ... )  # extrair ID

    if process_message "$result"; then
        # Sucesso: ACK
        XACK events processors "$entry_id"
    else
        # Incrementar tentativas (via hash externo)
        attempts=$(redis-cli HINCRBY events:retry "$entry_id" attempts 1)

        if [[ $attempts -ge $LIMIT ]]; then
            # Mover para dead letter stream
            XADD events:dead * source "$entry_id" error "max_retries"
            XACK events processors "$entry_id"
            redis-cli DEL "events:retry"
        fi
    fi
done

Stream vs Kafka

FeatureRedis StreamApache Kafka
PersistênciaRDB/AOFLog commitado em disco
OrdenaçãoSim (por entry ID)Sim (por offset)
Consumer GroupsSimSim
ReplaySimSim
RetençãoXTRIM / MAXLENConfigurável (time/size)
Throughput~1M msg/s~1M msg/s
ParticionamentoSim (múltiplos streams)Sim (partitions)
GarantiaAt-least-onceExactly-once (com transactions)

Bull/BullMQ

# BullMQ: fila robusta para Node.js baseada em Redis

# Instalação
npm install bullmq

# Producer
cat << 'EOF' > producer.js
const { Queue } = require('bullmq');

const queue = new Queue('email-queue', {
    connection: { host: 'localhost', port: 6379 }
});

async function main() {
    // Job simples
    await queue.add('send-email', {
        to: '[email protected]',
        subject: 'Bem-vindo!'
    });

    // Job com delay
    await queue.add('send-reminder', {
        to: '[email protected]'
    }, {
        delay: 3600000  // 1 hora
    });

    // Job repetitivo
    await queue.add('daily-report', {}, {
        repeat: { pattern: '0 8 * * *' }
    });

    // Job com prioridade
    await queue.add('urgent-task', data, {
        priority: 1
    });

    console.log('Jobs adicionados!');
    process.exit(0);
}

main().catch(console.error);
EOF

# Worker
cat << 'EOF' > worker.js
const { Worker } = require('bullmq');

const worker = new Worker('email-queue', async job => {
    console.log(`Processando job ${job.id} (${job.name})`);

    // Simular processamento
    await sendEmail(job.data.to, job.data.subject);

    // Progresso
    await job.updateProgress(100);

    return { sent: true };
}, {
    connection: { host: 'localhost', port: 6379 },
    concurrency: 10,        // processar 10 jobs em paralelo
    limiter: {
        max: 5,             // max jobs por
        duration: 1000      // segundo
    }
});

worker.on('completed', job => {
    console.log(`Job ${job.id} concluído`);
});

worker.on('failed', (job, err) => {
    console.error(`Job ${job.id} falhou: ${err.message}`);
});
EOF

# Job Scheduler (repeatable jobs)
cat << 'EOF' > scheduler.js
const { QueueScheduler } = require('bullmq');

const scheduler = new QueueScheduler('reports-queue', {
    connection: { host: 'localhost', port: 6379 }
});

// Scheduler gerencia jobs repetitivos no Redis
// Mantém cron e dispara no horário correto
console.log('Scheduler ativo');
EOF

Lab: Sistema de Notificações com Stream + Consumer Group

cat > notification-system.sh << 'SCRIPT'
#!/bin/bash
set -euo pipefail

echo "=== SISTEMA DE NOTIFICAÇÕES ==="

# 1. Criar stream e consumer group
echo ""
echo "--- 1. Setup ---"
redis-cli << 'REDIS'
XGROUP CREATE notifications email_group $ MKSTREAM
XGROUP CREATE notifications sms_group $ MKSTREAM
XGROUP CREATE notifications push_group $ MKSTREAM
REDIS

# 2. Produzir notificações (simular eventos)
echo ""
echo "--- 2. Produzindo notificações ---"
for i in $(seq 1 10); do
    type=""
    case $((RANDOM % 3)) in
        0) type="email" ;;
        1) type="sms" ;;
        2) type="push" ;;
    esac

    redis-cli XADD notifications * \
        type "$type" \
        user_id "$((1000 + i))" \
        message "Notificação $i enviada via $type" \
        priority "$((RANDOM % 5 + 1))"
done

echo "Stream size: $(redis-cli XLEN notifications)"

# 3. Consumir como grupo (cada grupo vê todas as mensagens)
echo ""
echo "--- 3. Consumer Email ---"
redis-cli XREADGROUP GROUP email_group consumer1 COUNT 5 STREAMS notifications ">"

echo ""
echo "--- 4. Consumer SMS ---"
redis-cli XREADGROUP GROUP sms_group consumer1 COUNT 5 STREAMS notifications ">"

# 4. ACK das processadas
echo ""
echo "--- 5. ACK das mensagens processadas ---"
ENTRIES=$(redis-cli XPENDING notifications email_group | head -1)
# ACK apenas da primeira (demonstração)

# 5. Ver pending
echo ""
echo "--- 6. Pending entries ---"
redis-cli XPENDING notifications email_group

# 6. Monitorar grupos
echo ""
echo "--- 7. Informações do Stream ---"
redis-cli XINFO GROUPS notifications

# 7. Cleanup
echo ""
echo "--- 8. Limpeza ---"
redis-cli DEL notifications
echo "Stream removido."
SCRIPT

chmod +x notification-system.sh

Listas (BRPOP) são filas simples. RPOPLPUSH/BRPOPLPUSH criam filas confiáveis com retry/redelivery. Pub/Sub é broadcast fire-and-forget. Streams oferecem consumer groups com ACK, pending entries e replay — a solução mais completa para mensageria no Redis.