domingo, 10 de abril de 2016

[Apache NiFi] Caso Práctico: Twitter => Kafka

Apache NiFi
Breve tutorial sobre cómo trabajar con la herramienta Apache NiFi. Y como la mejor forma de aprender creo que es a través de ejemplos, os mostraré a continuación un caso práctico: cómo filtrar e importar tweets a una cola o broker de Apache Kafka.

Lo primero deberemos disponer de una cuenta de Twitter así como habernos registrado en su programa para desarrolladores, Twitter Application Management, disponiendo así de las cuatro famosas Keys and Access Tokens: Consumer Key, Consumer Secret, Access Token y Access Token Secret.

Lo siguiente será instalar las herramientas, Apache Kafka y Apache NiFi, así como configurar el resto de nuestro entorno. Ahora bien, para este sencillo ejemplo de iniciación el entorno que planteo es también bastante sencillo, únicamente se necesitará una máquina en la que convivirán todas las herramientas.


Apache NiFi & Twitter & Apache Kafka

En la pasada entrada "Integrando Kafka (producer) y Spark Streaming (consumer)" ya dejé unas breves instrucciones sobre cómo instalar Apache Kafka así que os remito a ella en caso de duda o falta de conocimiento sobre cómo hacerlo.


Sobre cómo instalar Apache NiFi, más de lo mismo con este tipo de proyectos:

  # cd /usr/local/
  # wget http://apache.rediris.es/nifi/0.6.0/nifi-0.6.0-bin.tar.gz     ¡Atención! La URL del paquete de descarga puede cambiar con el paso del tiempo
  # tar xvzf nifi-0.6.0-bin.tar.gz     Descomprimir el paquete
  # ln -s nifi nifi-0.6.0
  # cd nifi
  # bin/nifi.sh install     Para instalarlo como un servicio más del sistema
  # service nifi start     Iniciar el servicio
  # chkconfig nifi on     Activar el servicio para que arranque tras cada inicio del servidor

A continuación y si no hemos cambiado nada de la configuración que viene por defecto, podremos acceder a la interfaz de usuario:

