viernes, 26 de febrero de 2016

[Spark MLlib] Árboles de Decisión: Ejemplo de Clasificación

Al poco de adentrarnos en esto del mundo del Big Data y de los primeros conceptos con los que nos familiarizaremos serán sus famosas 3 V'svariedadvelocidad y volumen; aunque en seguida una cuarta saldrá a la luz, valor. Esta última es a la que personalmente suelo relacionar con Machine Learning. Es verdad que también se puede sacar valor del Big Data, por ejemplo, como mera capa de almacenamiento (volumen), pero las empresas hoy en día buscan descubrir, nutrirse y enriquecerse gracias al valor del dato. Mientras tanto, las tres primeras las suelo relacionar más con el mundo de los sistemas y de su arquitectura de los cuales provengo.

Así pues, el área del Machine Learning me queda lejos, bastante lejos, pero como es de costumbre en la informática toca familiarizarse con todo, formación continua e inacabable, como si no fuera suficiente con el ecosistema y las nuevas herramientas que emergen (FlinkNiFiArrow...). Pues bien, ahora y como culturilla me encuentro tratando de adquirir unos mínimos conocimientos al respecto. Y que mejor forma de hacerlo que dejar constancia de los ejercicios que vaya realizando, pues me gusta aprender a través de ejemplos, tratando, o creyendo más bien, haberlos entendido gracias a su explicación poco a poco, por lo que si algún data scientist o experto en el área lee alguna "burrada" en esta o futuras entradas que por favor me corrija.

Como podéis intuir por el título de la entrada he empezado con los famosos árboles de decisión. Se trata de un modelo predictivo cuyo punto fuerte recae sobre la sencillez en su representación gráfica así como por el formato de sus reglas en casi un lenguaje natural.

Bueno, voy a dejarme de tanta literatura y centrarme en el ejemplo, pues si queréis más información o detalle basta con que introduzcáis esos conceptos en vuestro buscador favorito y tendréis miles de entradas al respecto.

El ejemplo mostrado a continuación se encuentra implementado en lenguaje Scala (v2.11.7), sbt (v0.13.9) y hace uso además de la librería MLlib que incluye Spark (v1.6). Doy por hecho la correcta instalación de todas estas herramientas en vuestro entorno, pero si os puede ayudar en algo, preguntad en los comentarios. Por mi parte y en lo que respecta a esta entrada trataré de ser lo más genérico posible, abstrayéndome del sistema operativo o de la herramienta de desarrollo utilizados.

La finalidad del ejemplo consiste en construir un modelo predictivo que nos ayude a anticiparnos a conocer si un vuelo será retrasado o no, entendiendo por ello un retraso en la llegada superior a los 40 minutos. El ejemplo está basado en el de la entrada: MapR Blog - Apache Spark Machine Learning Tutorial.

1) Empezaremos creando una nueva carpeta para nuestro proyecto, Carpeta_Proyecto. Dentro de esta crearemos los siguientes subdirectorios: data y src/main/scala/mllib/DecisionTrees.

2) De la siguiente URL Bureau of Transportation Statistics-Airline On-Time Performance generaremos y descargaremos nuestro conjunto de datos.

En la parte superior podréis filtrar por geografía (estado), año y mes. También podéis seleccionar más o menos columnas que como veréis luego en el ejemplo, nos quedaremos con las que consideremos interesantes. Para este ejemplo se han de seleccionar las columnas: Dayofmonth, DayOfWeek, Carrier, Tailnum, FlightNum, OriginAirportID, OriginState, DestAirportID, DestState, CRSDepTime, DepTime, DepDelayMinutes, CRSArrTime, ArrTime, ArrDelay, CRSElapsedTime y Distance.

Ubicaremos el dataset recien generado en Carpeta_Proyecto/data. En mi caso como me descargué los datos filtrados por el año 2015 y mes de diciembre lo nombre ontime-201512.csv

3) En el directorio raíz del proyecto crearemos el archivo build.sbt:
import sbt._
import Keys._

