Big Data 17 min read

Collaborative Filtering: Fundamentals, Similarity Measures, and Distributed Implementation on Spark

This article introduces the basic concepts of collaborative filtering, explains user‑based and item‑based approaches, presents co‑occurrence, Euclidean, Pearson, and Cosine similarity formulas, and provides complete Scala implementations for these metrics and association‑rule mining on the Spark platform, along with practical scalability tips.

DataFunTalk
DataFunTalk
DataFunTalk
Collaborative Filtering: Fundamentals, Similarity Measures, and Distributed Implementation on Spark

The article begins with an overview of collaborative filtering, describing it as a classic recommendation technique that predicts a user's preference for an item by analyzing similar users or items. Two main variants are introduced: User‑based CF (UserCF) and Item‑based CF (ItemCF), each illustrated with diagrams.

Four similarity measures are detailed:

Co‑occurrence similarity: w(i,j) = |N(i)∩N(j)| / sqrt(|N(i)|·|N(j)|)

Euclidean distance similarity: d(x,y) = sqrt(∑(x(i)-y(i))²) and sim(x,y) = m / (1+d(x,y))

Pearson correlation coefficient, ranging from –1 to +1

Cosine similarity: T(x,y) = ∑x(i)y(i) / (sqrt(∑x(i)²)·sqrt(∑y(i)²))

For each metric, a distributed computation design on Spark is described. The process generally involves grouping user‑item interactions by user, generating all unordered item pairs (upper‑triangle only), aggregating co‑occurrence counts, and then applying the respective formula to obtain similarity scores. After computing the upper‑triangle matrix, the lower‑triangle is produced by swapping item identifiers, yielding a full similarity matrix.

Scala implementation for co‑occurrence similarity (UserCF):

/**
 * 同现相似度计算
 * w(i,j) = N(i)∩N(j)/sqrt(N(i)*N(j))
 * @param user_rdd 用户评分
 * @param RDD[ItemSimi] 返回物品相似度
 */
 def CooccurrenceSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {
   import user_ds.sparkSession.implicits._
   // 1 (用户:物品) => (用户:(物品集合))
   val user_ds1 = user_ds.groupBy("userid").agg(collect_set("itemid")).withColumnRenamed("collect_set(itemid)", "itemid_set")
   // 2 物品:物品,上三角数据
   val user_ds2 = user_ds1.flatMap { row =>
     val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]](1).toArray.sorted
     val result = new ArrayBuffer[(String, String, Double)]()
     for (i <- 0 to itemlist.length - 2) {
       for (j <- i + 1 to itemlist.length - 1) {
         result += ((itemlist(i), itemlist(j), 1.0))
       }
     }
     result
   }.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "score")
   // 3 计算物品与物品,上三角,同现频次
   val user_ds3 = user_ds2.groupBy("itemidI", "itemidJ").agg(sum("score").as("sumIJ"))
   // 4 计算物品总共出现的频次
   val user_ds0 = user_ds.withColumn("score", lit(1)).groupBy("itemid").agg(sum("score").as("score"))
   // 5 计算同现相似度
   val user_ds4 = user_ds3.join(user_ds0.withColumnRenamed("itemid", "itemidJ").withColumnRenamed("score", "sumJ").select("itemidJ", "sumJ"), "itemidJ")
   val user_ds5 = user_ds4.join(user_ds0.withColumnRenamed("itemid", "itemidI").withColumnRenamed("score", "sumI").select("itemidI", "sumI"), "itemidI")
   // 根据公式 N(i)∩N(j)/sqrt(N(i)*N(j) 计算
   val user_ds6 = user_ds5.withColumn("result", col("sumIJ") / sqrt(col("sumI") * col("sumJ")))
   // 6 上、下三角合并
   val user_ds8 = user_ds6.select("itemidI", "itemidJ", "result").union(user_ds6.select(col("itemidJ").as("itemidI"), col("itemidI").as("itemidJ"), col("result")))
   // 7 结果返回
   val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>
     val itemidI = row.getString(0)
     val itemidJ = row.getString(1)
     val similar = row.getDouble(2)
     ItemSimi(itemidI, itemidJ, similar)
   }
   out
 }

