Como criar um pipeline de dados robusto para treinar seus modelos de machine learning
Introdução
Você já se perguntou como os grandes modelos de linguagem conseguem tantos dados para treinamento? A resposta frequentemente está no web scraping – a arte de extrair dados da web de forma automatizada.
Neste tutorial, vou te mostrar como construir um algoritmo completo usando Python e Firecrawl para coletar dados, processá-los e prepará-los para fine-tuning de modelos MML (Multi-Modal Learning).
🎯 O Que Você Vai Aprender
- Configurar ambiente de web scraping
- Usar Firecrawl para extração eficiente
- Processar e limpar dados
- Estruturar dados para fine-tuning
- Boas práticas e considerações éticas
🛠️ Configurando o Ambiente
📁 Estrutura do Projeto
web_scraping_fine_tuning/
│
├── requirements.txt
├── config.py
├── scraper.py
├── data_enhancer.py
├── pipeline.py
└── .env1. requirements.txt
firecrawl>=0.1.0
beautifulsoup4>=4.12.0
requests>=2.31.0
pandas>=2.0.0
numpy>=1.24.0
transformers>=4.30.0
datasets>=2.12.0
python-dotenv>=1.0.0
tqdm>=4.65.0
python-dateutil>=2.8.2pip install -r requirements.txt2. config.py
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
"""Configurações para o scraper"""
# API Configuration
FIRECRAWL_API_KEY = os.getenv('FIRECRAWL_API_KEY')
# Scraping Limits
MAX_PAGES_PER_DOMAIN = 100
REQUEST_DELAY = 1.0 # seconds
TIMEOUT = 30
# Data Quality
MIN_CONTENT_LENGTH = 100
MAX_CONTENT_LENGTH = 10000
# Output
OUTPUT_FORMAT = 'huggingface'
CHUNK_SIZE = 512
# Logging
LOG_LEVEL = 'INFO'3. scraper.py
import os
import json
import logging
import pandas as pd
import requests
from typing import List, Dict, Any, Optional
from datetime import datetime
from tqdm import tqdm
import time
class DataScraper:
def __init__(self, api_key: str):
"""
Inicializa o scraper com a API key do Firecrawl
Args:
api_key: Chave da API do Firecrawl
"""
self.api_key = api_key
self.base_url = "https://api.firecrawl.dev/v1"
self.scraped_data = []
self.logger = logging.getLogger(__name__)
def _make_request(self, endpoint: str, data: Dict) -> Optional[Dict]:
"""
Faz requisição para a API do Firecrawl
Args:
endpoint: Endpoint da API
data: Dados da requisição
Returns:
Resposta da API ou None em caso de erro
"""
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}'
}
url = f"{self.base_url}/{endpoint}"
try:
response = requests.post(url, headers=headers, json=data, timeout=60)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(f"Erro na requisição para {endpoint}: {str(e)}")
if hasattr(e, 'response') and e.response is not None:
self.logger.error(f"Resposta da API: {e.response.text}")
return None
def scrape_website(self, url: str, params: Dict = None) -> Optional[Dict]:
"""
Faz scraping de uma única página usando a API atualizada do Firecrawl
Args:
url: URL da página para scraping
params: Parâmetros adicionais para o scraping
Returns:
Dict com os dados extraídos ou None em caso de erro
"""
default_params = {
'formats': ['markdown', 'html'],
'onlyMainContent': True,
'includeTags': ['h1', 'h2', 'h3', 'p', 'code', 'pre']
}
if params:
default_params.update(params)
try:
self.logger.info(f"Fazendo scraping de: {url}")
request_data = {
'url': url,
'formats': default_params['formats'],
'onlyMainContent': default_params['onlyMainContent']
}
result = self._make_request('scrape', request_data)
if result and result.get('success'):
return result.get('data')
else:
self.logger.warning(f"Scraping não bem-sucedido para: {url}")
self.logger.debug(f"Resposta: {result}")
return None
except Exception as e:
self.logger.error(f"Erro ao fazer scraping de {url}: {str(e)}")
return None
def crawl_website(self, url: str, params: Dict = None) -> Optional[Dict]:
"""
Faz crawling de um website inteiro
Args:
url: URL base para crawling
params: Parâmetros adicionais
Returns:
Dict com resultados do crawl
"""
default_params = {
'limit': 10,
'scrapeOptions': {
'formats': ['markdown'],
'onlyMainContent': True
}
}
if params:
default_params.update(params)
try:
self.logger.info(f"Iniciando crawl de: {url}")
request_data = {
'url': url,
'limit': default_params['limit'],
'scrapeOptions': default_params['scrapeOptions']
}
# Inicia o crawl
result = self._make_request('crawl', request_data)
if result and result.get('success'):
crawl_id = result.get('id')
self.logger.info(f"Crawl iniciado com ID: {crawl_id}")
# Aguarda conclusão do crawl
return self._wait_for_crawl_completion(crawl_id)
else:
self.logger.error(f"Falha ao iniciar crawl para: {url}")
return None
except Exception as e:
self.logger.error(f"Erro no crawl de {url}: {str(e)}")
return None
def _wait_for_crawl_completion(self, crawl_id: str, max_wait: int = 300) -> Optional[Dict]:
"""
Aguarda a conclusão de um crawl
Args:
crawl_id: ID do crawl
max_wait: Tempo máximo de espera em segundos
Returns:
Resultados do crawl ou None
"""
start_time = time.time()
while time.time() - start_time < max_wait:
try:
headers = {
'Authorization': f'Bearer {self.api_key}'
}
response = requests.get(
f"{self.base_url}/crawl/{crawl_id}",
headers=headers,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get('status') == 'completed':
self.logger.info(f"Crawl {crawl_id} concluído!")
return result
elif result.get('status') in ['failed', 'cancelled']:
self.logger.error(f"Crawl {crawl_id} falhou: {result.get('status')}")
return None
else:
self.logger.info(f"Crawl em progresso... Status: {result.get('status')}")
else:
self.logger.warning(f"Status code inesperado: {response.status_code}")
except Exception as e:
self.logger.error(f"Erro ao verificar status do crawl: {str(e)}")
# Aguarda antes da próxima verificação
time.sleep(5)
self.logger.error(f"Timeout aguardando crawl {crawl_id}")
return None
def scrape_library_docs(self, base_urls: List[str], max_pages: int = 50) -> List[Dict]:
"""
Faz scraping de documentação de bibliotecas
Args:
base_urls: Lista de URLs base para scraping
max_pages: Número máximo de páginas por URL
Returns:
Lista de dicionários com dados processados
"""
all_data = []
for base_url in base_urls:
self.logger.info(f"Iniciando scraping de: {base_url}")
# Primeiro tenta fazer crawling
crawl_result = self.crawl_website(base_url, {'limit': max_pages})
if crawl_result and 'data' in crawl_result:
for page in tqdm(crawl_result['data'], desc=f"Processando {base_url}"):
page_data = self._process_page_data(page)
if page_data:
all_data.append(page_data)
else:
# Fallback: scraping página por página
self.logger.info(f"Usando fallback para scraping individual: {base_url}")
page_data = self.scrape_website(base_url)
if page_data:
processed_data = self._process_page_data(page_data)
if processed_data:
all_data.append(processed_data)
self.scraped_data = all_data
self.logger.info(f"Scraping concluído! Total de {len(all_data)} páginas coletadas.")
return all_data
def _process_page_data(self, page: Dict) -> Optional[Dict]:
"""
Processa e limpa os dados extraídos de uma página
Args:
page: Dicionário com dados da página
Returns:
Dicionário processado ou None se inválido
"""
try:
# A estrutura da resposta pode variar
content = page.get('markdown') or page.get('html') or page.get('content', '')
if not content or len(content.strip()) < 50:
return None
processed = {
'url': page.get('url', ''),
'title': page.get('metadata', {}).get('title', page.get('title', '')),
'content': content,
'content_type': 'markdown' if page.get('markdown') else 'html',
'timestamp': datetime.now().isoformat(),
'source': 'web_scraping',
'content_length': len(content)
}
# Limpeza do conteúdo
processed['content'] = self._clean_content(processed['content'])
return processed
except Exception as e:
self.logger.error(f"Erro ao processar página: {str(e)}")
return None
def _clean_content(self, content: str) -> str:
"""
Limpa e normaliza o conteúdo extraído
Args:
content: Texto a ser limpo
Returns:
Texto limpo
"""
if not content:
return ""
# Remove múltiplos espaços em branco e quebras de linha excessivas
content = ' '.join(content.split())
# Remove caracteres especiais problemáticos
problematic_chars = ['\u200b', '\ufeff', '\u2028', '\u2029']
for char in problematic_chars:
content = content.replace(char, '')
return content.strip()
def prepare_fine_tuning_data(self, output_format: str = 'huggingface', chunk_size: int = 512):
"""
Prepara os dados para fine-tuning em diferentes formatos
Args:
output_format: Formato de saída ('huggingface', 'jsonl', 'csv')
chunk_size: Tamanho dos chunks para divisão do texto
Returns:
Dados no formato solicitado
"""
if not self.scraped_data:
self.logger.warning("Nenhum dado disponível para preparação")
return None
# Cria DataFrame
df = pd.DataFrame(self.scraped_data)
# Filtra dados muito curtos ou muito longos
df = df[(df['content_length'] > 100) & (df['content_length'] < 10000)]
# Remove duplicatas baseadas no conteúdo
df = df.drop_duplicates(subset=['content'])
self.logger.info(f"Dados após filtragem: {len(df)} documentos")
if output_format == 'huggingface':
return self._prepare_hf_dataset(df, chunk_size)
elif output_format == 'jsonl':
return self._prepare_jsonl(df, chunk_size)
elif output_format == 'csv':
return self._prepare_csv(df)
else:
raise ValueError(f"Formato não suportado: {output_format}")
def _prepare_hf_dataset(self, df: pd.DataFrame, chunk_size: int = 512):
"""
Prepara dataset no formato Hugging Face
Args:
df: DataFrame com os dados
chunk_size: Tamanho dos chunks
Returns:
Dataset no formato Hugging Face
"""
try:
from datasets import Dataset
except ImportError:
self.logger.error("Biblioteca 'datasets' não instalada. Instale com: pip install datasets")
return None
training_data = []
for _, row in tqdm(df.iterrows(), total=len(df), desc="Preparando dataset HF"):
chunks = self._split_into_chunks(row['content'], chunk_size)
for i, chunk in enumerate(chunks):
training_data.append({
'text': chunk,
'source': row['url'],
'chunk_id': i,
'total_chunks': len(chunks),
'metadata': {
'title': row['title'],
'content_length': len(chunk),
'original_length': row['content_length'],
'timestamp': row['timestamp']
}
})
self.logger.info(f"Dataset HF criado com {len(training_data)} exemplos")
return Dataset.from_list(training_data)
def _split_into_chunks(self, text: str, chunk_size: int = 512) -> List[str]:
"""
Divide texto em chunks para treinamento
Args:
text: Texto a ser dividido
chunk_size: Tamanho de cada chunk
Returns:
Lista de chunks
"""
if not text:
return []
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size):
chunk = ' '.join(words[i:i + chunk_size])
if len(chunk) > 50: # Ignora chunks muito pequenos
chunks.append(chunk)
return chunks
def _prepare_jsonl(self, df: pd.DataFrame, chunk_size: int = 512) -> str:
"""
Prepara dados no formato JSONL
Args:
df: DataFrame com os dados
chunk_size: Tamanho dos chunks
Returns:
Caminho do arquivo salvo
"""
output_data = []
for _, row in tqdm(df.iterrows(), total=len(df), desc="Preparando JSONL"):
chunks = self._split_into_chunks(row['content'], chunk_size)
for i, chunk in enumerate(chunks):
output_data.append({
'text': chunk,
'metadata': {
'url': row['url'],
'title': row['title'],
'chunk_id': i,
'total_chunks': len(chunks),
'source': 'web_scraping',
'timestamp': row['timestamp'],
'content_length': len(chunk)
}
})
output_file = f'fine_tuning_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.jsonl'
with open(output_file, 'w', encoding='utf-8') as f:
for item in output_data:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
self.logger.info(f"Arquivo JSONL salvo: {output_file}")
return output_file
def _prepare_csv(self, df: pd.DataFrame) -> str:
"""
Prepara dados no formato CSV
Args:
df: DataFrame com os dados
Returns:
Caminho do arquivo salvo
"""
output_file = f'fine_tuning_data_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
df.to_csv(output_file, index=False, encoding='utf-8')
self.logger.info(f"Arquivo CSV salvo: {output_file}")
return output_file
def save_dataset_info(self, dataset, output_path: str):
"""
Salva informações e métricas do dataset
Args:
dataset: Dataset a ser analisado
output_path: Caminho para salvar as informações
"""
try:
info = {
'total_examples': len(dataset),
'sources': len(set(dataset['source'])),
'total_chunks': sum(dataset['total_chunks']),
'avg_text_length': sum(len(text) for text in dataset['text']) / len(dataset),
'creation_date': datetime.now().isoformat(),
'data_sources': list(set(dataset['source']))
}
info_file = os.path.join(output_path, 'dataset_info.json')
with open(info_file, 'w', encoding='utf-8') as f:
json.dump(info, f, indent=2, ensure_ascii=False)
self.logger.info(f"Informações do dataset salvas em: {info_file}")
except Exception as e:
self.logger.error(f"Erro ao salvar informações do dataset: {str(e)}")4. data_enhancer.py
import logging
import hashlib
from typing import List, Dict, Any
from datasets import Dataset
class DataEnhancer:
"""
Classe para enriquecer e melhorar os dados coletados
"""
def __init__(self):
self.quality_metrics = {}
self.logger = logging.getLogger(__name__)
def calculate_quality_metrics(self, dataset: Dataset) -> Dict[str, Any]:
"""
Calcula métricas de qualidade dos dados
Args:
dataset: Dataset a ser analisado
Returns:
Dicionário com métricas de qualidade
"""
texts = dataset['text']
if not texts:
return {}
text_lengths = [len(text) for text in texts]
word_counts = [len(text.split()) for text in texts]
metrics = {
'total_documents': len(texts),
'avg_length': sum(text_lengths) / len(texts),
'max_length': max(text_lengths),
'min_length': min(text_lengths),
'avg_word_count': sum(word_counts) / len(word_counts),
'empty_documents': sum(1 for text in texts if len(text.strip()) == 0),
'short_documents': sum(1 for text in texts if len(text.strip()) < 50),
'unique_sources': len(set(dataset['source']))
}
self.quality_metrics = metrics
self.logger.info(f"Métricas de qualidade calculadas: {metrics}")
return metrics
def filter_low_quality(self, dataset: Dataset, min_length: int = 50, max_length: int = 5000) -> Dataset:
"""
Filtra documentos de baixa qualidade
Args:
dataset: Dataset a ser filtrado
min_length: Comprimento mínimo do texto
max_length: Comprimento máximo do texto
Returns:
Dataset filtrado
"""
def quality_filter(example):
text_length = len(example['text'])
return min_length <= text_length <= max_length
initial_count = len(dataset)
filtered_dataset = dataset.filter(quality_filter)
filtered_count = len(filtered_dataset)
self.logger.info(f"Filtragem: {initial_count} -> {filtered_count} documentos")
return filtered_dataset
def deduplicate(self, dataset: Dataset) -> Dataset:
"""
Remove duplicatas baseadas em hash do conteúdo
Args:
dataset: Dataset a ser deduplicado
Returns:
Dataset deduplicado
"""
seen_hashes = set()
unique_data = []
for example in dataset:
# Cria hash do conteúdo (primeiros 1000 caracteres para eficiência)
content_to_hash = example['text'][:1000] + example['source']
content_hash = hashlib.md5(content_to_hash.encode('utf-8')).hexdigest()
if content_hash not in seen_hashes:
seen_hashes.add(content_hash)
unique_data.append(example)
initial_count = len(dataset)
final_count = len(unique_data)
self.logger.info(f"Deduplicação: {initial_count} -> {final_count} documentos")
return Dataset.from_list(unique_data)
def enhance_with_metadata(self, dataset: Dataset) -> Dataset:
"""
Adiciona metadados enriquecidos ao dataset
Args:
dataset: Dataset a ser enriquecido
Returns:
Dataset com metadados adicionais
"""
def add_enhanced_metadata(example):
text = example['text']
# Metadados básicos
enhanced_metadata = {
'word_count': len(text.split()),
'char_count': len(text),
'avg_word_length': sum(len(word) for word in text.split()) / len(text.split()) if text.split() else 0,
'has_code': '```' in text or 'def ' in text or 'class ' in text,
'language': 'python' if any(keyword in text.lower() for keyword in ['def ', 'class ', 'import ', 'from ']) else 'unknown'
}
# Combina com metadados existentes
if 'metadata' in example:
example['metadata'].update(enhanced_metadata)
else:
example['metadata'] = enhanced_metadata
return example
return dataset.map(add_enhanced_metadata)5. pipeline.py
import logging
import os
import pandas as pd
from datetime import datetime
from config import Config
from scraper import DataScraper
from data_enhancer import DataEnhancer
# Configuração de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('scraping_pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def create_fine_tuning_pipeline(config: Config):
"""
Pipeline completo de coleta e preparação de dados para fine-tuning
Args:
config: Configurações do pipeline
Returns:
Tuple (dataset, métricas finais)
"""
# 1. Inicialização
logger.info("🚀 Iniciando pipeline de fine-tuning")
scraper = DataScraper(config.FIRECRAWL_API_KEY)
enhancer = DataEnhancer()
# 2. Definição de fontes (documentações de bibliotecas Python)
sources = [
"https://docs.python.org/3/tutorial/",
"https://pandas.pydata.org/docs/user_guide/",
"https://numpy.org/doc/stable/user/",
"https://scikit-learn.org/stable/user_guide.html",
"https://requests.readthedocs.io/en/latest/",
"https://matplotlib.org/stable/users/index.html"
]
# 3. Coleta de dados
logger.info("📥 Fase 1: Coleta de dados")
try:
raw_data = scraper.scrape_library_docs(
sources,
max_pages=config.MAX_PAGES_PER_DOMAIN
)
if not raw_data:
logger.error("❌ Nenhum dado foi coletado")
return None, None
except Exception as e:
logger.error(f"❌ Erro na coleta de dados: {str(e)}")
return None, None
# 4. Preparação inicial
logger.info("🔧 Fase 2: Preparação de dados")
try:
dataset = scraper.prepare_fine_tuning_data(
output_format=config.OUTPUT_FORMAT,
chunk_size=config.CHUNK_SIZE
)
if dataset is None:
logger.error("❌ Falha na preparação dos dados")
return None, None
except Exception as e:
logger.error(f"❌ Erro na preparação dos dados: {str(e)}")
return None, None
# 5. Melhoria de qualidade
logger.info("✨ Fase 3: Melhoria de qualidade")
try:
# Calcula métricas iniciais
initial_metrics = enhancer.calculate_quality_metrics(dataset)
logger.info(f"📊 Métricas iniciais: {initial_metrics}")
# Filtra dados de baixa qualidade
filtered_dataset = enhancer.filter_low_quality(
dataset,
config.MIN_CONTENT_LENGTH,
config.MAX_CONTENT_LENGTH
)
# Remove duplicatas
deduplicated_dataset = enhancer.deduplicate(filtered_dataset)
# Adiciona metadados enriquecidos
enhanced_dataset = enhancer.enhance_with_metadata(deduplicated_dataset)
except Exception as e:
logger.error(f"❌ Erro no enhancement de dados: {str(e)}")
return None, None
# 6. Salvamento final
logger.info("💾 Fase 4: Salvamento")
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = f"fine_tuning_dataset_{timestamp}"
# Cria diretório se não existir
os.makedirs(output_path, exist_ok=True)
# Salva dataset
if config.OUTPUT_FORMAT == 'huggingface':
enhanced_dataset.save_to_disk(output_path)
else:
# Para outros formatos, salva via scraper
scraper.save_dataset_info(enhanced_dataset, output_path)
# Calcula métricas finais
final_metrics = enhancer.calculate_quality_metrics(enhanced_dataset)
# Salva relatório
report = {
'pipeline_execution': {
'timestamp': timestamp,
'sources': sources,
'config': {
'max_pages': config.MAX_PAGES_PER_DOMAIN,
'chunk_size': config.CHUNK_SIZE,
'min_length': config.MIN_CONTENT_LENGTH,
'max_length': config.MAX_CONTENT_LENGTH
}
},
'metrics': {
'initial': initial_metrics,
'final': final_metrics
},
'files': {
'dataset_path': output_path,
'log_file': 'scraping_pipeline.log'
}
}
report_file = os.path.join(output_path, 'pipeline_report.json')
with open(report_file, 'w', encoding='utf-8') as f:
import json
json.dump(report, f, indent=2, ensure_ascii=False)
logger.info("✅ Pipeline concluído com sucesso!")
logger.info(f"📁 Dataset salvo em: {output_path}")
logger.info(f"📊 Métricas finais: {final_metrics}")
logger.info(f"📄 Relatório salvo em: {report_file}")
return enhanced_dataset, final_metrics
except Exception as e:
logger.error(f"❌ Erro no salvamento: {str(e)}")
return None, None
def main():
"""
Função principal para execução do pipeline
"""
# Verifica se a API key está configurada
if not Config.FIRECRAWL_API_KEY:
logger.error("❌ FIRECRAWL_API_KEY não encontrada. Configure no arquivo .env")
return
logger.info("🔧 Iniciando configuração...")
config = Config()
# Executa o pipeline
dataset, metrics = create_fine_tuning_pipeline(config)
if dataset and metrics:
logger.info("🎉 Pipeline executado com sucesso!")
# Exibe resumo final
print("\n" + "="*50)
print("📈 RESUMO FINAL DO PIPELINE")
print("="*50)
print(f"📊 Total de exemplos: {metrics['total_documents']}")
print(f"📏 Comprimento médio: {metrics['avg_length']:.0f} caracteres")
print(f"🔤 Média de palavras: {metrics['avg_word_count']:.0f}")
print(f"🌐 Fontes únicas: {metrics['unique_sources']}")
print(f"📁 Dataset salvo em: fine_tuning_dataset_*")
print("="*50)
else:
logger.error("❌ Pipeline falhou")
return 1
return 0
if __name__ == "__main__":
exit(main())6. Arquivo .env
# Configurações do Firecrawl
FIRECRAWL_API_KEY=sua_api_key_aqui
# Configurações de scraping (opcionais)
MAX_PAGES_PER_DOMAIN=100
REQUEST_DELAY=1.0
TIMEOUT=30
# Configurações de qualidade de dados
MIN_CONTENT_LENGTH=100
MAX_CONTENT_LENGTH=10000
CHUNK_SIZE=5127. ✅ Execução
python pipeline.pySe tudo deu certo a saida é esta:

O diretório “fine_tuning_dataset_20251103_230714” e os arquivos contidos nele são gerados pela biblioteca datasets da Hugging Face e formam um dataset completo e otimizado. Vou explicar cada um em detalhes:
fine_tuning_dataset_20251103_230714/
|
├── data-00000-of-00001.arrow
├── dataset_info.json
├── pipeline_report.json
└── state.json📊 dataset_info.json
Propósito: Metadados e informações gerais sobre o dataset.
O que contém:
- ✅ Estrutura dos dados (schema) – define todos os campos e tipos
- ✅ Estatísticas – número de exemplos, tamanho em bytes
- ✅ Metadados – versão, licença, descrição
- ✅ Informações de splits – como os dados estão divididos
🗃️ data-00000-of-00001.arrow
Propósito: Os dados reais do dataset em formato Apache Arrow.
Formato: Apache Arrow (formato binário de colunas)
Por que Arrow?:
- 🚀 Performance: Leitura/escrita extremamente rápida
- 💾 Eficiência de memória: Processamento sem carregar tudo na RAM
- 🔄 Streaming: Permite processar dados maiores que a memória
- 📊 Columnar: Otimizado para operações em colunas (como pandas)
Estrutura interna:
Coluna 'text': ["conteúdo do chunk 1", "conteúdo do chunk 2", ...]
Coluna 'source': ["https://url1.com", "https://url2.com", ...]
Coluna 'chunk_id': [0, 1, 2, 3, 4, ...]
Coluna 'total_chunks': [5, 5, 5, 5, 5, ...]
Coluna 'metadata': [{"title": "...", "content_length": 150}, ...]Vantagens:
- ✅ Compressão automática
- ✅ Tipos de dados preservados
- ✅ Acesso aleatório rápido
- ✅ Compatível com pandas, PyTorch, TensorFlow
📋 state.json
Propósito: Estado interno do dataset e configurações do processamento.
O que controla:
- ✅ Fingerprint: Hash único que identifica o dataset
- ✅ Arquivos de dados: Lista dos arquivos .arrow
- ✅ Configurações de formatação: Como os dados são apresentados
- ✅ Split: Qual parte do dataset (train, test, validation)
Importância do fingerprint:
- Detecta automaticamente se os dados mudaram
- Gerencia cache de processamento
- Garante reprodutibilidade
📈 pipeline_report.json
Propósito: Relatório completo do processo de scraping e preparação.
Informações valiosas:
- ✅ Reprodutibilidade: Configurações exatas usadas
- ✅ Eficiência do pipeline: Comparação antes/depois do processamento
- ✅ Qualidade dos dados: Métricas de limpeza e filtragem
- ✅ Rastreabilidade: Timestamps e fontes originais
🎯 Resumo da Importância
| Arquivo | Para que serve | Analogia |
|---|---|---|
| dataset_info.json | 📋 “Certidão de nascimento” do dataset | Como um README técnico |
| data-00000-of-00001.arrow | 💾 Dados reais otimizados | Como os livros numa biblioteca |
| state.json | ⚙️ Configurações e estado | Como o catálogo da biblioteca |
| pipeline_report.json | 📊 Histórico e métricas | Como o relatório do bibliotecário |
Essa estrutura garante que seu dataset seja:
- ✅ Reprodutível (mesmos dados, mesmos resultados)
- ✅ Eficiente (carregamento rápido, baixo uso de memória)
- ✅ Portátil (funciona em diferentes ambientes)
- ✅ Auditável (sabe-se exatamente a origem e processamento)
Perfeito para fine-tuning profissional de modelos MML! 🚀
⚠️ Considerações Importantes
Éticas e Legais
- Respeite robots.txt: Sempre verifique as regras do site
- Rate limiting: Não sobrecarregue os servidores
- Termos de uso: Respeite os termos de cada site
- Dados pessoais: Evite coletar informações pessoais
Técnicas
- Tratamento de erros: Sempre implemente retry mechanisms
- Qualidade de dados: Valide a qualidade antes do treinamento
- Monitoramento: Acompanhe métricas de qualidade
🎯 Conclusão
Este tutorial mostrou como criar um pipeline completo de web scraping para fine-tuning de modelos MML. Com Python e Firecrawl, você pode:
✅ Coletar dados de documentações técnicas
✅ Processar e limpar o conteúdo
✅ Preparar datasets para treinamento
✅ Garantir qualidade e diversidade dos dados
Lembre-se: a qualidade dos dados é tão importante quanto a quantidade. Invista tempo no pré-processamento e validação para obter os melhores resultados no seu fine-tuning.
Próximos passos: Experimente com diferentes fontes de dados, implemente técnicas de augmentation e explore diferentes estratégias de chunking para otimizar o treinamento.
Gostou deste tutorial? Conecte-se comigo no LinkedIn para mais conteúdos sobre ciência de dados e ML!

