根据用户播放次数数据使用协同过滤算法完成音乐推荐。

数据集

Audioscrobbler数据集

下载 Audioscrobbler 数据集

user_artist_data.txt

它包含141000个用户和160万个艺术家,记录了约2420万条用户播放艺术家歌曲的信息,其中包括播放次数信息。播放次数较多意味着该用户更喜欢对应艺术家的作品。

useridartistidplaycount
用户ID艺术家ID播放次数
1000002155

artist_data.txt

该文件包含两列: artistid artist_name 艺术家ID 艺术家名字。文件中给出了每个艺术家的 ID 和对应的名字。此文件用于ID与名字的转换。

artistidartist_name
艺术家ID艺术家名
113499906Crazy Life

artist_alias.txt

该文件包含两列: badid, goodid 坏ID 好ID 。该文件包含已知错误拼写的艺术家ID及其对应艺术家的正规的,用于将拼写错误的艺术家 ID 或ID 变体对应到该艺术家正确的ID。

badidgoodid
坏ID好ID
10927641000311

算法

交替最小二乘推荐算法 (Alternating Least Squares,ALS)

人们虽然经常听音乐,但很少给音乐评分。因此 Audioscrobbler 数据集覆盖了更多的用户和艺术家,也包含了更多的总体信息,虽然单条记录的信息比较少。这种类型的数据通常被称为隐式反馈数据,因为用户和艺术家的关系是通过其他行动隐含体现出来的,而不是通过显式的评分或点赞得到的。

根据两个用户的相似行为判断他们有相同的偏好,学习算法不需要用户和艺术家的属性信息。这类算法通常称为协同过滤算法

潜在因素模型:试图通过数据相对少的未被观察到的底层原因,来解释大量用户和产品之间可观察到的交互。因子分析方法背后的理论是,有关观测变量之间的相互依赖性的信息可以稍后用于减少数据集中的变量集。

矩阵分解模型:数学上,算法把用户和产品数据当成一个大矩阵 R,矩阵第 i 行和第 j 列上的元素有值,代表用户 i 播放过艺术家 j 的音乐。矩阵 R 是稀疏的:R中大多数元素都是 0,因为相对于所有可能的用户 - 艺术家组合,只有很少一部分组合会出现在数据中。算法将 R 分解为两个小矩阵 U 和P的乘积。矩阵 U 和矩阵P非常“瘦”。因为 A 有很多行和列,但 U 和 P 的行很多而列很少(列数用 k 表示)。这 k 个列就是潜在因素,用于解释数据中的交互关系。由于 k 的值小,矩阵分解算法只能是某种近似。

为了使低秩矩阵P和U尽可能的逼近R,可以通过最小化如下损失函数L来完成。

λ

损失函数公式与上图对应:表示用户i的偏好隐含向量,表示艺术家j包含的隐含特征向量,表示用户i对艺术家j的评分,是用户i对艺术家j评分的近似。其中λ是正则化项的系数,损失函数一般需要加入正则化项来避免过拟合等问题。

于是就简化为一个最小化损失函数L的优化问题。用户-特征矩阵和特征-艺术家矩阵的乘积的结果是对整个稠密的用户-艺术家相互关系矩阵的完整估计。该乘积可以理解成艺术家与其属性之间的一个映射,然后按用户属性进行加权。

通常没有确切的解,因为U和P通常不够大,不足以完全表示R,应该尽可能逼近R。然而不幸的是,想直接同时得到 U 和 P 的最优解是不可能的。

如果 P 已知,求U的最优解是非常容易的,反之亦然。但 P 和 U事先都是未知的。

虽然 P 是未知的,但可以把P初始化为随机行向量矩阵。接着运用简单的线性代数,就能在给定 R 和 P 的条件下求出 U 的最优解。实际上,U 的第 i 行是 R 的第 i 行和 P 的函数。

因此可以很容易分开计算 U 的每一行。因为 U 的每一行可以分开计算,所以我们可以将其并行化,而并行化是大规模计算的一大优点。

ALS是求解的著名算法,固定P或U对其对应的隐含向量求偏导数并令导数为0,得到求解公式:
λ

λ

随机对P、Q初始化,随后交替进行优化直到收敛。收敛标准是均方误差小于预定义阈值,或者到达最大迭代次数。

推荐质量评价指标AUC

AUC指标是一个[0,1]之间的实数,代表如果随机挑选一个正样本和一个负样本,分类算法将这个正样本排在负样本前面的概率。值越大,表示分类算法更有可能将正样本排在前面,也即算法准确性越好。