Scala implementation for Cosine similarity:

/**
 * Cosine相似度计算
 * T(x,y) = ∑x(i)y(i) / sqrt(∑(x(i)*x(i))) * sqrt(∑(y(i)*y(i)))
 */
 def CosineSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {
   import user_ds.sparkSession.implicits._
   // 1 数据准备
   val user_ds1 = user_ds.withColumn("iv", concat_ws(":", $"itemid", $"pref"))
     .groupBy("userid").agg(collect_set("iv")).withColumnRenamed("collect_set(iv)", "itemid_set")
     .select("userid", "itemid_set")
   // 2 物品:物品,上三角数据
   val user_ds2 = user_ds1.flatMap { row =>
     val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]](1).toArray.sorted
     val result = new ArrayBuffer[(String, String, Double, Double)]()
     for (i <- 0 to itemlist.length - 2) {
       for (j <- i + 1 to itemlist.length - 1) {
         result += ((itemlist(i).split(":")(0), itemlist(j).split(":")(0), itemlist(i).split(":")(1).toDouble, itemlist(j).split(":")(1).toDouble))
       }
     }
     result
   }.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "scoreI").withColumnRenamed("_4", "scoreJ")
   // 3 按照公式计算相似度
   val user_ds3 = user_ds2.withColumn("cnt", lit(1))
     .groupBy("itemidI", "itemidJ")
     .agg(
       sum($"scoreI" * $"scoreJ").as("sum_xy"),
       sum($"scoreI" * $"scoreI").as("sum_x"),
       sum($"scoreJ" * $"scoreJ").as("sum_y")
     )
     .withColumn("result", $"sum_xy" / (sqrt($"sum_x") * sqrt($"sum_y")))
   // 4 上、下三角合并
   val user_ds8 = user_ds3.select("itemidI", "itemidJ", "result")
     .union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))
   // 5 结果返回
   val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>
     val itemidI = row.getString(0)
     val itemidJ = row.getString(1)
     val similar = row.getDouble(2)
     ItemSimi(itemidI, itemidJ, similar)
   }
   out
 }

Scala implementation for Euclidean distance similarity:

/**
 * 欧几里得距离相似度计算
 * d(x, y) = sqrt(∑((x(i)-y(i)) * (x(i)-y(i)))
 * sim(x, y) = n / (1 + d(x, y))
 */
 def EuclideanDistanceSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = {
   import user_ds.sparkSession.implicits._
   // 1 数据准备
   val user_ds1 = user_ds.withColumn("iv", concat_ws(":", $"itemid", $"pref"))
     .groupBy("userid").agg(collect_set("iv")).withColumnRenamed("collect_set(iv)", "itemid_set")
     .select("userid", "itemid_set")
   // 2 物品:物品,上三角数据
   val user_ds2 = user_ds1.flatMap { row =>
     val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]](1).toArray.sorted
     val result = new ArrayBuffer[(String, String, Double, Double)]()
     for (i <- 0 to itemlist.length - 2) {
       for (j <- i + 1 to itemlist.length - 1) {
         result += ((itemlist(i).split(":")(0), itemlist(j).split(":")(0), itemlist(i).split(":")(1).toDouble, itemlist(j).split(":")(1).toDouble))
       }
     }
     result
   }.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "scoreI").withColumnRenamed("_4", "scoreJ")
   // 3 计算距离并转为相似度
   val user_ds3 = user_ds2.withColumn("cnt", lit(1))
     .groupBy("itemidI", "itemidJ")
     .agg(
       sqrt(sum(($"scoreI" - $"scoreJ") * ($"scoreI" - $"scoreJ"))).as("dist"),
       sum($"cnt").as("cntSum")
     )
     .withColumn("result", $"cntSum" / (lit(1.0) + $"dist"))
   // 4 上、下三角合并
   val user_ds8 = user_ds3.select("itemidI", "itemidJ", "result")
     .union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"result"))
   // 5 结果返回
   val out = user_ds8.select("itemidI", "itemidJ", "result").map { row =>
     val itemidI = row.getString(0)
     val itemidJ = row.getString(1)
     val similar = row.getDouble(2)
     ItemSimi(itemidI, itemidJ, similar)
   }
   out
 }

