martes, 20 de diciembre de 2016

XGBOOST & Hadoop/YARN (II): Ejemplo

En la anterior entrada vimos cómo instalar la librería XGBOOST sobre CentOS con soporte HDFS. Bien, en ésta trataremos de ver su ejecución a través de algún ejemplo, eso sí, sin entrar a valorar el resultado o si se puede mejorar el modelo, variables, etc, simplemente se trata de demostrar la funcionalidad de poder ejecutar la librería XGBOOST en modo distribuido.

DMLC-YARN.JAR
Lo primero que deberemos hacer es crear el paquete dmlc-yarn.jar necesario para la ejecución de XGBOOST en modo distribuido, YARN.

     cd /usr/local/xgboost/dmlc-core/tracker/yarn/
     ./build.sh // NO preocuparse por los warnings.

     ls -al dmlc-yarn.jar // Debemos comprobar que se crea el fichero dmlc-yarn.jar
     rw-r--r-- 1 root root 21292 dic 16 13:39 dmlc-yarn.jar



Ejemplo

Ejemplo de cómo ejecutar XGBOOST en modo YARN con los conjuntos de datos almacenados previamente en HDFS. Además, el modelo resultante también será salvado en el HDFS.

Nota: El ejemplo mostrado a continuación fue ejecutado bajo el usuario root, totalmente desaconsejable su uso, pero así que cada uno lo adapte a su entorno ;)

1) Creo la estructura de directorios necesarios para el ejemplo en el HDFS:
     hadoop fs -mkdir /user/root/xgboost
     hadoop fs -mkdir /user/root/xgboost/data
     hadoop fs -mkdir /user/root/xgboost/data/train
     hadoop fs -mkdir /user/root/xgboost/data/test
     hadoop fs -mkdir /user/root/xgboost/model
     hadoop fs -chmod 777 /user/root/xgboost/model

2) Cargo los siguientes datasets de ejemplo que vienen, por defecto, con XGBOOST.
     hadoop fs -put /usr/local/xgboost/demo/data/agaricus.txt.train /user/root/xgboost/data/train/
     hadoop fs -put /usr/local/xgboost/demo/data/agaricus.txt.test /user/root/xgboost/data/test/

3) Creo la configuración necesaria para llevar a cabo el ejemplo.
     export LD_LIBRARY_PATH=/usr/hdp/2.4.2.0-258/usr/lib/
     vim /usr/local/xgboost/demo/distributed-training/hdfs.conf
          # General Parameters, see comment for each definition
          # choose the booster, can be gbtree or gblinear
          booster = gbtree
          # choose logistic regression loss function for binary classification
          objective = binary:logistic

          # Tree Booster Parameters
          # step size shrinkage
          eta = 1.0
          # minimum loss reduction required to make a further partition
          gamma = 1.0
          # minimum sum of instance weight(hessian) needed in a child
          min_child_weight = 1
          # maximum depth of a tree
          max_depth = 3

          # Task Parameters
          # the number of round to do boosting
          num_round = 2
          # 0 means do not save any model except the final round model
          save_period = 0
          # The path of training data
          data = "hdfs://namenode01.domain.com/user/root/xgboost/data/train"
          # The path of validation data, used to monitor training process
          eval[test] = "hdfs://namenode01.domain.com/root/xgboost/data/test"
          model_dir = "hdfs://namenode01.domain.com/user/root/xgboost/model"
          # evaluate on training data as well each round
          eval_train = 1

¡¡¡Importante!!! Incluir en el path del HDFS el FQDN de nuestro NameNode principal o bien el del grupo si hemos configurado la alta disponibilidad.

4) Ahora ya sí, se puede proceder a ejecutar el ejemplo:
/usr/local/xgboost/dmlc-core/tracker/dmlc-submit --cluster yarn --num-workers 2 /usr/local/xgboost/xgboost /usr/local/xgboost/demo/distributed-training/hdfs.conf
2016-12-16 13:43:19,708 INFO start listen on 192.168.4.245:9091
16/12/16 13:43:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/16 13:43:22 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
16/12/16 13:43:23 INFO impl.TimelineClientImpl: Timeline service address: http://namenode01.domain.com:8188/ws/v1/timeline/
16/12/16 13:43:23 INFO client.RMProxy: Connecting to ResourceManager at namenode01.domain.com/192.168.4.245:8050
16/12/16 13:43:24 INFO dmlc.Client: jobname=DMLC[nworker=2]:xgboost,username=root
16/12/16 13:43:24 INFO dmlc.Client: Submitting application application_1481716095353_0075
16/12/16 13:43:24 INFO impl.YarnClientImpl: Submitted application application_1481716095353_0075
2016-12-16 13:43:29,410 INFO @tracker All of 2 nodes getting started
2016-12-16 13:43:33,723 INFO [13:43:33] [0] test-error:0.016139 train-error:0.014433
2016-12-16 13:43:33,904 INFO [13:43:33] [1] test-error:0.000000 train-error:0.001228
2016-12-16 13:43:34,253 INFO @tracker All nodes finishes job
2016-12-16 13:43:34,253 INFO @tracker 4.84310793877 secs between node start and job finish
Application application_1481716095353_0075 finished with state FINISHED at 1481888615119