随机抽出一对样本(一个正样本,一个负样本),然后用训练得到的分类器来对这两个样本进行预测,预测得到正样本的概率大于负样本概率的概率。

在有M个正样本,N个负样本的数据集里。一共有M×N对样本(一对样本,一个正样本与一个负样本)。统计这M×N对样本里,正样本的预测概率大于负样本的预测概率的个数。

其中,

实验过程

数据预处理

artist_data.txt文件

数据最终处理成以逗号分割

artist_data.txt文件

两列之间的间隔有的是空格有的是Tab,第二列数据中包含空格

因第二列数据中含有逗号和空格,数据最终处理成以Tab分割

去除第一列不是数字的行

artist_alias.txt 文件

将拼写错误的艺术家 ID 或 ID 变体对应到该艺术家的规范 ID

两列之间的间隔有的是空格有的是Tab

包含数据缺失的列

在数据处理时对拼写错误ID进行映射,用别名数据集将所有的艺术家 ID 转换成正规 ID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
aa={}
with open("/export/work/F/1/data/artist_alias.txt") as f:
line = f.readline()
while line:
if len(line.split())==2:
aa[line.split()[0]]=line.split()[1]
line = f.readline()

f3=open("/export/work/F/1/data/user_artist_data.txt.data","w+")
with open("/export/work/F/1/data/user_artist_data.txt") as f2:
line = f2.readline()
while line:
it=line.split()
if it[1] in aa:
it[1]=aa[it[1]]
print(it[0]+","+it[1]+","+it[2],file=f3)
line = f2.readline()
f3.close()


f5=open("/export/work/F/1/data/artist_data.txt.data","w+")
with open("/export/work/F/1/data/artist_data.txt") as f4:
line = f4.readline()
while line:
it=line.split()
s=""
for i in range(len(it)):
s+=it[i]
if i==0:
s+=" "
elif i==len(it)-1:
s+=""
else:
s+=" "
print(s,file=f5)
line = f4.readline()

f7=open("/export/work/F/1/data/artist_data.txt.data2","w+")
with open("/export/work/F/1/data/artist_data.txt.data") as f6:
line = f6.readline()
while line:
it=line.split(" ")
try:
a=int(it[0])
print(str(a)+" "+it[1],file=f7,end="")
except:
pass
line = f6.readline()

预处理后得到的数据集

artist_data

user_artist_data

获取数据文件,并上传至HDFS

读入数据,转换成DataFrame备用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType,IntegerType
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
# 转换成DataFrame
name1=["user", "item", "rating"]
name2=["id","name"]

conf = SparkConf().setAppName("applicaiton").set("spark.executor.heartbeatInterval","500000").set("spark.network.timeout","500000")
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)
uaRDD = sc.textFile("/1/user_artist_data.txt.data")
fields = list(map( lambda fieldName : StructField(fieldName, IntegerType(), nullable = True), name1))
schema = StructType(fields)
rowRDD = uaRDD.map(lambda line : line.split(",")).map(lambda attr : Row(int(attr[0]),int(attr[1]),int(attr[2])))
uaDF = spark.createDataFrame(rowRDD, schema)


aRDD = sc.textFile("/1/artist_data.txt.data2")
fields = list(map( lambda fieldName : StructField(fieldName, IntegerType(), nullable = True) if fieldName=="id" else StructField(fieldName, StringType(), nullable = True) , name2))
schema = StructType(fields)
rowRDD =aRDD.map(lambda line : line.split(" ")).map(lambda attr : Row(int(attr[0]),attr[1]))
aDF = spark.createDataFrame(rowRDD, schema)

展示数据格式基本统计信息

数据格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
uaDF.show()

+-------+-------+------+
| user| item|rating|
+-------+-------+------+
|1000002| 1| 55|
|1000002|1000006| 33|
|1000002|1000007| 8|
|1000002|1000009| 144|
|1000002|1000010| 314|
|1000002|1000013| 8|
|1000002|1000014| 42|
|1000002|1000017| 69|
|1000002|1000024| 329|
|1000002|1000025| 1|
|1000002|1000028| 17|
|1000002|1000031| 47|
|1000002|1000033| 15|
|1000002|1000042| 1|
|1000002|1000045| 1|
|1000002|1000054| 2|
|1000002|1000055| 25|
|1000002|1000056| 4|
|1000002|1000059| 2|
|1000002|1000062| 71|
+-------+-------+------+
only showing top 20 rows

基本统计信息

用户数