Scala implementation for association‑rule mining (support, confidence, lift) and similarity:

/**
 * 关联规则计算
 * 支持度、置信度、提升度以及相似度的计算
 */
 def AssociationRules(user_ds: Dataset[ItemPref]): Dataset[ItemAssociation] = {
   import user_ds.sparkSession.implicits._
   // 1 (用户:物品集合)
   val user_ds1 = user_ds.groupBy("userid").agg(collect_set("itemid")).withColumnRenamed("collect_set(itemid)", "itemid_set")
   // 2 生成物品对上三角
   val user_ds2 = user_ds1.flatMap { row =>
     val itemlist = row.getAs[WrappedArray[String]](1).toArray.sorted
     val result = new ArrayBuffer[(String, String, Double)]()
     for (i <- 0 to itemlist.length - 2) {
       for (j <- i + 1 to itemlist.length - 1) {
         result += ((itemlist(i), itemlist(j), 1.0))
       }
     }
     result
   }.withColumnRenamed("_1", "itemidI").withColumnRenamed("_2", "itemidJ").withColumnRenamed("_3", "score")
   // 3 计算同现频次
   val user_ds3 = user_ds2.groupBy("itemidI", "itemidJ").agg(sum("score").as("sumIJ"))
   // 4 计算每个物品的出现次数
   val user_ds0 = user_ds.withColumn("score", lit(1)).groupBy("itemid").agg(sum("score").as("score"))
   val user_all = user_ds1.count
   // 5 支持度
   val user_ds4 = user_ds3.select("itemidI", "itemidJ", "sumIJ")
     .union(user_ds3.select($"itemidJ".as("itemidI"), $"itemidI".as("itemidJ"), $"sumIJ"))
     .withColumn("support", $"sumIJ" / user_all.toDouble)
   // 6 置信度
   val user_ds5 = user_ds4.join(user_ds0.withColumnRenamed("itemid", "itemidI").withColumnRenamed("score", "sumI"), "itemidI")
     .withColumn("confidence", $"sumIJ" / $"sumI")
   // 7 提升度
   val user_ds6 = user_ds5.join(user_ds0.withColumnRenamed("itemid", "itemidJ").withColumnRenamed("score", "sumJ"), "itemidJ")
     .withColumn("lift", $"confidence" / ($"sumJ" / user_all.toDouble))
   // 8 计算同现相似度
   val user_ds8 = user_ds6.withColumn("similar", col("sumIJ") / sqrt(col("sumI") * col("sumJ")))
   // 9 结果返回
   val out = user_ds8.select("itemidI", "itemidJ", "support", "confidence", "lift", "similar").map { row =>
     val itemidI = row.getString(0)
     val itemidJ = row.getString(1)
     val support = row.getDouble(2)
     val confidence = row.getDouble(3)
     val lift = row.getDouble(4)
     val similar = row.getDouble(5)
     ItemAssociation(itemidI, itemidJ, support, confidence, lift, similar)
   }
   out
 }

The article concludes with a discussion on scalability challenges when vectors become extremely high‑dimensional (e.g., billions of users) and suggests practical solutions such as user/item sampling or advanced techniques like Facebook’s Faiss for approximate nearest‑neighbor search.

Author information and a promotional book‑giveaway are provided at the end, but the technical content remains the core focus of the piece.

big datacollaborative filteringrecommender systemsSparkScalasimilarity measures
DataFunTalk
Written by

DataFunTalk

Dedicated to sharing and discussing big data and AI technology applications, aiming to empower a million data scientists. Regularly hosts live tech talks and curates articles on big data, recommendation/search algorithms, advertising algorithms, NLP, intelligent risk control, autonomous driving, and machine learning/deep learning.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.