Spark实战电影点评系统(一)

发布时间:2025-08-06 06:22

观看一些具有创新性的实验电影,挑战传统观影体验。 #生活乐趣# #电影推荐#

一、通过RDD实战电影点评系统

  日常的数据来源有很多渠道,如网络爬虫、网页埋点、系统日志等。下面的案例中使用的是用户观看电影和点评电影的行为数据,数据来源于网络上的公开数据,共有3个数据文件:uers.dat、ratings.dat和movies.dat。

  其中,uers.dat的格式如下: UserID::Gender::Age::Occupation::Zip-code ,这个文件里共有6040个用户的信息,每行中用“::”隔开的详细信息包括ID、性别(F、M分别表示女性、男性)、年龄(使用7个年龄段标记)、职业和邮编。

    

    ratings.dat的格式如下: UserID::MovieID::Rating::Timestamp ,这个文件共有一百万多条记录,记录的是评分信息,即用户ID、电影ID、评分(满分是5分)和时间戳。

    

  movies.dat的格式如下: MovieID::Title::Genres ,这个文件记录的是电影信息,即电影ID、电影名称和电影类型。

  

  首先初始化Spark,以及读取文件。创建一个Scala的object类,在main方法中配置SparkConf和SparkContext,这里指定程序在本地运行,并且把程序名字设置为“RDD_Movie_Users_Analyzer”。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

val conf = new SparkConf().setMaster("local[*]").setAppName("RDD_Movie_User_Analyzer")

val spark = SparkSession.builder.config(conf).getOrCreate()

val sc = spark.sparkContext

sc.setLogLevel("WARN")

val usersRDD = sc.textFile("./src/test1/users.dat")

val moviesRDD = sc.textFile("./src/test1/movies.dat")

val ratingsRDD = sc.textFile("./src/test1/ratings.dat") 

  首先我们来写一个案例计算,并打印出所有电影中评分最高的前10个电影名和平均评分。

  第一步:从ratingsRDD中取出MovieID和rating,从moviesRDD中取出MovieID和Name,如果后面的代码重复使用这些数据,则可以把它们缓存起来。首先把使用map算子上面的RDD中的每一个元素(即文件中的每一行)以“::”为分隔符进行拆分,然后再使用map算子从拆分后得到的数组中取出需要用到的元素,并把得到的RDD缓存起来

  第二步:从ratings的数据中使用map算子获取到形如(movieID,(rating,1))格式的RDD,然后使用reduceByKey把每个电影的总评分以及点评人数算出来。此时得到的RDD格式为(movieID,Sum(ratings),Count(ratings)))。

  第三步:把每个电影的Sum(ratings)和Count(ratings)相除,得到包含了电影ID和平均评分的RDD:

  第四步:把avgRatings与movieInfo通过关键字(key)连接到一起,得到形如(movieID, (MovieName,AvgRating))的RDD,然后格式化为(AvgRating,MovieName),并按照key(也就是平均评分)降序排列,最终取出前10个并打印出来。

1

2

3

4

5

6

7

8

println("所有电影中平均得分最高(口碑最好)的电影:")

val movieInfo = moviesRDD.map(_.split("::")).map(x=>(x(0),x(1))).cache()

val ratings = ratingsRDD.map(_.split("::")).map(x=>(x(0),x(1),x(2))).cache()

val moviesAndRatings = ratings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))

val avgRatings = moviesAndRatings.map(x=>(x._1,x._2._1.toDouble/x._2._2))

avgRatings.join(movieInfo).map(item=>(item._2._1,item._2._2))

          .sortByKey(false).take(10)

          .foreach(record=>println(record._2+"评分为:"+record._1))

   

  接下来我们来看另外一个功能的实现:分析最受男性喜爱的电影Top10和最受女性喜爱的电影Top10。

  首先来分析一下:单从ratings中无法计算出最受男性或者女性喜爱的电影Top10,因为该RDD中没有Gender信息,如果需要使用Gender信息进行Gender的分类,此时一定需要聚合。当然,我们力求聚合使用的是mapjoin(分布式计算的一大痛点是数据倾斜,map端的join一定不会数据倾斜),这里是否可使用mapjoin?不可以,因为map端的join是使用broadcast把相对小得多的变量广播出去,这样可以减少一次shuffle,这里,用户的数据非常多,所以要使用正常的join。 

  使用join连接ratings和users之后,对分别过滤出男性和女性的记录进行处理:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