Apache NiFi
http://127.0.0.1:8080/nifi/
Decir que básicamente esta interfaz presenta los siguientes cuatro apartados o grupos:
Apache NiFi
Captura obtenida de https://nifi.apache.org/docs.html
Bien, el primer paso será añadir un procesador (icono más a la izquierda). Para ello basta con pinchar sobre él y arrastrarlo hacia la cuadrícula principal:
Nos aparecerá la siguiente ventana
Podremos filtrar nuestra búsqueda de procesador en la parte superior derecha. Para el ejercicio propuesto filtraremos por twitter y pulsaremos el botón Add.
En nuestra ventana principal o entorno de Apache NiFi aparecerá una cajita correspondiente al procesador seleccionado. Veréis también como aparece en ella un símbolo de alerta, de momento ignorarlo, se 'queja' de la falta de conexiones.
Nuestra siguiente acción será configurar el procesador. Botón derecho sobre éste y  pulsar Configure.
En la primera pestaña podremos configurar el nombre que le queremos dar, ej. getTweets.
En la pestaña de propiedades configuraremos los campos Consumer Key, Consumer Secret, Access Token y Access Token Secret que Twitter nos facilitará tras habernos registrado en su Twitter Application Management.
Además, en el campo Twitter Endpoint seleccionaremos de la lista desplegable la opción Sample Endpoint. En caso de querer realizar cualquier tipo de filtro ya sea por determinados términos, IDs de usuario o localización, seleccionaremos la opción Filter Endpoint. Para finalizar pulsaremos el botón Apply.
Regresamos a la ventana principal donde el procesador aparecerá con el nombre asignado.
Añadiremos un nuevo procesador. En esta ocasión filtraremos por la cadena evaluate y seleccionaremos el tipo EvaluateJsonPath.
A continuación procedemos como antes para configurar el procesador.
De la pestaña de propiedades y del campo Destination, elegiremos la opción fileflow-atrribute.
Además, añadiremos dos nuevas propiedades que se corresponderán con los campos text y lang del JSON o tweet que recibiremos. Para ello en la parte superior derecha pulsaremos el botón New property, asignaremos un nombre twitter.lang y su valor $.lang a la nueva propiedad. Repetiremos este último paso para la propiedad twitter.text cuyo valor será $.text.
Antes de cerrar esta ventana, pulsamos sobre la primera. Le asignaremos un nombre, ej. evaluateJSON, y de los tres flags ubicados en la parte derecha, bajo el apartado Auto terminate relationships, activaremos failure y unmatched. Más adelante veremos por qué no activamos el matched y/o para qué sirven estos.
Nuestro siguiente paso será enlazar o conectar los dos procesadores creados. Para ello seleccionaremos al primero, vemos como aparece por la mitad de la caja un icono con una flecha, bien, lo seleccionamos.
Sin soltarlo, arrastraremos la flecha hacia la segunda caja o procesador en donde dejaremos de pulsar nuestro botón izquierdo del ratón.
Una nueva ventana nos aparecerá de manera inmediata. En ella y dentro de la pestaña de configuración seleccionaremos la política que queremos aplicar, para ello arrastraremos la caja FirstInFirstOutPrioritizer de la lista de Available prioritizers al área Selected Prioritizers. Por último pulsaremos el botón Add.
Este será el resultado o la nueva apariencia de nuestra ventana principal. Vemos como el símbolo de alerta del procesador getTweets ha desaparecido, pero no en el segundo, evaluateJSON, pues nos falta decir o configurar lo que pasa a continuación para la relación matched.
Añadiremos un nuevo procesador. En esta ocasión filtraremos por route, seleccionaremos RouteOnAttribute y pulsaremos el botón Add.
Configuramos una vez más el nuevo procesador incorporado. Le asignamos un nombre, ej. filterTweets y activamos el flag unmatched de la parte derecha.
A continuación pulsamos sobre la pestaña de propiedades y añadimos una nueva. Asignamos tweet como nombre de la misma y como valor la expresión "${twitter.text:isEmpty():not():and(${twitter.lang:equals("es")})}". Con esto conseguimos filtrar aquellos tweets que estén vacíos y cuyo idioma no haya sido marcado como español. Esto último podríamos haberlo realizado al principio, justo a la hora de configurar el procesador getTweets. Pulsamos Ok y Apply.
A continuación conectaremos los procesadores evaluateJSON y filterTweets. De la nueva ventana que nos emergerá, en la pestaña de detalles veremos el campo "For relationships", que casualidades de la vida -con el paso del tiempo se demuestra que hay pocas en esta vida- aparecen los valores de matched, unmatched y failure, justo los mismos que antes en el campo Auto termination relationships del procesador evaluateJSON. Activaremos el flag matched, es decir, proseguiremos procesando únicamente la información que nos cuadre, es decir, aquella que disponga de los campos lang y text. No nos confundamos o creamos que esta relación se refiere a que el campo text no esté vacío y el lenguaje sea el español, eso vendrá más adelante.
Dentro de la pestaña de configuración seleccionaremos de nuevo la política que queremos aplicar. Como en el caso anterior arrastraremos la caja FirstInFirstOutPrioritizer de la lista de Available prioritizers al área Selected Prioritizers y pulsaremos el botón Add.
Añadiremos ahora un cuarto y último procesador. En esta ocasión filtramos por kafka y seleccionamos la opción PutKafka.
Accedemos a la configuración de este recién creado procesador. Le asignamos un nombre, ej. tweetsKafka, y ahora sí de la sección derecha Auto termination relationships activamos los dos flags, pues queremos que este procesador sea el final de nuestro flujo de trabajo.
De la pestaña de propiedades configuraremos los campos Known Brokers, Topic y Client Name de acuerdo a nuestro entorno de Kafka desplegado.
Establecemos a continuación la última conexión entre los procesadores filterTweets y tweetsKafka. Activaremos enla pestaña de detalles el flag de la relación tweet, nombre de la propiedad ;) que añadimos anteriormente al primero de estos dos procesadores, de tal modo que ahora sí, sólo pasaremos o filtraremos los tweets con lenguaje igual a español y cuyo texto no estén vacíos.
Y una vez más, dentro de la pestaña de configuración seleccionaremos de nuevo la política que queremos aplicar. Como en el caso anterior arrastraremos la caja FirstInFirstOutPrioritizer de la lista de Available prioritizers al área Selected Prioritizers y pulsaremos el botón Add.
Nuestro flujo de trabajo quedará similar a éste. Un primer procesador para establecer la conexión con la API de Twitter. Un segundo para machear los campos deseados de los tweets recibidos. Un tercero para filtrarlos. Y un cuarto para enviarlos a la capa de almacenamiento final. Como es lógico se podría continuar poniendo un quinto que fuera un consumidor de Kafka, procesador GetKafka, y así sucesivamente de acuerdo a lo que buscáramos, pero por no complicar más este ejemplo decido acabarlo en ese cuarto procesador.
Para validar el funcionamiento del flujo implementado, ponemos en primer lugar nuestro entorno de Apache Kafka en marcha.
  1) Iniciamos en una consola el servidor Zookeeper.
  2) Iniciamos en una segunda el servidor de Kafka.
  3) Y en una tercera iniciamos un consumidor de mensajes de la cola o tema tweets.
Arrancamos todos los procesadores. Para ello pulsaremos sobre cada uno con el botón derecho y le daremos a la opción "Start".
De un momento a otro empezaremos a observar como en la consola del consumidor de tweets aparecen estos.
Pues eso es todo por hoy. Espero que os ayude o al menos os sirva como toma de contacto con esta nueva herramienta del ecosistema Big Data y que bajo mi punto de vista aporta infinitamente más que otras, por ejemplo Oozie.

0 comentarios:

Publicar un comentario