5) Comprobar la existencia del modelo recién creado. El nombre del fichero, por defecto, sigue el patrón <num_rounds>.model siendo <num_rounds> el valor establecido para dicha variable en el fichero de configuracion.
     hadoop fs -ls /user/root/xgboost/model
     Found 1 items
     -rw-r--r-- 2 yarn root 1501 2016-12-16 13:43 /user/root/xgboost/model/0002.model

El modelo puede ser cargado posteriormente en R, Python o Julia.

lunes, 19 de diciembre de 2016

XGBOOST & Hadoop/YARN (I)

Este primer tutorial trata de explicar los pasos necesarios para desplegar la librería XGBOOST sobre CentOS con soporte HDFS, y más concretamente sobre un clúster Hadoop / YARN, pues pese a existir la "Installation Guide" en su página principal sobre cómo hacerlo, ésta 'sólo' cubre los sistemas operativos Ubuntu/Debian, Windows y OSX.

Entorno

Antes de pasar a realizar cualquier tipo de acción, me gustaría detallar cual es el entorno sobre el que voy a trabajar y desplegar la librería XGBOOST,

  • CentOS 6.7
  • Hortonworks Data Platform (HDP) v.2.4.2 => Hadoop v2.7.1

Requisitos

G++ >= 4.6

Decir que CentOS 6.7 dispone de un compilador GCC y G++ bastante 'desactualizados' en sus repositorios oficiales, por lo que recurrí al siguiente procedimiento para la instalación de una versión 'más reciente' y superior incluso a la requerida:

     yum install wget git -y // En caso de no disponer previamente de estos paquetes
     cd /etc/yum.repos.d
     wget https://people.centos.org/tru/devtools-2/devtools-2.repo

Instalamos a continuación los siguientes paquetes:
     devtoolset-2-gcc.x86_64
     devtoolset-2-gcc-c++.x86_64
     devtoolset-2-gcc-plugin-devel.x86_64
     devtoolset-2-binutils.x86_64
     devtoolset-2-binutils-devel.x86_64

Una vez instalados, comprobar la versión desplegada:
     /opt/rh/devtoolset-2/root/usr/bin/gcc -v
     
     gcc version 4.8.2 20140120 (Red Hat 4.8.2-15) (GCC)

Python v2.7
Además, en CentOS 6.7, por defecto, el Python incluido en los repositorios suele ser el de la versión 'obsoleta' 2.6.6, por lo que también deberemos actualizarla para poder trabajar posteriormente con la librería XGBOOST. Las versiones aceptadas por XGBOOST son Python 2.7 o superior, o Python 3.4 o superior.

En mi caso vamos a recurrir a la versión 2.7.6 de Python:
     yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel
     cd /opt
     wget --no-check-certificate https://www.python.org/ftp/python/2.7.6/Python-2.7.6.tar.xz
     tar xf Python-2.7.6.tar.xz
     cd Python-2.7.6
     ./configure --prefix=/usr/local
     make && make altinstall

¡¡¡Importante!!! Usar altinstall en lugar de install porque sino acabaremos con dos versiones diferentes de Python instaladas en nuestro sistema y ambas nombradas Python.

Tras este tipo de instalación convivirán en nuestro sistema ambas versiones:
     python -V
     Python 2.6.6

     python2.7 -V
     Python 2.7.6

Pip 2.7
Para facilitar las futuras instalaciones de paquetes de Python, es aconsejable también actualizar la versión de Pip.
     wget https://bitbucket.org/pypa/setuptools/downloads/ez_setup.py
     /usr/local/bin/python2.7 ez_setup.py
     /usr/local/bin/easy_install-2.7 pip

Comprobamos su correcto funcionamiento gracias a la instalación del paquete argparse que será necesario posteriormente para ejecutar y obtener la ayuda del comando xgboost.
     pip2.7 install argparse

