viernes, 11 de marzo de 2016

[Spark MLlib] Filtrado Colaborativo (II): Recomendación por Usuario

Continuamos con este breve tutorial o introducción al filtrado colaborativo con Spark. Bien, la semana pasada en la entrada [Spark MLlib] Filtrado Colaborativo (I): Sistema de Recomendación construimos nuestro sistema de recomendación o más bien el generador de modelos. Esta semana tocará hacer uso de él y recomendar productos, en nuestro ejemplo películas, a nuestros usuarios a partir de sus votaciones.

Continuaremos también trabajando con la estructura de directorios y archivos ya definido, lo que vendría a ser nuestro proyecto dentro de nuestra herramienta IDE favorita. Antes de continuar, me gustaría matizar que este tutorial NO está orientado a la programación en scala ni a sus best practices a la hora de desarrollar. Todos los ejemplos son mejorables desde la primera línea. Por ejemplo, algo tan sencillo como recibir los parámetros con los que juego como argumentos de entrada, definir un único método principal, pues voy declarando funciones main por cada funcionalidad o algoritmo que trato de explicar, etc, pero lo dicho, no, no me voy a entretener ni por un instante en escribir el código correctamente, sólo pretendo dar unas pinceladas sobre cómo funcionan ciertos algoritmos o cómo alcanzar una determinada funcionalidad.

1) En esta ocasión creamos el archivo getRecommendationsByUserId.scala dentro de la misma carpeta donde creamos nuestro generador de modelos, es decir, en el directorio Carpeta_Proyecto/src/main/scala/mllib/collaborativeFiltering/

package mllib.collaborativeFiltering

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
import org.apache.spark.{SparkContext, SparkConf}

object getRecommendationsByUserId {
  def main (args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val sparkConfig = new SparkConf().setMaster("local[2]").setAppName("getRecommendationsByUserId")
    val sc = new SparkContext(sparkConfig)

    val mlRatings = sc.textFile("data/movielens/ratings.csv")
    val ratings = mlRatings.map { line =>
      // userId,movieId,rating,timestamp
      val fields = line.split(",")
      Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
    }.cache()

    val mlMovies = sc.textFile("data/movielens/movies.csv")
    val movies = mlMovies.map { line =>
      // movieId,title,genres
      val fields = line.split(",")
      (fields(0).toInt, fields(1))
    }.collect.toMap

    val model = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")    // Recuperando el modelo

    val userId = 25560     // Seleccionamos el usuario id. Interesante pasarlo al script como argumento ;)
    val moviesForUser = ratings.keyBy(_.user).lookup(userId)
    println("El usuario ha votado a " + moviesForUser.size + " peliculas:")
    moviesForUser.sortBy(-_.rating).take(10).map(rating => (movies(rating.product), rating.rating)).foreach(println)    // Observamos que tipo de películas le gustan al usuario

    val topKRecs = model.recommendProducts(userId,10)    // El método para obtener las recomendaciones ya viene implementado de serie en la librería. Obtenemos el top10 de productos, en nuestro caso películas, recomendados. También podríamos pasar el número de recomendaciones, topN  ;) ;)
    topKRecs.map(rating => (movies(rating.product), rating.rating)).foreach(println)

  }
}

2) Ejecutar la aplicación
sbt compile
...
[success] Total time: 1 s, completed 11-mar-2016 10:14:13

sbt run
...
Multiple main classes detected, select one to run:

 [1] mllib.DecisionTrees.classificationFlightsOnTime
 [2] mllib.collaborativeFiltering.getRecommendationsByUserId
 [3] mllib.collaborativeFiltering.recommendation

Enter number: 2
...
El usuario ha votado a 50 peliculas:
Star Wars: Episode IV - A New Hope (1977),5.0)
(Star Wars: Episode V - The Empire Strikes Back (1980),5.0)
("Princess Bride,5.0)
(Star Wars: Episode VI - Return of the Jedi (1983),5.0)
(Life Is Beautiful (La Vita è bella) (1997),5.0)
(X-Men (2000),5.0)
(Unbreakable (2000),5.0)
("Lord of the Rings: The Fellowship of the Ring,5.0)
(Spider-Man (2002),5.0)

("Lord of the Rings: The Two Towers,5.0)
...
("Roman Spring of Mrs. Stone,11.756657663464846)
(Babar The Movie (1989),11.171622844283377)
(Goodbye to Language 3D (2014),9.747998979226367)
(Garfield's Halloween Adventure (1985),9.736430143083723)
(Jimmy Carter Man from Plains (2007),9.72594699681999)
(Dangerous (1935),9.256450056766873)
(Bad Ronald (1974),9.089006045667578)
(End of the Line (2007),9.057430462126002)
(In the City of Sylvia (En la ciudad de Sylvia) (2007),8.963930587070152)
(Nine Dead Gay Guys (2003),8.895781428393946)
...
[success] Total time: 317 s, completed 11-mar-2016 10:41:22

Muy sencillo, ¿verdad?

Entradas relacionadas:

    [Spark MLlib] Filtrado Colaborativo (III): Recomendación por Ítem (pendiente)

viernes, 4 de marzo de 2016

[Spark MLlib] Filtrado Colaborativo (I): Sistema de Recomendación

Continuando mi aventura dentro del mundo del Machine Learning y tras [Spark MLlib] Árboles de Decisión: Ejemplo de Clasificación hoy voy a realizar y tratar de explicar el típico ejemplo de un sistema de recomendación gracias al uso de filtrado colaborativo (collaborative filtering).

Para ello me apoyaré en el 'desconocidísimo' dataset MovieLens y de nuevo en la librería MLlib de Spark que incluye para este tipo de soluciones el algoritmo ALS. También podríamos implementar dicho sistema de recomendación con Mahout, poderosísima herramienta de Machine Learning dentro del ecosistema de Big Data, que personalmente y para este problema en concreto presenta un mayor abanico de posibilidades de solución o al menos de algoritmos para atacarlo.
Collaborative Filtering
Filtrado Colaborativo
http://www.gravityrd.com/technology
Bueno, ¡al turrón!

1) Partiendo de la estructura de directorios del ejemplo anterior, ver [Spark MLlib] Árboles de Decisión: Ejemplo de Clasificación, crearemos la carpeta Carpeta_Proyecto/src/main/scala/mllib/collaborativeFiltering.

2) Nos descargaremos cualquiera de los conjunto de datos de MovieLens, en mi caso elegí "ml-latest.zip (size: 144 MB)", y extraeremos su contenido en el directorio Carpeta_Proyecto/data/movielens.

2.1) Antes de continuar deberemos realizar una pequeña edición sobre los ficheros ratings.csv y movies.csv. La primera línea de ambos ficheros es la definición de sus columnas por lo que deberemos eliminarla.

3) Fichero build.sbt. No sufre ninguna modificación respecto al del anterior ejemplo.

4) Copiar-pegar el siguiente código dentro del fichero recommendation.scala ubicado en el directorio Carpeta_Proyecto/src/main/scala/mllib/collaborativeFiltering/

package mllib.collaborativeFiltering

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}

object recommendation {
  def main (args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val sparkConfig = new SparkConf().setMaster("local[2]").setAppName("recommendation")
    val sc = new SparkContext(sparkConfig)    // Inicializar la aplicación de Spark mediante su objeto necesario

    // Leer el fichero de votaciones a partir del cual se crearán por cada línea un objeto de tipo Ratings cuyos argumentos deben ser: identificador de usuario, id. de producto (en nuestro caso moviesid), y valoración. Todos los argumentos deben ser de tipo numérico. En caso de no serlo alguno de ellos, deberemos en primer lugar proceder a su codificación o mapeo. Ej. variables carrierMap, originMap o destMap de la entrada anterior.
    val mlRatings = sc.textFile("data/movielens/ratings.csv")
    val ratings = mlRatings.map { line =>
      // userId,movieId,rating,timestamp
      val fields = line.split(",")
      Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
    }

    // Dividir el dataset inicial de valoraciones en tres conjuntos: entrenamiento, trest y validación
    val splits = ratings.randomSplit(Array(0.6, 0.2, 0.2))
    val (trainingData, testData, validationData) = (splits(0), splits(1), splits(2))

    // En el ejemplo que se propone en la web de Spark al respecto se implementa un único modelo, es decir, se determina el valor de las variables necesarias y se lleva a cabo su ejecución contra el conjunto de datos completo. Mientras que aquí he complicado un poco el código y lo que se busca es jugar con diferentes valores de las variables y con el conjunto de datos de entrenamiento. Se ejecuta de manera secuencial cada una de las combinaciones posibles quedándose al final con el modelo que mejor resultado o validación Root Mean Squared Error (RMSE) obtenga. Inconveniente, el tiempo de ejecución.

    val ranks = List(10, 15)    // Valor razonable entre 10-200. A mayor valor, generalmente mejor resultado, pero a costa de un mayor consumo de memoria
    val lambdas = List(0.01, 1.0)    // Su valor se debe establecer en función de las aproximaciones o resultados que vayamos obteniendo
    val numIters = List(10, 20)    // Valor habitual: 10; Con pocas iteraciones se consigue converger hacia una solución razonable
    var bestModel: Option[MatrixFactorizationModel] = None
    var bestValidationRmse = Double.MaxValue
    var bestRank = 0
    var bestLambda = -1.0
    var bestNumIter = -1
    for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {
      val model = ALS.train(trainingData, rank, numIter, lambda)    // Ejecución del algoritmo ALS y construcción del modelo respecto del conjunto de datos de entrenamiento
      val validationRmse = computeRmse(model, validationData, false)    // Comparación o validación Root Mean Squared Error del modelo recién creado contra el conjunto de datos de validación
      println("RMSE (validation) = " + validationRmse + " for the model trained with rank = "
        + rank + ", lambda = " + lambda + ", and numIter = " + numIter + ".")
      if (validationRmse < bestValidationRmse) {Compute
        bestModel = Some(model)
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lambda
        bestNumIter = numIter
      }
    }

    val testRmse = computeRmse(bestModel.get, testData, false)    // Ejecutar el modelo ganador contra el conjunto de datos de test

    println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda
      + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")

    bestModel.get.save(sc, "target/tmp/myCollaborativeFilter")    // Salvar el modelo

    sc.stop()    // Detener la aplicación de Spark
  }

  // Función para el cálculo del error cuadrático medio ó RMSE

  // En primer lugar se ejecuta el modelo en cuestión contra el conjunto de datos traspasado. A continuación se comparan los valores predecidos con las valoraciones reales hechas por los usuarios, obteniéndose así el error cuadrático medio final.
  def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean)
    : Double = {

    def mapPredictedRating(r: Double): Double = {
      if (implicitPrefs) math.max(math.min(r, 1.0), 0.0) else r
    }

    val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product)))
    val predictionsAndRatings = predictions.map{ x =>
      ((x.user, x.product), mapPredictedRating(x.rating))
    }.join(data.map(x => ((x.user, x.product), x.rating))).values
    math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean())
  }
}

5) Ejecutar la aplicación
# sbt compile
...
[success] Total time: 5 s, completed 04-mar-2016 09:59:18

# sbt run
...
Multiple main classes detected, select one to run:   // En el caso de haber continuado a partir del proyecto de la anterior entrada, [Spark MLlib] Árboles de Decisión: Ejemplo de Clasificación, nos aparecerá este mensaje.

 [1] mllib.DecisionTrees.classificationFlightsOnTime
 [2] mllib.collaborativeFiltering.recommendation
Enter number: 2    // Seleccionaremos la opción que corresponda a nuestro sistema de recomendación
...
RMSE (validation) = 0.8652560855462604 for the model trained with rank = 10, lambda = 0.01, and numIter = 10.
RMSE (validation) = 0.8629134178168192 for the model trained with rank = 10, lambda = 0.01, and numIter = 20.
RMSE (validation) = 1.3281799325759402 for the model trained with rank = 10, lambda = 1.0, and numIter = 10.
RMSE (validation) = 1.3281784808552248 for the model trained with rank = 10, lambda = 1.0, and numIter = 20.
RMSE (validation) = 0.8842091343625045 for the model trained with rank = 15, lambda = 0.01, and numIter = 10.
RMSE (validation) = 0.8788401499772798 for the model trained with rank = 15, lambda = 0.01, and numIter = 20.
RMSE (validation) = 1.3281800451766226 for the model trained with rank = 15, lambda = 1.0, and numIter = 10.
RMSE (validation) = 1.3281800690626133 for the model trained with rank = 15, lambda = 1.0, and numIter = 20.

The best model was trained with rank = 10 and lambda = 0.01, and numIter = 20, and its RMSE on the test set is 0.8628875199400583.
...
16/03/04 10:18:13 INFO FileOutputCommitter: Saved output of task 'attempt_201603041018_1009_m_000000_0' to file:/Carpeta_Proyecto/target/tmp/myCollaborativeFilter/data/user/_temporary/0/task_201603041018_1009_m_000000
16/03/04 10:18:13 INFO FileOutputCommitter: Saved output of task 'attempt_201603041018_1009_m_000001_0' to file:/Carpeta_Proyecto/target/tmp/myCollaborativeFilter/data/user/_temporary/0/task_201603041018_1009_m_000001
...

Con esto habremos conseguido definir nuestro modelo, salvarlo y así poder recuperarlo, como veremos en futuros capítulos, para por ejemplo hacer recomendaciones online.

Siguientes capítulos:

    [Spark MLlib] Filtrado Colaborativo (II): Recomendación por Usuario

    [Spark MLlib] Filtrado Colaborativo (III): Recomendación por Ítem (pendiente)