Transformer-code

简介

今天来读一下《Attention is All You Need》的代码,也就是 Transformer。pytorch 代码地址)。

目录结构

Transformer 文件夹下有以下文件:

  • Constants.py:模型常量定义
  • Layers.py:编码器和解码器层定义
  • Models.py:模型定义
  • Modules.py:工具模块
  • Optim.py:优化模块
  • SubLayer.py:多头注意力机制等子层
  • Translator.py:翻译 beam search

顶层目录下有以下文件:

  • train.py:训练入口,实例化模型、优化器等,进行优化迭代
  • learn_bpe.py:bpe 学习词表
  • apply_bpe.py:使用 bpe 得到的词表将文本进行编码
  • translate.py:加载模型进行翻译

下面逐一进行分析模型相关文件,剩下的文件等到下次再读吧。

模型解析

下面按照依赖顺序对各文件进行解析。

Constants.py

文件内容非常简单,分别为填充、未知、起始、终止四种 token 的定义。

1
2
3
4
PAD_WORD = '<blank>' # 填充token
UNK_WORD = '<unk>' # 未知token
BOS_WORD = '<s>' # 起始token
EOS_WORD = '</s>' # 结束token

Modules.py

定义了标量化点乘注意力机制类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import torch
import torch.nn as nn
import torch.nn.functional as F

class ScaledDotProductAttention(nn.Module):
''' Scaled Dot-Product Attention '''

def __init__(self, temperature, attn_dropout=0.1):
super().__init__()
self.temperature = temperature # softmax的温度系数,论文中为\sqrt d_k
self.dropout = nn.Dropout(attn_dropout) # 原论文dropout比例即为0.1

def forward(self, q, k, v, mask=None):
'''
q,k: bsz x n_head x lq x d_k
v: bsz x n_head x lq x d_v
'''
attn = torch.matmul(q / self.temperature, k.transpose(2, 3))

if mask is not None:
attn = attn.masked_fill(mask == 0, -1e9) # 填充位置的注意力掩码,避免信息泄露等问题

attn = self.dropout(F.softmax(attn, dim=-1)) # 注意力的dropout
output = torch.matmul(attn, v)

return output, attn

SubLayers.py

多头注意力机制定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

class MultiHeadAttention(nn.Module):
''' Multi-Head Attention module '''
'''
n_head:注意力头数
d_model:词嵌入向量维度
d_k:query,key向量维度
d_v:value向量维度,d_v*n_head即为注意力输出的维度
'''
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1):
super().__init__()

self.n_head = n_head
self.d_k = d_k
self.d_v = d_v
# query key value 矩阵
self.w_qs = nn.Linear(d_model, n_head * d_k, bias=False)
self.w_ks = nn.Linear(d_model, n_head * d_k, bias=False)
self.w_vs = nn.Linear(d_model, n_head * d_v, bias=False)
self.fc = nn.Linear(n_head * d_v, d_model, bias=False) #全连接层

self.attention = ScaledDotProductAttention(temperature=d_k ** 0.5) #注意力计算

self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6) # 层标准化


def forward(self, q, k, v, mask=None):

d_k, d_v, n_head = self.d_k, self.d_v, self.n_head
sz_b, len_q, len_k, len_v = q.size(0), q.size(1), k.size(1), v.size(1)

residual = q

# Pass through the pre-attention projection: b x lq x (n*dv)
# Separate different heads: b x lq x n x dv
q = self.w_qs(q).view(sz_b, len_q, n_head, d_k)
k = self.w_ks(k).view(sz_b, len_k, n_head, d_k)
v = self.w_vs(v).view(sz_b, len_v, n_head, d_v)

# Transpose for attention dot product: b x n x lq x dv
q, k, v = q.transpose(1, 2), k.transpose(1, 2), v.transpose(1, 2)

if mask is not None:
mask = mask.unsqueeze(1) # For head axis broadcasting.
# q为注意力的输出
q, attn = self.attention(q, k, v, mask=mask)