Java
Debido a la instalación de la suite HDP de Hortonworks ya dispondremos de una versión de Java instalada en nuestro clúster, pero no viene mal repasar y asegurarse de su correcta instalación y configuración de las variables de entorno, $JAVA_HOME y $PATH.

[XGBOOST]
XGBOOST con soporte HDFS

Una vez cumplidos con los requerimientos, podemos pasar a construir la librería compartida de XGBOOST sobre CentOS con soporte HDFS.

     cd /usr/local
     git clone --recursive https://github.com/dmlc/xgboost
     cd xgboost

     cp make/config.mk ./config.mk

     vim config.mk // Descomentar las siguientes líneas y modificar su contenido de acuerdo a:

          export CC=/opt/rh/devtoolset-2/root/usr/bin/gcc

          export CPP=/opt/rh/devtoolset-2/root/usr/bin/cpp

          export CXX=/opt/rh/devtoolset-2/root/usr/bin/c++
          USE_HDFS = 1

     cd dmlc-core/
     cp make/config.mk config.mk
     vim config.mk // Descomentar las siguientes líneas y modificar su contenido de acuerdo a:
          export CC=/opt/rh/devtoolset-2/root/usr/bin/gcc
          export CPP=/opt/rh/devtoolset-2/root/usr/bin/cpp
          export CXX=/opt/rh/devtoolset-2/root/usr/bin/c++
          USE_HDFS = 1

     cd /usr/local/xgboost
     make clean_all
     make -j4


Para comprobar que se ha construido la librería satisfactoriamente, deberemos asegurarnos que se ha generado el archivo libxgboost.so, en nuestro caso:

     ls /usr/local/xgboost/lib/libxgboost.so

     -rwxr-xr-x 1 root root 2374684 dic 15 16:48 lib/libxgboost.so

Una vez finalizada su construcción satisfactoriamente, deberemos editar el fichero dmlc-submit para que use la versión 2.7 de Python en lugar de la 2.6. Para ello bastará con modificar la primera línea de dicho fichero:
     vim /usr/local/xgboost/dmlc-core/tracker/dmlc-submit
          #!/usr/bin/env python2.7

Problema hdfs.h / libdmlc.a
En un primer intento a la hora de construir la librería, me surgió el error que podéis observar más abajo. Realmente el fallo no se debe a la falta del archivo o libreria libdmlc.a. Observando con detenimiento un poco más arriba del comentado error, detecté otro fallo "hdfs.h: No existe el fichero o el directorio" el cual desencadena el error final.

Problema:
     In file included from src/io.cc:16:0:
     src/io/hdfs_filesys.h:10:18: fatal error: hdfs.h: No existe el fichero o el directorio
     #include <hdfs.h>
     
     c++: error: dmlc-core/libdmlc.a: No existe el fichero o el directorio
     make: *** [lib/libxgboost.so] Error 1
     make: *** Se espera a que terminen otras tareas....

Solución: Me bastó con buscar la librería hdfs.h en el sistema y modificar la definición de la siguiente línea dentro del fichero de configuración dmlc.mk:
     vim /usr/local/xgboost/dmlc-core/make/dmlc.mk
          HDFS_INC_PATH=/usr/hdp/2.4.2.0-258/usr/include

Por útlimo, volvemos a ejecutar los comandos necesarios para construir la librería compartida de XGBOOST.
     make clean_all
     make -j4


[+Info] Para ver un ejemplo de su ejecución en modo YARN: XGBOOST & Hadoop/YARN (II): Ejemplo

domingo, 17 de abril de 2016

[YARN] Error "java.io.ioexception couldn't set IO streams"

Esta semana me he topado con un problema en un cliente a la hora de lanzar numerosos procesos o aplicaciones de Spark. Todas ellas ejecutadas bajo modo YARN. La primera parte de dichas apps se ejecutaban y finalizaban para bien o para mal transcurrido un cierto tiempo, pero alcanzado un punto y sin saber el por qué, muchos de los nuevos trabajos que lanzaba finalizaban inmediatamente y de manera errónea. En un principio la única causa era la falta de recursos.

Esto no tenía lógica alguna, pues en caso de no haber recursos suficientes, los trabajos deberían quedarse en estado ACCEPTED y una vez el YARN fuera capaz de asignárselos pasar al estado RUNNING, vamos, quedar encolados, no morir directamente. Además, un simple top en los servidores o gracias a nuestra herramienta de monitorización demostraba la 'incoherencia' del mensaje.