println("========================================")

println("所有电影中最受男性喜爱的电影Top10:")

val usersGender = usersRDD.map(_.split("::")).map(x=>(x(0),x(1)))

val genderRatings = ratings.map(x=>(x._1,(x._1,x._2,x._3))).join(usersGender).cache()

val maleFilteredRatings = genderRatings.filter(x=>x._2._2.equals("M")).map(x=>x._2._1)

val femaleFilteredRatings = genderRatings.filter(x=>x._2._2.equals("F")).map(x=>x._2._1)

maleFilteredRatings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))

                  .map(x=>(x._1,x._2._1.toDouble/x._2._2))

                  .join(movieInfo)

                  .map(item=>(item._2._1,item._2._2))

                  .sortByKey(false)

                  .take(10)

                  .foreach(record=>println(record._2+"评分为:"+record._1))

println("========================================")

println("所有电影中最受女性喜爱的电影Top10:")

femaleFilteredRatings.map(x=>(x._2,(x._3.toDouble,1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))

                  .map(x=>(x._1,x._2._1.toDouble/x._2._2))

                  .join(movieInfo)

                  .map(item=>(item._2._1,item._2._2))

                  .sortByKey(false)

                  .take(10)

                  .foreach(record=>println(record._2+"评分为:"+record._1))

   

  在现实业务场景中,二次排序非常重要,并且经常遇到。下面来模拟一下这些场景,实现对电影评分数据进行二次排序,以Timestamp和Rating两个维度降序排列,值得一提的是,Java版本的二次排序代码非常烦琐,而使用Scala实现就会很简捷,首先我们需要一个继承自Ordered和Serializable的类。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

class SecondarySortKey(val first:Double,val second:Double) extends Ordered[SecondarySortKey] with Serializable{

  override def compare(other:SecondarySortKey):Int={

    if(this.first-other.first!=0){

      (this.first-other.first).toInt

    }else {

      if(this.second-other.second>0){

        Math.ceil(this.second-other.second).toInt

      }else if (this.second-other.second<0) {

        Math.floor(this.second-other.second).toInt

      }else {

        (this.second-other.second).toInt

      }

    }

  }

}

  然后再把RDD的每条记录里想要排序的字段封装到上面定义的类中作为key,把该条记录整体作为value。  

1

2

3

4

5

6

7

8

9

10

println("========================================")

println("对电影评分数据以Timestamp和Rating两个维度进行二次降序排列:")

val pairWithSortkey = ratingsRDD.map(line=>{

  val spilted = line.split("::")

  (new SecondarySortKey(spilted(3).toDouble,spilted(2).toDouble),line)

})

val sorted = pairWithSortkey.sortByKey(false)

val sortedResult = sorted.map(sortedline => sortedline._2)

sortedResult.take(10).foreach(println)

  取出排序后的RDD的value,此时这些记录已经是按照时间戳和评分排好序的,最终打印出的结果如图所示,从图中可以看到已经按照timestamp和评分降序排列了。

   


如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!欢迎各位转载,但是未经作者本人同意,转载文章之后必须在文章页面明显位置给出作者和原文连接,否则保留追究法律责任的权利。

网址:Spark实战电影点评系统(一) https://klqsh.com/news/view/108197

相关内容

电影评论
电影影评观后感
好电影影评
《当幸福来敲门》影评:生活的真实与挑战
传统音乐正成为电影叙事的点睛之笔
电影评价的四大维度:专业影迷必看
One Spark这是属于我们的流金岁月
「电影点评」这些热门电影,我给满分!
《如何写影评》:电影向左,影评向右
写影评,享受电影的另一种乐趣

随便看看