# Transpose to move the head dimension back: b x lq x n x dv
# Combine the last two dimensions to concatenate all the heads together: b x lq x (n*dv)
q = q.transpose(1, 2).contiguous().view(sz_b, len_q, -1)
q = self.dropout(self.fc(q))
q += residual

q = self.layer_norm(q)

return q, attn

前馈神经网络子层、残差层定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class PositionwiseFeedForward(nn.Module):
''' A two-feed-forward-layer module '''

def __init__(self, d_in, d_hid, dropout=0.1):
super().__init__()
self.w_1 = nn.Linear(d_in, d_hid) # position-wise
self.w_2 = nn.Linear(d_hid, d_in) # position-wise
self.layer_norm = nn.LayerNorm(d_in, eps=1e-6)
self.dropout = nn.Dropout(dropout)

def forward(self, x):

residual = x

x = self.w_2(F.relu(self.w_1(x)))
x = self.dropout(x)
x += residual # 残差计算

x = self.layer_norm(x)

return x

Layers.py

编码器层类(非常简单,将多头注意力机制与前馈神经网络拼接起来即可):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
''' Define the Layers '''
import torch.nn as nn
import torch
from transformer.SubLayers import MultiHeadAttention, PositionwiseFeedForward

class EncoderLayer(nn.Module):
''' Compose with two layers '''

def __init__(self, d_model, d_inner, n_head, d_k, d_v, dropout=0.1):
super(EncoderLayer, self).__init__()
self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner, dropout=dropout)

def forward(self, enc_input, slf_attn_mask=None):
enc_output, enc_slf_attn = self.slf_attn(
enc_input, enc_input, enc_input, mask=slf_attn_mask)
enc_output = self.pos_ffn(enc_output)
return enc_output, enc_slf_attn


解码器层类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class DecoderLayer(nn.Module):
''' Compose with three layers '''

def __init__(self, d_model, d_inner, n_head, d_k, d_v, dropout=0.1):
super(DecoderLayer, self).__init__()
self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout)
self.enc_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout) # 解码器对编码器输出的注意力
self.pos_ffn = PositionwiseFeedForward(d_model, d_inner, dropout=dropout)

def forward(
self, dec_input, enc_output,
slf_attn_mask=None, dec_enc_attn_mask=None):
dec_output, dec_slf_attn = self.slf_attn(
dec_input, dec_input, dec_input, mask=slf_attn_mask) # 解码器的自注意力
dec_output, dec_enc_attn = self.enc_attn(
dec_output, enc_output, enc_output, mask=dec_enc_attn_mask) # 解码器对编码器输出的注意力
dec_output = self.pos_ffn(dec_output) # 前馈神经网络
return dec_output, dec_slf_attn, dec_enc_attn

Models.py

Models.py 是模型定义核心文件。

工具函数:

1
2
3
4
5
6
7
8
9
def get_pad_mask(seq, pad_idx): # 获取序列的MASK(填充位置为0)
return (seq != pad_idx).unsqueeze(-2)

def get_subsequent_mask(seq): # 获取序列的所有MASK(长度为1,2,...n)
''' For masking out the subsequent info. '''
sz_b, len_s = seq.size()
subsequent_mask = (1 - torch.triu(
torch.ones((1, len_s, len_s), device=seq.device), diagonal=1)).bool()
return subsequent_mask

位置编码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class PositionalEncoding(nn.Module):

def __init__(self, d_hid, n_position=200):
super(PositionalEncoding, self).__init__()

# Not a parameter
# 成员变量无法保存在模型参数中且无法通过.cuda()转移到gpu上,register_buffer注册后则可以
self.register_buffer('pos_table', self._get_sinusoid_encoding_table(n_position, d_hid))

def _get_sinusoid_encoding_table(self, n_position, d_hid):
''' Sinusoid position encoding table '''
# TODO: make it with torch instead of numpy

def get_position_angle_vec(position):
return [position / np.power(10000, 2 * (hid_j // 2) / d_hid) for hid_j in range(d_hid)]

sinusoid_table = np.array([get_position_angle_vec(pos_i) for pos_i in range(n_position)])
sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) # dim 2i
sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) # dim 2i+1

return torch.FloatTensor(sinusoid_table).unsqueeze(0)

def forward(self, x):
return x + self.pos_table[:, :x.size(1)].clone().detach()

Transformer 编码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class Encoder(nn.Module):
''' A encoder model with self attention mechanism. '''
def __init__(
self, n_src_vocab, d_word_vec, n_layers, n_head, d_k, d_v,
d_model, d_inner, pad_idx, dropout=0.1, n_position=200, scale_emb=False):

super().__init__()

self.src_word_emb = nn.Embedding(n_src_vocab, d_word_vec, padding_idx=pad_idx) # 词嵌入表
self.position_enc = PositionalEncoding(d_word_vec, n_position=n_position)
self.dropout = nn.Dropout(p=dropout)
self.layer_stack = nn.ModuleList([
EncoderLayer(d_model, d_inner, n_head, d_k, d_v, dropout=dropout) # 编码网络
for _ in range(n_layers)])
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
self.scale_emb = scale_emb
self.d_model = d_model

def forward(self, src_seq, src_mask, return_attns=False):

enc_slf_attn_list = []

# -- Forward
enc_output = self.src_word_emb(src_seq)
if self.scale_emb:
enc_output *= self.d_model ** 0.5
enc_output = self.dropout(self.position_enc(enc_output))
enc_output = self.layer_norm(enc_output)

for enc_layer in self.layer_stack: # 编码网络
enc_output, enc_slf_attn = enc_layer(enc_output, slf_attn_mask=src_mask)
enc_slf_attn_list += [enc_slf_attn] if return_attns else []

if return_attns:
return enc_output, enc_slf_attn_list
return enc_output,

Transformer 解码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Decoder(nn.Module):
''' A decoder model with self attention mechanism. '''
'''
与编码器类似
'''
def __init__(
self, n_trg_vocab, d_word_vec, n_layers, n_head, d_k, d_v,
d_model, d_inner, pad_idx, n_position=200, dropout=0.1, scale_emb=False):

super().__init__()

self.trg_word_emb = nn.Embedding(n_trg_vocab, d_word_vec, padding_idx=pad_idx)
self.position_enc = PositionalEncoding(d_word_vec, n_position=n_position)
self.dropout = nn.Dropout(p=dropout)
self.layer_stack = nn.ModuleList([
DecoderLayer(d_model, d_inner, n_head, d_k, d_v, dropout=dropout)
for _ in range(n_layers)])
self.layer_norm = nn.LayerNorm(d_model, eps=1e-6)
self.scale_emb = scale_emb
self.d_model = d_model

def forward(self, trg_seq, trg_mask, enc_output, src_mask, return_attns=False):

dec_slf_attn_list, dec_enc_attn_list = [], []

# -- Forward
dec_output = self.trg_word_emb(trg_seq)
if self.scale_emb:
dec_output *= self.d_model ** 0.5 # Transformer论文中的trick,对词嵌入向量进行放大
dec_output = self.dropout(self.position_enc(dec_output))
dec_output = self.layer_norm(dec_output)

for dec_layer in self.layer_stack:
dec_output, dec_slf_attn, dec_enc_attn = dec_layer(
dec_output, enc_output, slf_attn_mask=trg_mask, dec_enc_attn_mask=src_mask)
dec_slf_attn_list += [dec_slf_attn] if return_attns else []
dec_enc_attn_list += [dec_enc_attn] if return_attns else []

if return_attns:
return dec_output, dec_slf_attn_list, dec_enc_attn_list
return dec_output,

Transformer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class Transformer(nn.Module):
''' A sequence to sequence model with attention mechanism. '''

def __init__(
self, n_src_vocab, n_trg_vocab, src_pad_idx, trg_pad_idx,
d_word_vec=512, d_model=512, d_inner=2048,
n_layers=6, n_head=8, d_k=64, d_v=64, dropout=0.1, n_position=200,
trg_emb_prj_weight_sharing=True, emb_src_trg_weight_sharing=True,
scale_emb_or_prj='prj'):

super().__init__()

self.src_pad_idx, self.trg_pad_idx = src_pad_idx, trg_pad_idx

# In section 3.4 of paper "Attention Is All You Need", there is such detail:
# "In our model, we share the same weight matrix between the two
# embedding layers and the pre-softmax linear transformation...
# In the embedding layers, we multiply those weights by \sqrt{d_model}".
#
# Options here:
# 'emb': multiply \sqrt{d_model} to embedding output
# 'prj': multiply (\sqrt{d_model} ^ -1) to linear projection output
# 'none': no multiplication

assert scale_emb_or_prj in ['emb', 'prj', 'none']
scale_emb = (scale_emb_or_prj == 'emb') if trg_emb_prj_weight_sharing else False
self.scale_prj = (scale_emb_or_prj == 'prj') if trg_emb_prj_weight_sharing else False
self.d_model = d_model

self.encoder = Encoder(
n_src_vocab=n_src_vocab, n_position=n_position,
d_word_vec=d_word_vec, d_model=d_model, d_inner=d_inner,
n_layers=n_layers, n_head=n_head, d_k=d_k, d_v=d_v,
pad_idx=src_pad_idx, dropout=dropout, scale_emb=scale_emb)

self.decoder = Decoder(
n_trg_vocab=n_trg_vocab, n_position=n_position,
d_word_vec=d_word_vec, d_model=d_model, d_inner=d_inner,
n_layers=n_layers, n_head=n_head, d_k=d_k, d_v=d_v,
pad_idx=trg_pad_idx, dropout=dropout, scale_emb=scale_emb)

self.trg_word_prj = nn.Linear(d_model, n_trg_vocab, bias=False) # 投影到词表

for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p) # xavier 初始化权重

assert d_model == d_word_vec, \
'To facilitate the residual connections, \
the dimensions of all module outputs shall be the same.'
# 嵌入层与投影线性层权重共享
if trg_emb_prj_weight_sharing:
# Share the weight between target word embedding & last dense layer
self.trg_word_prj.weight = self.decoder.trg_word_emb.weight

if emb_src_trg_weight_sharing:
self.encoder.src_word_emb.weight = self.decoder.trg_word_emb.weight

def forward(self, src_seq, trg_seq):

src_mask = get_pad_mask(src_seq, self.src_pad_idx)
trg_mask = get_pad_mask(trg_seq, self.trg_pad_idx) & get_subsequent_mask(trg_seq)

enc_output, *_ = self.encoder(src_seq, src_mask)
dec_output, *_ = self.decoder(trg_seq, trg_mask, enc_output, src_mask)
seq_logit = self.trg_word_prj(dec_output)
if self.scale_prj:
seq_logit *= self.d_model ** -0.5

return seq_logit.view(-1, seq_logit.size(2))

Optim.py

一个简单封装的优化器类,用以动态调整学习率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
'''A wrapper class for scheduled optimizer '''
import numpy as np

class ScheduledOptim():
'''A simple wrapper class for learning rate scheduling'''

def __init__(self, optimizer, lr_mul, d_model, n_warmup_steps):
self._optimizer = optimizer
self.lr_mul = lr_mul
self.d_model = d_model
self.n_warmup_steps = n_warmup_steps
self.n_steps = 0


def step_and_update_lr(self):
"Step with the inner optimizer"
self._update_learning_rate()
self._optimizer.step()


def zero_grad(self):
"Zero out the gradients with the inner optimizer"
self._optimizer.zero_grad()


def _get_lr_scale(self):
d_model = self.d_model
n_steps, n_warmup_steps = self.n_steps, self.n_warmup_steps
return (d_model ** -0.5) * min(n_steps ** (-0.5), n_steps * n_warmup_steps ** (-1.5))


def _update_learning_rate(self):
''' Learning rate scheduling per step '''

self.n_steps += 1
lr = self.lr_mul * self._get_lr_scale()

for param_group in self._optimizer.param_groups:
param_group['lr'] = lr

总结

读一遍代码发现了论文中忽视的好几个点,例如嵌入向量放缩、dropout 等,读代码还是很有必要的。