1
2
3
4
a=uaDF.select(uaDF.user).distinct().count()
print(a)

148111

艺术家数目

1
2
3
4
b=uaDF.select(uaDF.item).distinct().count()
print(b)

1568126

每用户平均播放次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
uaDF.drop("item").groupBy("user").agg({"rating":"mean"}).show()

+-------+------------------+
| user| avg(rating)|
+-------+------------------+
|1000190|55.355432780847146|
|1001043|6.0131578947368425|
|1001129| 12.32748538011696|
|1001139| 8.652557319223986|
|1002431|12.833333333333334|
|1002605|3.5392670157068062|
|1004666| 9.79409594095941|
|1005158|1.9245283018867925|
|1005439|28.333333333333332|
|1005697|11.733333333333333|
|1005853| 2.5|
|1007007| 2.443396226415094|
|1007847|14.333333333333334|
|1008081|31.232876712328768|
|1008233| 90.0|
|1008804| 9.0|
|1009408| 4.666666666666667|
|1012261|3.2887640449438202|
|1015587| 9.46|
|1016416| 8.241935483870968|
+-------+------------------+
only showing top 20 rows

每艺术家平均播放次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
uaDF.drop("user").groupBy("item").agg({"rating":"mean"}).show()

+-------+------------------+
| item| avg(rating)|
+-------+------------------+
|1001129|10.578309692671395|
|1003373|2.3333333333333335|
|1007972|18.156831042845596|
|1029443| 20.54196642685851|
|1076507| 2.969264544456641|
|1318111|5.6902654867256635|
| 833| 9.483282674772036|
|1239413| 3.821794871794872|
|1000636| 2.0|
|1002431|1.7142857142857142|
|1005697| 3.5|
|1040360| 1.0|
|1043263|1.9166666666666667|
|1245208|19.613390928725703|
| 463| 34.3479262672811|
|1043126|14.580645161290322|
|1001601| 3.573529411764706|
|1091589| 2.5|
|1004021| 6.96403785488959|
|1012885| 4.744927536231884|
+-------+------------------+
only showing top 20 rows

构建ALS模型

构建ALS模型,并记录所耗时间。初始参数:Rank 10, maxiter 15, RegParm 0.01 Alpha 1.0。

1
2
3
4
5
6
7
8
9
from  pyspark.ml.recommendation import ALS,ALSModel
import random
import time

start = time.time()
als = ALS(rank=10,maxIter=15,regParam=0.01,alpha=1.0,seed=int(random.random()*100))
model=als.fit(uaDF)
end = time.time()
print ("时间:"+str(end-start))

输出结果:

1
时间:785.1817960739136

这样我们就构建了一个ALSModel 模型。

模型用两个不同的DataFrame,它们分别表示“用户 - 特征”和“产品 - 特征”这两个大型矩阵。

检查推荐结果

依据构建的模型,选择部分ID检查推荐结果。

看看模型给出的艺术家推荐直观上是否合理,检查一下用户播放过的艺术家,然后看看模型向用户推荐的艺术家。具体来看看用户 2093760 的例子。

1
2
userID = 2093760
a=uaDF.rdd.filter(lambda x:x[0]==userID).collect()

查看用户输出结果:

1
2
3
4
5
6
7
[
Row(user=2093760, item=1180, rating=1),
Row(user=2093760, item=1255340, rating=3),
Row(user=2093760, item=378, rating=1),
Row(user=2093760, item=813, rating=2),
Row(user=2093760, item=942, rating=7)
]

获取艺术家ID:

1
2
3
artistid=[]
for i in a:
artistid.append(i.item)

输出结果:

1
[1180, 1255340, 378, 813, 942] 

要提取该用户收听过的艺术家 ID 并打印他们的名字,这意味着先在输入数据中搜索该用户收听过的艺术家的 ID,然后用这些 ID 对艺术家集合进行过滤,这样我们就可以获取并按序打印这些艺术家的名字:

1
b=aDF.rdd.filter(**lambda** x: x[0] **in** artistid).collect() 

输出结果:

1
2
3
4
5
6
7
8
9
[Row(id=1180, name='David Gray'),  

Row(id=378, name='Blackalicious'),

Row(id=813, name='Jurassic 5'),

Row(id=1255340, name='The Saw Doctors'),

Row(id=942, name='Xzibit')]

用户播放过的艺术家既有大众流行音乐风格的也有嘻哈风格的。

使用Spark2.4.6自带的recommendForUserSubset方法,对所有艺术家评分,并返回向用户2093760推荐其中分值最高的前5位。