name := "Carpeta_Proyecto"   // Reemplazar por el título del proyecto que le hayáis asignado

version := "1.0"

scalaVersion := "2.11.7"   // Configurar de acuerdo a vuestra versión de scala

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % "1.6.0",   // Configurar de acuerdo a vuestra versión de scala y spark
  "org.apache.spark" % "spark-mllib_2.11" % "1.6.0"   // Configurar de acuerdo a vuestra versión de scala y spark
)

4) Por último, cread el fichero classificationFlightsOnTime.scala dentro de Carpeta_Proyecto/src/main/scala/mllib/DecisionTrees/ y copiar-pegar el código de la aplicación.

package mllib.DecisionTrees

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.{SparkConf, SparkContext}

object classificationFlightsOnTime {
  def main (args: Array[String]): Unit = {

    val sparkConfig = new SparkConf()
      .setMaster("local[2]")   // Configuración del clúster de Spark
      .setAppName("ClasificacionFlightsOnTime")   // Nombre otorgado a la app de Spark durante su ejecución
    val sc = new SparkContext(sparkConfig)  // Inicializar el necesario objeto de Spark Context

    val textRDD = sc.textFile("data/ontime-201512.csv")   // Asegurarse en poner la ruta y nombre correctos del fichero generado en el punto 2)
    val flightsRDD = textRDD.map(parseFlight).cache() // Gracias a la función parseFlight y clase Flight definiremos cual es el esquema (variables y tipos) de nuestro fichero CSV

    // A continuación crearemos una serie de asociaciones entre el valor de ciertas variables o columnas de tipo cadena con un identificador único, pudiendo reconvertir así más adelante el valor de dichas columnas de tipo string a entero.
    // La primera de estas asociaciones es para la columna Carrier
    var carrierMap: Map[String, Int] = Map()
    var index: Int = 0
    flightsRDD.map(flight => flight.carrier).distinct.collect.foreach(x => { carrierMap += (x -> index); index += 1 })

    // Idem para la columna OriginState del CSV que en la clase Flight se corresponde con el atributo origin
    var originMap: Map[String, Int] = Map()
    var index1: Int = 0
    flightsRDD.map(flight => flight.origin).distinct.collect.foreach(x => { originMap += (x -> index1); index1 += 1 })

    // Idem para DestState respecto del CSV o dest en la clase Flight
    var destMap: Map[String, Int] = Map()
    var index2: Int = 0
    flightsRDD.map(flight => flight.dest).distinct.collect.foreach(x => { destMap += (x -> index2); index2 += 1 })

    //  El siguiente paso será construir el array o vector con aquellas características que sean de nuestro interés. Es aquí por lo tanto donde filtraremos o quitaremos aquellas columnas que hayamos podido seleccionar de más a la hora de descargar nuestro dataset. También se trata del punto en donde convertiremos las columnas/abritutos Carrier, Origin y Dest de tipo cadena a entero, pues el vector finalmente que habremos de pasar al algoritmo tiene que ser de tipo numérico.
    val mlprep = flightsRDD.map(flight => {
      val monthday = flight.dofM.toInt - 1 // category
      val weekday = flight.dofW.toInt - 1 // category
      val crsdeptime1 = flight.crsdeptime.toInt
      val crsarrtime1 = flight.crsarrtime.toInt
      val carrier1 = carrierMap(flight.carrier) // category
      val crselapsedtime1 = flight.crselapsedtime.toDouble
      val origin1 = originMap(flight.origin) // category
      val dest1 = destMap(flight.dest) // category
      val delayed = if (flight.depdelaymins.toDouble > 40) 1.0 else 0.0    // Recordar nuestra definición de vuelo retrasado: si el retraso de éste era mayor a 40 minutos. Bien, aquí es donde transformaremos también ese retraso de minutos a verdadero o falso de acuerdo a dicha condición.
      Array(delayed.toDouble, monthday.toDouble, weekday.toDouble, crsdeptime1.toDouble, crsarrtime1.toDouble,
        carrier1.toDouble, crselapsedtime1.toDouble, origin1.toDouble, dest1.toDouble)
    })

    // Ahora nos encargaremos de asociar cada uno de los vectores anteriormente creados a lo que se denomina como LabeledPoint, que podríamos decir que se trata de la etiqueta que clasifica al vector en cuestión. En nuestro caso el LabeledPoint será 1.0 o 0.0 de acuerdo a si el vuelo fue retrasado o no, respectivamente. Gracias a esta acción construimos el conjunto de datos final.
    val mldata = mlprep.map(x => LabeledPoint(x(0), Vectors.dense(x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8))))

