Assincronismo, Streams e Sistema de Arquivos
Aula 3 de 7
Padrões Assíncronos
Callbacks (estilo antigo)
const fs = require('fs');
fs.readFile('arquivo.txt', 'utf8', (err, data) => {
if (err) {
console.error('Erro:', err);
return;
}
console.log('Conteúdo:', data);
});
Promises
const fs = require('fs/promises');
async function readFile() {
try {
const data = await fs.readFile('arquivo.txt', 'utf8');
console.log('Conteúdo:', data);
} catch (err) {
console.error('Erro:', err);
}
}
Async/Await
const fetch = require('node-fetch');
async function fetchUser(id) {
const response = await fetch(`https://api.example.com/users/${id}`);
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
}
async function main() {
try {
const user = await fetchUser(1);
console.log(user);
} catch (err) {
console.error('Falha ao buscar usuário:', err);
}
}
main();
Event Emitter
const EventEmitter = require('events');
class Logger extends EventEmitter {
log(message) {
this.emit('message', { message, timestamp: new Date() });
}
}
const logger = new Logger();
logger.on('message', (data) => {
console.log(`${data.timestamp.toISOString()}: ${data.message}`);
});
logger.on('message', (data) => {
// Salvar em banco (segundo listener)
saveToDatabase(data);
});
logger.log('Servidor iniciado');
Streams
Streams permitem processar dados em partes, sem carregar tudo em memória.
Tipos de Stream
| Tipo | Entrada | Saída | Uso |
|---|---|---|---|
| Readable | Fonte de dados | Dados | Leitura de arquivo, HTTP request |
| Writable | Dados | Destino | Escrita em arquivo, HTTP response |
| Transform | Dados transformados | Dados transformados | Compressão, criptografia |
| Duplex | Ambas | Ambas | Socket TCP |
Readable Stream
const fs = require('fs');
const { Readable } = require('stream');
// Criar readable a partir de um arquivo
const readStream = fs.createReadStream('grande-arquivo.log', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
readStream.on('data', (chunk) => {
console.log('Chunk recebido:', chunk.length, 'bytes');
});
readStream.on('end', () => {
console.log('Leitura completa');
});
readStream.on('error', (err) => {
console.error('Erro na stream:', err);
});
Writable Stream
const fs = require('fs');
const writeStream = fs.createWriteStream('saida.txt', { flags: 'a' });
writeStream.write('Linha 1\n');
writeStream.write('Linha 2\n');
writeStream.end('Final\n');
writeStream.on('finish', () => {
console.log('Escrita completa');
});
Transform Stream (pipe)
const { Transform, pipeline } = require('stream');
const fs = require('fs');
// Transform que converte para maiúsculas
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// Pipe: ler -> transformar -> escrever
const readStream = fs.createReadStream('entrada.txt');
const writeStream = fs.createWriteStream('saida.txt');
readStream
.pipe(upperCaseTransform)
.pipe(writeStream);
Pipeline (recomendado)
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
async function compressFile(input, output) {
try {
await pipeline(
fs.createReadStream(input),
zlib.createGzip(),
fs.createWriteStream(output)
);
console.log('Compressão concluída');
} catch (err) {
console.error('Pipeline falhou:', err);
}
}
compressFile('dados.txt', 'dados.txt.gz');
// Múltiplos transforms
await pipeline(
fs.createReadStream('video.mp4'),
zlib.createGzip(),
transformEncrypt,
fs.createWriteStream('video.mp4.enc')
);
Buffer
Buffers representam dados binários na memória.
// Criar buffers
const buf1 = Buffer.alloc(10); // preenchido com zeros
const buf2 = Buffer.from('Hello');
const buf3 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);
console.log(buf2.toString()); // Hello
console.log(buf2.toString('base64')); // SGVsbG8=
console.log(buf2.toString('hex')); // 48656c6c6f
// Concatenar
const combined = Buffer.concat([buf2, Buffer.from(' World')]);
console.log(combined.toString()); // Hello World
// Fatias (sem copiar)
const slice = buf1.slice(2, 5);
Sistema de Arquivos (fs)
const fs = require('fs/promises');
const path = require('path');
// Leitura
const data = await fs.readFile('arquivo.txt', 'utf8');
// Escrita
await fs.writeFile('arquivo.txt', 'conteúdo', 'utf8');
// Append
await fs.appendFile('arquivo.txt', 'mais conteúdo\n');
// Listar diretório
const files = await fs.readdir('./');
console.log(files);
// Estatísticas
const stats = await fs.stat('arquivo.txt');
console.log(stats.isFile(), stats.size, stats.mtime);
// Watch (observar mudanças)
const watcher = fs.watch('./', { recursive: true });
for await (const event of watcher) {
console.log(`Arquivo ${event.filename} foi ${event.eventType}`);
}
// Leitura eficiente com createReadStream
const fs = require('fs');
const { createReadStream } = require('fs');
const readline = require('readline');
async function processLineByLine(filePath) {
const fileStream = createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
console.log('Linha:', line);
}
}
Lab: Exercício - Processador de Logs com Streams
Crie um processador que lê um arquivo de log, filtra erros e comprime:
// src/log-processor.js
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
const readline = require('readline');
class ErrorFilter extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(line, encoding, callback) {
if (line.includes('ERROR') || line.includes('FATAL')) {
this.push(line + '\n');
}
callback();
}
}
class JsonFormatter extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(line, encoding, callback) {
const parts = line.split(' - ');
const entry = {
timestamp: parts[0],
level: parts[1],
message: parts.slice(2).join(' - '),
parsedAt: new Date().toISOString()
};
this.push(JSON.stringify(entry) + '\n');
callback();
}
}
async function processLogs(inputFile, outputFile) {
const readStream = fs.createReadStream(inputFile, { encoding: 'utf8' });
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity
});
const writeStream = fs.createWriteStream(outputFile);
const gzip = zlib.createGzip();
const errorFilter = new ErrorFilter();
const jsonFormatter = new JsonFormatter();
await pipeline(
rl,
errorFilter,
jsonFormatter,
gzip,
writeStream
);
console.log(`Logs processados: ${outputFile}.gz`);
}
processLogs('server.log', 'errors.json.gz').catch(console.error);
# Testar processador de logs
node src/log-processor.js
Streams permitem processar dados que não caberiam em memória. Sempre use
pipelineem vez de.pipe()para tratamento automático de erros. Prefirafs/promisespara operações simples ecreateReadStream+readlinepara arquivos grandes linha a linha.