APIs em Produção
Aula 6 de 7
Caching
Estratégias de Cache:
├── Client Cache (Browser) → Cache-Control headers
├── CDN Cache (CloudFront, Cloudflare) → Edge locations
├── Reverse Proxy (Nginx, Varnish) → Server-side caching
└── Application Cache (Redis, Memcached) → Data caching
Cache Headers
# Cache-Control
response.headers["Cache-Control"] = "public, max-age=3600, must-revalidate"
# public: qualquer cache pode armazenar
# private: apenas browser do usuário
# no-cache: sempre validar com servidor
# no-store: não armazenar (dados sensíveis)
# max-age: tempo de vida em segundos
# ETag (validação)
ETag: "abc123"
If-None-Match: "abc123" # request com ETag
# Response: 304 Not Modified (se conteúdo não mudou)
# Last-Modified
Last-Modified: Wed, 01 Jun 2024 14:30:00 GMT
If-Modified-Since: Wed, 01 Jun 2024 14:30:00 GMT
Rate Limiting em Produção
# Estratégias de rate limit:
# 1. Fixed Window: conta requests em janela fixa
# Problema: pico no final da janela
# 2. Sliding Window: janela deslizante
# Mais justo, mais complexo
# 3. Token Bucket: tokens são consumidos, reabastecidos
# Suporta bursts, mais suave
# 4. Leaky Bucket: fila com vazão constante
# Smooth, sem bursts
# Token Bucket (Redis + Lua)
local key = KEYS[1]
local rate = tonumber(ARGV[1]) # tokens/s
local capacity = tonumber(ARGV[2]) # max tokens
local now = tonumber(ARGV[3])
local cost = tonumber(ARGV[4])
local tokens = redis.call("get", key)
if not tokens then
tokens = capacity
else
tokens = tonumber(tokens)
end
local last_refill = redis.call("get", key .. ":time")
if last_refill then
local elapsed = now - tonumber(last_refill)
tokens = math.min(capacity, tokens + elapsed * rate)
end
if tokens >= cost then
redis.call("set", key, tokens - cost)
redis.call("set", key .. ":time", now)
return 1 # allowed
else
return 0 # rate limited
end
CORS — Cross-Origin Resource Sharing
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://app.empresa.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
expose_headers=["X-Request-Id"],
max_age=600,
)
# Preflight (OPTIONS) — navegador envia antes de request não-simples
# Resposta:
Access-Control-Allow-Origin: https://app.empresa.com
Access-Control-Allow-Methods: POST, GET, PUT
Access-Control-Allow-Headers: Authorization, Content-Type
Access-Control-Max-Age: 600
Graceful Shutdown
import asyncio
import signal
class Server:
async def shutdown(self, sig):
print(f"Recebido {sig}, encerrando...")
await self.db.close()
self.server.should_exit = True
print("Servidor encerrado")
server = Server()
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(server.shutdown(s)))
Health Checks
# /health endpoint (padrão mercado)
@app.get("/health")
def health():
db_ok = check_database()
return {
"status": "ok" if db_ok else "degraded",
"version": "1.2.3",
"uptime": time.time() - start_time,
"checks": {
"database": "ok" if db_ok else "fail",
"cache": check_redis(),
"disk": check_disk_space(),
}
}
# /ready (pode receber tráfego?)
@app.get("/ready")
def ready():
return {"status": "ok"}
# /live (processo está vivo?)
@app.get("/live")
def live():
return {"status": "alive"}
Observabilidade
# Structured logging (JSON)
import structlog
logger = structlog.get_logger()
logger.info("request", method="GET", path="/users", duration_ms=45, status=200)
# Métricas (Prometheus)
from prometheus_client import Counter, Histogram
requests_total = Counter("http_requests_total", "Total requests", ["method", "path", "status"])
request_duration = Histogram("http_request_duration_seconds", "Request duration", ["method", "path"])
APIs em produção precisam de caching, rate limiting, CORS, health checks, graceful shutdown e observabilidade. Saúde = /health (db), /ready (tráfego), /live (vida). Prometheus + structured logging.