viernes, 4 de marzo de 2016

[Spark MLlib] Filtrado Colaborativo (I): Sistema de Recomendación

Continuando mi aventura dentro del mundo del Machine Learning y tras [Spark MLlib] Árboles de Decisión: Ejemplo de Clasificación hoy voy a realizar y tratar de explicar el típico ejemplo de un sistema de recomendación gracias al uso de filtrado colaborativo (collaborative filtering).

Para ello me apoyaré en el 'desconocidísimo' dataset MovieLens y de nuevo en la librería MLlib de Spark que incluye para este tipo de soluciones el algoritmo ALS. También podríamos implementar dicho sistema de recomendación con Mahout, poderosísima herramienta de Machine Learning dentro del ecosistema de Big Data, que personalmente y para este problema en concreto presenta un mayor abanico de posibilidades de solución o al menos de algoritmos para atacarlo.
Collaborative Filtering
Filtrado Colaborativo
http://www.gravityrd.com/technology
Bueno, ¡al turrón!

1) Partiendo de la estructura de directorios del ejemplo anterior, ver [Spark MLlib] Árboles de Decisión: Ejemplo de Clasificación, crearemos la carpeta Carpeta_Proyecto/src/main/scala/mllib/collaborativeFiltering.

2) Nos descargaremos cualquiera de los conjunto de datos de MovieLens, en mi caso elegí "ml-latest.zip (size: 144 MB)", y extraeremos su contenido en el directorio Carpeta_Proyecto/data/movielens.

2.1) Antes de continuar deberemos realizar una pequeña edición sobre los ficheros ratings.csv y movies.csv. La primera línea de ambos ficheros es la definición de sus columnas por lo que deberemos eliminarla.

3) Fichero build.sbt. No sufre ninguna modificación respecto al del anterior ejemplo.

4) Copiar-pegar el siguiente código dentro del fichero recommendation.scala ubicado en el directorio Carpeta_Proyecto/src/main/scala/mllib/collaborativeFiltering/

package mllib.collaborativeFiltering

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}

object recommendation {
  def main (args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val sparkConfig = new SparkConf().setMaster("local[2]").setAppName("recommendation")
    val sc = new SparkContext(sparkConfig)    // Inicializar la aplicación de Spark mediante su objeto necesario

    // Leer el fichero de votaciones a partir del cual se crearán por cada línea un objeto de tipo Ratings cuyos argumentos deben ser: identificador de usuario, id. de producto (en nuestro caso moviesid), y valoración. Todos los argumentos deben ser de tipo numérico. En caso de no serlo alguno de ellos, deberemos en primer lugar proceder a su codificación o mapeo. Ej. variables carrierMap, originMap o destMap de la entrada anterior.
    val mlRatings = sc.textFile("data/movielens/ratings.csv")
    val ratings = mlRatings.map { line =>
      // userId,movieId,rating,timestamp
      val fields = line.split(",")
      Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
    }

    // Dividir el dataset inicial de valoraciones en tres conjuntos: entrenamiento, trest y validación
    val splits = ratings.randomSplit(Array(0.6, 0.2, 0.2))
    val (trainingData, testData, validationData) = (splits(0), splits(1), splits(2))

    // En el ejemplo que se propone en la web de Spark al respecto se implementa un único modelo, es decir, se determina el valor de las variables necesarias y se lleva a cabo su ejecución contra el conjunto de datos completo. Mientras que aquí he complicado un poco el código y lo que se busca es jugar con diferentes valores de las variables y con el conjunto de datos de entrenamiento. Se ejecuta de manera secuencial cada una de las combinaciones posibles quedándose al final con el modelo que mejor resultado o validación Root Mean Squared Error (RMSE) obtenga. Inconveniente, el tiempo de ejecución.

    val ranks = List(10, 15)    // Valor razonable entre 10-200. A mayor valor, generalmente mejor resultado, pero a costa de un mayor consumo de memoria
    val lambdas = List(0.01, 1.0)    // Su valor se debe establecer en función de las aproximaciones o resultados que vayamos obteniendo
    val numIters = List(10, 20)    // Valor habitual: 10; Con pocas iteraciones se consigue converger hacia una solución razonable
    var bestModel: Option[MatrixFactorizationModel] = None
    var bestValidationRmse = Double.MaxValue
    var bestRank = 0
    var bestLambda = -1.0
    var bestNumIter = -1
    for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
      val model = ALS.train(trainingData, rank, numIter, lambda)    // Ejecución del algoritmo ALS y construcción del modelo respecto del conjunto de datos de entrenamiento
      val validationRmse = computeRmse(model, validationData, false)    // Comparación o validación Root Mean Squared Error del modelo recién creado contra el conjunto de datos de validación
      println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
        + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
      if (validationRmse < bestValidationRmse) {Compute
        bestModel = Some(model)
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lambda
        bestNumIter = numIter
      }
    }

    val testRmse = computeRmse(bestModel.get, testData, false)    // Ejecutar el modelo ganador contra el conjunto de datos de test

    println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
      + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")

    bestModel.get.save(sc, "target/tmp/myCollaborativeFilter")    // Salvar el modelo

    sc.stop()    // Detener la aplicación de Spark
  }

  // Función para el cálculo del error cuadrático medio ó RMSE

  // En primer lugar se ejecuta el modelo en cuestión contra el conjunto de datos traspasado. A continuación se comparan los valores predecidos con las valoraciones reales hechas por los usuarios, obteniéndose así el error cuadrático medio final.
  def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean)
    : Double = {

    def mapPredictedRating(r: Double): Double = {
      if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
    }

    val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
    val predictionsAndRatings = predictions.map{ x =>
      ((x.user, x.product), mapPredictedRating(x.rating))
    }.join(data.map(x => ((x.user, x.product), x.rating))).values
    math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
  }
}

5) Ejecutar la aplicación
# sbt compile
...
[success] Total time: 5 s, completed 04-mar-2016 09:59:18

# sbt run
...
Multiple main classes detected, select one to run:   // En el caso de haber continuado a partir del proyecto de la anterior entrada, [Spark MLlib] Árboles de Decisión: Ejemplo de Clasificación, nos aparecerá este mensaje.

 [1] mllib.DecisionTrees.classificationFlightsOnTime
 [2] mllib.collaborativeFiltering.recommendation
Enter number: 2    // Seleccionaremos la opción que corresponda a nuestro sistema de recomendación
...
RMSE (validation) = 0.8652560855462604 for the model trained with rank = 10, lambda = 0.01, and numIter = 10.
RMSE (validation) = 0.8629134178168192 for the model trained with rank = 10, lambda = 0.01, and numIter = 20.
RMSE (validation) = 1.3281799325759402 for the model trained with rank = 10, lambda = 1.0, and numIter = 10.
RMSE (validation) = 1.3281784808552248 for the model trained with rank = 10, lambda = 1.0, and numIter = 20.
RMSE (validation) = 0.8842091343625045 for the model trained with rank = 15, lambda = 0.01, and numIter = 10.
RMSE (validation) = 0.8788401499772798 for the model trained with rank = 15, lambda = 0.01, and numIter = 20.
RMSE (validation) = 1.3281800451766226 for the model trained with rank = 15, lambda = 1.0, and numIter = 10.
RMSE (validation) = 1.3281800690626133 for the model trained with rank = 15, lambda = 1.0, and numIter = 20.

The best model was trained with rank = 10 and lambda = 0.01, and numIter = 20, and its RMSE on the test set is 0.8628875199400583.
...
16/03/04 10:18:13 INFO FileOutputCommitter: Saved output of task 'attempt_201603041018_1009_m_000000_0' to file:/Carpeta_Proyecto/target/tmp/myCollaborativeFilter/data/user/_temporary/0/task_201603041018_1009_m_000000
16/03/04 10:18:13 INFO FileOutputCommitter: Saved output of task 'attempt_201603041018_1009_m_000001_0' to file:/Carpeta_Proyecto/target/tmp/myCollaborativeFilter/data/user/_temporary/0/task_201603041018_1009_m_000001
...

Con esto habremos conseguido definir nuestro modelo, salvarlo y así poder recuperarlo, como veremos en futuros capítulos, para por ejemplo hacer recomendaciones online.

Siguientes capítulos:

    [Spark MLlib] Filtrado Colaborativo (II): Recomendación por Usuario

    [Spark MLlib] Filtrado Colaborativo (III): Recomendación por Ítem (pendiente)

1 comentario:

  1. La función computeRmse podría ser reemplazada por la clase que incluye la propia librería de MLlib: RegressionMetrics. Se le debe pasar como argumento un RDD de tipo clave-valor que represente los valores predichos y reales para cada una de las valoraciones.

    Código de ejemplo:
    import org.apache.spark.mllib.evaluation.RegressionMetrics
    val predictedAndTrue = ratingsAndPredictions.map { case ((user, product),
    (predicted, actual)) => (predicted, actual) }
    val regressionMetrics = new RegressionMetrics(predictedAndTrue)

    println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
    println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)

    ResponderEliminar