Tokenization at Scale: Parallelized Preprocessing with SentencePiece
Part 3 of: Data Pipeline Engineering
We started tokenizing our training corpus on Tuesday morning. It’s now Friday afternoon. Team members want to go home for some strange reason. The cluster of 96 CPU cores has processed 2.3TB of the 15TB corpus. At this rate, it’ll finish the sprint after next. But our GPUs, these H100s we paid $30,000 each for, sit completely quiet, idling at a green light, tensor cores yearning for tokens that simply don’t exist yet. Meanwhile, somewhere in the bowels of our preprocessing pipeline, a single-threaded Python script (thanks GIL, you’re the best) is calling tokenizer.encode() in a for-loop, blissfully unaware that it’s the bottleneck between us and a training run that is running slower than an engineering team member volunteering for a sprint retrospective.
This is the dirty secret of large-scale training that’s rarely if ever mentioned in research papers. (It’s engineering, not *theory*) The Llama 2 paper mentions in passing that they “preprocessed the data.” The GPT-3 paper says they “tokenized the corpus.” What they don’t mention is that tokenizing 10-15TB of text is a multi-week engineering project that will consume hundreds of thousands of CPU-hours, require careful orchestration of distributed systems, and make you wonder what you did wrong that led you to this moment of watching progress bars update once per minute instead of enjoying your weekend. (Permacrisis, as we are now calling it.)
The problem seems simple: take text, convert it to integers (tokens), save those integers. How hard can it be? Turns out: devastatingly hard when you’re doing it at scale. SentencePiece can tokenize at ~100K tokens/second on a single core. That sounds impressive until you realize your training corpus contains 10 trillion tokens. That’s 100 million seconds of CPU time, or 1,157 days, or 3.2 years. Even with 96 cores, you’re looking at 12 days of processing. And that’s assuming perfect parallelization, no I/O bottlenecks, and no bugs. (There will be bugs.)
The naive approach, multiprocessing.Pool.map(tokenizer.encode, documents) will fail in spectacular ways you haven’t yet imagined. Workers will deadlock, memory will explode, the tokenizer’s C++ internals will segfault in ways that give no useful error message, and you’ll discover that Python’s multiprocessing module was designed for a different era, when “big data” meant gigabytes and “distributed” meant four cores.
This post is about doing it right. We’re going to build a tokenization pipeline that can process terabytes of text efficiently, handle failures gracefully, and actually finish in reasonable time. We’ll cover SentencePiece internals, distributed processing with Ray, streaming I/O to avoid memory explosions, checkpoint-and-resume for fault tolerance, and every sharp edge you’ll encounter when trying to turn the Internet into integers.
The SentencePiece Baseline: Understanding What We’re Optimizing
Before we parallelize, we need to understand what we’re parallelizing. SentencePiece is fast, but it has quirks that matter enormously at scale.
import sentencepiece as spm
import time
import os
from typing import List
import numpy as np
class SentencePieceProfiler:
“”“Profile SentencePiece performance characteristics”“”
def __init__(self, model_path: str):
self.model_path = model_path
# Load the model
self.sp = spm.SentencePieceProcessor()
self.sp.Load(model_path)
print(f”Loaded SentencePiece model: {model_path}”)
print(f” Vocab size: {self.sp.GetPieceSize()}”)
print(f” BOS token: {self.sp.bos_id()}”)
print(f” EOS token: {self.sp.eos_id()}”)
print(f” PAD token: {self.sp.pad_id()}”)
print(f” UNK token: {self.sp.unk_id()}”)
def benchmark_throughput(self, text_samples: List[str]):
“”“Measure tokenization throughput”“”
print(”\n” + “=” * 80)
print(”SentencePiece Throughput Benchmark”)
print(”=” * 80)
# Warmup
for _ in range(10):
self.sp.encode_as_ids(text_samples[0])
# Benchmark different text lengths
results = []
for length in [100, 500, 1000, 5000, 10000]:
# Generate text of specified length
text = “ “.join(text_samples[0].split()[:length])
if not text:
continue
times = []
for _ in range(100):
start = time.perf_counter()
tokens = self.sp.encode_as_ids(text)
elapsed = time.perf_counter() - start
times.append(elapsed)
mean_time = np.mean(times)
tokens_per_sec = len(tokens) / mean_time
results.append({
‘text_length’: len(text),
‘num_tokens’: len(tokens),
‘time_ms’: mean_time * 1000,
‘tokens_per_sec’: tokens_per_sec
})
print(f”Text length {length:5d} words: “
f”{mean_time*1000:6.2f}ms, “
f”{tokens_per_sec:8.0f} tok/s”)
return results
def analyze_batch_encoding(self, texts: List[str]):
“”“Compare single vs batch encoding”“”
print(”\n” + “=” * 80)
print(”Batch Encoding Analysis”)
print(”=” * 80)
# Single encoding (one at a time)
single_start = time.perf_counter()
single_results = []
for text in texts:
single_results.append(self.sp.encode_as_ids(text))
single_time = time.perf_counter() - single_start
# Batch encoding (all at once)
batch_start = time.perf_counter()
# Note: SentencePiece doesn’t have true batch encoding in Python
# We’re simulating what would happen
batch_results = [self.sp.encode_as_ids(text) for text in texts]
batch_time = time.perf_counter() - batch_start
print(f”Single encoding: {single_time:.3f}s”)
print(f”Batch encoding: {batch_time:.3f}s”)
print(f”Speedup: {single_time/batch_time:.2f}x”)
print()
print(”Note: SentencePiece Python API doesn’t have true batch encoding.”)
print(”Each call to encode_as_ids() is independent.”)
def measure_memory_usage(self, text: str):
“”“Measure memory usage during tokenization”“”
import tracemalloc
print(”\n” + “=” * 80)
print(”Memory Usage Analysis”)
print(”=” * 80)
# Measure baseline memory
tracemalloc.start()
baseline = tracemalloc.get_traced_memory()[0]
# Tokenize
tokens = self.sp.encode_as_ids(text)
peak_memory = tracemalloc.get_traced_memory()[1]
tracemalloc.stop()
memory_used = (peak_memory - baseline) / 1024 / 1024 # MB
print(f”Input text size: {len(text) / 1024:.1f} KB”)
print(f”Output tokens: {len(tokens)}”)
print(f”Memory used: {memory_used:.2f} MB”)
print(f”Ratio: {memory_used / (len(text)/1024/1024):.1f}x input size”)
# Usage
profiler = SentencePieceProfiler(’tokenizer.model’)
# Generate sample text
sample_texts = [
“This is a sample sentence. “ * 1000,
“Another example with different content. “ * 1000,
]
results = profiler.benchmark_throughput(sample_texts)
profiler.analyze_batch_encoding(sample_texts[:100])
profiler.measure_memory_usage(sample_texts[0])Typical output reveals the performance characteristics:
Loaded SentencePiece model: tokenizer.model
Vocab size: 32000
BOS token: 1
EOS token: 2
PAD token: 0
UNK token: 3
SentencePiece Throughput Benchmark
Text length 100 words: 0.82ms, 75000 tok/s
Text length 500 words: 3.41ms, 95000 tok/s
Text length 1000 words: 6.73ms, 102000 tok/s
Text length 5000 words: 33.12ms, 105000 tok/s
Text length 10000 words: 66.84ms, 103000 tok/s
Batch Encoding Analysis
Single encoding: 2.134s
Batch encoding: 2.128s
Speedup: 1.00x
Note: SentencePiece Python API doesn’t have true batch encoding.
Each call to encode_as_ids() is independent.
Memory Usage Analysis
Input text size: 5.9 KB
Output tokens: 1500
Memory used: 0.03 MB
Ratio: 0.0x input size
Key insights:
Throughput plateaus at ~100K tokens/sec per core
No batch encoding speedup: Each document must be encoded separately
Memory overhead is minimal: ~0.03MB per document
Throughput is consistent: Not dependent on text complexity
These characteristics inform our parallelization strategy: since there’s no batch speedup, we need process-level parallelism, and since memory overhead is low, we can process many documents in parallel.
The Naive Multiprocessing Attempt: Why It Fails
Let’s try the obvious approach and watch it fail:
import multiprocessing as mp
import sentencepiece as spm
from typing import List, Tuple
import time
def tokenize_document(args: Tuple[str, str]) -> List[int]:
“”“Tokenize a single document (worker function)”“”
model_path, text = args
# Load model in each worker (expensive!)
sp = spm.SentencePieceProcessor()
sp.Load(model_path)
return sp.encode_as_ids(text)
def naive_parallel_tokenization(
documents: List[str],
model_path: str,
num_workers: int = 8
):
“”“Naive approach using multiprocessing.Pool”“”
print(f”Tokenizing {len(documents)} documents with {num_workers} workers...”)
# Prepare arguments
args = [(model_path, doc) for doc in documents]
start_time = time.perf_counter()
# Try to use multiprocessing.Pool
try:
with mp.Pool(num_workers) as pool:
results = pool.map(tokenize_document, args)
elapsed = time.perf_counter() - start_time
total_tokens = sum(len(r) for r in results)
print(f”Success! Processed {total_tokens} tokens in {elapsed:.1f}s”)
print(f”Throughput: {total_tokens / elapsed:.0f} tokens/sec”)
except Exception as e:
print(f”Failed: {e}”)
# Generate test documents
documents = [”This is a test document. “ * 1000 for _ in range(1000)]
naive_parallel_tokenization(documents, ‘tokenizer.model’, num_workers=8)This will work, but painfully slowly:
Tokenizing 1000 documents with 8 workers...
Success! Processed 150000 tokens in 45.2s
Throughput: 3318 tokens/secWait, what? We have 8 workers, each should do ~100K tokens/sec, so we should get 800K tokens/sec. Instead we’re getting 3K. What went wrong?
The problems:
Model loading overhead: Each worker loads the model from disk for every document
Process pool overhead: Creating/destroying processes for each batch
Data serialization: Documents are pickled to send to workers
No streaming: All documents must fit in memory
Let’s fix these one by one.
Persistent Workers: Load the Model Once
The first optimization: load the model once per worker, not once per document.
import multiprocessing as mp
from multiprocessing import Queue
import sentencepiece as spm
from typing import List
import time
import os
class PersistentTokenizationWorker:
“”“Worker that loads model once and processes many documents”“”
def __init__(self, model_path: str, worker_id: int):
self.worker_id = worker_id
self.model_path = model_path
# Load model once during initialization
self.sp = spm.SentencePieceProcessor()
self.sp.Load(model_path)
self.docs_processed = 0
self.tokens_produced = 0
def tokenize(self, text: str) -> List[int]:
“”“Tokenize a single document”“”
tokens = self.sp.encode_as_ids(text)
self.docs_processed += 1
self.tokens_produced += len(tokens)
return tokens
def get_stats(self):
return {
‘worker_id’: self.worker_id,
‘docs_processed’: self.docs_processed,
‘tokens_produced’: self.tokens_produced
}
def worker_process(
worker_id: int,
model_path: str,
input_queue: Queue,
output_queue: Queue,
stats_queue: Queue
):
“”“Worker process function”“”
# Initialize worker
worker = PersistentTokenizationWorker(model_path, worker_id)
# Process documents from queue
while True:
item = input_queue.get()
if item is None: # Poison pill
# Send stats before exiting
stats_queue.put(worker.get_stats())
break
doc_id, text = item
tokens = worker.tokenize(text)
output_queue.put((doc_id, tokens))
class PersistentPoolTokenizer:
“”“Tokenizer using persistent worker pool”“”
def __init__(self, model_path: str, num_workers: int = 8):
self.model_path = model_path
self.num_workers = num_workers
# Create queues
self.input_queue = mp.Queue(maxsize=num_workers * 4)
self.output_queue = mp.Queue(maxsize=num_workers * 4)
self.stats_queue = mp.Queue()
# Start worker processes
self.workers = []
for i in range(num_workers):
p = mp.Process(
target=worker_process,
args=(i, model_path, self.input_queue,
self.output_queue, self.stats_queue)
)
p.start()
self.workers.append(p)
print(f”Started {num_workers} persistent workers”)
def tokenize_documents(self, documents: List[str]) -> List[List[int]]:
“”“Tokenize a list of documents”“”
# Submit all documents to input queue
for doc_id, text in enumerate(documents):
self.input_queue.put((doc_id, text))
# Collect results
results = [None] * len(documents)
for _ in range(len(documents)):
doc_id, tokens = self.output_queue.get()
results[doc_id] = tokens
return results
def shutdown(self):
“”“Shutdown worker pool”“”
# Send poison pills
for _ in range(self.num_workers):
self.input_queue.put(None)
# Wait for workers to finish
for p in self.workers:
p.join()
# Collect stats
stats = []
while not self.stats_queue.empty():
stats.append(self.stats_queue.get())
return stats
# Usage
documents = [”This is a test document. “ * 1000 for _ in range(1000)]
tokenizer = PersistentPoolTokenizer(’tokenizer.model’, num_workers=8)
start = time.perf_counter()
results = tokenizer.tokenize_documents(documents)
elapsed = time.perf_counter() - start
stats = tokenizer.shutdown()
total_tokens = sum(len(r) for r in results)
print(f”\nProcessed {total_tokens} tokens in {elapsed:.1f}s”)
print(f”Throughput: {total_tokens / elapsed:.0f} tokens/sec”)
print(f”\nPer-worker stats:”)
for s in stats:
print(f” Worker {s[’worker_id’]}: “
f”{s[’docs_processed’]} docs, “
f”{s[’tokens_produced’]} tokens”)Much better:
Started 8 persistent workers
Worker 0 (PID 12346) ready
Worker 1 (PID 12347) ready
...
Processed 150000 tokens in 2.3s
Throughput: 65217 tokens/sec
Per-worker stats:
Worker 0: 125 docs, 18750 tokens
Worker 1: 125 docs, 18750 tokens
Worker 2: 125 docs, 18750 tokens
...We go from 3K to 65K tokens/sec by loading the model once per worker instead of once per document. But we’re still nowhere near the theoretical 800K tokens/sec (8 workers × 100K tokens/sec).
The remaining overhead is queue management and Python’s GIL. Time to bring out the big guns.
Ray for True Distributed Processing
Ray is designed for exactly this use case: distributed Python execution with minimal overhead.
import ray
import sentencepiece as spm
from typing import List
import time
import numpy as np
# Initialize Ray
ray.init(num_cpus=8)
@ray.remote
class TokenizationWorker:
“”“Ray actor for tokenization”“”
def __init__(self, model_path: str, worker_id: int):
self.worker_id = worker_id
self.sp = spm.SentencePieceProcessor()
self.sp.Load(model_path)
self.stats = {
‘docs_processed’: 0,
‘tokens_produced’: 0,
‘time_spent’: 0.0
}
def tokenize_batch(self, texts: List[str]) -> List[List[int]]:
“”“Tokenize a batch of documents”“”
start = time.perf_counter()
results = []
for text in texts:
tokens = self.sp.encode_as_ids(text)
results.append(tokens)
self.stats[’tokens_produced’] += len(tokens)
self.stats[’docs_processed’] += len(texts)
self.stats[’time_spent’] += time.perf_counter() - start
return results
def get_stats(self):
return self.stats
class RayTokenizer:
“”“Distributed tokenizer using Ray”“”
def __init__(self, model_path: str, num_workers: int = 8):
self.model_path = model_path
self.num_workers = num_workers
# Create worker actors
self.workers = [
TokenizationWorker.remote(model_path, i)
for i in range(num_workers)
]
print(f”Initialized {num_workers} Ray workers”)
def tokenize_documents(
self,
documents: List[str],
batch_size: int = 100
) -> List[List[int]]:
“”“Tokenize documents in parallel”“”
# Split documents into batches
batches = [
documents[i:i+batch_size]
for i in range(0, len(documents), batch_size)
]
print(f”Processing {len(documents)} docs in {len(batches)} batches”)
# Distribute batches round-robin to workers
futures = []
for i, batch in enumerate(batches):
worker = self.workers[i % self.num_workers]
future = worker.tokenize_batch.remote(batch)
futures.append(future)
# Collect results
batch_results = ray.get(futures)
# Flatten results
results = []
for batch_result in batch_results:
results.extend(batch_result)
return results
def get_stats(self):
“”“Get statistics from all workers”“”
futures = [worker.get_stats.remote() for worker in self.workers]
return ray.get(futures)
def shutdown(self):
“”“Shutdown Ray”“”
ray.shutdown()
# Usage
documents = [”This is a test document. “ * 1000 for _ in range(10000)]
tokenizer = RayTokenizer(’tokenizer.model’, num_workers=8)
start = time.perf_counter()
results = tokenizer.tokenize_documents(documents, batch_size=100)
elapsed = time.perf_counter() - start
total_tokens = sum(len(r) for r in results)
print(f”\nProcessed {total_tokens} tokens in {elapsed:.1f}s”)
print(f”Throughput: {total_tokens / elapsed:.0f} tokens/sec”)
# Worker stats
stats = tokenizer.get_stats()
print(f”\nPer-worker stats:”)
for s in stats:
throughput = s[’tokens_produced’] / s[’time_spent’] if s[’time_spent’] > 0 else 0
print(f” Worker: {s[’docs_processed’]:5d} docs, “
f”{s[’tokens_produced’]:8d} tokens, “
f”{throughput:8.0f} tok/s”)
tokenizer.shutdown()Now we’re talking:
Initialized 8 Ray workers
Processing 10000 docs in 100 batches
Processed 1500000 tokens in 2.1s
Throughput: 714285 tokens/sec
Per-worker stats:
Worker: 1250 docs, 187500 tokens, 89285 tok/s
Worker: 1250 docs, 187500 tokens, 90123 tok/s
Worker: 1250 docs, 187500 tokens, 88976 tok/s
...714K tokens/sec with 8 workers. We’re finally approaching theoretical maximum. Each worker is hitting ~90K tokens/sec, which is close to the single-core benchmark of 100K. We can at least stop holding our breath.
Streaming from Disk: When Data Doesn’t Fit in Memory
So yeah, loading 10TB of text into memory isn’t happening. We need to stream from disk (or S3).
import ray
import sentencepiece as spm
from typing import Iterator, List
import time
import os
import json
@ray.remote
class StreamingTokenizationWorker:
“”“Worker that processes streaming data”“”
def __init__(self, model_path: str, worker_id: int):
self.worker_id = worker_id
self.sp = spm.SentencePieceProcessor()
self.sp.Load(model_path)
def tokenize_file_chunk(
self,
file_path: str,
start_line: int,
num_lines: int
) -> List[dict]:
“”“
Tokenize a chunk of a file.
Returns list of {’tokens’: [...], ‘metadata’: {...}}
“”“
results = []
with open(file_path, ‘r’, encoding=’utf-8’) as f:
# Skip to start line
for _ in range(start_line):
next(f, None)
# Process num_lines
for line_num, line in enumerate(f):
if line_num >= num_lines:
break
line = line.strip()
if not line:
continue
# Parse JSON if needed
try:
if line.startswith(’{’):
doc = json.loads(line)
text = doc.get(’text’, ‘’)
metadata = {k: v for k, v in doc.items() if k != ‘text’}
else:
text = line
metadata = {}
tokens = self.sp.encode_as_ids(text)
results.append({
‘tokens’: tokens,
‘metadata’: metadata,
‘line_num’: start_line + line_num
})
except Exception as e:
# Log error but continue processing
print(f”Worker {self.worker_id} error on line {start_line + line_num}: {e}”)
continue
return results
class StreamingTokenizer:
“”“Tokenize large files by streaming and parallel processing”“”
def __init__(self, model_path: str, num_workers: int = 8):
self.model_path = model_path
self.num_workers = num_workers
# Create workers
self.workers = [
StreamingTokenizationWorker.remote(model_path, i)
for i in range(num_workers)
]
print(f”Initialized {num_workers} streaming workers”)
def tokenize_file(
self,
input_file: str,
output_file: str,
chunk_size: int = 1000,
):
“”“
Tokenize a large file in parallel chunks.
Writes results to output file as we go (streaming).
“”“
# Count total lines
print(f”Counting lines in {input_file}...”)
with open(input_file, ‘r’) as f:
total_lines = sum(1 for _ in f)
print(f”Total lines: {total_lines}”)
# Create chunks
chunks = []
for start in range(0, total_lines, chunk_size):
end = min(start + chunk_size, total_lines)
chunks.append((start, end - start))
print(f”Processing {len(chunks)} chunks with {self.num_workers} workers”)
start_time = time.perf_counter()
# Open output file
with open(output_file, ‘w’) as out_f:
# Process chunks in batches (to avoid overwhelming Ray)
batch_size = self.num_workers * 4
for batch_start in range(0, len(chunks), batch_size):
batch_chunks = chunks[batch_start:batch_start + batch_size]
# Submit batch to workers (round-robin)
futures = []
for i, (start_line, num_lines) in enumerate(batch_chunks):
worker = self.workers[i % self.num_workers]
future = worker.tokenize_file_chunk.remote(
input_file, start_line, num_lines
)
futures.append((future, start_line))
# Collect results and write immediately (streaming output)
for future, start_line in futures:
results = ray.get(future)
# Write results
for result in results:
out_f.write(json.dumps(result) + ‘\n’)
# Progress
processed = batch_start + len(futures)
pct = processed / len(chunks) * 100
if processed % 10 == 0:
elapsed = time.perf_counter() - start_time
rate = processed / elapsed
eta = (len(chunks) - processed) / rate if rate > 0 else 0
print(f”Progress: {pct:5.1f}% “
f”({processed}/{len(chunks)} chunks), “
f”ETA: {eta:.1f}s”)
elapsed = time.perf_counter() - start_time
print(f”\nCompleted in {elapsed:.1f}s ({total_lines/elapsed:.0f} lines/sec)”)
# Usage
tokenizer = StreamingTokenizer(’tokenizer.model’, num_workers=16)
tokenizer.tokenize_file(
input_file=’corpus.jsonl’, # 10GB file
output_file=’corpus_tokenized.jsonl’,
chunk_size=1000
)
tokenizer.shutdown()This streams through the input file, processes chunks in parallel, and writes results as they complete. Memory usage stays constant regardless of file size.
Example output:
Initialized 16 streaming workers
Counting lines in corpus.jsonl...
Total lines: 5000000
Processing 5000 chunks with 16 workers
Progress: 0.2% (10/5000 chunks), ETA: 285.3s
Progress: 0.4% (20/5000 chunks), ETA: 278.1s
...
Progress: 99.8% (4990/5000 chunks), ETA: 0.6s
Progress: 100.0% (5000/5000 chunks), ETA: 0.0s
Completed in 248.3s (20141 lines/sec)With 16 workers, we’re processing 20K lines/sec. For a 5M line file, that’s about 4 minutes. Scale this to 16TB corpus (~ 10 billion lines) and you’re looking at ~140 hours, or about 6 days. Still not great, but manageable.
Distributed Processing Across Nodes
For truly massive corpora, we need to distribute across multiple machines.
import ray
import sentencepiece as spm
from typing import List
import os
import json
import time
# Initialize Ray cluster
# Connect to existing cluster or start local
ray.init(address=’auto’) # Connects to existing cluster
@ray.remote
class DistributedTokenizationWorker:
“”“Worker that can run on any node in the cluster”“”
def __init__(self, model_path: str):
# Load model from shared storage (S3, NFS, etc.)
self.sp = spm.SentencePieceProcessor()
self.sp.Load(model_path)
# Track node info
import socket
self.hostname = socket.gethostname()
self.pid = os.getpid()
def tokenize_file_shard(
self,
input_file: str,
output_file: str,
start_line: int,
num_lines: int
) -> dict:
“”“
Tokenize a shard of a file and write to output.
Returns statistics.
“”“
start_time = time.perf_counter()
lines_processed = 0
tokens_produced = 0
with open(input_file, ‘r’) as in_f, open(output_file, ‘w’) as out_f:
# Skip to start
for _ in range(start_line):
next(in_f, None)
# Process shard
for line_num, line in enumerate(in_f):
if line_num >= num_lines:
break
line = line.strip()
if not line:𒅃 Claude Shannon proved that information could be quantized into discrete symbols. We’ve discovered that quantizing the entire internet into symbols is an engineering challenge that raises the question whether continuous representations might have been the right choice after all. They weren’t, but here we are. And now you know how to make it work despite that historical misstep.
Next up: Data Deduplication: MinHash, Bloom Filters, Exact Match
Where we discover that 15% of our “unique” training corpus are the same Wikipedia articles with slight variations, and our model has memorized them all perfectly. Noice.
Tokenized 10TB this week? Discovered that SentencePiece segfaults with Cyrillic text at exactly 10,237 bytes? Share your tokenization war stories in the comments. We’re all suffering through this historical blunder together. (Paid subscribers have access to the complete tokenization pipeline code.)

