|
try: |
|
import cupy as cp |
|
except ImportError: |
|
import numpy as cp |
|
|
|
try: |
|
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, TimeElapsedColumn |
|
except ImportError: |
|
Progress = None |
|
|
|
import os |
|
import pickle |
|
import math |
|
|
|
# Constants grouped by usage |
|
|
|
# Model architecture |
|
WEIGHT_INIT_SCALE = 0.01 # Scale for initializing weights |
|
POS_ENCODING_BASE = 10000.0 # Base for positional encoding frequencies |
|
DROPOUT_RATE = 0.1 # Dropout rate for regularization |
|
EMBED_SCALE = 64 # Increased scale factor for embedding dimension |
|
BASE_NUM_HEADS = 2 # Base number of attention heads |
|
HEADS_PER_ENTROPY = 2 # Heads added per entropy unit |
|
FF_MULTIPLIER = 4 # Multiplier for feedforward dimension |
|
BASE_NUM_LAYERS = 1 # Base number of transformer layers |
|
LAYERS_PER_ENTROPY = 3 # Layers added per entropy unit |
|
|
|
# Training |
|
LEARNING_RATE = 0.003 # Increased learning rate for faster convergence |
|
EPSILON = 1e-8 # Small value for numerical stability |
|
NUM_EPOCHS = 100 # Number of training epochs |
|
BATCH_SIZE = 16 # Reduced batch size for more updates |
|
VALIDATION_SPLIT = 0.2 # Fraction of data for validation |
|
PATIENCE = 10 # Number of epochs to wait for early stopping |
|
|
|
# Sequence lengths |
|
MIN_SEQ_LEN = 10 # Minimum sequence length for training |
|
MAX_SEQ_LEN_DEFAULT = 50 # Default maximum sequence length if text-based computation is small |
|
|
|
# Generation |
|
TEMPERATURE = 0.5 # Sampling temperature for generation |
|
MAX_GENERATION_LENGTH = 20 # Maximum length for generated text |
|
|
|
# Dataset |
|
TEXT_REPETITIONS = 20 # Number of times to repeat default text |
|
DEFAULT_TEXT = ( |
|
"hello world this is a test for transformer " |
|
"the quick brown fox jumps over the lazy dog " |
|
"machine learning is fun and exciting to explore " |
|
"coding in python makes life easier every day " |
|
"data science opens new doors to innovation " |
|
"artificial intelligence shapes the future now " |
|
) * TEXT_REPETITIONS # Diverse phrases, repeated for more data |
|
|
|
class ProgressHandler: |
|
"""Handles progress bar display using rich.progress or no-op if unavailable.""" |
|
def __init__(self, totalEpochs, totalBatches): |
|
self.totalEpochs = totalEpochs |
|
self.totalBatches = totalBatches |
|
self.progress = None |
|
self.epochTask = None |
|
self.batchTask = None |
|
|
|
def __enter__(self): |
|
if Progress is not None: |
|
self.progress = Progress( |
|
TextColumn("[progress.description]{task.description}"), |
|
BarColumn(), |
|
TimeElapsedColumn(), |
|
TimeRemainingColumn(), |
|
) |
|
self.progress.__enter__() |
|
self.epochTask = self.progress.add_task( |
|
f"Epoch 1/{self.totalEpochs}, Train Loss: 0.000000, Val Loss: 0.000000", |
|
total=self.totalEpochs, |
|
completed=0 |
|
) |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
if self.progress is not None: |
|
self.progress.__exit__(exc_type, exc_val, exc_tb) |
|
|
|
def startBatch(self): |
|
if self.progress is not None: |
|
self.batchTask = self.progress.add_task( |
|
f"Batch 1/{self.totalBatches}", total=self.totalBatches, completed=0 |
|
) |
|
|
|
def updateBatch(self, batchIdx): |
|
if self.progress is not None and self.batchTask is not None: |
|
self.progress.update( |
|
self.batchTask, |
|
advance=1, |
|
description=f"Batch {batchIdx}/{self.totalBatches}", |
|
completed=batchIdx |
|
) |
|
|
|
def endBatch(self): |
|
if self.progress is not None and self.batchTask is not None: |
|
self.progress.update(self.batchTask, visible=False) |
|
|
|
def updateEpoch(self, epoch, trainLoss, valLoss): |
|
if self.progress is not None and self.epochTask is not None: |
|
self.progress.update( |
|
self.epochTask, |
|
advance=1, |
|
description=f"Epoch {epoch}/{self.totalEpochs}, Train Loss: {trainLoss:.6f}, Val Loss: {valLoss:.6f}", |
|
completed=epoch |
|
) |
|
|
|
def softmax(x, axis=None): |
|
"""Custom softmax for CuPy/NumPy compatibility.""" |
|
expX = cp.exp(x - cp.max(x, axis=axis, keepdims=True)) |
|
return expX / cp.sum(expX, axis=axis, keepdims=True) |
|
|
|
class LayerNorm: |
|
"""Layer normalization with learnable parameters.""" |
|
def __init__(self, embedDim, epsilon=EPSILON): |
|
self.embedDim = embedDim |
|
self.epsilon = epsilon |
|
self.gamma = cp.ones((1, 1, embedDim)) # Scale parameter |
|
self.beta = cp.zeros((1, 1, embedDim)) # Shift parameter |
|
self.cache = {} # For backprop |
|
|
|
def forward(self, x): |
|
# Normalize: (x - mean) / sqrt(var + epsilon) * gamma + beta |
|
mean = cp.mean(x, axis=-1, keepdims=True) |
|
var = cp.var(x, axis=-1, keepdims=True) |
|
xNorm = (x - mean) / cp.sqrt(var + self.epsilon) |
|
out = xNorm * self.gamma + self.beta |
|
|
|
self.cache['x'] = x |
|
self.cache['mean'] = mean |
|
self.cache['var'] = var |
|
self.cache['xNorm'] = xNorm |
|
return out |
|
|
|
def backward(self, dOut): |
|
# Backprop through layer norm |
|
x, mean, var, xNorm = self.cache['x'], self.cache['mean'], self.cache['var'], self.cache['xNorm'] |
|
N = self.embedDim |
|
|
|
# Gradients for gamma and beta |
|
dGamma = cp.sum(dOut * xNorm, axis=(0, 1), keepdims=True) |
|
dBeta = cp.sum(dOut, axis=(0, 1), keepdims=True) |
|
|
|
# Gradient for xNorm |
|
dxNorm = dOut * self.gamma |
|
|
|
# Gradient for variance |
|
dVar = cp.sum(dxNorm * (x - mean) * -0.5 * (var + self.epsilon)**(-1.5), axis=-1, keepdims=True) |
|
|
|
# Gradient for mean |
|
dMean = cp.sum(dxNorm * -1 / cp.sqrt(var + self.epsilon), axis=-1, keepdims=True) |
|
dx = dxNorm / cp.sqrt(var + self.epsilon) + dVar * 2 * (x - mean) / N + dMean / N |
|
|
|
return dx, [('gamma', dGamma), ('beta', dBeta)] |
|
|
|
def _collectWeights(self): |
|
# Collect layer norm weights |
|
return [('gamma', self.gamma), ('beta', self.beta)] |
|
|
|
class ModelConfig: |
|
"""Configuration class for the transformer model architecture.""" |
|
def __init__(self, embedDim, numHeads, ffDim, numLayers, maxSeqLen, dropoutRate=DROPOUT_RATE): |
|
self.embedDim = embedDim |
|
self.numHeads = numHeads |
|
self.ffDim = ffDim |
|
self.numLayers = numLayers |
|
self.maxSeqLen = maxSeqLen |
|
self.dropoutRate = dropoutRate |
|
|
|
class TrainingConfig: |
|
"""Configuration class for training parameters.""" |
|
def __init__(self, numEpochs=NUM_EPOCHS, batchSize=BATCH_SIZE, learningRate=LEARNING_RATE): |
|
self.numEpochs = numEpochs |
|
self.batchSize = batchSize |
|
self.learningRate = learningRate |
|
|
|
class Tokenizer: |
|
"""Tokenizer class for converting text to token IDs and vice versa, including entropy computation for model scaling.""" |
|
def __init__(self, text): |
|
# Split text into words (including spaces) for word-based tokenization |
|
words = text.split() |
|
self.vocab = sorted(set(words + [' '])) # Include space as a token |
|
self.vocabSize = len(self.vocab) |
|
self.wordToIdx = {w: i for i, w in enumerate(self.vocab)} |
|
self.idxToWord = {i: w for i, w in enumerate(self.vocab)} |
|
self.entropy = self._computeEntropy(words) |
|
self.seqLen = min(50, max(MIN_SEQ_LEN, int(math.sqrt(len(words))))) # Scale with word count |
|
self.maxSeqLen = max(self.seqLen, MAX_SEQ_LEN_DEFAULT) # Ensure sufficient capacity |
|
self.modelConfig = self._computeModelConfig() |
|
|
|
def tokenize(self, text): |
|
# Convert text to token IDs (word-based) |
|
words = text.split() |
|
return [self.wordToIdx[w] for w in words if w in self.wordToIdx] |
|
|
|
def detokenize(self, tokenIds): |
|
# Convert token IDs to text |
|
return ' '.join([self.idxToWord[idx.item()] for idx in tokenIds if idx.item() in self.idxToWord]) |
|
|
|
def save(self, filePath): |
|
# Save tokenizer config, ModelConfig, and sequence lengths to a file |
|
with open(filePath, 'wb') as f: |
|
pickle.dump({ |
|
'vocab': self.vocab, |
|
'wordToIdx': self.wordToIdx, |
|
'idxToWord': self.idxToWord, |
|
'seqLen': self.seqLen, |
|
'maxSeqLen': self.maxSeqLen, |
|
'modelConfig': { |
|
'embedDim': self.modelConfig.embedDim, |
|
'numHeads': self.modelConfig.numHeads, |
|
'ffDim': self.modelConfig.ffDim, |
|
'numLayers': self.modelConfig.numLayers, |
|
'maxSeqLen': self.modelConfig.maxSeqLen, |
|
'dropoutRate': self.modelConfig.dropoutRate |
|
} |
|
}, f) |
|
|
|
def load(self, filePath): |
|
# Load tokenizer config, ModelConfig, and sequence lengths from a file |
|
with open(filePath, 'rb') as f: |
|
data = pickle.load(f) |
|
self.vocab = data['vocab'] |
|
self.vocabSize = len(self.vocab) |
|
self.wordToIdx = data['wordToIdx'] |
|
self.idxToWord = data['idxToWord'] |
|
self.entropy = 0.0 # Not saved, set to 0 as placeholder |
|
self.seqLen = data['seqLen'] |
|
self.maxSeqLen = data['maxSeqLen'] |
|
self.modelConfig = ModelConfig( |
|
embedDim=data['modelConfig']['embedDim'], |
|
numHeads=data['modelConfig']['numHeads'], |
|
ffDim=data['modelConfig']['ffDim'], |
|
numLayers=data['modelConfig']['numLayers'], |
|
maxSeqLen=data['modelConfig']['maxSeqLen'], |
|
dropoutRate=data['modelConfig']['dropoutRate'] |
|
) |
|
return self |
|
|
|
def _computeEntropy(self, words): |
|
# Compute word-level entropy of input text |
|
wordCount = {} |
|
totalWords = len(words) |
|
for w in words: |
|
wordCount[w] = wordCount.get(w, 0) + 1 |
|
entropy = 0.0 |
|
for count in wordCount.values(): |
|
prob = count / totalWords |
|
entropy -= prob * math.log2(prob + EPSILON) # Add EPSILON to avoid log(0) |
|
return entropy |
|
|
|
def _computeModelConfig(self): |
|
# Compute ModelConfig based on cached entropy |
|
entropy = self.entropy |
|
embedDim = EMBED_SCALE * math.ceil(entropy) # Scale embedding size with entropy |
|
numHeads = BASE_NUM_HEADS + math.floor(entropy / HEADS_PER_ENTROPY) # More heads for higher entropy |
|
ffDim = FF_MULTIPLIER * embedDim # Standard transformer ratio |
|
numLayers = BASE_NUM_LAYERS + math.floor(entropy / LAYERS_PER_ENTROPY) # Deeper model for higher entropy |
|
maxSeqLen = self.maxSeqLen # Use computed maxSeqLen |
|
return ModelConfig(embedDim, numHeads, ffDim, numLayers, maxSeqLen, DROPOUT_RATE) |
|
|
|
class PositionalEncoding: |
|
"""Positional Encoding class to add position information to embeddings.""" |
|
def __init__(self, maxSeqLen, embedDim): |
|
self.maxSeqLen = maxSeqLen |
|
self.embedDim = embedDim |
|
self.posEncoding = self._buildPosEncoding() |
|
|
|
def _buildPosEncoding(self): |
|
# Create sinusoidal encodings to represent sequence positions |
|
pos = cp.arange(self.maxSeqLen)[:, None] |
|
divTerm = cp.exp(cp.arange(0, self.embedDim, 2) * -(cp.log(POS_ENCODING_BASE) / self.embedDim)) |
|
posEncoding = cp.zeros((self.maxSeqLen, self.embedDim)) |
|
posEncoding[:, 0::2] = cp.sin(pos * divTerm) |
|
posEncoding[:, 1::2] = cp.cos(pos * divTerm) |
|
return posEncoding |
|
|
|
def forward(self, inputData): |
|
# Add positional encodings to input embeddings |
|
_, seqLen, _ = inputData.shape |
|
return inputData + self.posEncoding[:seqLen] |
|
|
|
def backward(self, dOutput): |
|
# Gradient for positional encoding addition is 1 for the input |
|
return dOutput |
|
|
|
class MultiHeadAttention: |
|
"""Multi-Head Attention class for self-attention mechanism.""" |
|
def __init__(self, embedDim, numHeads, dropoutRate=DROPOUT_RATE): |
|
self.embedDim = embedDim |
|
self.numHeads = numHeads |
|
self.headDim = embedDim // numHeads |
|
self.dropoutRate = dropoutRate |
|
self.queryWeight = cp.random.randn(embedDim, embedDim) * WEIGHT_INIT_SCALE |
|
self.keyWeight = cp.random.randn(embedDim, embedDim) * WEIGHT_INIT_SCALE |
|
self.valueWeight = cp.random.randn(embedDim, embedDim) * WEIGHT_INIT_SCALE |
|
self.outputWeight = cp.random.randn(embedDim, embedDim) * WEIGHT_INIT_SCALE |
|
self.cache = {} # To store intermediates for backprop |
|
self.dropoutMasks = {} # To store dropout masks for backprop |
|
|
|
def forward(self, queryInput, keyInput, valueInput, mask=None, training=True): |
|
# Compute multi-head self-attention |
|
batchSize, seqLen, _ = queryInput.shape |
|
query = cp.dot(queryInput, self.queryWeight).reshape(batchSize, seqLen, self.numHeads, self.headDim).transpose(0, 2, 1, 3) |
|
key = cp.dot(keyInput, self.keyWeight).reshape(batchSize, seqLen, self.numHeads, self.headDim).transpose(0, 2, 1, 3) |
|
value = cp.dot(valueInput, self.valueWeight).reshape(batchSize, seqLen, self.numHeads, self.headDim).transpose(0, 2, 1, 3) |
|
scores = cp.matmul(query, key.transpose(0, 1, 3, 2)) / cp.sqrt(self.headDim) |
|
if mask is not None: |
|
scores = scores + mask |
|
attentionWeights = softmax(scores, axis=-1) |
|
if training: |
|
attentionWeights, mask1 = self._applyDropout(attentionWeights) |
|
self.dropoutMasks['attention'] = mask1 |
|
attentionOutput = cp.matmul(attentionWeights, value) |
|
attentionOutput = attentionOutput.transpose(0, 2, 1, 3).reshape(batchSize, seqLen, self.embedDim) |
|
output = cp.dot(attentionOutput, self.outputWeight) |
|
if training: |
|
output, mask2 = self._applyDropout(output) |
|
self.dropoutMasks['output'] = mask2 |
|
|
|
# Cache for backprop |
|
if training: |
|
self.cache['queryInput'] = queryInput |
|
self.cache['keyInput'] = keyInput |
|
self.cache['valueInput'] = valueInput |
|
self.cache['query'] = query |
|
self.cache['key'] = key |
|
self.cache['value'] = value |
|
self.cache['scores'] = scores |
|
self.cache['attentionWeights'] = attentionWeights |
|
self.cache['attentionOutput'] = attentionOutput |
|
self.cache['mask'] = mask |
|
|
|
return output, attentionWeights |
|
|
|
def backward(self, dOutput, training=True): |
|
# Backprop through multi-head attention |
|
batchSize, seqLen, _ = dOutput.shape |
|
|
|
# Dropout on output |
|
if training: |
|
dOutput = self._applyDropoutGrad(dOutput, self.dropoutMasks['output']) |
|
|
|
# Output projection |
|
dAttentionOutput = cp.dot(dOutput, self.outputWeight.T) |
|
attentionOutput_flat = self.cache['attentionOutput'].reshape(-1, self.embedDim) |
|
dOutput_flat = dOutput.reshape(-1, self.embedDim) |
|
dOutputWeight = cp.dot(attentionOutput_flat.T, dOutput_flat) |
|
|
|
# Reshape attention output grad |
|
dAttentionOutput = dAttentionOutput.reshape(batchSize, seqLen, self.numHeads, self.headDim).transpose(0, 2, 1, 3) |
|
|
|
# Attention matmul grad |
|
dValue = cp.matmul(self.cache['attentionWeights'].transpose(0, 1, 3, 2), dAttentionOutput) |
|
dAttentionWeights = cp.matmul(dAttentionOutput, self.cache['value'].transpose(0, 1, 3, 2)) |
|
|
|
# Dropout on attention weights |
|
if training: |
|
dAttentionWeights = self._applyDropoutGrad(dAttentionWeights, self.dropoutMasks['attention']) |
|
|
|
# Softmax grad |
|
dScores = self._softmaxBackward(self.cache['attentionWeights'], dAttentionWeights) |
|
|
|
# Mask not learnable |
|
if self.cache['mask'] is not None: |
|
inf_mask = self.cache['mask'] == float('-inf') |
|
inf_mask = cp.broadcast_to(inf_mask, dScores.shape) |
|
dScores[inf_mask] = 0 |
|
|
|
# Scale grad |
|
dScores /= cp.sqrt(self.headDim) |
|
|
|
# Matmul for query and key |
|
dQuery = cp.matmul(dScores, self.cache['key']) |
|
dKey = cp.matmul(dScores.transpose(0, 1, 3, 2), self.cache['query']) |
|
|
|
# Transpose back |
|
dQuery = dQuery.transpose(0, 2, 1, 3).reshape(batchSize, seqLen, self.embedDim) |
|
dKey = dKey.transpose(0, 2, 1, 3).reshape(batchSize, seqLen, self.embedDim) |
|
dValue = dValue.transpose(0, 2, 1, 3).reshape(batchSize, seqLen, self.embedDim) |
|
|
|
# Linear projections grad |
|
dQueryInput = cp.dot(dQuery, self.queryWeight.T) |
|
queryInput_flat = self.cache['queryInput'].reshape(-1, self.embedDim) |
|
dQuery_flat = dQuery.reshape(-1, self.embedDim) |
|
dQueryWeight = cp.dot(queryInput_flat.T, dQuery_flat) |
|
|
|
dKeyInput = cp.dot(dKey, self.keyWeight.T) |
|
keyInput_flat = self.cache['keyInput'].reshape(-1, self.embedDim) |
|
dKey_flat = dKey.reshape(-1, self.embedDim) |
|
dKeyWeight = cp.dot(keyInput_flat.T, dKey_flat) |
|
|
|
dValueInput = cp.dot(dValue, self.valueWeight.T) |
|
valueInput_flat = self.cache['valueInput'].reshape(-1, self.embedDim) |
|
dValue_flat = dValue.reshape(-1, self.embedDim) |
|
dValueWeight = cp.dot(valueInput_flat.T, dValue_flat) |
|
|
|
# Since self-attention, dInput = dQueryInput + dKeyInput + dValueInput |
|
dInput = dQueryInput + dKeyInput + dValueInput |
|
|
|
return dInput, [('queryWeight', dQueryWeight), ('keyWeight', dKeyWeight), ('valueWeight', dValueWeight), ('outputWeight', dOutputWeight)] |
|
|
|
def _softmaxBackward(self, softmaxOutput, dSoftmax): |
|
# Softmax gradient |
|
s = softmaxOutput |
|
ds = cp.einsum('ijkl,ijkm->ijml', dSoftmax, s) - cp.einsum('ijkl,ijml->ijkm', s, dSoftmax) |
|
return ds |
|
|
|
def _applyDropout(self, inputData): |
|
# Randomly drop elements to prevent overfitting, return mask for backprop |
|
mask = cp.random.random(inputData.shape) > self.dropoutRate |
|
return inputData * mask / (1 - self.dropoutRate), mask |
|
|
|
def _applyDropoutGrad(self, dOutput, mask): |
|
# Apply dropout gradient using saved mask |
|
return dOutput * mask / (1 - self.dropoutRate) |
|
|
|
def _collectWeights(self): |
|
# Collect attention weights |
|
return [ |
|
('queryWeight', self.queryWeight), |
|
('keyWeight', self.keyWeight), |
|
('valueWeight', self.valueWeight), |
|
('outputWeight', self.outputWeight) |
|
] |
|
|
|
class FeedForward: |
|
"""FeedForward class for the position-wise feed-forward network.""" |
|
def __init__(self, embedDim, ffDim, dropoutRate=DROPOUT_RATE): |
|
self.embedDim = embedDim |
|
self.ffDim = ffDim |
|
self.dropoutRate = dropoutRate |
|
self.firstFeedForwardWeight = cp.random.randn(embedDim, ffDim) * WEIGHT_INIT_SCALE |
|
self.secondFeedForwardWeight = cp.random.randn(ffDim, embedDim) * WEIGHT_INIT_SCALE |
|
self.cache = {} # For backprop |
|
self.dropoutMasks = {} # To store dropout masks for backprop |
|
|
|
def forward(self, inputData, training=True): |
|
# Apply two-layer feedforward network with ReLU |
|
hidden = cp.dot(inputData, self.firstFeedForwardWeight) |
|
hidden = cp.maximum(0, hidden) # ReLU |
|
if training: |
|
hidden, mask1 = self._applyDropout(hidden) |
|
self.dropoutMasks['hidden'] = mask1 |
|
output = cp.dot(hidden, self.secondFeedForwardWeight) |
|
if training: |
|
output, mask2 = self._applyDropout(output) |
|
self.dropoutMasks['output'] = mask2 |
|
|
|
if training: |
|
self.cache['inputData'] = inputData |
|
self.cache['hidden'] = hidden |
|
|
|
return output |
|
|
|
def backward(self, dOutput, training=True): |
|
# Backprop through feedforward |
|
if training: |
|
dOutput = self._applyDropoutGrad(dOutput, self.dropoutMasks['output']) |
|
|
|
dHidden = cp.dot(dOutput, self.secondFeedForwardWeight.T) |
|
hidden_flat = self.cache['hidden'].reshape(-1, self.ffDim) |
|
dOutput_flat = dOutput.reshape(-1, self.embedDim) |
|
dSecondWeight = cp.dot(hidden_flat.T, dOutput_flat) |
|
|
|
# ReLU grad |
|
dHidden[self.cache['hidden'] <= 0] = 0 |
|
|
|
if training: |
|
dHidden = self._applyDropoutGrad(dHidden, self.dropoutMasks['hidden']) |
|
|
|
dInput = cp.dot(dHidden, self.firstFeedForwardWeight.T) |
|
input_flat = self.cache['inputData'].reshape(-1, self.embedDim) |
|
dHidden_flat = dHidden.reshape(-1, self.ffDim) |
|
dFirstWeight = cp.dot(input_flat.T, dHidden_flat) |
|
|
|
return dInput, [('firstFeedForwardWeight', dFirstWeight), ('secondFeedForwardWeight', dSecondWeight)] |
|
|
|
def _applyDropout(self, inputData): |
|
# Randomly drop elements to prevent overfitting, return mask for backprop |
|
mask = cp.random.random(inputData.shape) > self.dropoutRate |
|
return inputData * mask / (1 - self.dropoutRate), mask |
|
|
|
def _applyDropoutGrad(self, dOutput, mask): |
|
# Apply dropout gradient using saved mask |
|
return dOutput * mask / (1 - self.dropoutRate) |
|
|
|
def _collectWeights(self): |
|
# Collect feedforward weights |
|
return [ |
|
('firstFeedForwardWeight', self.firstFeedForwardWeight), |
|
('secondFeedForwardWeight', self.secondFeedForwardWeight) |
|
] |
|
|
|
class TransformerBlock: |
|
"""TransformerBlock class combining attention and feedforward with residual connections and normalization.""" |
|
def __init__(self, modelConfig): |
|
self.embedDim = modelConfig.embedDim |
|
self.numHeads = modelConfig.numHeads |
|
self.ffDim = modelConfig.ffDim |
|
self.dropoutRate = modelConfig.dropoutRate |
|
self.attention = MultiHeadAttention(self.embedDim, self.numHeads, self.dropoutRate) |
|
self.norm1 = LayerNorm(self.embedDim) |
|
self.feedForward = FeedForward(self.embedDim, self.ffDim, self.dropoutRate) |
|
self.norm2 = LayerNorm(self.embedDim) |
|
self.cache = {} # For backprop |
|
|
|
def forward(self, inputData, mask=None, training=True): |
|
# Combine attention and feedforward with layer normalization |
|
attentionOutput, _ = self.attention.forward(inputData, inputData, inputData, mask, training) |
|
postAttn = inputData + attentionOutput |
|
normOutput = self.norm1.forward(postAttn) |
|
ffOutput = self.feedForward.forward(normOutput, training) |
|
postFF = normOutput + ffOutput |
|
output = self.norm2.forward(postFF) |
|
|
|
if training: |
|
self.cache['inputData'] = inputData |
|
self.cache['attentionOutput'] = attentionOutput |
|
self.cache['postAttn'] = postAttn |
|
self.cache['normOutput'] = normOutput |
|
self.cache['ffOutput'] = ffOutput |
|
self.cache['postFF'] = postFF |
|
|
|
return output |
|
|
|
def backward(self, dOutput, training=True): |
|
# Backprop through transformer block |
|
# Second layer norm |
|
dPostFF, norm2Grads = self.norm2.backward(dOutput) |
|
|
|
# FF + residual |
|
dFfOutput = dPostFF |
|
dNormOutput = dPostFF |
|
|
|
# FF back |
|
dNormOutputFf, ffGrads = self.feedForward.backward(dFfOutput, training) |
|
|
|
dNormOutput += dNormOutputFf |
|
|
|
# First layer norm |
|
dPostAttn, norm1Grads = self.norm1.backward(dNormOutput) |
|
|
|
# Attention + residual |
|
dAttentionOutput = dPostAttn |
|
dInput = dPostAttn |
|
|
|
# Attention back |
|
dInputAtt, attGrads = self.attention.backward(dAttentionOutput, training) |
|
|
|
dInput += dInputAtt |
|
|
|
return dInput, attGrads + norm1Grads + ffGrads + norm2Grads |
|
|
|
def _collectWeights(self): |
|
# Collect weights from attention, norms, and feedforward |
|
weights = self.attention._collectWeights() |
|
weights.extend(self.norm1._collectWeights()) |
|
weights.extend(self.feedForward._collectWeights()) |
|
weights.extend(self.norm2._collectWeights()) |
|
return weights |
|
|
|
class Transformer: |
|
"""Transformer class representing the full model with embeddings, positional encoding, blocks, and output.""" |
|
def __init__(self, tokenizer): |
|
self.modelConfig = tokenizer.modelConfig |
|
self.tokenizer = tokenizer |
|
self.embedding = self._buildEmbedding() |
|
self.posEncoding = PositionalEncoding(self.modelConfig.maxSeqLen, self.modelConfig.embedDim) |
|
self.layers = self._buildLayers() |
|
self.outputLayer = self._buildOutputLayer() |
|
self.cache = {} # For backprop |
|
self.dropoutMasks = {} # To store dropout masks for backprop |
|
|
|
def _buildEmbedding(self): |
|
# Initialize embedding matrix |
|
return cp.random.randn(self.tokenizer.vocabSize, self.modelConfig.embedDim) * WEIGHT_INIT_SCALE |
|
|
|
def _buildLayers(self): |
|
# Create multiple transformer blocks |
|
return [TransformerBlock(self.modelConfig) for _ in range(self.modelConfig.numLayers)] |
|
|
|
def _buildOutputLayer(self): |
|
# Initialize output projection layer |
|
return cp.random.randn(self.modelConfig.embedDim, self.tokenizer.vocabSize) * WEIGHT_INIT_SCALE |
|
|
|
def forward(self, inputIds, mask=None, training=True): |
|
# Forward pass: embedding -> pos encoding -> layers -> output |
|
embedded = self.embedding[inputIds] |
|
if training: |
|
embedded, mask1 = self._applyDropout(embedded) |
|
self.dropoutMasks['embedded'] = mask1 |
|
posOutput = self.posEncoding.forward(embedded) |
|
output = posOutput |
|
for layer in self.layers: |
|
output = layer.forward(output, mask, training) |
|
logits = cp.dot(output, self.outputLayer) |
|
|
|
if training: |
|
self.cache['inputIds'] = inputIds |
|
self.cache['embedded'] = embedded |
|
self.cache['posOutput'] = posOutput |
|
self.cache['output'] = output |
|
|
|
return logits |
|
|
|
def backward(self, dLogits, training=True): |
|
# Backprop through the model |
|
dOutput = cp.dot(dLogits, self.outputLayer.T) |
|
output_flat = self.cache['output'].reshape(-1, self.modelConfig.embedDim) |
|
dLogits_flat = dLogits.reshape(-1, self.tokenizer.vocabSize) |
|
dOutputLayer = cp.dot(output_flat.T, dLogits_flat) |
|
|
|
# Back through layers |
|
layerGrads = [] |
|
dLayer = dOutput |
|
for i in range(self.modelConfig.numLayers - 1, -1, -1): |
|
layer = self.layers[i] |
|
dLayer, lGrads = layer.backward(dLayer, training) |
|
prefixedGrads = [(f'layer{i}_{name}', grad) for name, grad in lGrads] |
|
layerGrads.extend(prefixedGrads) |
|
|
|
# Pos encoding |
|
dPos = self.posEncoding.backward(dLayer) |
|
|
|
# Dropout on embedded |
|
if training: |
|
dPos = self._applyDropoutGrad(dPos, self.dropoutMasks['embedded']) |
|
|
|
# Embedding grad |
|
dEmbedding = cp.zeros_like(self.embedding) |
|
for b in range(dPos.shape[0]): |
|
for s in range(dPos.shape[1]): |
|
idx = self.cache['inputIds'][b, s] |
|
dEmbedding[idx] += dPos[b, s] |
|
|
|
grads = [('embedding', dEmbedding), ('outputLayer', dOutputLayer)] + layerGrads |
|
|
|
return grads |
|
|
|
def _applyDropout(self, inputData): |
|
# Randomly drop elements to prevent overfitting, return mask for backprop |
|
mask = cp.random.random(inputData.shape) > self.modelConfig.dropoutRate |
|
return inputData * mask / (1 - self.modelConfig.dropoutRate), mask |
|
|
|
def _applyDropoutGrad(self, dOutput, mask): |
|
# Apply dropout gradient using saved mask |
|
return dOutput * mask / (1 - self.modelConfig.dropoutRate) |
|
|
|
def generate(self, inputIds, maxLength=MAX_GENERATION_LENGTH, temperature=TEMPERATURE): |
|
# Generate text by sampling tokens with temperature |
|
for _ in range(maxLength - inputIds.shape[1]): |
|
logits = self.forward(inputIds, _createCausalMask(inputIds.shape[1]), training=False) |
|
scaledLogits = logits[:, -1, :] / temperature |
|
probs = softmax(scaledLogits, axis=-1) |
|
nextToken = cp.random.choice(cp.arange(self.tokenizer.vocabSize), size=1, p=probs[0])[0] |
|
inputIds = cp.concatenate([inputIds, cp.array([[nextToken]])], axis=1) |
|
return inputIds |
|
|
|
def predict(self, prompt, maxLength=MAX_GENERATION_LENGTH, temperature=TEMPERATURE): |
|
# Generate text from a user prompt |
|
if not prompt or not all(w in self.tokenizer.wordToIdx for w in prompt.split()): |
|
prompt = "hello" # Default prompt if empty or invalid |
|
inputIds = cp.array([self.tokenizer.tokenize(prompt)]) |
|
generatedIds = self.generate(inputIds, maxLength, temperature) |
|
generatedText = self.tokenizer.detokenize(generatedIds[0]) |
|
return generatedText |
|
|
|
def _collectWeights(self): |
|
# Collect all model weights |
|
weights = [ |
|
('embedding', self.embedding), |
|
('outputLayer', self.outputLayer) |
|
] |
|
for i, layer in enumerate(self.layers): |
|
for name, weight in layer._collectWeights(): |
|
weights.append((f'layer{i}_{name}', weight)) |
|
return weights |
|
|
|
def _setWeights(self, weights): |
|
# Set model weights from a dictionary |
|
self.embedding = weights['embedding'] |
|
self.outputLayer = weights['outputLayer'] |
|
for i, layer in enumerate(self.layers): |
|
layer.attention.queryWeight = weights[f'layer{i}_queryWeight'] |
|
layer.attention.keyWeight = weights[f'layer{i}_keyWeight'] |
|
layer.attention.valueWeight = weights[f'layer{i}_valueWeight'] |
|
layer.attention.outputWeight = weights[f'layer{i}_outputWeight'] |
|
layer.feedForward.firstFeedForwardWeight = weights[f'layer{i}_firstFeedForwardWeight'] |
|
layer.feedForward.secondFeedForwardWeight = weights[f'layer{i}_secondFeedForwardWeight'] |
|
layer.norm1.gamma = weights[f'layer{i}_gamma'] |
|
layer.norm1.beta = weights[f'layer{i}_beta'] |
|
layer.norm2.gamma = weights[f'layer{i}_gamma'] |
|
layer.norm2.beta = weights[f'layer{i}_beta'] |
|
|
|
def saveModel(self, filePath): |
|
# Save model weights to a file |
|
weights = {name: weight for name, weight in self._collectWeights()} |
|
cp.savez(filePath, **weights) |
|
|
|
def loadModel(self, filePath): |
|
# Load model weights from a file |
|
weights = cp.load(filePath) |
|
self._setWeights(weights) |
|
|
|
def _createCausalMask(seqLen): |
|
# Create mask to prevent attending to future tokens |
|
mask = cp.triu(cp.ones((seqLen, seqLen)) * float('-inf'), k=1) |
|
return mask[None, None, :, :] |
|
|
|
def _computeLoss(logits, targetData): |
|
# Compute cross-entropy loss |
|
batchSize, seqLen, vocabSize = logits.shape |
|
logProbs = softmax(logits, axis=-1) |
|
logProbs = cp.log(logProbs + EPSILON) # Add small value to avoid log(0) |
|
targetOneHot = cp.zeros_like(logProbs) |
|
targetOneHot[cp.arange(batchSize)[:, None], cp.arange(seqLen)[None, :], targetData] = 1 |
|
loss = -cp.sum(logProbs * targetOneHot, axis=-1) |
|
return loss |
|
|
|
def _computeValidationLoss(model, valInput, valTarget, batchSize): |
|
# Compute average loss on validation data |
|
numValSamples = valInput.shape[0] |
|
valLoss = 0.0 |
|
for i in range(0, numValSamples, batchSize): |
|
batchValInput = valInput[i:i+batchSize] |
|
batchValTarget = valTarget[i:i+batchSize] |
|
logits = model.forward(batchValInput, _createCausalMask(batchValInput.shape[1]), training=False) |
|
loss = _computeLoss(logits, batchValTarget) |
|
valLoss += loss.mean().item() * batchValInput.shape[0] |
|
return valLoss / numValSamples if numValSamples > 0 else 0.0 |
|
|
|
def _computeGradients(logits, targetData, inputData, model): |
|
# Compute gradients for all weights using backpropagation |
|
batchSize, seqLen, vocabSize = logits.shape |
|
probs = softmax(logits, axis=-1) |
|
targetOneHot = cp.zeros_like(probs) |
|
targetOneHot[cp.arange(batchSize)[:, None], cp.arange(seqLen)[None, :], targetData] = 1 |
|
dLogits = (probs - targetOneHot) / batchSize |
|
|
|
# Backprop through the model |
|
grads = model.backward(dLogits) |
|
|
|
return grads |
|
|
|
def _updateWeights(model, grads, learningRate): |
|
# Update all weights using gradient descent |
|
weights = {name: weight for name, weight in model._collectWeights()} |
|
for name, grad in grads: |
|
grad = cp.clip(grad, -1.0, 1.0) # Clip gradients to prevent explosion |
|
weights[name] -= learningRate * grad |
|
model._setWeights(weights) |
|
|
|
def _getDataSets(text, tokenizer, seqLen): |
|
# Prepare training and validation data using tokenizer and text |
|
tokenizedText = tokenizer.tokenize(text) |
|
inputData = cp.array([tokenizedText[i:i+seqLen] for i in range(0, len(tokenizedText)-seqLen, 1)]) |
|
targetData = cp.array([tokenizedText[i+1:i+seqLen+1] for i in range(0, len(tokenizedText)-seqLen, 1)]) |
|
numSamples = inputData.shape[0] |
|
splitIndex = int(numSamples * (1 - VALIDATION_SPLIT)) |
|
trainInput = inputData[:splitIndex] |
|
trainTarget = targetData[:splitIndex] |
|
valInput = inputData[splitIndex:] |
|
valTarget = targetData[splitIndex:] |
|
return trainInput, trainTarget, valInput, valTarget |
|
|
|
def trainModel(model, text, tokenizer, trainingConfig): |
|
# Train model with mini-batches, compute validation loss, and apply early stopping |
|
trainInput, trainTarget, valInput, valTarget = _getDataSets(text, tokenizer, tokenizer.seqLen) |
|
numTrainSamples = trainInput.shape[0] |
|
totalBatches = math.ceil(numTrainSamples / trainingConfig.batchSize) |
|
bestValLoss = float('inf') |
|
epochsWithoutImprovement = 0 |
|
|
|
with ProgressHandler(trainingConfig.numEpochs, totalBatches) as progress: |
|
for epoch in range(trainingConfig.numEpochs): |
|
epochLoss = 0.0 |
|
progress.startBatch() |
|
for i in range(0, numTrainSamples, trainingConfig.batchSize): |
|
batchInput = trainInput[i:i+trainingConfig.batchSize] |
|
batchTarget = trainTarget[i:i+trainingConfig.batchSize] |
|
logits = model.forward(batchInput, _createCausalMask(batchInput.shape[1])) |
|
loss = _computeLoss(logits, batchTarget) |
|
if cp.any(cp.isnan(loss)): |
|
progress.updateEpoch(epoch + 1, float('nan'), float('nan')) |
|
return # Stop training if NaN loss detected |
|
grads = _computeGradients(logits, batchTarget, batchInput, model) |
|
_updateWeights(model, grads, trainingConfig.learningRate) |
|
epochLoss += loss.mean().item() * batchInput.shape[0] |
|
progress.updateBatch(i // trainingConfig.batchSize + 1) |
|
avgTrainLoss = epochLoss / numTrainSamples if numTrainSamples > 0 else 0.0 |
|
avgValLoss = _computeValidationLoss(model, valInput, valTarget, trainingConfig.batchSize) |
|
progress.endBatch() |
|
progress.updateEpoch(epoch + 1, avgTrainLoss, avgValLoss) |
|
|
|
# Early stopping |
|
if avgValLoss < bestValLoss: |
|
bestValLoss = avgValLoss |
|
epochsWithoutImprovement = 0 |
|
else: |
|
epochsWithoutImprovement += 1 |
|
if epochsWithoutImprovement >= PATIENCE: |
|
if progress.progress is not None: |
|
progress.progress.update( |
|
progress.epochTask, |
|
description=f"Early stopping at Epoch {epoch + 1}/{trainingConfig.numEpochs}, Train Loss: {avgTrainLoss:.6f}, Val Loss: {avgValLoss:.6f}" |
|
) |
|
break |
|
|
|
def loadTrainingText(filePath): |
|
# Load text from file or use default |
|
if os.path.exists(filePath): |
|
with open(filePath, 'r', encoding='utf-8') as f: |
|
return f.read().strip() |
|
return DEFAULT_TEXT |
|
|
|
def _main(): |
|
# Setup model and data |
|
modelPath = "transformers.npz" |
|
tokenPath = "transformers.tkn" |
|
|
|
# Load or create tokenizer and model |
|
if os.path.exists(modelPath) and os.path.exists(tokenPath): |
|
print("Loading saved tokenizer and model...") |
|
tokenizer = Tokenizer("").load(tokenPath) |
|
model = Transformer(tokenizer) |
|
model.loadModel(modelPath) |
|
else: |
|
print("Creating new tokenizer and training model...") |
|
text = loadTrainingText("input.txt") |
|
tokenizer = Tokenizer(text) |
|
tokenizer.save(tokenPath) |
|
model = Transformer(tokenizer) |
|
trainingConfig = TrainingConfig() |
|
trainModel(model, text, tokenizer, trainingConfig) |
|
model.saveModel(modelPath) |
|
|
|
# Get user prompt |
|
prompt = input("Enter a prompt (or press Enter for 'hello'): ").strip() |
|
generatedText = model.predict(prompt) |
|
print(f"Generated text: {generatedText}") |
|
|
|
if __name__ == "__main__": |
|
_main() |