双塔召回模型
双塔模型在推荐领域中是一个十分经典的模型,无论是在召回还是粗排阶段,都会是首选。这主要是得益于双塔模型结构,使得能够在线预估时满足低延时的要求。但也是因为其模型结构的问题,使得无法考虑到 user 和 item 特之间的特征交叉,使得影响模型最终效果,因此很多工作尝试调整经典双塔模型结构,在保持在线预估低延时的同时,保证双塔两侧之间有效的信息交叉。下面针对于经典双塔模型以及一些改进版本进行介绍。
经典双塔模型
DSSM (Deep Structured Semantic Model) 是由微软研究院于 CIKM 在 2013 年提出的一篇工作,该模型主要用来解决 NLP 领域语义相似度任务 ,利用深度神经网络将文本表示为低维度的向量,用来提升搜索场景下文档和 query 匹配的问题。DSSM 模型的原理主要是:通过用户搜索行为中 query 和 doc 的日志数据,通过深度学习网络将 query 和 doc 映射到到共同维度的语义空间中,通过最大化 query 和 doc 语义向量之 间的余弦相似度,从而训练得到隐含语义模型,即 query 侧特征的 embedding 和 doc 侧特征的 embedding,进而可以获取语句的低维 语义向量表达 sentence embedding,可以预测两句话的语义相似度。模型结构如下所示:

从上图可以看出,该网络结构比较简单,是一个由几层 DNN 组成网络,我们将要搜索文本 (Query) 和要匹配的文本 (Document) 的 embedding 输入到网络,网络输出为 128 维的向量,然后通过向量之间计算余弦相似度来计算向量之间距离,可以看作每一个 query 和 document 之间相似分数,然后在做 softmax。
而在推荐系统中,最为关键的问题是如何做好用户与 item 的匹配问题,因此对于推荐系统中 DSSM 模型的则是为 user 和 item 分别构建独立的子网络塔式结构,利用 user 和 item 的曝光或点击日期进行训练,最终得到 user 侧的 embedding 和 item 侧的 embedding。因此在推荐系统中,常见的模型结构如下所示:

从模型结构上来看,主要包括两个部分:user 侧塔和 item 侧塔,对于每个塔分别是一个 DNN 结构。通过两侧的特征输入,通过 DNN 模块到 user 和 item 的 embedding,然后计算两者之间的相似度 (常用內积或者余弦值,下面会说这两种方式的联系和区别),因此对于 user 和 item 两侧最终得到的 embedding 维度需要保持一致,即最后一层全连接层隐藏单元个数相同。
在召回模型中,将这种检索行为视为多类分类问题,类似于 YouTubeDNN 模型。将物料库中所有的 item 视为一个类别,因此损失函数需要计算每个类的概率值:

其中
以上就是推荐系统中经典的双塔模型,之所以在实际应用中非常常见,是因为在海量的候选数据进行召回的场景下,速度很快,效果说不上极端好,但一般而言效果也够用了。之所以双塔模型在服务时速度很快,是因为模型结构简单 (两侧没有特征交叉),但这也带来了问题,双塔的结构无法考虑两侧特征之间的交互信息,在一定程度上牺牲掉模型的部分精准性。例如在精排模型中,来自 user 侧和 item 侧的特征会在第一层 NLP 层就可以做细粒度的特征交互,而对于双塔模型,user 侧和 item 侧的特征只会在最后的內积计算时发生,这就导致很多有用的信息在经过 DNN 结构时就已经被其他特征所模糊了,因此双塔结构由于其结构问题先天就会存在这样的问题。下面针对这个问题来看看一下现有模型的解决思路。
SENet 双塔模型
SENet 由 Momenta 在 2017 年提出,当时是一种应用于图像处理的新型网络结构。后来张俊林大佬将 SENet 引入了精排模型 FiBiNET 中,其作用是为了将大量长尾的低频特征抛弃,弱化不靠谱低频特征 embedding 的负面影响,强化高频特征的重要作用。那 SENet 结构到底是怎么样的呢,为什么可以起到特征筛选的作用?
从上图可以看出 SENET 主要分为三个步骤 Squeeze, Excitation, Re-weight:
Squeeze 阶段:我们对每个特征的 Embedding 向量进行数据压缩与信息汇总,即在 Embedding 维度计算均值:
$$z_i = F_{sq}(e_i) = \frac{1}{k} \sum_{t=1}^k e_i^{(t)}$$
其中 k 表示 Embedding 的维度,Squeeze 阶段是将每个特征的 Squeeze 转换成单一的数值。
Excitation 阶段:这阶段是根据上一阶段得到的向量进行缩放,即将上阶段的得到的
的向量 先压缩成 长度,然后在放回到 的维度,其中 表示压缩的程度。这个过程的具体操作就是经过两层 DNN。 $$A = F_{ex}(Z) = \sigma_2(W_2\sigma_1(W_1Z)) $$
该过程可以理解为:对于当前所有输入的特征,通过相互发生关联,来动态地判断哪些特征重要,哪些特征不重要,而这体现在 Excitation 阶段的输出结果
,其反应每个特征对应的重要性权重。Re-weight 阶段:是将 Excitation 阶段得到的每个特征对应的权重
再乘回到特征对应的 Embedding 里,就完成了对特征重要性的加权操作。
$$V=F_{ReWeight }(A,E)=[a_1 \cdot e_1,⋯,a_f \cdot e_f]=[v_1,⋯,v_f]$$
以上简单的介绍了一下 SENet 结构,可以发现这种结构可以通过对特征 embedding 先压缩,再交互,再选择,进而实现特征选择的效果。
此外张俊林大佬还将 SENet 应用于双塔模型中 (SENet 双塔模型:在推荐领域召回粗排的应用及其它),模型结构如下所示:

从上图可以发现,具体地是将双塔中的 user 塔和 Item 侧塔的特征输入部分加上一个 SENet 模块,通过 SENet 网络,动态地学习这些特征的重要性,通过小权重抑制噪音或者无效低频特征,通过大权重放大重要特征影响的目的。
之所以 SENet 双塔模型是有效的呢?张俊林老师的解释是:双塔模型的问题在于 User 侧特征和 Item 侧特征交互太晚,在高层交互,会造成细节信息,也就是具体特征信息的损失,影响两侧特征交叉的效果。而 SENet 模块在最底层就进行了特征的过滤,使得很多无效低频特征即使被过滤掉,这样更多有用的信息被保留到了双塔的最高层,使得两侧的交叉效果很好;同时由于 SENet 模块选择出更加重要的信息,使得 User 侧和 Item 侧特征之间的交互表达方面增强了 DNN 双塔的能力。
因此 SENet 双塔模型主要是从特征选择的角度,提高了两侧特征交叉的有效性,减少了噪音对有效信息的干扰,进而提高了双塔模型的效果。此外,除了这样的方式,还可以通过增加通道的方式来增强两侧的信息交互。即对于 user 和 item 两侧不仅仅使用一个 DNN 结构,而是可以通过不同结构 (如 FM,DCN 等) 来建模 user 和 item 的自身特征交叉,例如下图所示:

这样对于 user 和 item 侧会得到多个 embedding,类似于多兴趣的概念。通过得到的多个 user 和 item 的 embedding,然后分别计算余弦值再相加 (两侧的 Embedding 维度需要对齐),进而增加了双塔两侧的信息交互。而这种方法在腾讯进行过尝试,他们提出的 “并联” 双塔就是按照这样的思路,感兴趣的可以了解一下。
多目标的双塔模型
现如今多任务学习在实际的应用场景也十分的常见,主要是因为实际场景中业务复杂,往往有很多的衡量指标,例如点击,评论,收藏,关注,转发等。在多任务学习中,往往会针对不同的任务使用一个独有的 tower,然后优化不同任务损失。那么针对双塔模型应该如何构建多任务学习框架呢?

