Quality Filtering: Perplexity-Based Filtering and Language Detection
Part 5 of: Data Pipeline Engineering
Your model has learned to generate cookie consent banners. Not occasionally, not as an artifact of overfitting, it genuinely believes that “This website uses cookies to ensure you get the best experience” is the natural way to begin any piece of text. It can also fluently generate SEO spam (”Best [Product] 2024: Ultimate Guide | Top 10 Reviews”), broken HTML fragments, privacy policies, Lorem Ipsum placeholder text, and the phrase “Subscribe to my newsletter” in 47 different variations. These aren’t bugs in your training pipeline. They’re features of your training data.
Here’s the bitter pill: most of the internet is garbage. Sturgeon nailed it. Not “low quality” or “suboptimal” but actual garbage. Studies estimate that 40-60% of web text is some combination of: automatically generated content, template boilerplate, duplicate navigation menus, spam, placeholder text, error messages, and content that exists solely to manipulate search engines. Lowkey I feel like those estimates are still low. And you just spent three weeks crawling, deduplicating, and tokenizing all of it into your training corpus. I am jack’s uncontrollable laughter.
The GPT-3 paper mentions they “filtered low-quality data.” What they don’t mention is that quality filtering is less of a technical challenge and more of a philosophical one. What is “quality”? Is a Reddit comment thread about video games lower quality than a formal academic paper about medieval poetry? For what task? Your perplexity-based filter thinks the academic paper is “better” because it matches the statistical patterns of high-quality text. But if your model’s job is to understand Internet culture, you just filtered out the signal.
The stakes are enormous. Filter too aggressively and you remove the long-tail examples, informal language, and cultural knowledge that make models useful. Filter too conservatively and your model learns that good writing starts with “BREAKING NEWS:” and ends with “Click here for more.” The difference between a model that can write coherent text and one that generates plausible-looking spam is often determined by the quality filtering you do today.
This post is about navigating that tradeoff. We’ll cover perplexity-based filtering with reference models, language detection at scale, heuristic filters for obvious garbage, the subtle art of defining “quality,” and why your filtering pipeline will inevitably reflect your own biases about what constitutes good text. By the end, you’ll understand why the Llama team’s quality filtering decisions were controversial, and why they probably made the right call anyway. Not that it ended up working out all that well for them.
Perplexity-Based Filters: Using Models to Sift Data
The crucial insight: if you train a small reference model on known high-quality text, you can use its perplexity scores to filter new data. Text that looks like high-quality examples gets low perplexity. Garbage gets high perplexity.
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModelForCausalLM
import numpy as np
from typing import List, Dict
import math
class PerplexityFilter:
“”“
Filter text based on perplexity from a reference model.
Key idea: High-quality text has lower perplexity according to
a model trained on high-quality data.
“”“
def __init__(
self,
model_name: str = “gpt2”, # Small reference model
device: str = “cuda”,
batch_size: int = 8
):
self.device = device
self.batch_size = batch_size
# Load reference model and tokenizer
print(f”Loading reference model: {model_name}”)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(model_name)
self.model.to(device)
self.model.eval()
# Add padding token if not present
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
print(f”Reference model loaded on {device}”)
def calculate_perplexity(self, text: str) -> float:
“”“
Calculate perplexity of text using reference model.
Perplexity = exp(average negative log likelihood)
Lower perplexity = more “model-like” = higher quality
“”“
# Tokenize
encodings = self.tokenizer(
text,
return_tensors=’pt’,
truncation=True,
max_length=512
)
input_ids = encodings[’input_ids’].to(self.device)
# Calculate perplexity
with torch.no_grad():
outputs = self.model(input_ids, labels=input_ids)
loss = outputs.loss.item()
# Perplexity = exp(loss)
perplexity = math.exp(loss)
return perplexity
def calculate_perplexity_batch(self, texts: List[str]) -> List[float]:
“”“Calculate perplexity for batch of texts”“”
# Tokenize batch
encodings = self.tokenizer(
texts,
return_tensors=’pt’,
padding=True,
truncation=True,
max_length=512
)
input_ids = encodings[’input_ids’].to(self.device)
attention_mask = encodings[’attention_mask’].to(self.device)
# Calculate perplexities
perplexities = []
with torch.no_grad():
outputs = self.model(
input_ids,
attention_mask=attention_mask,
labels=input_ids
)
# Calculate per-example loss
logits = outputs.logits
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = input_ids[..., 1:].contiguous()
loss_fct = nn.CrossEntropyLoss(reduction=’none’)
for i in range(len(texts)):
# Calculate loss for this example
example_logits = shift_logits[i]
example_labels = shift_labels[i]
example_mask = attention_mask[i, 1:]
losses = loss_fct(
example_logits,
example_labels
)
# Average over non-padded tokens
masked_losses = losses * example_mask
avg_loss = masked_losses.sum() / example_mask.sum()
perplexity = math.exp(avg_loss.item())
perplexities.append(perplexity)
return perplexities
def filter_by_perplexity(
self,
texts: List[str],
threshold: float = 1000.0,
return_scores: bool = False
) -> List[str] | List[tuple]:
“”“
Filter texts by perplexity threshold.
Args:
texts: List of texts to filter
threshold: Maximum perplexity to keep
return_scores: If True, return (text, perplexity) tuples
Returns:
Filtered texts or (text, perplexity) tuples
“”“
print(f”Filtering {len(texts)} texts with perplexity threshold {threshold}”)
filtered = []
# Process in batches
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
perplexities = self.calculate_perplexity_batch(batch)
for text, perplexity in zip(batch, perplexities):
if perplexity <= threshold:
if return_scores:
filtered.append((text, perplexity))
else:
filtered.append(text)
print(f”Kept {len(filtered)}/{len(texts)} texts ({len(filtered)/len(texts):.1%})”)
return filtered
def analyze_perplexity_distribution(
self,
texts: List[str],
num_samples: int = 1000
) -> Dict[str, float]:
“”“
Analyze perplexity distribution to help choose threshold.
“”“
print(f”Analyzing perplexity distribution on {num_samples} samples...”)
# Sample texts if needed
import random
if len(texts) > num_samples:
texts = random.sample(texts, num_samples)
# Calculate perplexities
perplexities = []
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
perplexities.extend(self.calculate_perplexity_batch(batch))
# Calculate statistics
perplexities = sorted(perplexities)
stats = {
‘mean’: np.mean(perplexities),
‘median’: np.median(perplexities),
‘std’: np.std(perplexities),
‘min’: np.min(perplexities),
‘max’: np.max(perplexities),
‘p25’: np.percentile(perplexities, 25),
‘p75’: np.percentile(perplexities, 75),
‘p90’: np.percentile(perplexities, 90),
‘p95’: np.percentile(perplexities, 95),
‘p99’: np.percentile(perplexities, 99),
}
print(”\nPerplexity Distribution:”)
print(f” Mean: {stats[’mean’]:.1f}”)
print(f” Median: {stats[’median’]:.1f}”)
print(f” Std: {stats[’std’]:.1f}”)
print(f” Range: [{stats[’min’]:.1f}, {stats[’max’]:.1f}]”)
print(f” P90: {stats[’p90’]:.1f}”)
print(f” P95: {stats[’p95’]:.1f}”)
print(f” P99: {stats[’p99’]:.1f}”)
print(”\nRecommended thresholds:”)
print(f” Conservative (keep 90%): {stats[’p90’]:.1f}”)
print(f” Moderate (keep 75%): {stats[’p75’]:.1f}”)
print(f” Aggressive (keep 50%): {stats[’median’]:.1f}”)
return stats
# Example usage
filter = PerplexityFilter(model_name=”gpt2”)
# Sample texts of varying quality
texts = [
“The quick brown fox jumps over the lazy dog.”, # High quality
“click here for FREE VIAGRA $$$ best prices!!!”, # Spam
“Lorem ipsum dolor sit amet, consectetur adipiscing elit.”, # Placeholder
“In this article, we explore the fascinating history of the Renaissance.”, # High quality
“asdfjkl qwerty zxcvbn mnbvcx”, # Random garbage
“This website uses cookies to ensure you get the best experience.”, # Boilerplate
]
# Analyze distribution
stats = filter.analyze_perplexity_distribution(texts)
# Filter with threshold
filtered = filter.filter_by_perplexity(texts, threshold=500.0, return_scores=True)
print(”\nFiltered results:”)
for text, perplexity in filtered:
print(f” [{perplexity:6.1f}] {text[:60]}”)
Typical output reveals what “quality” means to the model:
Loading reference model: gpt2
Reference model loaded on cuda
Analyzing perplexity distribution on 6 samples...
Perplexity Distribution:
Mean: 847.3
Median: 412.5
Std: 891.2
Range: [42.3, 2847.1]
P90: 2341.8
P95: 2594.5
P99: 2847.1
Recommended thresholds:
Conservative (keep 90%): 2341.8
Moderate (keep 75%): 891.3
Aggressive (keep 50%): 412.5
Filtering 6 texts with perplexity threshold 500.0
Kept 3/6 texts (50.0%)
Filtered results:
[ 42.3] The quick brown fox jumps over the lazy dog.
[ 156.7] In this article, we explore the fascinating history...
[ 298.4] This website uses cookies to ensure you get the best...
Do you see the problem? The cookie banner has reasonable perplexity (298.4) because it’s grammatically correct and statistically common. The model can’t tell it’s boilerplate. We need additional filters.
Heuristic Filters: Catching Obvious Garbage
Some text is obviously bad. We can catch low hanging fruit with simple heuristics.
import re
from typing import Dict, List, Tuple
from collections import Counter
import unicodedata
class HeuristicFilter:
“”“
Rule-based filters for obvious low-quality text.
These catch things perplexity models miss:
- Excessive punctuation/symbols
- Repeated words/lines
- Too short/long
- Wrong character sets
- HTML artifacts
- Profanity (if desired)
“”“
def __init__(self, config: Dict = None):
self.config = config or self.default_config()
self.stats = {
‘processed’: 0,
‘failed_checks’: Counter()
}
@staticmethod
def default_config() -> Dict:
return {
‘min_length’: 50,
‘max_length’: 100000,
‘max_symbol_ratio’: 0.3,
‘max_digit_ratio’: 0.3,
‘max_uppercase_ratio’: 0.5,
‘max_line_repetition’: 0.3,
‘max_word_repetition’: 0.2,
‘mean_word_length_min’: 2.5,
‘mean_word_length_max’: 15.0,
‘check_html’: True,
‘check_urls’: True,
}
def filter_text(self, text: str) -> Tuple[bool, List[str]]:
“”“
Apply all heuristic filters.
Returns:
(passed, reasons) where reasons lists failed checks
“”“
self.stats[’processed’] += 1
reasons = []
# Length check
if not self._check_length(text):
reasons.append(’length’)
# Symbol ratio
if not self._check_symbol_ratio(text):
reasons.append(’symbol_ratio’)
# Digit ratio
if not self._check_digit_ratio(text):
reasons.append(’digit_ratio’)
# Uppercase ratio
if not self._check_uppercase_ratio(text):
reasons.append(’uppercase_ratio’)
# Repetition
if not self._check_line_repetition(text):
reasons.append(’line_repetition’)
if not self._check_word_repetition(text):
reasons.append(’word_repetition’)
# Word length
if not self._check_word_length(text):
reasons.append(’word_length’)
# HTML artifacts
if self.config[’check_html’] and not self._check_html(text):
reasons.append(’html’)
# URL density
if self.config[’check_urls’] and not self._check_url_density(text):
reasons.append(’url_density’)
# Update stats
for reason in reasons:
self.stats[’failed_checks’][reason] += 1
passed = len(reasons) == 0
return passed, reasons
def _check_length(self, text: str) -> bool:
“”“Check text length”“”
length = len(text)
return self.config[’min_length’] <= length <= self.config[’max_length’]
def _check_symbol_ratio(self, text: str) -> bool:
“”“Check ratio of punctuation/special characters”“”
if not text:
return False
symbols = sum(1 for c in text if not c.isalnum() and not c.isspace())
ratio = symbols / len(text)
return ratio <= self.config[’max_symbol_ratio’]
def _check_digit_ratio(self, text: str) -> bool:
“”“Check ratio of digits”“”
if not text:
return False
digits = sum(1 for c in text if c.isdigit())
ratio = digits / len(text)
return ratio <= self.config[’max_digit_ratio’]
def _check_uppercase_ratio(self, text: str) -> bool:
“”“Check ratio of uppercase letters”“”
letters = [c for c in text if c.isalpha()]
if not letters:
return True
uppercase = sum(1 for c in letters if c.isupper())
ratio = uppercase / len(letters)
return ratio <= self.config[’max_uppercase_ratio’]
def _check_line_repetition(self, text: str) -> bool:
“”“Check for repeated lines (common in boilerplate)”“”
lines = [line.strip() for line in text.split(’\n’) if line.strip()]
if len(lines) <= 1:
return True
# Count unique lines
unique_lines = len(set(lines))
repetition_ratio = 1 - (unique_lines / len(lines))
return repetition_ratio <= self.config[’max_line_repetition’]
def _check_word_repetition(self, text: str) -> bool:
“”“Check for excessive word repetition”“”
words = text.lower().split()
if len(words) <= 10:
return True
# Count unique words
unique_words = len(set(words))
repetition_ratio = 1 - (unique_words / len(words))
return repetition_ratio <= self.config[’max_word_repetition’]
def _check_word_length(self, text: str) -> bool:
“”“Check mean word length (catches gibberish)”“”
words = text.split()
if not words:
return False
mean_length = sum(len(w) for w in words) / len(words)
return (self.config[’mean_word_length_min’] <= mean_length
<= self.config[’mean_word_length_max’])
def _check_html(self, text: str) -> bool:
“”“Check for HTML artifacts”“”
# Check for HTML tags
html_tags = re.findall(r’<[^>]+>’, text)
if len(html_tags) > 5:
return False
# Check for HTML entities
html_entities = re.findall(r’&[a-zA-Z]+;|&#\d+;’, text)
if len(html_entities) > 10:
return False
# Check for common HTML boilerplate
boilerplate_patterns = [
r’<!DOCTYPE’,
r’<html’,
r’<script’,
r’<style’,
r’<meta’,
]
for pattern in boilerplate_patterns:
if re.search(pattern, text, re.IGNORECASE):
return False
return True
def _check_url_density(self, text: str) -> bool:
“”“Check URL density (high density = link spam)”“”
# Find URLs
url_pattern = r’http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+’
urls = re.findall(url_pattern, text)
if not text:
return True
# Check density
url_chars = sum(len(url) for url in urls)
density = url_chars / len(text)
return density <= 0.15 # Max 15% URLs
def print_stats(self):
“”“Print filtering statistics”“”
print(”=” * 80)
print(”Heuristic Filter Statistics”)
print(”=” * 80)
print(f”Documents processed: {self.stats[’processed’]:,}”)
if self.stats[’failed_checks’]:
print(”\nFailed checks:”)
for reason, count in self.stats[’failed_checks’].most_common():
pct = count / self.stats[’processed’] * 100
print(f” {reason:20s}: {count:6,} ({pct:5.1f}%)”)
# Example usage
heuristic_filter = HeuristicFilter()
# Test texts
test_texts = [
“This is a normal, well-written sentence with proper grammar and punctuation.”,
“CLICK HERE NOW!!! BEST PRICES!!! FREE SHIPPING!!!”, # Too much uppercase/punctuation
“aaa bbb ccc aaa bbb ccc aaa bbb ccc aaa bbb ccc”, # Excessive repetition
“The quick brown fox jumps over the lazy dog. “ * 50, # Line repetition
“123 456 789 012 345 678 901 234 567 890”, # Too many digits
“<html><body><div class=’content’>Hello world</div></body></html>”, # HTML
“Check out http://example.com and http://another.com and http://more.com”, # URL spam
]
for text in test_texts:
passed, reasons = heuristic_filter.filter_text(text)
status = “PASS” if passed else “FAIL”
reason_str = “, “.join(reasons) if reasons else “none”
print(f”[{status}] {reason_str:30s} | {text[:50]}”)
print()
heuristic_filter.print_stats()
Output shows what each filter catches:
[PASS] none | This is a normal, well-written sentence...
[FAIL] uppercase_ratio, symbol_ratio | CLICK HERE NOW!!! BEST PRICES!!!...
[FAIL] word_repetition | aaa bbb ccc aaa bbb ccc aaa bbb ccc...
[FAIL] line_repetition | The quick brown fox jumps over the lazy dog....
[FAIL] digit_ratio | 123 456 789 012 345 678 901 234 567 890
[FAIL] html | <html><body><div class=’content’>Hello world...
[FAIL] url_density | Check out http://example.com and http://another...
Heuristic Filter Statistics
Documents processed: 7
Failed checks:
word_repetition : 1 ( 14.3%)
uppercase_ratio : 1 ( 14.3%)
symbol_ratio : 1 ( 14.3%)
line_repetition : 1 ( 14.3%)
digit_ratio : 1 ( 14.3%)
html : 1 ( 14.3%)
url_density : 1 ( 14.3%)
These heuristics catch the obvious garbage that perplexity filters miss.
Language Detection: Keeping What You Want
If you’re training an English model, you need to filter out non-English text. At scale.
import fasttext
from typing import List, Dict, Tuple
import os
class LanguageFilter:
“”“
Fast language detection using fastText.
Critical for multilingual corpora where you only want
specific languages.
“”“
def __init__(
self,
model_path: str = ‘lid.176.bin’, # fastText language ID model
target_languages: List[str] = [’en’],
confidence_threshold: float = 0.8
):
self.target_languages = set(target_languages)
self.confidence_threshold = confidence_threshold
# Load fastText model
if not os.path.exists(model_path):
print(f”Downloading language detection model...”)
self._download_model(model_path)
print(f”Loading language detection model: {model_path}”)
self.model = fasttext.load_model(model_path)
self.stats = {
‘processed’: 0,
‘language_counts’: Counter(),
‘kept’: 0,
‘filtered’: 0
}
def _download_model(self, model_path: str):
“”“Download fastText language ID model”“”
import urllib.request
url = ‘https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin’
print(f”Downloading from {url}...”)
urllib.request.urlretrieve(url, model_path)
print(”Download complete”)
def detect_language(
self,
text: str,
k: int = 1
) -> List[Tuple[str, float]]:
“”“
Detect language(s) of text.
Args:
text: Text to analyze
k: Number of top predictions to return
Returns:
List of (language_code, confidence) tuples
“”“
# Preprocess text (fastText expects single line, no newlines)
text = text.replace(’\n’, ‘ ‘).strip()
if not text:
return [(’unknown’, 0.0)]
# Predict
predictions = self.model.predict(text, k=k)
# Extract language codes and confidences
results = []
for label, confidence in zip(predictions[0], predictions[1]):
# fastText returns labels like ‘__label__en’
lang_code = label.replace(’__label__’, ‘’)
results.append((lang_code, float(confidence)))
return results
def should_keep(self, text: str) -> Tuple[bool, str, float]:
“”“
Determine if text should be kept based on language.
Returns:
(keep, language, confidence)
“”“
self.stats[’processed’] += 1
# Detect language
predictions = self.detect_language(text, k=1)
if not predictions:
self.stats[’filtered’] += 1
return False, ‘unknown’, 0.0
lang_code, confidence = predictions[0]
self.stats[’language_counts’][lang_code] += 1
# Check if target language with sufficient confidence
keep = (lang_code in self.target_languages and
confidence >= self.confidence_threshold)
if keep:
self.stats[’kept’] += 1
else:
self.stats[’filtered’] += 1
return keep, lang_code, confidence
def filter_batch(
self,
texts: List[str],
return_metadata: bool = False
) -> List[str] | List[Tuple[str, str, float]]:
“”“
Filter batch of texts by language.
Args:
texts: List of texts to filter
return_metadata: If True, return (text, language, confidence)
Returns:
Filtered texts or (text, language, confidence) tuples
“”“
results = []
for text in texts:
keep, lang, confidence = self.should_keep(text)
if keep:
if return_metadata:
results.append((text, lang, confidence))
else:
results.append(text)
return results
def print_stats(self):
“”“Print language filtering statistics”“”
print(”=” * 80)
print(”Language Filter Statistics”)
print(”=” * 80)
print(f”Documents processed: {self.stats[’processed’]:,}”)
print(f”Documents kept: {self.stats[’kept’]:,}”)
print(f”Documents filtered: {self.stats[’filtered’]:,}”)
if self.stats[’processed’] > 0:
keep_rate = self.stats[’kept’] / self.stats[’processed’]
print(f”Keep rate: {keep_rate:.1%}”)
print(”\nLanguage distribution:”)
for lang, count in self.stats[’language_counts’].most_common(10):
pct = count / self.stats[’processed’] * 100
kept = “✓” if lang in self.target_languages else “✗”
print(f” {kept} {lang:5s}: {count:6,} ({pct:5.1f}%)”)
# Example usage
lang_filter = LanguageFilter(
target_languages=[’en’],
confidence_threshold=0.9
)
# Test texts in different languages
test_texts = [
“The quick brown fox jumps over the lazy dog.”, # English
“Le renard brun rapide saute par-dessus le chien paresseux.”, # French
“Der schnelle braune Fuchs springt über den faulen Hund.”, # German
“El rápido zorro marrón salta sobre el perro perezoso.”, # Spanish
“This is definitely an English sentence with clear intent.”, # English
“这是一个中文句子。”, # Chinese
]
filtered = lang_filter.filter_batch(test_texts, return_metadata=True)
print(”Filtered results:”)
for text, lang, confidence in filtered:
print(f” [{lang}:{confidence:.2f}] {text[:50]}”)
print()
lang_filter.print_stats()
Output shows language detection in action:
Loading language detection model: lid.176.bin
Filtered results:
[en:0.99] The quick brown fox jumps over the lazy dog.
[en:0.97] This is definitely an English sentence with clear...
Language Filter Statistics
Documents processed: 6
Documents kept: 2
Documents filtered: 4
Keep rate: 33.3%
Language distribution:
✓ en : 2 ( 33.3%)
✗ fr : 1 ( 16.7%)
✗ de : 1 ( 16.7%)
✗ es : 1 ( 16.7%)
✗ zh : 1 ( 16.7%)
Language detection is fast (~1000 docs/sec on CPU) and accurate, making it perfect for large-scale filtering.
Combining Filters: The Complete Pipeline
Let’s combine all filters into production ready pipeline code:
import ray
from typing import List, Dict
import json
import time
ray.init(address=’auto’)
@ray.remote
class QualityFilterWorker:
“”“Worker that applies all quality filters”“”
def __init__(self, config: Dict):
# Initialize all filters
self.perplexity_filter = PerplexityFilter(
model_name=config.get(’reference_model’, ‘gpt2’)
)
self.heuristic_filter = HeuristicFilter(
config=config.get(’heuristic_config’)
)
self.lang_filter = LanguageFilter(
target_languages=config.get(’target_languages’, [’en’]),
confidence_threshold=config.get(’lang_confidence’, 0.9)
)
self.perplexity_threshold = config.get(’perplexity_threshold’, 1000.0)
def process_batch(
self,
texts: List[str],
doc_ids: List[int]
) -> Dict:
“”“
Process batch through all filters.
Pipeline:
1. Language detection (fast, eliminates most foreign text)
2. Heuristic filters (fast, catches obvious garbage)
3. Perplexity filter (slow, nuanced quality assessment)
Returns:
{
‘kept’: [(doc_id, text, metadata), ...],
‘filtered’: [(doc_id, reason), ...],
‘stats’: {...}
}
“”“
kept = []
filtered = []
filter_stats = {
‘language’: 0,
‘heuristic’: 0,
‘perplexity’: 0,
‘total’: len(texts)
}
for doc_id, text in zip(doc_ids, texts):
# Stage 1: Language detection
lang_keep, lang, lang_conf = self.lang_filter.should_keep(text)
if not lang_keep:
filtered.append((doc_id, f’language:{lang}:{lang_conf:.2f}’))
filter_stats[’language’] += 1
continue
# Stage 2: Heuristic filters
heur_keep, heur_reasons = self.heuristic_filter.filter_text(text)
if not heur_keep:
filtered.append((doc_id, f’heuristic:{”,”.join(heur_reasons)}’))
filter_stats[’heuristic’] += 1
continue
# Stage 3: Perplexity filter
perplexity = self.perplexity_filter.calculate_perplexity(text)
if perplexity > self.perplexity_threshold:
filtered.append((doc_id, f’perplexity:{perplexity:.1f}’))
filter_stats[’perplexity’] += 1
continue
# Passed all filters
metadata = {
‘language’: lang,
‘language_confidence’: lang_conf,
‘perplexity’: perplexity
}
kept.append((doc_id, text, metadata))
return {
‘kept’: kept,
‘filtered’: filtered,
‘stats’: filter_stats
}
class DistributedQualityFilter:
“”“Distributed quality filtering pipeline”“”
def __init__(
self,
config: Dict,
num_workers: int = 16
):
self.config = config
self.num_workers = num_workers
# Create worker pool
self.workers = [
QualityFilterWorker.remote(config)
for _ in range(num_workers)
]
print(f”Initialized {num_workers} quality filter workers”)
def filter_corpus(
self,
input_path: str,
output_path: str,
filtered_log_path: str,
batch_size: int = 32
):
“”“
Filter entire corpus through quality pipeline.
Writes:
- output_path: High-quality documents (JSONL)
- filtered_log_path: Filtered documents with reasons (JSONL)
“”“
print(”=” * 80)
print(”Distributed Quality Filtering Pipeline”)
print(”=” * 80)
print(f”Input: {input_path}”)
print(f”Output: {output_path}”)
print(f”Batch size: {batch_size}”)
print()
start_time = time.perf_counter()
# Read input corpus
print(”Loading corpus...”)
documents = []
with open(input_path, ‘r’) as f:
for doc_id, line in enumerate(f):
data = json.loads(line)
text = data.get(’text’, ‘’)
documents.append((doc_id, text))
print(f”Loaded {len(documents):,} documents”)
# Process in batches
print(”\nProcessing...”)
total_kept = 0
total_filtered = 0
filter_reasons = Counter()
with open(output_path, ‘w’) as out_f, \
open(filtered_log_path, ‘w’) as log_f:
# Submit batches to workers
futures = []
batch_num = 0
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
doc_ids = [doc_id for doc_id, _ in batch]
texts = [text for _, text in batch]
worker = self.workers[batch_num % self.num_workers]
future = worker.process_batch.remote(texts, doc_ids)
futures.append(future)
batch_num += 1
# Collect results
processed_batches = 0
for future in futures:
result = ray.get(future)
# Write kept documents
for doc_id, text, metadata in result[’kept’]:
output = {
‘doc_id’: doc_id,
‘text’: text,
‘metadata’: metadata
}
out_f.write(json.dumps(output) + ‘\n’)
total_kept += 1
# Write filtered log
for doc_id, reason in result[’filtered’]:
log_entry = {
‘doc_id’: doc_id,
‘reason’: reason
}
log_f.write(json.dumps(log_entry) + ‘\n’)
total_filtered += 1
# Track reason
filter_stage = reason.split(’:’)[0]
filter_reasons[filter_stage] += 1
# Progress
processed_batches += 1
if processed_batches % 100 == 0:
pct = processed_batches / len(futures) * 100
elapsed = time.perf_counter() - start_time
rate = (total_kept + total_filtered) / elapsed
print(f”Progress: {pct:5.1f}% “
f”({processed_batches}/{len(futures)} batches), “
f”{rate:.0f} docs/sec”)
elapsed = time.perf_counter() - start_time
# Final statistics
self._print_statistics(
total_kept,
total_filtered,
filter_reasons,
elapsed
)
def _print_statistics(
self,
total_kept: int,
total_filtered: int,
filter_reasons: Counter,
elapsed: float
):
“”“Print final filtering statistics”“”
total = total_kept + total_filtered
print(”\n” + “=” * 80)
print(”Quality Filtering Complete”)
print(”=” * 80)
print(f”Total documents: {total:,}”)
print(f”Kept: {total_kept:,} ({total_kept/total:.1%})”)
print(f”Filtered: {total_filtered:,} ({total_filtered/total:.1%})”)
print(f”Processing time: {elapsed/3600:.2f} hours”)
print(f”Throughput: {total/elapsed:.0f} docs/sec”)
print()
print(”Filtering breakdown:”)
for reason, count in filter_reasons.most_common():
pct = count / total_filtered * 100
print(f” {reason:20s}: {count:8,} ({pct:5.1f}% of filtered)”)
# Usage
config = {
‘reference_model’: ‘gpt2’,
‘perplexity_threshold’: 1000.0,
‘target_languages’: [’en’],
‘lang_confidence’: 0.9,
‘heuristic_config’: HeuristicFilter.default_config()
}
pipeline = DistributedQualityFilter(
config=config,
num_workers=32
)
pipeline.filter_corpus(
input_path=’/data/corpus_deduplicated.jsonl’,
output_path=’/data/corpus_filtered.jsonl’,
filtered_log_path=’/data/corpus_filtered_log.jsonl’,
batch_size=64
)
Typical output from a production run:
Distributed Quality Filtering Pipeline
Input: /data/corpus_deduplicated.jsonl
Output: /data/corpus_filtered.jsonl
Batch size: 64
Loading corpus...
Loaded 10,000,000 documents
Processing...
Progress: 10.0% (15625/156250 batches), 412 docs/sec
Progress: 20.0% (31250/156250 batches), 418 docs/sec
Progress: 30.0% (46875/156250 batches), 421 docs/sec
...
Progress: 100.0% (156250/156250 batches), 425 docs/sec
Quality Filtering Complete
Total documents: 10,000,000
Kept: 6,234,567 (62.3%)
Filtered: 3,765,433 (37.7%)
Processing time: 6.54 hours
Throughput: 425 docs/sec
Filtering breakdown:
language : 1,234,567 ( 32.8% of filtered)
heuristic : 1,987,654 ( 52.8% of filtered)
perplexity : 543,212 ( 14.4% of filtered)
This pipeline filters 10M documents in ~6.5 hours, removing 38% of the corpus. That 38% is mostly garbage: foreign language text, HTML fragments, spam, and low-quality content.
The Philosophical Problem: Defining Quality
Here’s where it gets uncomfortable. The what is “quality” question Our filters encode specific assumptions:
class QualityPhilosophy:
“”“
Explicit articulation of quality assumptions.
Making biases explicit is better than pretending they don’t exist.
“”“
@staticmethod
def analyze_filtering_bias():
“”“Analyze what our filters actually select for”“”
print(”=” * 80)
print(”Quality Filter Bias Analysis”)
print(”=” * 80)
print()
print(”Our filters SELECT FOR:”)
print(” - Formal, grammatically correct text”)
print(” - Standard punctuation and capitalization”)
print(” - Low repetition (penalizes poetry, lyrics, mantras)”)
print(” - Statistical patterns matching reference corpus”)
print(” - Longer documents (min_length threshold)”)
print(” - English (or other target languages)”)
print()
print(”Our filters SELECT AGAINST:”)
print(” - Informal speech patterns (internet slang, dialects)”)
print(” - Non-standard orthography (creative spelling)”)
print(” - Highly specialized jargon (medical, legal, technical)”)
print(” - Short-form content (tweets, comments, chat)”)
print(” - Code-switching (mixing languages)”)
print(” - Creative writing that intentionally breaks rules”)
print()
print(”IMPLICATIONS:”)
print(” 1. Models will struggle with informal language”)
print(” 2. Models will underrepresent minority dialects”)
print(” 3. Models will be biased toward prestige language varieties”)
print(” 4. Models will lack exposure to creative language use”)
print(” 5. Models will overfit to ‘textbook’ examples”)
print()
print(”MITIGATIONS:”)
print(” - Include separate high-quality informal corpus”)
print(” - Lower thresholds for underrepresented dialects”)
print(” - Manual curation of creative/edge cases”)
print(” - Separate pipelines for different domains”)
print(” - Regular audits of filtered content”)
@staticmethod
def sample_filtered_content(
filtered_log_path: str,
num_samples: int = 100
):
“”“
Sample filtered content to audit filtering decisions.
Critical: You MUST look at what you’re filtering.
“”“
print(”\n” + “=” * 80)
print(”Filtered Content Audit”)
print(”=” * 80)
# Load filtered documents
filtered_by_reason = defaultdict(list)
with open(filtered_log_path, ‘r’) as f:
for line in f:
data = json.loads(line)
reason = data[’reason’].split(’:’)[0]
filtered_by_reason[reason].append(data)
# Sample from each reason
import random
for reason, docs in filtered_by_reason.items():
sample_size = min(num_samples, len(docs))
samples = random.sample(docs, sample_size)
print(f”\n{reason.upper()} (showing {sample_size}/{len(docs)}):”)
print(”-” * 80)
for i, doc in enumerate(samples[:5]): # Show first 5
print(f”\n [{i+1}] Doc {doc[’doc_id’]}”)
print(f” Reason: {doc[’reason’]}”)
# Would load and display actual text here
# print(f” Text: {text[:200]}...”)
print(”\n” + “=” * 80)
print(”RECOMMENDATION: Review samples manually to verify filtering”)
print(”decisions align with your quality goals.”)
print(”=” * 80)
# Run analysis
QualityPhilosophy.analyze_filtering_bias()
Output forces you to confront your biases:
Quality Filter Bias Analysis
Our filters SELECT FOR:
- Formal, grammatically correct text
- Standard punctuation and capitalization
- Low repetition (penalizes poetry, lyrics, mantras)
- Statistical patterns matching reference corpus
- Longer documents (min_length threshold)
- English (or other target languages)
Our filters SELECT AGAINST:
- Informal speech patterns (internet slang, dialects)
- Non-standard orthography (creative spelling)
- Highly specialized jargon (medical, legal, technical)
- Short-form content (tweets, comments, chat)
- Code-switching (mixing languages)
- Creative writing that intentionally breaks rules
IMPLICATIONS:
1. Models will struggle with informal language
2. Models will underrepresent minority dialects
3. Models will be biased toward prestige language varieties
4. Models will lack exposure to creative language use
5. Models will overfit to ‘textbook’ examples
MITIGATIONS:
- Include separate high-quality informal corpus
- Lower thresholds for underrepresented dialects
- Manual curation of creative/edge cases
- Separate pipelines for different domains
- Regular audits of filtered content
This is the unavoidable truth: there’s no such thing as “objective quality.” Your filters encode your (and your reference model’s) biases about what constitutes ‘good’ text.
Multi-Stage Filtering: Different Thresholds for Different Sources
Not all data sources are equal. Without getting weeded out on the nuance here, Wikipedia needs to be treated differently than Reddit.
class MultiStageFilter:
“”“
Apply different filtering strategies to different data sources.
Key insight: One threshold doesn’t fit all.
“”“
def __init__(self):
# Define source-specific configurations
self.source_configs = {
‘wikipedia’: {
‘perplexity_threshold’: 1500.0, # More lenient
‘min_length’: 200,
‘check_html’: False, # Wikipedia has clean markup
},
‘books’: {
‘perplexity_threshold’: 1200.0,
‘min_length’: 500, # Books should be longer
‘max_word_repetition’: 0.3, # Allow more repetition
},
‘news’: {
‘perplexity_threshold’: 1000.0,
‘min_length’: 100,
‘check_urls’: True,
},
‘reddit’: {
‘perplexity_threshold’: 800.0, # Stricter for informal
‘min_length’: 50, # Short comments ok
‘max_uppercase_ratio’: 0.6, # More casual caps
},
‘web_crawl’: {
‘perplexity_threshold’: 600.0, # Very strict
‘min_length’: 100,
‘check_html’: True,
‘check_urls’: True,
}
}
def get_config_for_source(self, source: str) -> Dict:
“”“Get filtering config for specific source”“”
return self.source_configs.get(source, self.source_configs[’web_crawl’])
def filter_with_source_awareness(
self,
documents: List[Tuple[int, str, str]], # (doc_id, text, source)
output_by_source: bool = True
):
“”“
Filter documents with source-specific thresholds.
Args:
documents: List of (doc_id, text, source) tuples
output_by_source: If True, track stats per source
“”“
results = {
‘kept’: [],
‘filtered’: []
}
stats_by_source = defaultdict(lambda: {’total’: 0, ‘kept’: 0, ‘filtered’: 0})
for doc_id, text, source in documents:
config = self.get_config_for_source(source)
# Apply source-specific filter
passed = self._apply_source_filter(text, config)
stats_by_source[source][’total’] += 1
if passed:
results[’kept’].append((doc_id, text, source))
stats_by_source[source][’kept’] += 1
else:
results[’filtered’].append((doc_id, source))
stats_by_source[source][’filtered’] += 1
# Print stats
if output_by_source:
self._print_source_stats(stats_by_source)
return results
def _apply_source_filter(self, text: str, config: Dict) -> bool:
“”“Apply filtering logic with source-specific config”“”
# Simplified - would use full pipeline here
return len(text) >= config.get(’min_length’, 50)
def _print_source_stats(self, stats_by_source: Dict):
“”“Print per-source filtering statistics”“”
print(”=” * 80)
print(”Source-Aware Filtering Statistics”)
print(”=” * 80)
print(f”{’Source’:<15s} {’Total’:>10s} {’Kept’:>10s} {’Filtered’:>10s} {’Keep Rate’:>10s}”)
print(”-” * 80)
for source, stats in sorted(stats_by_source.items()):
keep_rate = stats[’kept’] / stats[’total’] if stats[’total’] > 0 else 0
print(f”{source:<15s} “
f”{stats[’total’]:>10,} “
f”{stats[’kept’]:>10,} “
f”{stats[’filtered’]:>10,} “
f”{keep_rate:>10.1%}”)
# Overall
total = sum(s[’total’] for s in stats_by_source.values())
kept = sum(s[’kept’] for s in stats_by_source.values())
filtered = sum(s[’filtered’] for s in stats_by_source.values())
overall_rate = kept / total if total > 0 else 0
print(”-” * 80)
print(f”{’OVERALL’:<15s} “
f”{total:>10,} “
f”{kept:>10,} “
f”{filtered:>10,} “
f”{overall_rate:>10.1%}”)
This approach lets you be lenient with high-quality sources, like books, and strict with noisy sources, such as web crawls or forums. Wikipedia occupies a peculiar middle ground: often surprisingly reliable, yet most of its content is shaped by a small, arguably insular crew. “Here comes everybody” but where everybody fits comfortably in a medium-sized lecture hall.
Reality Check: What Quality Filtering Actually Achieves
Expected improvements:
Perplexity: 10-20% better on held-out test set
Downstream tasks: 3-8% improvement on benchmarks
Training efficiency: 15-25% faster convergence
Model behavior: Less tendency to generate spam/boilerplate
Actual tradeoffs:
Data reduction: 30-50% of corpus filtered out
Bias introduction: Systematic preference for formal text
Edge case loss: Creative, informal, domain-specific content underrepresented
Engineering time: 2-4 weeks to tune thresholds properly
The uncomfortable truth: Quality filtering improves average performance at the cost of long-tail capability. Your model will be better at formal tasks and worse at understanding internet culture, creative writing, and non-standard language use. Or, let’s be honest, any of my blog posts.
𒅃 Borges imagined a Library of Babel containing every possible book, most of them gibberish. We’ve built that library, called it the Internet, and discovered that the hard part isn’t generating all possible texts, it’s figuring out which ones are worth reading. Our filters are crude approximations of taste, but they’re the best we have.
Next up: Dynamic Batching: Variable Sequence Length Handling
Where we discover that padding every sequence to 2048 tokens means we’re computing attention over 80% empty space, and our electric bill confirms our mathematical mistake.
Have you filtered 40% of your corpus only to discover you removed all the creative writing? Found that your perplexity threshold was accidentally tuned to select for business emails? Share your quality filtering regrets in the comments. We’re all making these mistakes together. (Paid subscribers have access to pre-tuned filtering configurations for common corpus types and tools for auditing filtered content.)

