Easy-to-read Transformers implementation, written by Grok 3.0.

17 min read Original article ↗
try: import cupy as cp except ImportError: import numpy as cp try: from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, TimeElapsedColumn except ImportError: Progress = None import os import pickle import math # Constants grouped by usage # Model architecture WEIGHT_INIT_SCALE = 0.01 # Scale for initializing weights POS_ENCODING_BASE = 10000.0 # Base for positional encoding frequencies DROPOUT_RATE = 0.1 # Dropout rate for regularization EMBED_SCALE = 64 # Increased scale factor for embedding dimension BASE_NUM_HEADS = 2 # Base number of attention heads HEADS_PER_ENTROPY = 2 # Heads added per entropy unit FF_MULTIPLIER = 4 # Multiplier for feedforward dimension BASE_NUM_LAYERS = 1 # Base number of transformer layers LAYERS_PER_ENTROPY = 3 # Layers added per entropy unit # Training LEARNING_RATE = 0.003 # Increased learning rate for faster convergence EPSILON = 1e-8 # Small value for numerical stability NUM_EPOCHS = 100 # Number of training epochs BATCH_SIZE = 16 # Reduced batch size for more updates VALIDATION_SPLIT = 0.2 # Fraction of data for validation PATIENCE = 10 # Number of epochs to wait for early stopping # Sequence lengths MIN_SEQ_LEN = 10 # Minimum sequence length for training MAX_SEQ_LEN_DEFAULT = 50 # Default maximum sequence length if text-based computation is small # Generation TEMPERATURE = 0.5 # Sampling temperature for generation MAX_GENERATION_LENGTH = 20 # Maximum length for generated text # Dataset TEXT_REPETITIONS = 20 # Number of times to repeat default text DEFAULT_TEXT = ( "hello world this is a test for transformer " "the quick brown fox jumps over the lazy dog " "machine learning is fun and exciting to explore " "coding in python makes life easier every day " "data science opens new doors to innovation " "artificial intelligence shapes the future now " ) * TEXT_REPETITIONS # Diverse phrases, repeated for more data class ProgressHandler: """Handles progress bar display using rich.progress or no-op if unavailable.""" def __init__(self, totalEpochs, totalBatches): self.totalEpochs = totalEpochs self.totalBatches = totalBatches self.progress = None self.epochTask = None self.batchTask = None def __enter__(self): if Progress is not None: self.progress = Progress( TextColumn("[progress.description]{task.description}"), BarColumn(), TimeElapsedColumn(), TimeRemainingColumn(), ) self.progress.__enter__() self.epochTask = self.progress.add_task( f"Epoch 1/{self.totalEpochs}, Train Loss: 0.000000, Val Loss: 0.000000", total=self.totalEpochs, completed=0 ) return self def __exit__(self, exc_type, exc_val, exc_tb): if self.progress is not None: self.progress.__exit__(exc_type, exc_val, exc_tb) def startBatch(self): if self.progress is not None: self.batchTask = self.progress.add_task( f"Batch 1/{self.totalBatches}", total=self.totalBatches, completed=0 ) def updateBatch(self, batchIdx): if self.progress is not None and self.batchTask is not None: self.progress.update( self.batchTask, advance=1, description=f"Batch {batchIdx}/{self.totalBatches}", completed=batchIdx ) def endBatch(self): if self.progress is not None and self.batchTask is not None: self.progress.update(self.batchTask, visible=False) def updateEpoch(self, epoch, trainLoss, valLoss): if self.progress is not None and self.epochTask is not None: self.progress.update( self.epochTask, advance=1, description=f"Epoch {epoch}/{self.totalEpochs}, Train Loss: {trainLoss:.6f}, Val Loss: {valLoss:.6f}", completed=epoch ) def softmax(x, axis=None): """Custom softmax for CuPy/NumPy compatibility.""" expX = cp.exp(x - cp.max(x, axis=axis, keepdims=True)) return expX / cp.sum(expX, axis=axis, keepdims=True) class LayerNorm: """Layer normalization with learnable parameters.""" def __init__(self, embedDim, epsilon=EPSILON): self.embedDim = embedDim self.epsilon = epsilon self.gamma = cp.ones((1, 1, embedDim)) # Scale parameter self.beta = cp.zeros((1, 1, embedDim)) # Shift parameter self.cache = {} # For backprop def forward(self, x): # Normalize: (x - mean) / sqrt(var + epsilon) * gamma + beta mean = cp.mean(x, axis=-1, keepdims=True) var = cp.var(x, axis=-1, keepdims=True) xNorm = (x - mean) / cp.sqrt(var + self.epsilon) out = xNorm * self.gamma + self.beta self.cache['x'] = x self.cache['mean'] = mean self.cache['var'] = var self.cache['xNorm'] = xNorm return out def backward(self, dOut): # Backprop through layer norm x, mean, var, xNorm = self.cache['x'], self.cache['mean'], self.cache['var'], self.cache['xNorm'] N = self.embedDim # Gradients for gamma and beta dGamma = cp.sum(dOut * xNorm, axis=(0, 1), keepdims=True) dBeta = cp.sum(dOut, axis=(0, 1), keepdims=True) # Gradient for xNorm dxNorm = dOut * self.gamma # Gradient for variance dVar = cp.sum(dxNorm * (x - mean) * -0.5 * (var + self.epsilon)**(-1.5), axis=-1, keepdims=True) # Gradient for mean dMean = cp.sum(dxNorm * -1 / cp.sqrt(var + self.epsilon), axis=-1, keepdims=True) dx = dxNorm / cp.sqrt(var + self.epsilon) + dVar * 2 * (x - mean) / N + dMean / N return dx, [('gamma', dGamma), ('beta', dBeta)] def _collectWeights(self): # Collect layer norm weights return [('gamma', self.gamma), ('beta', self.beta)] class ModelConfig: """Configuration class for the transformer model architecture.""" def __init__(self, embedDim, numHeads, ffDim, numLayers, maxSeqLen, dropoutRate=DROPOUT_RATE): self.embedDim = embedDim self.numHeads = numHeads self.ffDim = ffDim self.numLayers = numLayers self.maxSeqLen = maxSeqLen self.dropoutRate = dropoutRate class TrainingConfig: """Configuration class for training parameters.""" def __init__(self, numEpochs=NUM_EPOCHS, batchSize=BATCH_SIZE, learningRate=LEARNING_RATE): self.numEpochs = numEpochs self.batchSize = batchSize self.learningRate = learningRate class Tokenizer: """Tokenizer class for converting text to token IDs and vice versa, including entropy computation for model scaling.""" def __init__(self, text): # Split text into words (including spaces) for word-based tokenization words = text.split() self.vocab = sorted(set(words + [' '])) # Include space as a token self.vocabSize = len(self.vocab) self.wordToIdx = {w: i for i, w in enumerate(self.vocab)} self.idxToWord = {i: w for i, w in enumerate(self.vocab)} self.entropy = self._computeEntropy(words) self.seqLen = min(50, max(MIN_SEQ_LEN, int(math.sqrt(len(words))))) # Scale with word count self.maxSeqLen = max(self.seqLen, MAX_SEQ_LEN_DEFAULT) # Ensure sufficient capacity self.modelConfig = self._computeModelConfig() def tokenize(self, text): # Convert text to token IDs (word-based) words = text.split() return [self.wordToIdx[w] for w in words if w in self.wordToIdx] def detokenize(self, tokenIds): # Convert token IDs to text return ' '.join([self.idxToWord[idx.item()] for idx in tokenIds if idx.item() in self.idxToWord]) def save(self, filePath): # Save tokenizer config, ModelConfig, and sequence lengths to a file with open(filePath, 'wb') as f: pickle.dump({ 'vocab': self.vocab, 'wordToIdx': self.wordToIdx, 'idxToWord': self.idxToWord, 'seqLen': self.seqLen, 'maxSeqLen': self.maxSeqLen, 'modelConfig': { 'embedDim': self.modelConfig.embedDim, 'numHeads': self.modelConfig.numHeads, 'ffDim': self.modelConfig.ffDim, 'numLayers': self.modelConfig.numLayers, 'maxSeqLen': self.modelConfig.maxSeqLen, 'dropoutRate': self.modelConfig.dropoutRate } }, f) def load(self, filePath): # Load tokenizer config, ModelConfig, and sequence lengths from a file with open(filePath, 'rb') as f: data = pickle.load(f) self.vocab = data['vocab'] self.vocabSize = len(self.vocab) self.wordToIdx = data['wordToIdx'] self.idxToWord = data['idxToWord'] self.entropy = 0.0 # Not saved, set to 0 as placeholder self.seqLen = data['seqLen'] self.maxSeqLen = data['maxSeqLen'] self.modelConfig = ModelConfig( embedDim=data['modelConfig']['embedDim'], numHeads=data['modelConfig']['numHeads'], ffDim=data['modelConfig']['ffDim'], numLayers=data['modelConfig']['numLayers'], maxSeqLen=data['modelConfig']['maxSeqLen'], dropoutRate=data['modelConfig']['dropoutRate'] ) return self def _computeEntropy(self, words): # Compute word-level entropy of input text wordCount = {} totalWords = len(words) for w in words: wordCount[w] = wordCount.get(w, 0) + 1 entropy = 0.0 for count in wordCount.values(): prob = count / totalWords entropy -= prob * math.log2(prob + EPSILON) # Add EPSILON to avoid log(0) return entropy def _computeModelConfig(self): # Compute ModelConfig based on cached entropy entropy = self.entropy embedDim = EMBED_SCALE * math.ceil(entropy) # Scale embedding size with entropy numHeads = BASE_NUM_HEADS + math.floor(entropy / HEADS_PER_ENTROPY) # More heads for higher entropy ffDim = FF_MULTIPLIER * embedDim # Standard transformer ratio numLayers = BASE_NUM_LAYERS + math.floor(entropy / LAYERS_PER_ENTROPY) # Deeper model for higher entropy maxSeqLen = self.maxSeqLen # Use computed maxSeqLen return ModelConfig(embedDim, numHeads, ffDim, numLayers, maxSeqLen, DROPOUT_RATE) class PositionalEncoding: """Positional Encoding class to add position information to embeddings.""" def __init__(self, maxSeqLen, embedDim): self.maxSeqLen = maxSeqLen self.embedDim = embedDim self.posEncoding = self._buildPosEncoding() def _buildPosEncoding(self): # Create sinusoidal encodings to represent sequence positions pos = cp.arange(self.maxSeqLen)[:, None] divTerm = cp.exp(cp.arange(0, self.embedDim, 2) * -(cp.log(POS_ENCODING_BASE) / self.embedDim)) posEncoding = cp.zeros((self.maxSeqLen, self.embedDim)) posEncoding[:, 0::2] = cp.sin(pos * divTerm) posEncoding[:, 1::2] = cp.cos(pos * divTerm) return posEncoding def forward(self, inputData): # Add positional encodings to input embeddings _, seqLen, _ = inputData.shape return inputData + self.posEncoding[:seqLen] def backward(self, dOutput): # Gradient for positional encoding addition is 1 for the input return dOutput class MultiHeadAttention: """Multi-Head Attention class for self-attention mechanism.""" def __init__(self, embedDim, numHeads, dropoutRate=DROPOUT_RATE): self.embedDim = embedDim self.numHeads = numHeads self.headDim = embedDim // numHeads self.dropoutRate = dropoutRate self.queryWeight = cp.random.randn(embedDim, embedDim) * WEIGHT_INIT_SCALE self.keyWeight = cp.random.randn(embedDim, embedDim) * WEIGHT_INIT_SCALE self.valueWeight = cp.random.randn(embedDim, embedDim) * WEIGHT_INIT_SCALE self.outputWeight = cp.random.randn(embedDim, embedDim) * WEIGHT_INIT_SCALE self.cache = {} # To store intermediates for backprop self.dropoutMasks = {} # To store dropout masks for backprop def forward(self, queryInput, keyInput, valueInput, mask=None, training=True): # Compute multi-head self-attention batchSize, seqLen, _ = queryInput.shape query = cp.dot(queryInput, self.queryWeight).reshape(batchSize, seqLen, self.numHeads, self.headDim).transpose(0, 2, 1, 3) key = cp.dot(keyInput, self.keyWeight).reshape(batchSize, seqLen, self.numHeads, self.headDim).transpose(0, 2, 1, 3) value = cp.dot(valueInput, self.valueWeight).reshape(batchSize, seqLen, self.numHeads, self.headDim).transpose(0, 2, 1, 3) scores = cp.matmul(query, key.transpose(0, 1, 3, 2)) / cp.sqrt(self.headDim) if mask is not None: scores = scores + mask attentionWeights = softmax(scores, axis=-1) if training: attentionWeights, mask1 = self._applyDropout(attentionWeights) self.dropoutMasks['attention'] = mask1 attentionOutput = cp.matmul(attentionWeights, value) attentionOutput = attentionOutput.transpose(0, 2, 1, 3).reshape(batchSize, seqLen, self.embedDim) output = cp.dot(attentionOutput, self.outputWeight) if training: output, mask2 = self._applyDropout(output) self.dropoutMasks['output'] = mask2 # Cache for backprop if training: self.cache['queryInput'] = queryInput self.cache['keyInput'] = keyInput self.cache['valueInput'] = valueInput self.cache['query'] = query self.cache['key'] = key self.cache['value'] = value self.cache['scores'] = scores self.cache['attentionWeights'] = attentionWeights self.cache['attentionOutput'] = attentionOutput self.cache['mask'] = mask return output, attentionWeights def backward(self, dOutput, training=True): # Backprop through multi-head attention batchSize, seqLen, _ = dOutput.shape # Dropout on output if training: dOutput = self._applyDropoutGrad(dOutput, self.dropoutMasks['output']) # Output projection dAttentionOutput = cp.dot(dOutput, self.outputWeight.T) attentionOutput_flat = self.cache['attentionOutput'].reshape(-1, self.embedDim) dOutput_flat = dOutput.reshape(-1, self.embedDim) dOutputWeight = cp.dot(attentionOutput_flat.T, dOutput_flat) # Reshape attention output grad dAttentionOutput = dAttentionOutput.reshape(batchSize, seqLen, self.numHeads, self.headDim).transpose(0, 2, 1, 3) # Attention matmul grad dValue = cp.matmul(self.cache['attentionWeights'].transpose(0, 1, 3, 2), dAttentionOutput) dAttentionWeights = cp.matmul(dAttentionOutput, self.cache['value'].transpose(0, 1, 3, 2)) # Dropout on attention weights if training: dAttentionWeights = self._applyDropoutGrad(dAttentionWeights, self.dropoutMasks['attention']) # Softmax grad dScores = self._softmaxBackward(self.cache['attentionWeights'], dAttentionWeights) # Mask not learnable if self.cache['mask'] is not None: inf_mask = self.cache['mask'] == float('-inf') inf_mask = cp.broadcast_to(inf_mask, dScores.shape) dScores[inf_mask] = 0 # Scale grad dScores /= cp.sqrt(self.headDim) # Matmul for query and key dQuery = cp.matmul(dScores, self.cache['key']) dKey = cp.matmul(dScores.transpose(0, 1, 3, 2), self.cache['query']) # Transpose back dQuery = dQuery.transpose(0, 2, 1, 3).reshape(batchSize, seqLen, self.embedDim) dKey = dKey.transpose(0, 2, 1, 3).reshape(batchSize, seqLen, self.embedDim) dValue = dValue.transpose(0, 2, 1, 3).reshape(batchSize, seqLen, self.embedDim) # Linear projections grad dQueryInput = cp.dot(dQuery, self.queryWeight.T) queryInput_flat = self.cache['queryInput'].reshape(-1, self.embedDim) dQuery_flat = dQuery.reshape(-1, self.embedDim) dQueryWeight = cp.dot(queryInput_flat.T, dQuery_flat) dKeyInput = cp.dot(dKey, self.keyWeight.T) keyInput_flat = self.cache['keyInput'].reshape(-1, self.embedDim) dKey_flat = dKey.reshape(-1, self.embedDim) dKeyWeight = cp.dot(keyInput_flat.T, dKey_flat) dValueInput = cp.dot(dValue, self.valueWeight.T) valueInput_flat = self.cache['valueInput'].reshape(-1, self.embedDim) dValue_flat = dValue.reshape(-1, self.embedDim) dValueWeight = cp.dot(valueInput_flat.T, dValue_flat) # Since self-attention, dInput = dQueryInput + dKeyInput + dValueInput dInput = dQueryInput + dKeyInput + dValueInput return dInput, [('queryWeight', dQueryWeight), ('keyWeight', dKeyWeight), ('valueWeight', dValueWeight), ('outputWeight', dOutputWeight)] def _softmaxBackward(self, softmaxOutput, dSoftmax): # Softmax gradient s = softmaxOutput ds = cp.einsum('ijkl,ijkm->ijml', dSoftmax, s) - cp.einsum('ijkl,ijml->ijkm', s, dSoftmax) return ds def _applyDropout(self, inputData): # Randomly drop elements to prevent overfitting, return mask for backprop mask = cp.random.random(inputData.shape) > self.dropoutRate return inputData * mask / (1 - self.dropoutRate), mask def _applyDropoutGrad(self, dOutput, mask): # Apply dropout gradient using saved mask return dOutput * mask / (1 - self.dropoutRate) def _collectWeights(self): # Collect attention weights return [ ('queryWeight', self.queryWeight), ('keyWeight', self.keyWeight), ('valueWeight', self.valueWeight), ('outputWeight', self.outputWeight) ] class FeedForward: """FeedForward class for the position-wise feed-forward network.""" def __init__(self, embedDim, ffDim, dropoutRate=DROPOUT_RATE): self.embedDim = embedDim self.ffDim = ffDim self.dropoutRate = dropoutRate self.firstFeedForwardWeight = cp.random.randn(embedDim, ffDim) * WEIGHT_INIT_SCALE self.secondFeedForwardWeight = cp.random.randn(ffDim, embedDim) * WEIGHT_INIT_SCALE self.cache = {} # For backprop self.dropoutMasks = {} # To store dropout masks for backprop def forward(self, inputData, training=True): # Apply two-layer feedforward network with ReLU hidden = cp.dot(inputData, self.firstFeedForwardWeight) hidden = cp.maximum(0, hidden) # ReLU if training: hidden, mask1 = self._applyDropout(hidden) self.dropoutMasks['hidden'] = mask1 output = cp.dot(hidden, self.secondFeedForwardWeight) if training: output, mask2 = self._applyDropout(output) self.dropoutMasks['output'] = mask2 if training: self.cache['inputData'] = inputData self.cache['hidden'] = hidden return output def backward(self, dOutput, training=True): # Backprop through feedforward if training: dOutput = self._applyDropoutGrad(dOutput, self.dropoutMasks['output']) dHidden = cp.dot(dOutput, self.secondFeedForwardWeight.T) hidden_flat = self.cache['hidden'].reshape(-1, self.ffDim) dOutput_flat = dOutput.reshape(-1, self.embedDim) dSecondWeight = cp.dot(hidden_flat.T, dOutput_flat) # ReLU grad dHidden[self.cache['hidden'] <= 0] = 0 if training: dHidden = self._applyDropoutGrad(dHidden, self.dropoutMasks['hidden']) dInput = cp.dot(dHidden, self.firstFeedForwardWeight.T) input_flat = self.cache['inputData'].reshape(-1, self.embedDim) dHidden_flat = dHidden.reshape(-1, self.ffDim) dFirstWeight = cp.dot(input_flat.T, dHidden_flat) return dInput, [('firstFeedForwardWeight', dFirstWeight), ('secondFeedForwardWeight', dSecondWeight)] def _applyDropout(self, inputData): # Randomly drop elements to prevent overfitting, return mask for backprop mask = cp.random.random(inputData.shape) > self.dropoutRate return inputData * mask / (1 - self.dropoutRate), mask def _applyDropoutGrad(self, dOutput, mask): # Apply dropout gradient using saved mask return dOutput * mask / (1 - self.dropoutRate) def _collectWeights(self): # Collect feedforward weights return [ ('firstFeedForwardWeight', self.firstFeedForwardWeight), ('secondFeedForwardWeight', self.secondFeedForwardWeight) ] class TransformerBlock: """TransformerBlock class combining attention and feedforward with residual connections and normalization.""" def __init__(self, modelConfig): self.embedDim = modelConfig.embedDim self.numHeads = modelConfig.numHeads self.ffDim = modelConfig.ffDim self.dropoutRate = modelConfig.dropoutRate self.attention = MultiHeadAttention(self.embedDim, self.numHeads, self.dropoutRate) self.norm1 = LayerNorm(self.embedDim) self.feedForward = FeedForward(self.embedDim, self.ffDim, self.dropoutRate) self.norm2 = LayerNorm(self.embedDim) self.cache = {} # For backprop def forward(self, inputData, mask=None, training=True): # Combine attention and feedforward with layer normalization attentionOutput, _ = self.attention.forward(inputData, inputData, inputData, mask, training) postAttn = inputData + attentionOutput normOutput = self.norm1.forward(postAttn) ffOutput = self.feedForward.forward(normOutput, training) postFF = normOutput + ffOutput output = self.norm2.forward(postFF) if training: self.cache['inputData'] = inputData self.cache['attentionOutput'] = attentionOutput self.cache['postAttn'] = postAttn self.cache['normOutput'] = normOutput self.cache['ffOutput'] = ffOutput self.cache['postFF'] = postFF return output def backward(self, dOutput, training=True): # Backprop through transformer block # Second layer norm dPostFF, norm2Grads = self.norm2.backward(dOutput) # FF + residual dFfOutput = dPostFF dNormOutput = dPostFF # FF back dNormOutputFf, ffGrads = self.feedForward.backward(dFfOutput, training) dNormOutput += dNormOutputFf # First layer norm dPostAttn, norm1Grads = self.norm1.backward(dNormOutput) # Attention + residual dAttentionOutput = dPostAttn dInput = dPostAttn # Attention back dInputAtt, attGrads = self.attention.backward(dAttentionOutput, training) dInput += dInputAtt return dInput, attGrads + norm1Grads + ffGrads + norm2Grads def _collectWeights(self): # Collect weights from attention, norms, and feedforward weights = self.attention._collectWeights() weights.extend(self.norm1._collectWeights()) weights.extend(self.feedForward._collectWeights()) weights.extend(self.norm2._collectWeights()) return weights class Transformer: """Transformer class representing the full model with embeddings, positional encoding, blocks, and output.""" def __init__(self, tokenizer): self.modelConfig = tokenizer.modelConfig self.tokenizer = tokenizer self.embedding = self._buildEmbedding() self.posEncoding = PositionalEncoding(self.modelConfig.maxSeqLen, self.modelConfig.embedDim) self.layers = self._buildLayers() self.outputLayer = self._buildOutputLayer() self.cache = {} # For backprop self.dropoutMasks = {} # To store dropout masks for backprop def _buildEmbedding(self): # Initialize embedding matrix return cp.random.randn(self.tokenizer.vocabSize, self.modelConfig.embedDim) * WEIGHT_INIT_SCALE def _buildLayers(self): # Create multiple transformer blocks return [TransformerBlock(self.modelConfig) for _ in range(self.modelConfig.numLayers)] def _buildOutputLayer(self): # Initialize output projection layer return cp.random.randn(self.modelConfig.embedDim, self.tokenizer.vocabSize) * WEIGHT_INIT_SCALE def forward(self, inputIds, mask=None, training=True): # Forward pass: embedding -> pos encoding -> layers -> output embedded = self.embedding[inputIds] if training: embedded, mask1 = self._applyDropout(embedded) self.dropoutMasks['embedded'] = mask1 posOutput = self.posEncoding.forward(embedded) output = posOutput for layer in self.layers: output = layer.forward(output, mask, training) logits = cp.dot(output, self.outputLayer) if training: self.cache['inputIds'] = inputIds self.cache['embedded'] = embedded self.cache['posOutput'] = posOutput self.cache['output'] = output return logits def backward(self, dLogits, training=True): # Backprop through the model dOutput = cp.dot(dLogits, self.outputLayer.T) output_flat = self.cache['output'].reshape(-1, self.modelConfig.embedDim) dLogits_flat = dLogits.reshape(-1, self.tokenizer.vocabSize) dOutputLayer = cp.dot(output_flat.T, dLogits_flat) # Back through layers layerGrads = [] dLayer = dOutput for i in range(self.modelConfig.numLayers - 1, -1, -1): layer = self.layers[i] dLayer, lGrads = layer.backward(dLayer, training) prefixedGrads = [(f'layer{i}_{name}', grad) for name, grad in lGrads] layerGrads.extend(prefixedGrads) # Pos encoding dPos = self.posEncoding.backward(dLayer) # Dropout on embedded if training: dPos = self._applyDropoutGrad(dPos, self.dropoutMasks['embedded']) # Embedding grad dEmbedding = cp.zeros_like(self.embedding) for b in range(dPos.shape[0]): for s in range(dPos.shape[1]): idx = self.cache['inputIds'][b, s] dEmbedding[idx] += dPos[b, s] grads = [('embedding', dEmbedding), ('outputLayer', dOutputLayer)] + layerGrads return grads def _applyDropout(self, inputData): # Randomly drop elements to prevent overfitting, return mask for backprop mask = cp.random.random(inputData.shape) > self.modelConfig.dropoutRate return inputData * mask / (1 - self.modelConfig.dropoutRate), mask def _applyDropoutGrad(self, dOutput, mask): # Apply dropout gradient using saved mask return dOutput * mask / (1 - self.modelConfig.dropoutRate) def generate(self, inputIds, maxLength=MAX_GENERATION_LENGTH, temperature=TEMPERATURE): # Generate text by sampling tokens with temperature for _ in range(maxLength - inputIds.shape[1]): logits = self.forward(inputIds, _createCausalMask(inputIds.shape[1]), training=False) scaledLogits = logits[:, -1, :] / temperature probs = softmax(scaledLogits, axis=-1) nextToken = cp.random.choice(cp.arange(self.tokenizer.vocabSize), size=1, p=probs[0])[0] inputIds = cp.concatenate([inputIds, cp.array([[nextToken]])], axis=1) return inputIds def predict(self, prompt, maxLength=MAX_GENERATION_LENGTH, temperature=TEMPERATURE): # Generate text from a user prompt if not prompt or not all(w in self.tokenizer.wordToIdx for w in prompt.split()): prompt = "hello" # Default prompt if empty or invalid inputIds = cp.array([self.tokenizer.tokenize(prompt)]) generatedIds = self.generate(inputIds, maxLength, temperature) generatedText = self.tokenizer.detokenize(generatedIds[0]) return generatedText def _collectWeights(self): # Collect all model weights weights = [ ('embedding', self.embedding), ('outputLayer', self.outputLayer) ] for i, layer in enumerate(self.layers): for name, weight in layer._collectWeights(): weights.append((f'layer{i}_{name}', weight)) return weights def _setWeights(self, weights): # Set model weights from a dictionary self.embedding = weights['embedding'] self.outputLayer = weights['outputLayer'] for i, layer in enumerate(self.layers): layer.attention.queryWeight = weights[f'layer{i}_queryWeight'] layer.attention.keyWeight = weights[f'layer{i}_keyWeight'] layer.attention.valueWeight = weights[f'layer{i}_valueWeight'] layer.attention.outputWeight = weights[f'layer{i}_outputWeight'] layer.feedForward.firstFeedForwardWeight = weights[f'layer{i}_firstFeedForwardWeight'] layer.feedForward.secondFeedForwardWeight = weights[f'layer{i}_secondFeedForwardWeight'] layer.norm1.gamma = weights[f'layer{i}_gamma'] layer.norm1.beta = weights[f'layer{i}_beta'] layer.norm2.gamma = weights[f'layer{i}_gamma'] layer.norm2.beta = weights[f'layer{i}_beta'] def saveModel(self, filePath): # Save model weights to a file weights = {name: weight for name, weight in self._collectWeights()} cp.savez(filePath, **weights) def loadModel(self, filePath): # Load model weights from a file weights = cp.load(filePath) self._setWeights(weights) def _createCausalMask(seqLen): # Create mask to prevent attending to future tokens mask = cp.triu(cp.ones((seqLen, seqLen)) * float('-inf'), k=1) return mask[None, None, :, :] def _computeLoss(logits, targetData): # Compute cross-entropy loss batchSize, seqLen, vocabSize = logits.shape logProbs = softmax(logits, axis=-1) logProbs = cp.log(logProbs + EPSILON) # Add small value to avoid log(0) targetOneHot = cp.zeros_like(logProbs) targetOneHot[cp.arange(batchSize)[:, None], cp.arange(seqLen)[None, :], targetData] = 1 loss = -cp.sum(logProbs * targetOneHot, axis=-1) return loss def _computeValidationLoss(model, valInput, valTarget, batchSize): # Compute average loss on validation data numValSamples = valInput.shape[0] valLoss = 0.0 for i in range(0, numValSamples, batchSize): batchValInput = valInput[i:i+batchSize] batchValTarget = valTarget[i:i+batchSize] logits = model.forward(batchValInput, _createCausalMask(batchValInput.shape[1]), training=False) loss = _computeLoss(logits, batchValTarget) valLoss += loss.mean().item() * batchValInput.shape[0] return valLoss / numValSamples if numValSamples > 0 else 0.0 def _computeGradients(logits, targetData, inputData, model): # Compute gradients for all weights using backpropagation batchSize, seqLen, vocabSize = logits.shape probs = softmax(logits, axis=-1) targetOneHot = cp.zeros_like(probs) targetOneHot[cp.arange(batchSize)[:, None], cp.arange(seqLen)[None, :], targetData] = 1 dLogits = (probs - targetOneHot) / batchSize # Backprop through the model grads = model.backward(dLogits) return grads def _updateWeights(model, grads, learningRate): # Update all weights using gradient descent weights = {name: weight for name, weight in model._collectWeights()} for name, grad in grads: grad = cp.clip(grad, -1.0, 1.0) # Clip gradients to prevent explosion weights[name] -= learningRate * grad model._setWeights(weights) def _getDataSets(text, tokenizer, seqLen): # Prepare training and validation data using tokenizer and text tokenizedText = tokenizer.tokenize(text) inputData = cp.array([tokenizedText[i:i+seqLen] for i in range(0, len(tokenizedText)-seqLen, 1)]) targetData = cp.array([tokenizedText[i+1:i+seqLen+1] for i in range(0, len(tokenizedText)-seqLen, 1)]) numSamples = inputData.shape[0] splitIndex = int(numSamples * (1 - VALIDATION_SPLIT)) trainInput = inputData[:splitIndex] trainTarget = targetData[:splitIndex] valInput = inputData[splitIndex:] valTarget = targetData[splitIndex:] return trainInput, trainTarget, valInput, valTarget def trainModel(model, text, tokenizer, trainingConfig): # Train model with mini-batches, compute validation loss, and apply early stopping trainInput, trainTarget, valInput, valTarget = _getDataSets(text, tokenizer, tokenizer.seqLen) numTrainSamples = trainInput.shape[0] totalBatches = math.ceil(numTrainSamples / trainingConfig.batchSize) bestValLoss = float('inf') epochsWithoutImprovement = 0 with ProgressHandler(trainingConfig.numEpochs, totalBatches) as progress: for epoch in range(trainingConfig.numEpochs): epochLoss = 0.0 progress.startBatch() for i in range(0, numTrainSamples, trainingConfig.batchSize): batchInput = trainInput[i:i+trainingConfig.batchSize] batchTarget = trainTarget[i:i+trainingConfig.batchSize] logits = model.forward(batchInput, _createCausalMask(batchInput.shape[1])) loss = _computeLoss(logits, batchTarget) if cp.any(cp.isnan(loss)): progress.updateEpoch(epoch + 1, float('nan'), float('nan')) return # Stop training if NaN loss detected grads = _computeGradients(logits, batchTarget, batchInput, model) _updateWeights(model, grads, trainingConfig.learningRate) epochLoss += loss.mean().item() * batchInput.shape[0] progress.updateBatch(i // trainingConfig.batchSize + 1) avgTrainLoss = epochLoss / numTrainSamples if numTrainSamples > 0 else 0.0 avgValLoss = _computeValidationLoss(model, valInput, valTarget, trainingConfig.batchSize) progress.endBatch() progress.updateEpoch(epoch + 1, avgTrainLoss, avgValLoss) # Early stopping if avgValLoss < bestValLoss: bestValLoss = avgValLoss epochsWithoutImprovement = 0 else: epochsWithoutImprovement += 1 if epochsWithoutImprovement >= PATIENCE: if progress.progress is not None: progress.progress.update( progress.epochTask, description=f"Early stopping at Epoch {epoch + 1}/{trainingConfig.numEpochs}, Train Loss: {avgTrainLoss:.6f}, Val Loss: {avgValLoss:.6f}" ) break def loadTrainingText(filePath): # Load text from file or use default if os.path.exists(filePath): with open(filePath, 'r', encoding='utf-8') as f: return f.read().strip() return DEFAULT_TEXT def _main(): # Setup model and data modelPath = "transformers.npz" tokenPath = "transformers.tkn" # Load or create tokenizer and model if os.path.exists(modelPath) and os.path.exists(tokenPath): print("Loading saved tokenizer and model...") tokenizer = Tokenizer("").load(tokenPath) model = Transformer(tokenizer) model.loadModel(modelPath) else: print("Creating new tokenizer and training model...") text = loadTrainingText("input.txt") tokenizer = Tokenizer(text) tokenizer.save(tokenPath) model = Transformer(tokenizer) trainingConfig = TrainingConfig() trainModel(model, text, tokenizer, trainingConfig) model.saveModel(modelPath) # Get user prompt prompt = input("Enter a prompt (or press Enter for 'hello'): ").strip() generatedText = model.predict(prompt) print(f"Generated text: {generatedText}") if __name__ == "__main__": _main()