kb.erickguedes.com
Node.js: Backend JavaScript em Produção

Assincronismo, Streams e Sistema de Arquivos

Aula 3 de 7

Padrões Assíncronos

Callbacks (estilo antigo)

const fs = require('fs');

fs.readFile('arquivo.txt', 'utf8', (err, data) => {
  if (err) {
    console.error('Erro:', err);
    return;
  }
  console.log('Conteúdo:', data);
});

Promises

const fs = require('fs/promises');

async function readFile() {
  try {
    const data = await fs.readFile('arquivo.txt', 'utf8');
    console.log('Conteúdo:', data);
  } catch (err) {
    console.error('Erro:', err);
  }
}

Async/Await

const fetch = require('node-fetch');

async function fetchUser(id) {
  const response = await fetch(`https://api.example.com/users/${id}`);
  if (!response.ok) throw new Error(`HTTP ${response.status}`);
  return response.json();
}

async function main() {
  try {
    const user = await fetchUser(1);
    console.log(user);
  } catch (err) {
    console.error('Falha ao buscar usuário:', err);
  }
}

main();

Event Emitter

const EventEmitter = require('events');

class Logger extends EventEmitter {
  log(message) {
    this.emit('message', { message, timestamp: new Date() });
  }
}

const logger = new Logger();

logger.on('message', (data) => {
  console.log(`${data.timestamp.toISOString()}: ${data.message}`);
});

logger.on('message', (data) => {
  // Salvar em banco (segundo listener)
  saveToDatabase(data);
});

logger.log('Servidor iniciado');

Streams

Streams permitem processar dados em partes, sem carregar tudo em memória.

Tipos de Stream

TipoEntradaSaídaUso
ReadableFonte de dadosDadosLeitura de arquivo, HTTP request
WritableDadosDestinoEscrita em arquivo, HTTP response
TransformDados transformadosDados transformadosCompressão, criptografia
DuplexAmbasAmbasSocket TCP

Readable Stream

const fs = require('fs');
const { Readable } = require('stream');

// Criar readable a partir de um arquivo
const readStream = fs.createReadStream('grande-arquivo.log', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

readStream.on('data', (chunk) => {
  console.log('Chunk recebido:', chunk.length, 'bytes');
});

readStream.on('end', () => {
  console.log('Leitura completa');
});

readStream.on('error', (err) => {
  console.error('Erro na stream:', err);
});

Writable Stream

const fs = require('fs');

const writeStream = fs.createWriteStream('saida.txt', { flags: 'a' });

writeStream.write('Linha 1\n');
writeStream.write('Linha 2\n');
writeStream.end('Final\n');

writeStream.on('finish', () => {
  console.log('Escrita completa');
});

Transform Stream (pipe)

const { Transform, pipeline } = require('stream');
const fs = require('fs');

// Transform que converte para maiúsculas
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Pipe: ler -> transformar -> escrever
const readStream = fs.createReadStream('entrada.txt');
const writeStream = fs.createWriteStream('saida.txt');

readStream
  .pipe(upperCaseTransform)
  .pipe(writeStream);

Pipeline (recomendado)

const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

async function compressFile(input, output) {
  try {
    await pipeline(
      fs.createReadStream(input),
      zlib.createGzip(),
      fs.createWriteStream(output)
    );
    console.log('Compressão concluída');
  } catch (err) {
    console.error('Pipeline falhou:', err);
  }
}

compressFile('dados.txt', 'dados.txt.gz');

// Múltiplos transforms
await pipeline(
  fs.createReadStream('video.mp4'),
  zlib.createGzip(),
  transformEncrypt,
  fs.createWriteStream('video.mp4.enc')
);

Buffer

Buffers representam dados binários na memória.

// Criar buffers
const buf1 = Buffer.alloc(10); // preenchido com zeros
const buf2 = Buffer.from('Hello');
const buf3 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);

console.log(buf2.toString()); // Hello
console.log(buf2.toString('base64')); // SGVsbG8=
console.log(buf2.toString('hex')); // 48656c6c6f

// Concatenar
const combined = Buffer.concat([buf2, Buffer.from(' World')]);
console.log(combined.toString()); // Hello World

// Fatias (sem copiar)
const slice = buf1.slice(2, 5);

Sistema de Arquivos (fs)

const fs = require('fs/promises');
const path = require('path');

// Leitura
const data = await fs.readFile('arquivo.txt', 'utf8');

// Escrita
await fs.writeFile('arquivo.txt', 'conteúdo', 'utf8');

// Append
await fs.appendFile('arquivo.txt', 'mais conteúdo\n');

// Listar diretório
const files = await fs.readdir('./');
console.log(files);

// Estatísticas
const stats = await fs.stat('arquivo.txt');
console.log(stats.isFile(), stats.size, stats.mtime);

// Watch (observar mudanças)
const watcher = fs.watch('./', { recursive: true });
for await (const event of watcher) {
  console.log(`Arquivo ${event.filename} foi ${event.eventType}`);
}
// Leitura eficiente com createReadStream
const fs = require('fs');
const { createReadStream } = require('fs');
const readline = require('readline');

async function processLineByLine(filePath) {
  const fileStream = createReadStream(filePath);

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });

  for await (const line of rl) {
    console.log('Linha:', line);
  }
}

Lab: Exercício - Processador de Logs com Streams

Crie um processador que lê um arquivo de log, filtra erros e comprime:

// src/log-processor.js
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
const readline = require('readline');

class ErrorFilter extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(line, encoding, callback) {
    if (line.includes('ERROR') || line.includes('FATAL')) {
      this.push(line + '\n');
    }
    callback();
  }
}

class JsonFormatter extends Transform {
  constructor() {
    super({ objectMode: true });
  }

  _transform(line, encoding, callback) {
    const parts = line.split(' - ');
    const entry = {
      timestamp: parts[0],
      level: parts[1],
      message: parts.slice(2).join(' - '),
      parsedAt: new Date().toISOString()
    };
    this.push(JSON.stringify(entry) + '\n');
    callback();
  }
}

async function processLogs(inputFile, outputFile) {
  const readStream = fs.createReadStream(inputFile, { encoding: 'utf8' });

  const rl = readline.createInterface({
    input: readStream,
    crlfDelay: Infinity
  });

  const writeStream = fs.createWriteStream(outputFile);
  const gzip = zlib.createGzip();
  const errorFilter = new ErrorFilter();
  const jsonFormatter = new JsonFormatter();

  await pipeline(
    rl,
    errorFilter,
    jsonFormatter,
    gzip,
    writeStream
  );

  console.log(`Logs processados: ${outputFile}.gz`);
}

processLogs('server.log', 'errors.json.gz').catch(console.error);
# Testar processador de logs
node src/log-processor.js

Streams permitem processar dados que não caberiam em memória. Sempre use pipeline em vez de .pipe() para tratamento automático de erros. Prefira fs/promises para operações simples e createReadStream + readline para arquivos grandes linha a linha.