1
2
3
4
5
d=sc.parallelize([(2093760,1)]).toDF(['user']) 

t=model.recommendForUserSubset(d,5)

t.show()

输出结果:

1
2
3
4
5
+-------+--------------------+                                                  
| user| recommendations|
+-------+--------------------+
|2093760|[[6674945, 4997.0...|
+-------+--------------------+

遍历打印一下:

1
t.select("recommendations").rdd.foreach(**lambda** x:**print**(x)) 

输出:

1
2
3
4
5
6
7
Row(recommendations=[
Row(item=6674945, rating=4997.056640625),
Row(item=1170225, rating=1805.596435546875),
Row(item=1153293, rating=1753.0908203125),
Row(item=6730413, rating=1233.61767578125),
Row(item=183, rating=1169.90234375)
])

结果全部是嘻哈风格。能看出,这些推荐都不怎么样。虽然推荐的艺术家都受人欢迎,但好像并没有针对用户的收听习惯进行个性化。

训练-验证切分

训练-验证切分,采用初始参数,重新训练模型。

为了利用输入数据,需要把它分成训练集和验证集。训练集只用于训练 ALS 模型,验证集用于评估模型。这里将 90% 的数据用于训练,剩余的 10% 用于交叉验证:

1
2
3
4
5
train,test=uaDF.randomSplit([0.9,0.1])
als = ALS(rank=10,maxIter=15,regParam=0.01,alpha=1.0,seed=int(random.random()*100),implicitPrefs=True)
model=als.fit(train)
train.cache()
test.cache()

计算AUC

接受一个交叉验证集和一个预测函数,交叉验证集代表每个用户对应的“正面的”或“好的”艺术家。预测函数把每个包含“用户 - 艺术家”对的 DataFrame 转换为一个同时包含“用户 - 艺术家”和“预测”的 DataFrame,“预测”表示“用户”与“艺术家”之间关联的强度值,这个值越高,代表推荐的排名越高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
allArtistIDs = uaDF.select("item").distinct().collect()

import numpy
allArtistID = []
for i in range(len(allArtistIDs)):
allArtistID.append(allArtistIDs[i]["item"])
def f(a,b):
posItemIDSet = set(list(b))
negative = []
i = 0
while (i < len(allArtistID)) and (len(negative) < len(posItemIDSet)):
artistID = allArtistID[numpy.random.randint(1, high=len(allArtistID), size=None, dtype='l')]
if artistID not in posItemIDSet:
negative.append(artistID)
i += 1
s=list()
for i in negative:
s.append((a,i))
return s

# 计算AUC
import pyspark.sql.functions as func
def areaUnderCurve(positiveData,allArtistIDs,predictFunction):
positivePredictions = predictFunction(positiveData.select("user", "item")).withColumnRenamed("prediction", "positivePrediction")
negativeDatatmp = positiveData.select("user", "item").rdd.groupByKey().map(lambda x: f(x[0],x[1])).collect()
negativeDatalist=[]
for i in negativeDatatmp:
for j in i:
negativeDatalist.append(j)
negativeData=spark.createDataFrame(negativeDatalist,['user','item'])
negativePredictions = predictFunction(negativeData.select("user", "item")).withColumnRenamed("prediction", "negativePrediction")
joinedPredictions = positivePredictions.join(negativePredictions, "user").select("user", "positivePrediction", "negativePrediction")
allCounts = joinedPredictions.groupBy("user").agg(func.count(func.lit(1)).alias("total")).select("user", "total")
correctCounts = joinedPredictions.filter(joinedPredictions["positivePrediction"] > joinedPredictions["negativePrediction"]).groupBy("user").agg(func.count("user").alias("correct")).select("user", "correct")
meanAUCtemp = allCounts.join(correctCounts, "user", "left_outer")
meanAUC = meanAUCtemp.select("user", (meanAUCtemp["correct"] / meanAUCtemp["total"]).alias("auc")).agg(func.mean("auc")).first()
try:
joinedPredictions.unpersist()
except:
pass
return meanAUC

mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print(mostListenedAUC)

输出结果:

1
Row(avg(auc)=0.9098560946043145) 

有必要把上述方法和一个更简单方法做一个基准比对。举个例子,考虑下面的推荐方法:向每个用户推荐播放最多的艺术家。这个策略一点儿都不个性化,但它很简单,也可能有效。定义这个简单预测函数并评估它的 AUC 得分:

1
2
3
4
5
6
7
def predictMostListened(data):
listenCounts = train.groupBy("item").agg({"rating":"sum"}).withColumnRenamed("sum(rating)", "prediction").select("item", "prediction")
uaDF.join(listenCounts, ["item"], "left_outer").select("user", "item", "prediction")
listenCounts = uaDF.groupBy("item").agg({"rating":"sum"}).withColumnRenamed("sum(rating)", "prediction").select("item", "prediction")
return data.join(listenCounts, ["item"], "left_outer").select("user", "item", "prediction")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, predictMostListened)
print(mostListenedAUC)

输出结果:

1
Row(avg(auc)=0.9578054887285846) 

结果得分大约是 0.96。这意味着,对 AUC 这个指标,非个性化的推荐表现已经不错了。然而,我们想要的是得分更高,也就是更为“个性化”的推荐。显然这个模型还有待改进。调整超参数,使推荐结果更合理。

选择超参数

Rank可选(5,30)RegParam可选(4.0,0.0001),alpha可选(1.0,40.0)。合计8种参数组合。

可以把 rank、regParam 和 alpha 看作模型的超参数。(maxIter 更像是对分解过程使用的资源的一种约束。)这些值不会体现在 ALSModel 的内部矩阵中,这些矩阵只是参数,其值由算法选定。超参数则是构建过程本身的参数。

1
2
3
4
5
6
7
8
9
def TrainALS(rank,regParam,alpha,dir):
als = ALS(rank=rank,maxIter=15,regParam=regParam,alpha=alpha,seed=int(random.random()*100),implicitPrefs=True)
model=als.fit(train)
model.save("/model/ALS/Try2/"+str(dir))
try:
model.userFactors.unpersist()
model.itemFactors.unpersist()
except:
pass

构建模型

1
2
3
4
5
6
dir=0
for rank in [5,30]:
for regParam in [4.0,0.0001]:
for alpha in [1.0,40.0]:
dir=dir+1
TrainALS(rank,regParam,alpha,dir)

加载模型计算AUC 得分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
try:
model=ALSModel.load("/model/ALS/Try2/1")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print((mostListenedAUC,(5,4.0,1.0)))
except:
print(("ERROR",(5,4.0,1.0)))
try:
model=ALSModel.load("/model/ALS/Try2/2")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print((mostListenedAUC,(5,4.0,40.0)))
except:
print(("ERROR",(5,4.0,40.0)))
try:
model=ALSModel.load("/model/ALS/Try2/3")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print((mostListenedAUC,(5,0.0001,1.0)))
except:
print(("ERROR",(5,0.0001,1.0)))
try:
model=ALSModel.load("/model/ALS/Try2/4")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print((mostListenedAUC,(5,0.0001,40.0)))
except:
print(("ERROR",(5,0.0001,40.0)))
try:
model=ALSModel.load("/model/ALS/Try2/5")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print((mostListenedAUC,(30,4.0,1.0)))
except:
print(("ERROR",(30,4.0,1.0)))
try:
model=ALSModel.load("/model/ALS/Try2/6")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print((mostListenedAUC,(30,4.0,40.0)))
except:
print(("ERROR",(30,4.0,40.0)))
try:
model=ALSModel.load("/model/ALS/Try2/7")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print((mostListenedAUC,(30,0.0001,1.0)))
except:
print(("ERROR",(30,0.0001,1.0)))
try:
model=ALSModel.load("/model/ALS/Try2/8")
mostListenedAUC = areaUnderCurve(test, allArtistIDs, model.transform)
print((mostListenedAUC,(30,0.0001,40.0)))
except:
print(("ERROR",(30,0.0001,40.0)))

输出结果:

1
2
3
4
5
6
7
8
(Row(avg(auc)=0.9122637924972641), (5, 4.0, 1.0))
(Row(avg(auc)=0.9154223563144587), (5, 4.0, 40.0))
(Row(avg(auc)=0.9057761909633262), (5, 0.0001, 1.0))
(Row(avg(auc)=0.9146676967584815), (5, 0.0001, 40.0))
(Row(avg(auc)=0.9230010545570864), (30, 4.0, 1.0))
(Row(avg(auc)=0.9275148741094371), (30, 4.0, 40.0))
(Row(avg(auc)=0.9125799456221803), (30, 0.0001, 1.0))
(Row(avg(auc)=0.9265221600644649), (30, 0.0001, 40.0))

可以看出rank=30,regParam=4.0,alpha=40.0时取得了最优的结果avg(auc)=0.9275148741094371.

虽然这些值的绝对差很小,但对于 AUC 值来说,仍然具有一定的意义。有意思的是,参数 alpha 取 40 的时候看起来总是比取 1 表现好。这说明了模型在强调用户听过什么时的表现要比强调用户没听过什么时要好。

产生推荐

选取10个用户展示推荐结果

1
2
3
4
model=ALSModel.load("/model/ALS/Try2/6")
d=sc.parallelize([(2093760,1),(1000002,1),(1006277,1),(1006282,1),(1006283,1),(1006285,1),(1041207,1),(1071489,1),(2025005,1),(2025007,1),]).toDF(['user'])
t=model.recommendForUserSubset(d,5)
t.select("recommendations").rdd.foreach(lambda x:print(x))

输出推荐结果:

1
2
3
4
5
6
7
8
9
10
Row(recommendations=[Row(item=1010991, rating=1.1815389394760132), Row(item=1245226, rating=1.139704942703247), Row(item=4629, rating=1.1092422008514404), Row(item=1113701, rating=1.1066040992736816), Row(item=1019715, rating=1.10056471824646)])
Row(recommendations=[Row(item=1010921, rating=1.225757122039795), Row(item=1166169, rating=1.1972994804382324), Row(item=1183949, rating=1.184556007385254), Row(item=1082446, rating=1.178223729133606), Row(item=3892, rating=1.1580229997634888)])
Row(recommendations=[Row(item=1028958, rating=1.146733283996582), Row(item=1086774, rating=1.1434733867645264), Row(item=1037761, rating=1.1272671222686768), Row(item=1184419, rating=1.1093372106552124), Row(item=1148170, rating=1.1065270900726318)])
Row(recommendations=[Row(item=1010295, rating=1.2554553747177124), Row(item=3722, rating=1.2187168598175049), Row(item=1024674, rating=1.2044183015823364), Row(item=1009445, rating=1.099776268005371), Row(item=1018746, rating=1.0894378423690796)])
Row(recommendations=[Row(item=1002068, rating=0.9575374126434326), Row(item=1005288, rating=0.9533681273460388), Row(item=2430, rating=0.9476644992828369), Row(item=1002270, rating=0.9428261518478394), Row(item=3909, rating=0.9334102869033813)])
Row(recommendations=[Row(item=1034635, rating=0.606441080570221), Row(item=1000107, rating=0.6010327935218811), Row(item=1000024, rating=0.59807950258255), Row(item=4154, rating=0.5966558456420898), Row(item=1000157, rating=0.5944067239761353)])
Row(recommendations=[Row(item=1034635, rating=0.290438175201416), Row(item=930, rating=0.2857869565486908), Row(item=4267, rating=0.2853849530220032), Row(item=1205, rating=0.2830250561237335), Row(item=1000113, rating=0.2829856276512146)])
Row(recommendations=[Row(item=1002909, rating=1.2324668169021606), Row(item=1653, rating=1.1938585042953491), Row(item=988, rating=1.1880080699920654), Row(item=1003367, rating=1.1807997226715088), Row(item=1009545, rating=1.1761128902435303)])
Row(recommendations=[Row(item=1017017, rating=0.8724791407585144), Row(item=1032349, rating=0.8424116373062134), Row(item=1028433, rating=0.8259942531585693), Row(item=1240603, rating=0.8185747265815735), Row(item=1029602, rating=0.8147093653678894)])
Row(recommendations=[Row(item=1013187, rating=1.2516499757766724), Row(item=1098360, rating=1.2394661903381348), Row(item=1289948, rating=1.2353206872940063), Row(item=1129243, rating=1.2332189083099365), Row(item=1245184, rating=1.1997216939926147)])

按照用户顺序将最喜欢的推荐结果输出到文件

1
2
3
4
5
6
def getp(x):
f=open("/export/work/result","a+")
print(x,file=f)
u=uaDF.select("user").distinct()
t=model.recommendForUserSubset(u,1)
t.rdd.foreach(lambda x:getp(x))

输出详见result文件,以下为部分输出:

1
2
3
4
5
Row(user=1000092, recommendations=[Row(item=1002400, rating=1.2386927604675293)]) 
Row(user=1000144, recommendations=[Row(item=5221, rating=1.1728830337524414)])
Row(user=3175, recommendations=[Row(item=1022207, rating=0.282743901014328)])
Row(user=1000164, recommendations=[Row(item=1034635, rating=0.36578264832496643)])
Row(user=7340, recommendations=[Row(item=1007903, rating=0.8722475171089172)])

评论