Indagando más en los logs de la aplicación de Spark, para una mejor comprensión y visualización me apoyo en Livy, the REST Spark Server, llegué a observar el siguiente error: "java.io.ioexception couldn't set io streams". Ya tenía algo más de donde poder tirar.

Lo primero que hice fue revisar la configuración del usuario con el que se ejecutan los jobs o aplicaciones de Spark en cuanto a lo que respecta al uso o límites establecidos para el uso de recursos:
  # ulimit -a
  core file size          (blocks, -c) 0
  data seg size           (kbytes, -d) unlimited
  scheduling priority             (-e) 0
  file size               (blocks, -f) unlimited
  pending signals                 (-i) 63238
  max locked memory       (kbytes, -l) 64
  max memory size         (kbytes, -m) unlimited
  open files                      (-n) 1024
  pipe size            (512 bytes, -p) 8
  POSIX message queues     (bytes, -q) 819200
  real-time priority              (-r) 0
  stack size              (kbytes, -s) 8192
  cpu time               (seconds, -t) unlimited
  max user processes              (-u) 1024
  virtual memory          (kbytes, -v) unlimited
  file locks                      (-x) unlimited

¡Y voilà! En seguida algo llamó mi atención: max user processes (-u) 1024. Valor establecido demasiado bajo.

Traté de configurar dicho valor en "caliente", pero... quedó descartado:
  # ulimit -u 65536
  bash: ulimit: max user processes: no se puede modificar el límite: Operación no permitida

¡Ah! Y no aplica el intentarlo con sudo ;)



Tocaba pues, configurar los valores adecuados a través del fichero correspondiente. Bien, existen dos opciones:

1) /etc/security/limits.conf - Fichero de propiedades general.

2) /etc/security/limits.d/<userName>.conf - Fichero de propiedades relacionadas únicamente, o al menos debería ser así, con el usuario <userName>.

Básicamente estos ficheros sirven para controlar, limitar y/o repartir los recursos del sistema en caso de estar éste compartido entre diferentes usuarios y/o aplicaciones, evitando así que por ejemplo uno haga un uso excesivo de CPU viéndose perjudicado el resto de usuarios del sistema. Algo que en un entorno BigData con Hadoop y demás herramientas analíticas o de procesamiento, va a ser de lo más normal ;)

Si abrimos el primer fichero, veremos que se nos explica un poco la sintaxis del mismo.
  ...
  #Each line describes a limit for a user in the form:
  #
  #<domain>        <type>  <item>  <value>
  ...
  #        - nproc - max number of processes // para el caso que estamos tratando
  ...

Para un mejor entendimiento y/o administración o por una simple manía mía, soy partidario de la segunda opción.

Ahora bien, al trabajar sobre una plataforma con Cloudera, ya existía el fichero /etc/security/limits.d/cloudera-scm.conf por lo que para evitar tener que 'pensar' en los valores a establecer, decidí probar copiando dicha configuración a la del usuario correspondiente.

  # sudo cp /etc/security/limits.d/clouderascm.conf /etc/security/limits.d/spark-user.conf

Sino, una configuración posible sería:

  username soft nofile 32768
  username soft nproc 65536
  username hard nofile 1048576
  username hard nproc unlimited
  username hard memlock unlimited
  username soft memlock unlimited

Por último, para que los cambios tuvieran efecto, fue necesario reiniciar la máquina.

Una vez hecho esto, el sistema permitió una mayor carga de trabajo y donde antes daba el error "java.io.ioexception couldn't set io streams" ahora los trabajos eran encolados, estado ACCEPTED.

domingo, 10 de abril de 2016

[Apache NiFi] Caso Práctico: Twitter => Kafka

Apache NiFi
Breve tutorial sobre cómo trabajar con la herramienta Apache NiFi. Y como la mejor forma de aprender creo que es a través de ejemplos, os mostraré a continuación un caso práctico: cómo filtrar e importar tweets a una cola o broker de Apache Kafka.

Lo primero deberemos disponer de una cuenta de Twitter así como habernos registrado en su programa para desarrolladores, Twitter Application Management, disponiendo así de las cuatro famosas Keys and Access Tokens: Consumer Key, Consumer Secret, Access Token y Access Token Secret.

Lo siguiente será instalar las herramientas, Apache Kafka y Apache NiFi, así como configurar el resto de nuestro entorno. Ahora bien, para este sencillo ejemplo de iniciación el entorno que planteo es también bastante sencillo, únicamente se necesitará una máquina en la que convivirán todas las herramientas.