    // Partimos nuestro reciente creado conjunto de datos en los conjuntos de entrenamiento y test al 70% y 30% respectivamente.
    val splits = mldata.randomSplit(Array(0.7, 0.3))
    val (trainingData, testData) = (splits(0), splits(1))

    // Configuramos los parámetros que necesita el árbol de decisión
    var categoricalFeaturesInfo = Map[Int, Int]()    // Identificará que características del vector (segundo parámetro de la variable mldata) son categóricas y que rango de valores pueden tomar
    categoricalFeaturesInfo += (0 -> 31)    // El 0 se refiere a la posición 0 del vector que se corresponde con la columna monthday y 31 al rango de valores diferentes que puede tomar ésta 
    categoricalFeaturesInfo += (1 -> 7)    // En este caso, 1 igual columna weekday y lógicamente puede tomar 7 valores diferentes
    categoricalFeaturesInfo += (4 -> carrierMap.size)    // 4 igual a columna carrier1 y gracias a haber convertido sus valores de tipo string a entero, sabemos de antemano cuántos valores diferentes puede tomar
    categoricalFeaturesInfo += (6 -> originMap.size)    // Columna origin1
    categoricalFeaturesInfo += (7 -> destMap.size)    // Columna dest1

    // Resto de parámetros del árbol de decisión
    val numClasses = 2    // Vuelo retrasado o no
    val impurity = "gini"    // Tipo de medición de impurezas respecto a la homogeneidad de las etiquetas en el nodo
    val maxDepth = 4    // Profundidad del árbol
    val maxBins = 7000    // Número máximo de "bins"

    // Entrenamiento del modelo
    val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins)

    // Visualización del árbol de decisión resultante
    println("Learned classification tree model:\n" + model.toDebugString)

    // Evaluación del modelo contra el conjunto de datos de test
    val labelAndPreds = testData.map { point =>
      val prediction = model.predict(point.features)
      (point.label, prediction)
    }

    // Cálculo del número de predicciones incorrectas
    val wrongPrediction =(labelAndPreds.filter{
      case (label, prediction) => ( label !=prediction)
    })
    println("Wrong predicions: " + wrongPrediction.count())

    // Cálculo del ratio de predicciones incorrectas
    val ratioWrong=wrongPrediction.count().toDouble/testData.count()
    println("Ratio wrong predictions: " + ratioWrong)
  }

    // Función para parsear las líneas del fichero en la clase Flights
  def parseFlight(str: String): Flight = {
    val line = str.split(",")
    Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5), line(6), line(7), line(8), line(9).toDouble,
      line(10).toDouble, line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble,
      line(16).toInt)
  }
}

    // Definición del esquema de la clase Flights
case class Flight(dofM: String, dofW: String, carrier: String, tailnum: String, flnum: Int, org_id: String,
                  origin: String, dest_id: String, dest: String, crsdeptime: Double, deptime: Double,
                  depdelaymins: Double, crsarrtime: Double, arrtime: Double, arrdelay: Double, crselapsedtime: Double,
                  dist: Int)

5) Llega la hora de probar nuestra aplicación de Spark. Ejecutar desde la línea de comandos
# cd /ruta/a/Carpeta_Proyecto
# sbt compile
...
[success] Total time: 4 s, completed 26-feb-2016 10:27:14