这种模型结构,可以针对多目标进行联合建模,通过多任务学习的结构,一方面可以利用不同任务之间的信息共享,为一些稀疏特征提供其他任务中的迁移信息,另一方面可以在召回时,直接使用一个模型得到多个目标预测,解决了多个模型维护困难的问题。也就是说,在线上通过这一个模型就可以同时得到多个指标,例如视频场景,一个模型就可以直接得到点赞,品论,转发等目标的预测值,进而通过这些值计算分数获得最终的 Top-K 召回结果。
双塔模型的细节
关于双塔模型,其模型结构相比排序模型来说很简单,没有过于复杂的结构。但除了结构,有一些细节部分容易被忽视,而这些细节部分往往比模型结构更加重要,因此下面主要介绍一下双塔模型中需要主要的一些细节问题。
归一化与温度系数
在 Google 的双塔召回模型中,重点介绍了两个 trick,将 user 和 item 侧输出的 embedding 进行归一化以及对于內积值除以温度系数,实验证明这两种方式可以取得十分好的效果。那为什么这两种方法会使得模型的效果更好呢?
归一化:对 user 侧和 item 侧的输入 embedding,进行 L2 归一化
$$u(x,\theta) \leftarrow = \frac{u(x,\theta)}{||u(x,\theta)||_2}$$
$$v(x,\theta) \leftarrow = \frac{v(x,\theta)}{||v(x,\theta)||_2}$$
温度系数:在归一化之后的向量计算內积之后,除以一个固定的超参
,论文中命名为温度系数。 $$s(u,v) = \frac{<u(x,\theta), v(x,\theta)>}{r}$$
那为什么需要进行上述的两个操作呢?
归一化的操作主要原因是因为向量点积距离是非度量空间,不满足三角不等式,而归一化的操作使得点击行为转化成了欧式距离。
首先向量点积是向量对应位相乘并求和,即向量內积。而向量內积不保序,例如空间上三个点 (A=(10,0),B=(0,10),C=(11,0)),利用向量点积计算的距离 dis (A,B) < dis (A,C),但是在欧式距离下这是错误的。而归一化的操作则会让向量点积转化为欧式距离,例如
表示归一化 user 的 embedding, 表示归一化 item 的 embedding,那么两者之间的欧式距离 如下, 可以看出归一化的向量点积已转化成了欧式距离。那没啥非要转为欧式距离呢?这是因为 ANN 一般是通过计算欧式距离进行检索,这样转化成欧式空间,保证训练和检索一致。
模型的应用
在实际的工业应用场景中,分为离线训练和在线服务两个环节。
- 在离线训练阶段,同过训练数据,训练好模型参数。然后将候选库中所有的 item 集合离线计算得到对应的 embedding,并存储进 ANN 检索系统,比如 faiss。为什么将离线计算 item 集合,主要是因为 item 的会相对稳定,不会频繁的变动,而对于用户而言,如果将用户行为作为 user 侧的输入,那么 user 的 embedding 会随着用户行为的发生而不断变化,因此对于 user 侧的 embedding 需要实时的计算。
- 在线服务阶段,正是因为用户的行为变化需要被即使的反应在用户的 embedding 中,以更快的反应用户当前的兴趣,即可以实时地体现用户即时兴趣的变化。因此在线服务阶段需要实时的通过拼接用户特征,输入到 user 侧的 DNN 当中,进而得到 user embedding,在通过 user embedding 去 faiss 中进行 ANN 检索,召回最相似的 K 个 item embedding。
可以看到双塔模型结构十分的适合实际的应用场景,在快速服务的同时,还可以更快的反应用户即时兴趣的变化。
负样本采样
相比于排序模型而言,召回阶段的模型除了在结构上的不同,在样本选择方面也存在着很大的差异,可以说样本的选择很大程度上会影响召回模型的效果。对于召回模型而言,其负样本并不能和排序模型一样只使用展现未点击样本,因为召回模型在线上面临的数据分布是全部的 item,而不仅仅是展现未点击样本。因此在离线训练时,需要让其保证和线上分布尽可能一致,所以在负样本的选择样要尽可能的增加很多未被曝光的 item。下面简单的介绍一些常见的采样方法:
全局随机采样
全局随机采样指:从全局候选 item 里面随机抽取一定数量 item 做为召回模型的负样本。这样的方式实现简单,也可以让模型尽可能的和线上保持一致的分布,尽可能的多的让模型对于全局 item 有区分的能力。例如 YoutubeDNN 算法。
但这样的方式也会存在一定的问题,由于候选的 item 属于长尾数据,即 “八二定律”,也就是说少数热门物料占据了绝大多数的曝光与点击。因此存随机的方式只能让模型在学到粗粒度上差异,对一些尾部 item 并不友好。
全局随机采样 + 热门打压
针对于全局随机采样的不足,一个直观的方法是针对于 item 的热度 item 进行打压,即对于热门的 item 很多用户可能会点击,需要进行一定程度的欠采样,使得模型更加关注一些非热门的 item。 此外在进行负样本采样时,应该对一些热门 item 进行适当的过采样,这可以尽可能的让模型对于负样本有更加细粒度的区分。例如在 word2vec 中,负采样方法是根据 word 的频率,对 negative words 进行随机抽样,降 低 negative words 量级。
之所以热门 item 做负样本时,要适当过采样,增加负样本难度。因为对于全量的 item,模型可以轻易的区分一些和用户兴趣差异性很大的 item,难点在于很难区分一些和用户兴趣相似的 item。因此在训练模型时,需要适当的增加一些难以区分的负样本来提升模型面对相似 item 的分区能力。
Hard Negative 增强样本
Hard Negative 指的是选取一部分匹配度适中的 item,能够增加模型在训练时的难度,提升模型能学习到 item 之间细粒度上的差异。至于 如何选取在工业界也有很多的解决方案。
例如 Airbnb 根据业务逻辑来采样一些 hard negative (增加与正样本同城的房间作为负样本,增强了正负样本在地域上的相似性;增加与正样本同城的房间作为负样本,增强了正负样本在地域上的相似性,),详细内容可以查看原文
例如百度和 facebook 依靠模型自己来挖掘 Hard Negative,都是用上一版本的召回模型筛选出 "没那么相似" 的 <user,item> 对,作为额外负样本,用于训练下一版本召回模型。 详细可以查看 Mobius 和 EBR
Batch 内随机选择负采样
基于 batch 的负采样方法是将 batch 内选择除了正样本之外的其它 Item,做为负样本,其本质就是利用其他样本的正样本随机采样作为自己的负样本。这样的方法可以作为负样本的选择方式,特别是在如今分布式训练以及增量训练的场景中是一个非常值得一试的方法。但这种方法也存在他的问题,基于 batch 的负采样方法受 batch 的影响很大,当 batch 的分布与整体的分布差异很大时就会出现问题,同时 batch 内负采样也会受到热门 item 的影响,需要考虑打压热门 item 的问题。至于解决的办法,Google 的双塔召回模型中给出了答案,想了解的同学可以去学习一下。
总的来说负样本的采样方法,不光是双塔模型应该重视的工作,而是所有召回模型都应该仔细考虑的方法。
代码实现
下面使用一点资讯提供的数据,实践一下 DSSM 召回模型。该模型的实现主要参考:DeepCtr 和 DeepMatch 模块。
模型训练数据
1、数据预处理 用户侧主要包含一些用户画像属性(用户性别,年龄,所在省市,使用设备及系统);新闻侧主要包括新闻的创建时间,题目,所属 一级、二级类别,题片个数以及关键词。下面主要是对着两部分数据的简单处理:
def proccess(file):
if file=="user_info_data_5w.csv":
data = pd.read_csv(file_path + file, sep="\t",index_col=0)
data["age"] = data["age"].map(lambda x: get_pro_age(x))
data["gender"] = data["gender"].map(lambda x: get_pro_age(x))
data["province"]=data["province"].fillna(method='ffill')
data["city"]=data["city"].fillna(method='ffill')
data["device"] = data["device"].fillna(method='ffill')
data["os"] = data["os"].fillna(method='ffill')
return data
elif file=="doc_info.txt":
data = pd.read_csv(file_path + file, sep="\t")
data.columns = ["article_id", "title", "ctime", "img_num","cate","sub_cate", "key_words"]
select_column = ["article_id", "title_len", "ctime", "img_num","cate","sub_cate", "key_words"]
# 去除时间为nan的新闻以及除脏数据
data= data[(data["ctime"].notna()) & (data["ctime"] != 'Android')]
data['ctime'] = data['ctime'].astype('str')
data['ctime'] = data['ctime'].apply(lambda x: int(x[:10]))
data['ctime'] = pd.to_datetime(data['ctime'], unit='s', errors='coerce')
# 这里存在nan字符串和异常数据
data["sub_cate"] = data["sub_cate"].astype(str)
data["sub_cate"] = data["sub_cate"].apply(lambda x: pro_sub_cate(x))
data["img_num"] = data["img_num"].astype(str)
data["img_num"] = data["img_num"].apply(photoNums)
data["title_len"] = data["title"].apply(lambda x: len(x) if isinstance(x, str) else 0)
data["cate"] = data["cate"].fillna('其他')
return data[select_column]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2、构造训练样本 该部分主要是根据用户的交互日志中前 6 天的数据作为训练集,第 7 天的数据作为测试集,来构造模型的训练测试样本。
def dealsample(file, doc_data, user_data, s_data_str = "2021-06-24 00:00:00", e_data_str="2021-06-30 23:59:59", neg_num=5):
# 先处理时间问题
data = pd.read_csv(file_path + file, sep="\t",index_col=0)
data['expo_time'] = data['expo_time'].astype('str')
data['expo_time'] = data['expo_time'].apply(lambda x: int(x[:10]))
data['expo_time'] = pd.to_datetime(data['expo_time'], unit='s', errors='coerce')
s_date = datetime.datetime.strptime(s_data_str,"%Y-%m-%d %H:%M:%S")
e_date = datetime.datetime.strptime(e_data_str,"%Y-%m-%d %H:%M:%S") + datetime.timedelta(days=-1)
t_date = datetime.datetime.strptime(e_data_str,"%Y-%m-%d %H:%M:%S")
# 选取训练和测试所需的数据
all_data_tmp = data[(data["expo_time"]>=s_date) & (data["expo_time"]<=t_date)]
# 处理训练数据集 防止穿越样本
# 1. merge 新闻信息,得到曝光时间和新闻创建时间; inner join 去除doc_data之外的新闻
all_data_tmp = all_data_tmp.join(doc_data.set_index("article_id"),on="article_id",how='inner')
# 发现还存在 ctime大于expo_time的交互存在 去除这部分错误数据
all_data_tmp = all_data_tmp[(all_data_tmp["ctime"]<=all_data_tmp["expo_time"])]
# 2. 去除与新闻的创建时间在测试数据时间内的交互 ()
train_data = all_data_tmp[(all_data_tmp["expo_time"]>=s_date) & (all_data_tmp["expo_time"]<=e_date)]
train_data = train_data[(train_data["ctime"]<=e_date)]
print("有效的样本数:",train_data["expo_time"].count())
# 负采样
if os.path.exists(file_path + "neg_sample.pkl") and os.path.getsize(file_path + "neg_sample.pkl"):
neg_samples = pd.read_pickle(file_path + "neg_sample.pkl")
# train_neg_samples.insert(loc=2, column="click", value=[0] * train_neg_samples["user_id"].count())
else:
# 进行负采样的时候对于样本进行限制,只对一定时间范围之内的样本进行负采样
doc_data_tmp = doc_data[(doc_data["ctime"]>=datetime.datetime.strptime("2021-06-01 00:00:00","%Y-%m-%d %H:%M:%S"))]
neg_samples = negSample_like_word2vec(train_data, doc_data_tmp[["article_id"]].values, user_data[["user_id"]].values, neg_num=neg_num)
neg_samples = pd.DataFrame(neg_samples, columns= ["user_id","article_id","click"])
neg_samples.to_pickle(file_path + "neg_sample.pkl")
train_pos_samples = train_data[train_data["click"] == 1][["user_id","article_id", "expo_time", "click"]] # 取正样本
neg_samples_df = train_data[train_data["click"] == 0][["user_id","article_id", "click"]]
train_neg_samples = pd.concat([neg_samples_df.sample(n=train_pos_samples["click"].count()) ,neg_samples],axis=0) # 取负样本
print("训练集正样本数:",train_pos_samples["click"].count())
print("训练集负样本数:",train_neg_samples["click"].count())
train_data_df = pd.concat([train_neg_samples,train_pos_samples],axis=0)
train_data_df = train_data_df.sample(frac=1) # shuffle
print("训练集总样本数:",train_data_df["click"].count())
test_data_df = all_data_tmp[(all_data_tmp["expo_time"]>e_date) & (all_data_tmp["expo_time"]<=t_date)][["user_id","article_id", "expo_time", "click"]]
print("测试集总样本数:",test_data_df["click"].count())
print("测试集总样本数:",test_data_df["click"].count())
all_data_df = pd.concat([train_data_df, test_data_df],axis=0)
print("总样本数:",all_data_df["click"].count())
return all_data_df
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
3、负样本采样 该部分主要采用基于 item 的展现次数对全局 item 进行负采样。
def negSample_like_word2vec(train_data, all_items, all_users, neg_num=10):
"""
为所有item计算一个采样概率,根据概率为每个用户采样neg_num个负样本,返回所有负样本对
1. 统计所有item在交互中的出现频次
2. 根据频次进行排序,并计算item采样概率(频次出现越多,采样概率越低,打压热门item)
3. 根据采样概率,利用多线程为每个用户采样 neg_num 个负样本
"""
pos_samples = train_data[train_data["click"] == 1][["user_id","article_id"]]
pos_samples_dic = {}
for idx,u in enumerate(pos_samples["user_id"].unique().tolist()):
pos_list = list(pos_samples[pos_samples["user_id"] == u]["article_id"].unique().tolist())
if len(pos_list) >= 30: # 30是拍的 需要数据统计的支持确定
pos_samples_dic[u] = pos_list[30:]
else:
pos_samples_dic[u] = pos_list
# 统计出现频次
article_counts = train_data["article_id"].value_counts()
df_article_counts = pd.DataFrame(article_counts)
dic_article_counts = dict(zip(df_article_counts.index.values.tolist(),df_article_counts.article_id.tolist()))
for item in all_items:
if item[0] not in dic_article_counts.keys():
dic_article_counts[item[0]] = 0
# 根据频次排序, 并计算每个item的采样概率
tmp = sorted(list(dic_article_counts.items()), key=lambda x:x[1], reverse=True) # 降序
n_articles = len(tmp)
article_prob = {}
for idx, item in enumerate(tmp):
article_prob[item[0]] = cal_pos(idx, n_articles)
# 为每个用户进行负采样
article_id_list = [a[0] for a in article_prob.items()]
article_pro_list = [a[1] for a in article_prob.items()]
pos_sample_users = list(pos_samples_dic.keys())
all_users_list = [u[0] for u in all_users]
print("start negative sampling !!!!!!")
pool = multiprocessing.Pool(core_size)
res = pool.map(SampleOneProb((pos_sample_users,article_id_list,article_pro_list,pos_samples_dic,neg_num)), tqdm(all_users_list))
pool.close()
pool.join()
neg_sample_dic = {}
for idx, u in tqdm(enumerate(all_users_list)):
neg_sample_dic[u] = res[idx]
return [[k,i,0] for k,v in neg_sample_dic.items() for i in v]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
DSSM 模型
1、模型构建
模型构建部分主要是将输入的 user 特征以及 item 特征处理完之后分别送入两侧的 DNN 结构。
def DSSM(user_feature_columns, item_feature_columns, dnn_units=[64, 32],
temp=10, task='binary'):
# 构建所有特征的Input层和Embedding层
feature_encode = FeatureEncoder(user_feature_columns + item_feature_columns)
feature_input_layers_list = list(feature_encode.feature_input_layer_dict.values())
# 特征处理
user_dnn_input, item_dnn_input = process_feature(user_feature_columns,\
item_feature_columns, feature_encode)
# 构建模型的核心层
if len(user_dnn_input) >= 2:
user_dnn_input = Concatenate(axis=1)(user_dnn_input)
else:
user_dnn_input = user_dnn_input[0]
if len(item_dnn_input) >= 2:
item_dnn_input = Concatenate(axis=1)(item_dnn_input)
else:
item_dnn_input = item_dnn_input[0]
user_dnn_input = Flatten()(user_dnn_input)
item_dnn_input = Flatten()(item_dnn_input)
user_dnn_out = DNN(dnn_units)(user_dnn_input)
item_dnn_out = DNN(dnn_units)(item_dnn_input)
# 计算相似度
scores = CosinSimilarity(temp)([user_dnn_out, item_dnn_out]) # (B,1)
# 确定拟合目标
output = PredictLayer()(scores)
# 根据输入输出构建模型
model = Model(feature_input_layers_list, output)
return model
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2、CosinSimilarity 相似度计算
在余弦相似度计算,主要是注意使用归一化以及温度系数的技巧。
def call(self, inputs, **kwargs):
"""inputs 是一个列表"""
query, candidate = inputs
# 计算两个向量的二范数
query_norm = tf.norm(query, axis=self.axis) # (B, 1)
candidate_norm = tf.norm(candidate, axis=self.axis)
# 计算向量点击,即內积操作
scores = tf.reduce_sum(tf.multiply(query, candidate), axis=-1)#(B,1)
# 相似度除以二范数, 防止除零
scores = tf.divide(scores, query_norm * candidate_norm + 1e-8)
# 对score的范围限制到(-1, 1)之间
scores = tf.clip_by_value(scores, -1, 1)
# 乘以温度系数
score = scores * self.temperature
return score
2
3
4
5
6
7
8
9
10
11
12
13
14
15
模型训练
1、稀疏特征编码 该部分主要是针对于用户侧和新闻侧的稀疏特征进行编码,并将训练样本 join 上两侧的特征。
# 数据和测试数据
data, user_data, doc_data = get_all_data()
# 1.Label Encoding for sparse features,and process sequence features with `gen_date_set` and `gen_model_input`
feature_max_idx = {}
feature_encoder = {}
user_sparse_features = ["user_id", "device", "os", "province", "city", "age", "gender"]
for feature in user_sparse_features:
lbe = LabelEncoder()
user_data[feature] = lbe.fit_transform(user_data[feature]) + 1
feature_max_idx[feature] = user_data[feature].max() + 1
feature_encoder[feature] = lbe
doc_sparse_features = ["article_id", "cate", "sub_cate"]
doc_dense_features = ["title_len", "img_num"]
for feature in doc_sparse_features:
lbe = LabelEncoder()
if feature in ["cate","sub_cate"]:
# 这里面会出现一些float的数据,导致无法编码
doc_data[feature] = lbe.fit_transform(doc_data[feature].astype(str)) + 1
else:
doc_data[feature] = lbe.fit_transform(doc_data[feature]) + 1
feature_max_idx[feature] = doc_data[feature].max() + 1
feature_encoder[feature] = lbe
data["article_id"] = feature_encoder["article_id"].transform(data["article_id"].tolist())
data["user_id"] = feature_encoder["user_id"].transform(data["user_id"].tolist())
# join 用户侧和新闻侧的特征
data = data.join(user_data.set_index("user_id"), on="user_id", how="inner")
data = data.join(doc_data.set_index("article_id"), on="article_id", how="inner")
sparse_features = user_sparse_features + doc_sparse_features
dense_features = doc_dense_features
features = sparse_features + dense_features
mms = MinMaxScaler(feature_range=(0, 1))
data[dense_features] = mms.fit_transform(data[dense_features])
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2、配置特征以及模型训练 构建模型所需的输入特征,同时构建 DSSM 模型及训练。
embedding_dim = 8
user_feature_columns = [SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim),
SparseFeat("gender", feature_max_idx['gender'], embedding_dim),
SparseFeat("age", feature_max_idx['age'], embedding_dim),
SparseFeat("device", feature_max_idx['device'], embedding_dim),
SparseFeat("os", feature_max_idx['os'], embedding_dim),
SparseFeat("province", feature_max_idx['province'], embedding_dim),
SparseFeat("city", feature_max_idx['city'], embedding_dim), ]
item_feature_columns = [SparseFeat('article_id', feature_max_idx['article_id'], embedding_dim),
DenseFeat('img_num', 1),
DenseFeat('title_len', 1),
SparseFeat('cate', feature_max_idx['cate'], embedding_dim),
SparseFeat('sub_cate', feature_max_idx['sub_cate'], embedding_dim)]
model = DSSM(user_feature_columns, item_feature_columns,
user_dnn_hidden_units=(32, 16, embedding_dim), item_dnn_hidden_units=(32, 16, embedding_dim)) # FM(user_feature_columns,item_feature_columns)
model.compile(optimizer="adagrad", loss = "binary_crossentropy", metrics=[tf.keras.metrics.Recall(), tf.keras.metrics.Precision()] ) #
history = model.fit(train_model_input, train_label, batch_size=256, epochs=4, verbose=1, validation_split=0.2, )
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
3、生成 embedding 用于召回 利用训练过的模型获取所有 item 的 embeddings,同时获取所有测试集的 user embedding,保存之后用于之后的召回工作。
all_item_model_input = {"article_id": item_profile['article_id'].values,
"img_num": item_profile['img_num'].values,
"title_len": item_profile['title_len'].values,
"cate": item_profile['cate'].values,
"sub_cate": item_profile['sub_cate'].values,}
user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding)
item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12)
item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)
user_idx_2_rawid, doc_idx_2_rawid = {}, {}
for i in range(len(user_embs)):
user_idx_2_rawid[i] = test_user_model_input["user_id"][i]
for i in range(len(item_embs)):
doc_idx_2_rawid[i] = all_item_model_input["article_id"][i]
# 保存一份
pickle.dump((user_embs, user_idx_2_rawid, feature_encoder["user_id"]), open(file_path + 'user_embs.pkl', 'wb'))
pickle.dump((item_embs, doc_idx_2_rawid, feature_encoder["article_id"]), open(file_path + 'item_embs.pkl', 'wb'))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ANN 召回
1、为测试集用户召回 通过 annoy tree 为所有的 item 构建索引,并通过测试集中所有的 user embedding 为每个用户召回一定数量的 item。
def get_DSSM_recall_res(user_embs, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk):
"""近邻检索,这里用annoy tree"""
# 把doc_embs构建成索引树
f = user_embs.shape[1]
t = AnnoyIndex(f, 'angular')
for i, v in enumerate(doc_embs):
t.add_item(i, v)
t.build(10)
# 每个用户向量, 返回最近的TopK个item
user_recall_items_dict = collections.defaultdict(dict)
for i, u in enumerate(user_embs):
recall_doc_scores = t.get_nns_by_vector(u, topk, include_distances=True)
# recall_doc_scores是(([doc_idx], [scores])), 这里需要转成原始doc的id
raw_doc_scores = list(recall_doc_scores)
raw_doc_scores[0] = [doc_idx_2_rawid[i] for i in raw_doc_scores[0]]
# 转换成实际用户id
user_recall_items_dict[user_idx_2_rawid[i]] = dict(zip(*raw_doc_scores))
# 默认是分数从小到大排的序, 这里要从大到小
user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()}
pickle.dump(user_recall_items_dict, open(file_path + 'DSSM_u2i_dict.pkl', 'wb'))
return user_recall_items_dict
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2、测试召回结果 为测试集用户的召回结果进行测试。
user_recall_items_dict = get_DSSM_recall_res(user_embs, item_embs, user_idx_2_rawid, doc_idx_2_rawid, topk=TOP_NUM)
test_true_items = {line[0]:line[1] for line in test_set}
s = []
precision = []
for i, uid in tqdm(enumerate(list(user_recall_items_dict.keys()))):
# try:
pred = [x for x, _ in user_recall_items_dict[uid]]
filter_item = None
recall_score = recall_N(test_true_items[uid], pred, N=TOP_NUM)
s.append(recall_score)
precision_score = precision_N(test_true_items[uid], pred, N=TOP_NUM)
precision.append(precision_score)
print("recall", np.mean(s))
print("precision", np.mean(precision))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16