Apache NiFi & Twitter & Apache Kafka

En la pasada entrada "Integrando Kafka (producer) y Spark Streaming (consumer)" ya dejé unas breves instrucciones sobre cómo instalar Apache Kafka así que os remito a ella en caso de duda o falta de conocimiento sobre cómo hacerlo.


Sobre cómo instalar Apache NiFi, más de lo mismo con este tipo de proyectos:

  # cd /usr/local/
  # wget http://apache.rediris.es/nifi/0.6.0/nifi-0.6.0-bin.tar.gz     ¡Atención! La URL del paquete de descarga puede cambiar con el paso del tiempo
  # tar xvzf nifi-0.6.0-bin.tar.gz     Descomprimir el paquete
  # ln -s nifi nifi-0.6.0
  # cd nifi
  # bin/nifi.sh install     Para instalarlo como un servicio más del sistema
  # service nifi start     Iniciar el servicio
  # chkconfig nifi on     Activar el servicio para que arranque tras cada inicio del servidor

A continuación y si no hemos cambiado nada de la configuración que viene por defecto, podremos acceder a la interfaz de usuario:

Apache NiFi
http://127.0.0.1:8080/nifi/
Decir que básicamente esta interfaz presenta los siguientes cuatro apartados o grupos:
Apache NiFi
Captura obtenida de https://nifi.apache.org/docs.html
Bien, el primer paso será añadir un procesador (icono más a la izquierda). Para ello basta con pinchar sobre él y arrastrarlo hacia la cuadrícula principal:
Nos aparecerá la siguiente ventana
Podremos filtrar nuestra búsqueda de procesador en la parte superior derecha. Para el ejercicio propuesto filtraremos por twitter y pulsaremos el botón Add.
En nuestra ventana principal o entorno de Apache NiFi aparecerá una cajita correspondiente al procesador seleccionado. Veréis también como aparece en ella un símbolo de alerta, de momento ignorarlo, se 'queja' de la falta de conexiones.
Nuestra siguiente acción será configurar el procesador. Botón derecho sobre éste y  pulsar Configure.
En la primera pestaña podremos configurar el nombre que le queremos dar, ej. getTweets.
En la pestaña de propiedades configuraremos los campos Consumer Key, Consumer Secret, Access Token y Access Token Secret que Twitter nos facilitará tras habernos registrado en su Twitter Application Management.
Además, en el campo Twitter Endpoint seleccionaremos de la lista desplegable la opción Sample Endpoint. En caso de querer realizar cualquier tipo de filtro ya sea por determinados términos, IDs de usuario o localización, seleccionaremos la opción Filter Endpoint. Para finalizar pulsaremos el botón Apply.
Regresamos a la ventana principal donde el procesador aparecerá con el nombre asignado.
Añadiremos un nuevo procesador. En esta ocasión filtraremos por la cadena evaluate y seleccionaremos el tipo EvaluateJsonPath.
A continuación procedemos como antes para configurar el procesador.
De la pestaña de propiedades y del campo Destination, elegiremos la opción fileflow-atrribute.
Además, añadiremos dos nuevas propiedades que se corresponderán con los campos text y lang del JSON o tweet que recibiremos. Para ello en la parte superior derecha pulsaremos el botón New property, asignaremos un nombre twitter.lang y su valor $.lang a la nueva propiedad. Repetiremos este último paso para la propiedad twitter.text cuyo valor será $.text.
Antes de cerrar esta ventana, pulsamos sobre la primera. Le asignaremos un nombre, ej. evaluateJSON, y de los tres flags ubicados en la parte derecha, bajo el apartado Auto terminate relationships, activaremos failure y unmatched. Más adelante veremos por qué no activamos el matched y/o para qué sirven estos.
Nuestro siguiente paso será enlazar o conectar los dos procesadores creados. Para ello seleccionaremos al primero, vemos como aparece por la mitad de la caja un icono con una flecha, bien, lo seleccionamos.
Sin soltarlo, arrastraremos la flecha hacia la segunda caja o procesador en donde dejaremos de pulsar nuestro botón izquierdo del ratón.
Una nueva ventana nos aparecerá de manera inmediata. En ella y dentro de la pestaña de configuración seleccionaremos la política que queremos aplicar, para ello arrastraremos la caja FirstInFirstOutPrioritizer de la lista de Available prioritizers al área Selected Prioritizers. Por último pulsaremos el botón Add.
Este será el resultado o la nueva apariencia de nuestra ventana principal. Vemos como el símbolo de alerta del procesador getTweets ha desaparecido, pero no en el segundo, evaluateJSON, pues nos falta decir o configurar lo que pasa a continuación para la relación matched.
Añadiremos un nuevo procesador. En esta ocasión filtraremos por route, seleccionaremos RouteOnAttribute y pulsaremos el botón Add.
Configuramos una vez más el nuevo procesador incorporado. Le asignamos un nombre, ej. filterTweets y activamos el flag unmatched de la parte derecha.
A continuación pulsamos sobre la pestaña de propiedades y añadimos una nueva. Asignamos tweet como nombre de la misma y como valor la expresión "${twitter.text:isEmpty():not():and(${twitter.lang:equals("es")})}". Con esto conseguimos filtrar aquellos tweets que estén vacíos y cuyo idioma no haya sido marcado como español. Esto último podríamos haberlo realizado al principio, justo a la hora de configurar el procesador getTweets. Pulsamos Ok y Apply.
A continuación conectaremos los procesadores evaluateJSON y filterTweets. De la nueva ventana que nos emergerá, en la pestaña de detalles veremos el campo "For relationships", que casualidades de la vida -con el paso del tiempo se demuestra que hay pocas en esta vida- aparecen los valores de matched, unmatched y failure, justo los mismos que antes en el campo Auto termination relationships del procesador evaluateJSON. Activaremos el flag matched, es decir, proseguiremos procesando únicamente la información que nos cuadre, es decir, aquella que disponga de los campos lang y text. No nos confundamos o creamos que esta relación se refiere a que el campo text no esté vacío y el lenguaje sea el español, eso vendrá más adelante.
Dentro de la pestaña de configuración seleccionaremos de nuevo la política que queremos aplicar. Como en el caso anterior arrastraremos la caja FirstInFirstOutPrioritizer de la lista de Available prioritizers al área Selected Prioritizers y pulsaremos el botón Add.
Añadiremos ahora un cuarto y último procesador. En esta ocasión filtramos por kafka y seleccionamos la opción PutKafka.
Accedemos a la configuración de este recién creado procesador. Le asignamos un nombre, ej. tweetsKafka, y ahora sí de la sección derecha Auto termination relationships activamos los dos flags, pues queremos que este procesador sea el final de nuestro flujo de trabajo.
De la pestaña de propiedades configuraremos los campos Known Brokers, Topic y Client Name de acuerdo a nuestro entorno de Kafka desplegado.
Establecemos a continuación la última conexión entre los procesadores filterTweets y tweetsKafka. Activaremos enla pestaña de detalles el flag de la relación tweet, nombre de la propiedad ;) que añadimos anteriormente al primero de estos dos procesadores, de tal modo que ahora sí, sólo pasaremos o filtraremos los tweets con lenguaje igual a español y cuyo texto no estén vacíos.
Y una vez más, dentro de la pestaña de configuración seleccionaremos de nuevo la política que queremos aplicar. Como en el caso anterior arrastraremos la caja FirstInFirstOutPrioritizer de la lista de Available prioritizers al área Selected Prioritizers y pulsaremos el botón Add.
Nuestro flujo de trabajo quedará similar a éste. Un primer procesador para establecer la conexión con la API de Twitter. Un segundo para machear los campos deseados de los tweets recibidos. Un tercero para filtrarlos. Y un cuarto para enviarlos a la capa de almacenamiento final. Como es lógico se podría continuar poniendo un quinto que fuera un consumidor de Kafka, procesador GetKafka, y así sucesivamente de acuerdo a lo que buscáramos, pero por no complicar más este ejemplo decido acabarlo en ese cuarto procesador.
Para validar el funcionamiento del flujo implementado, ponemos en primer lugar nuestro entorno de Apache Kafka en marcha.
  1) Iniciamos en una consola el servidor Zookeeper.
  2) Iniciamos en una segunda el servidor de Kafka.
  3) Y en una tercera iniciamos un consumidor de mensajes de la cola o tema tweets.
Arrancamos todos los procesadores. Para ello pulsaremos sobre cada uno con el botón derecho y le daremos a la opción "Start".
De un momento a otro empezaremos a observar como en la consola del consumidor de tweets aparecen estos.
Pues eso es todo por hoy. Espero que os ayude o al menos os sirva como toma de contacto con esta nueva herramienta del ecosistema Big Data y que bajo mi punto de vista aporta infinitamente más que otras, por ejemplo Oozie.