# sbt run
...
Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 31 nodes
  If (feature 0 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0})
   If (feature 2 <= 1035.0)
    If (feature 4 in {0.0,1.0,2.0,3.0})
     If (feature 0 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0})
      Predict: 0.0
     Else (feature 0 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0})
      Predict: 0.0
    Else (feature 4 not in {0.0,1.0,2.0,3.0})
     If (feature 1 in {0.0})
      Predict: 0.0
     Else (feature 1 not in {0.0})
      Predict: 0.0
   Else (feature 2 > 1035.0)
    If (feature 0 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0})
     If (feature 0 in {0.0,1.0,2.0})
      Predict: 0.0
     Else (feature 0 not in {0.0,1.0,2.0})
      Predict: 0.0
    Else (feature 0 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0})
     If (feature 4 in {0.0,1.0,2.0,3.0})
      Predict: 0.0
     Else (feature 4 not in {0.0,1.0,2.0,3.0})
      Predict: 0.0
  Else (feature 0 not in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0})
   If (feature 2 <= 1120.0)
    If (feature 2 <= 903.0)
     If (feature 0 in {21.0,22.0,23.0,24.0,25.0})
      Predict: 0.0
     Else (feature 0 not in {21.0,22.0,23.0,24.0,25.0})
      Predict: 0.0
    Else (feature 2 > 903.0)
     If (feature 0 in {21.0,22.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0})
      Predict: 0.0
     Else (feature 0 not in {21.0,22.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0})
      Predict: 0.0
   Else (feature 2 > 1120.0)
    If (feature 0 in {21.0,22.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0})
     If (feature 0 in {21.0,22.0,23.0,24.0,25.0})
      Predict: 0.0
     Else (feature 0 not in {21.0,22.0,23.0,24.0,25.0})
      Predict: 0.0
    Else (feature 0 not in {21.0,22.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0})
     If (feature 7 in {1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0,35.0,36.0,37.0,38.0,39.0,40.0,41.0,42.0,43.0,44.0,45.0,46.0})
      Predict: 0.0
     Else (feature 7 not in {1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0,20.0,21.0,22.0,23.0,24.0,25.0,26.0,27.0,28.0,29.0,30.0,31.0,32.0,33.0,34.0,35.0,36.0,37.0,38.0,39.0,40.0,41.0,42.0,43.0,44.0,45.0,46.0})
      Predict: 0.0
...
Wrong predicions: 14141
...
Ratio wrong predictions: 0.09830378866875217
...

5.1) Ahora por ejemplo establecemos la profundidad del árbol a 3:
...
Wrong predicions: 14207
...
Ratio wrong predictions: 0.09855022197558269

5.2) Profundidad del árbol a 7:
...
Wrong predicions: 14150
...
Ratio wrong predictions: 0.09854995751556601

5.3) Y a 9:
...
Wrong predicions: 14193
...
Ratio wrong predictions: 0.09897627581974644

Bueno, pues ya tendríamos construido nuestro modelo, aunque con la tasa o ratio de acierto reflejado... me deja muchas dudas de su utilidad, todo sería afinarlo más, ¿mayor o menor profundidad? De acuerdo a los valores de ratio observados, podríamos establecer un valor bajo, pues son prácticamente idénticos y ante un mayor nivel de profundidad lo único que se consigue es complicarnos la comprensión del árbol de decisión, ¿otras variables? Es posible. He ahí el buen hacer o no de los data scientist y de sus modelos. ¡Ah! Y de la CALIDAD de los datos, ese "pequeño e insignificante" detalle ;)

Por cierto, si el modelo resultante fuera considerado correcto y se quisiera hacer uso de él en el futuro, podríamos salvarlo a disco, en este ejemplo model.save(sc, "target/tmp/myDecisionTreeClassificationFlightsOnTime"), y para recuperarlo, val model = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationFlightsOnTime"), evitando así el tener que volver a entrenar el modelo.

Espero que os haya servido o al menos para haceros una idea de cómo trabajar con Spark MLlib y con árboles de decisión.

1 comentario: