jueves, 26 de noviembre de 2015

[Hadoop] Rack Awareness

Rack Awareness / Rack Topology
http://www.slideshare.net/tutorialvillage/hadoop-hdfs-concepts
Los scripts de topología son usados por Hadoop para determinar la localización de los nodos que lo forman. Esta información, a su vez, es usada a la hora de llevar a cabo la replicación de los bloques de datos así como por el JobTracker al asignar tareas a los nodos.

La política por defecto de Hadoop para replicar (factor 3) los bloques a groso modo es la siguiente: Almacena el primer bloque en un nodo, ej. N1 ubicado en el rack1; A continuación replica ese bloque en un nodo diferente a su vez hayado en otro rack, ej. N2 y rack2; La tercera réplica la realiza sobre otro nodo, pero en este caso ubicado en el mismo rack que el primero, ej. N3 y rack1. En el caso de haberse definido un mayor número de réplicas, éstas serán aleatoriamente asignadas a otros nodos.

Rack topology viene a definir cómo físicamente las máquinas están conectadas en racks en nuestro CPD, proporcionando un conocimiento sobre como de cerca o lejos están nuestros nodos, los unos de los otros, hablando siempre en términos de conectividad de red. La comprensión de este punto puede llegar a ser especialmente crítico cuando hablamos o planteamos dominios de fallo en nuestro sistema.

A continuación se muestran los pasos necesarios para implementar esta funcionalidad. Parto de un pequeño clúster de pruebas formado por:

  • vlihdp01.domain => NameNode y DataNode
  • vlihdp02.domain => DataNode
  • vlihdp03.domain => DataNode
HADOOP: home & version
Lo primero será crear un fichero, /opt/hadoop/etc/hadoop/topology.csv, en donde definiremos la relación de nodos que forman nuestro clúster Hadoop con respecto la localización o rack al que pertenecen.
topology.csv
A continuación crearemos un script, 100% personalizable, que a partir del fichero anterior nos devuelva el rack al que pertenece un nodo o lista de nodos pasados como argumentos. Podéis usar el del siguiente enlace ó bien el aquí mostrado a continuación y obtenido del libro Hadoop Operations:

$ vim /opt/hadoop/etc/hadoop/topology.py
#!/usr/bin/python

import sys

class RackTopology:
  # Make sure you include the absolute path to topology.csv.
  DEFAULT_TOPOLOGY_FILE = '/opt/hadoop/etc/hadoop/topology.csv'
  DEFAULT_RACK = '/default-rack'
  def __init__(self, filename = DEFAULT_TOPOLOGY_FILE):
    self._filename = filename
    self._mapping = dict()

    self._load_topology(filename)

  def _load_topology(self, filename):
    '''
    Load a CSV-ish mapping file. Should be
    hostname or IP and the second the rack
    it's discarded. Each field is stripped
    the file fails to load for any reason,
    '''
    try:
      f = file(filename, 'r')

      for line in f:
        fields = line.split(',')

        if len(fields) == 2:
          self._mapping[fields[0].strip()] = fields[1].strip()
    except:
      pass

  def rack_of(self, host):
    '''
    Look up and a hostname or IP address in the mapping and return its rack.
    '''
    if self._mapping.has_key(host):
      return self._mapping[host]
    else:
      return RackTopology.DEFAULT_RACK

if __name__ == '__main__':
  app = RackTopology()

  logFile = open('/tmp/topology.log', 'a')
  for node in sys.argv[1:]:
    rack = app.rack_of(node)
    logFile.write(node + ' => ' + rack + '\n')
    print rack

  logFile.close()

¡Importante! Asegurarse que la variable DEFAULT_TOPOLOGY_FILE quede correctamente configurada con el path completo del archivo anteriormente creado con la topología de nuestros nodos. En mi caso: /opt/hadoop/etc/hadoop/topology.csv

Lo siguiente será darle permisos de ejecución a dicho script:
$ chmod 755 /opt/hadoop/etc/hadoop/topology.py
Podemos realizar una serie de pruebas de su correcto funcionamiento:
$ python /opt/hadoop/etc/hadoop/topology.py vlihdp01.domain
/rack1
$ python /opt/hadoop/etc/hadoop/topology.py vlihdp02.domain
/rack1
$ python /opt/hadoop/etc/hadoop/topology.py vlihdp03.domain
/rack2
python /opt/hadoop/etc/hadoop/topology.py vlihdp01.domain vlihdp02.domain vlihdp03.domain
/rack1
/rack1
/rack2

Por último, toca modificar la configuración, core-site.xml, de Hadoop. Para ello añadiremos el parámetro net.topology.script.file.name y estableceremos su valor al path completo del script recien creado:
core-site.xml
Ya sólo quedar comprobar su correcto funcionamiento.
$ hadoop dfsadmin -printTopology
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

15/11/25 22:55:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Rack: /rack1
   10.0.3.11:50010 (vlihdp01.domain)
   10.0.3.12:50010 (vlihdp02.domain)

Rack: /rack2
   10.0.3.13:50010 (vlihdp03.domain)

hadoop dfsadmin -report
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

15/11/25 22:57:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Configured Capacity: 144813367296 (134.87 GB)
Present Capacity: 132053340160 (122.98 GB)
DFS Remaining: 132053200896 (122.98 GB)
DFS Used: 139264 (136 KB)
DFS Used%: 0.00%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0

-------------------------------------------------
Live datanodes (3):

Name: 10.0.3.13:50010 (vlihdp03.domain)
Hostname: vlihdp03.domain
Rack: /rack2
Decommission Status : Normal
Configured Capacity: 48271122432 (44.96 GB)
DFS Used: 49152 (48 KB)
Non DFS Used: 4243726336 (3.95 GB)
DFS Remaining: 44027346944 (41.00 GB)
DFS Used%: 0.00%
DFS Remaining%: 91.21%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1
Last contact: Thu Nov 25 22:57:53 CET 2015


Name: 10.0.3.12:50010 (vlihdp02.domain)
Hostname: vlihdp02.domain
Rack: /rack1
Decommission Status : Normal
Configured Capacity: 48271122432 (44.96 GB)
DFS Used: 49152 (48 KB)
Non DFS Used: 4243742720 (3.95 GB)
DFS Remaining: 44027330560 (41.00 GB)
DFS Used%: 0.00%
DFS Remaining%: 91.21%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1
Last contact: Thu Nov 25 22:57:54 CET 2015


Name: 10.0.3.11:50010 (vlihdp01.domain)
Hostname: vlihdp01.domain
Rack: /rack1
Decommission Status : Normal
Configured Capacity: 48271122432 (44.96 GB)
DFS Used: 40960 (40 KB)
Non DFS Used: 4272558080 (3.98 GB)
DFS Remaining: 43998523392 (40.98 GB)
DFS Used%: 0.00%
DFS Remaining%: 91.15%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 1

Last contact: Thu Nov 25 22:57:53 CET 2015

miércoles, 18 de noviembre de 2015

Integrando Kafka (producer) y Spark Streaming (consumer), Parte 1

Breve tutorial que tratará de mostrar como integrar estas dos potentes herramientas: Kafka y Spark Streaming. La idea es aclarar o ampliar en cierta forma la información contenida en la guía Spark Streaming + Kafka Integration Guide a través de un sencillo ejemplo.

En esta primera parte cubriré como procesar con Spark Streaming (consumer) la información proveniente de Kafka (producer).

Preparación del Entorno
Software necesario:


Descargar e instalar las diferentes herramientas anteriormente mencionadas. En mi caso he usado el directorio /usr/local como directorio base en el que desplegarlas:
Directorio /usr/local
Acordarse también de exportar las variables de entorno adecuadas:
Variables de entorno
Servidores Zookeeper & Kafka
Iniciar el servidor de Zookeeper:
  # zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
Zookeeper Server Start
En otra shell diferente iniciar el servidor de Kafka:
  # kafka-server-start.sh $KAFKA_HOME/config/server.properties
Kakfa Server Start (1)
Kakfa Server Start (2)
Sin cerrar ninguna de las otras líneas de comandos, abrir dos nuevas más:

En la primera de ellas crearemos el topic 'demo':
  # kafka-topics.sh --zookeeper localhost:2181 --create --topic demo --partitions 1 --replication-factor 1

En esa misma terminal haremos a continuación uso del propio script de Kafka, kafka-console-producer.sh, para generar futuros mensajes:
  # kafka-console-producer.sh --broker-list localhost:9092 --topic demo
Start Producer
Spark Streaming Examples
Mientras que en la segunda terminal arrancaremos el proceso que deberá consumir los mensajes que vayamos generando:
  # $SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 groupTest demo 1
Start Consumer
Por último, ya sólo faltaría ir generando mensajes en la ventana del producer y ver como son procesados en la del consumer:




jueves, 12 de febrero de 2015

[Cloudera Manager & Event Server] Error starting EventServer

Problema: Tras una caída inesperada del servidor en donde se encuentra desplegado el Event Server, éste no puede iniciarse satisfactoriamente.

   # more /var/log/cloudera-scm-eventserver/mgmt-cmf-mgmt-EVENTSERVER-cloudera1.log.out
   ...
   2015-02-07 11:27:45,483 ERROR com.cloudera.cmf.eventcatcher.server.EventCatcherService: Error starting EventServer
   java.lang.ArrayIndexOutOfBoundsException: 525
           at org.apache.lucene.search.FieldCacheImpl$LongCache.createValue(FieldCacheImpl.java:510)
           at org.apache.lucene.search.FieldCacheImpl$Cache.get(FieldCacheImpl.java:191)
           at org.apache.lucene.search.FieldCacheImpl.getLongs(FieldCacheImpl.java:478)
           at org.apache.lucene.search.FieldCacheImpl$LongCache.createValue(FieldCacheImpl.java:495)
           at org.apache.lucene.search.FieldCacheImpl$Cache.get(FieldCacheImpl.java:191)
           at org.apache.lucene.search.FieldCacheImpl.getLongs(FieldCacheImpl.java:478)
           at org.apache.lucene.search.FieldCacheImpl.getLongs(FieldCacheImpl.java:472)
           at com.cloudera.cmf.eventcatcher.server.SearcherManager.warm(SearcherManager.java:78)
           at com.cloudera.cmf.eventcatcher.server.SearcherManager.<init>(SearcherManager.java:59)
           at com.cloudera.cmf.eventcatcher.server.SingleIndexManager.<init>(SingleIndexManager.java:113)
           at com.cloudera.cmf.eventcatcher.server.EventCatcherService.<init>(EventCatcherService.java:229)
           at com.cloudera.cmf.eventcatcher.server.EventCatcherService.main(EventCatcherService.java:130)
   ...

Solución: Avisar que se perderá el histórico de salud y demás eventos registrados.

1) bash# mv /var/lib/cloudera-scm-eventserver /var/lib/cloudera-scm-eventserver-old

2) Reiniciar el Event Server. Cloudera Manager volverá a crear el directorio durante dicho reinicio.

lunes, 12 de enero de 2015

[Flume + MongoDB] Plugin Flume NG MongoDB Sink

[Flume + MongoDB] Plugin Flume NG MongoDB Sink
[Flume + MongoDB] Plugin Flume NG MongoDB Sink

Instalar Flume en CentOS:

Visitar Apache Flume - Downloads, copiar la URL de los binarios de la versión deseada y descargarla:
   bash# cd /usr/local
   bash# wget http://apache.rediris.es/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz

Extraer su contenido:
   bash# tar xzf apache-flume-1.5.2-bin.tar.gz
   bash# mv apache-flume-1.5.2-bin/ apache-flume-1.5.2
   bash# ln -s apache-flume-1.5.2/ flume
   bash# ls -al flume
   lrwxrwxrwx 1 root root 19 ene  9 16:52 flume -&gt; apache-flume-1.5.2/

Establecer las variables de entorno:
   bash# 
vim /etc/profile.d/flume.sh
   ... (añadir las siguientes líneas)
   export FLUME_HOME=/usr/local/flume
   export PATH=$PATH:$FLUME_HOME/bin

   bash# source /etc/profile

Editar la configuración:
   bash# cd flume
   bash# cp conf/flume-env.sh.template conf/flume-env.sh
   bash# vim conf/flume-env.sh
      ... (editar la siguiente línea)
   JAVA_HOME=/usr/java/default

Plugin Flume NG MongoDB Sink

En la propia web del plugin flume-ng-mongodb-sink sólo se nos listan los 5 pasos que deberemos dar para instalar y configurar el plugin en cuestión, pero sin darnos mayor detalle sobre los mismos.

Mi propósito a continuación es mostrarlo paso a paso.
  1. Clone the repository
    Recordar que en caso de estar detrás de un proxy deberemos exportar las variables oportunas:
    bash# git config --global http.proxy http://user:password@host:port
    bash# git config --global https.proxy https://user:password@host:port

    bash# cd /usr/local
    bash# git clone https://github.com/leonlee/flume-ng-mongodb-sink.git
  2. Install latest Maven and build source by 'mvn package'
    Visitar la web del proyecto Apache Maven => Downloads y copiar la URL de los binarios de la versión deseada.

    bash# 
    wget http://ftp.cixug.es/apache/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.tar.gz

    bash# tar xzf apache-maven-3.2.5-bin.tar.gz
    bash# 
    ln -s apache-maven-3.2.5 maven
    bash# ls -al maven

    lrwxrwxrwx 1 user group 18 ene  9 16:10 maven -&gt; apache-maven-3.2.5

    bash# 
    vim /etc/profile.d/maven.sh
    ... (añadir las siguientes líneas)
    export M2_HOME=/usr/local/maven
    export PATH=$PATH:$M2_HOME/bin


    bash# source /etc/profile
    bash# mvn -version     (verificamos su correcta instalación)
    Apache Maven 3.2.5...

    bash# 
    vi ~/.m2/settings.xml     (en caso de estar detrás de un proxy crear el siguiente fichero y modificarlo de acuerdo a nuestro entorno)
        
     <settings>
           <proxies>
             <proxy>
               <active>true</active>
               <protocol>http</protocol>
               <host>host.domain</host>
               <port>port</port>
               <username>username</username>
               <password>password</password>
               <nonProxyHosts></nonProxyHosts>
             </proxy>
           </proxies>
         </settings>


    bash# cd flume-ng-mongodb-sink
    bash# mvn package
    [INFO] Scanning for projects...
    [INFO]                                                                      
    [INFO] ------------------------------------------------------------------------
    [INFO] Building Flume NG MongoDB sink 1.0.0
    [INFO] ------------------------------------------------------------------------
    Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/plugins/maven-resources-plugin/2.6/maven-resources-plugin-2.6.pom
    ...
    [INFO] Building jar: /usr/local/flume-ng-mongodb-sink/target/flume-ng-mongodb-sink-1.0.0.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 31.738 s
    [INFO] Finished at: 2015-01-09T16:33:15+01:00
    [INFO] Final Memory: 14M/139M
    [INFO] ------------------------------------------------------------------------

    bash# 
  3. Generate classpath by 'mvn dependency:build-classpath'
    bash# mvn dependency:build-classpath
    [INFO] Scanning for projects...
    [INFO]                                                                         
    [INFO] ------------------------------------------------------------------------
    [INFO] Building Flume NG MongoDB sink 1.0.0
    [INFO] ------------------------------------------------------------------------
    [INFO] 
    [INFO] --- maven-dependency-plugin:2.8:build-classpath (default-cli) @ flume-ng-mongodb-sink ---
    [INFO] Dependencies classpath:
    /root/.m2/repository/org/apache/flume/flume-ng-sdk/1.3.0/flume-ng-sdk-1.3.0.jar:/root/.m2/repository/org/apache/avro/avro/1.7.2/avro-1.7.2.jar:/root/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar:/root/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar:/root/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/root/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/root/.m2/repository/org/apache/avro/avro-ipc/1.7.2/avro-ipc-1.7.2.jar:/root/.m2/repository/org/apache/velocity/velocity/1.7/velocity-1.7.jar:/root/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:/root/.m2/repository/commons-lang/commons-lang/2.4/commons-lang-2.4.jar:/root/.m2/repository/io/netty/netty/3.4.0.Final/netty-3.4.0.Final.jar:/root/.m2/repository/org/apache/flume/flume-ng-core/1.3.0/flume-ng-core-1.3.0.jar:/root/.m2/repository/org/apache/flume/flume-ng-configuration/1.3.0/flume-ng-configuration-1.3.0.jar:/root/.m2/repository/com/google/guava/guava/10.0.1/guava-10.0.1.jar:/root/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/root/.m2/repository/commons-io/commons-io/2.1/commons-io-2.1.jar:/root/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar:/root/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/root/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar:/root/.m2/repository/org/mortbay/jetty/servlet-api/2.5-20110124/servlet-api-2.5-20110124.jar:/root/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/root/.m2/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/root/.m2/repository/com/google/code/gson/gson/2.2.2/gson-2.2.2.jar:/root/.m2/repository/org/apache/mina/mina-core/2.0.4/mina-core-2.0.4.jar:/root/.m2/repository/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar:/root/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/root/.m2/repository/org/testng/testng/6.7/testng-6.7.jar:/root/.m2/repository/junit/junit/4.10/junit-4.10.jar:/root/.m2/repository/org/hamcrest/hamcrest-core/1.1/hamcrest-core-1.1.jar:/root/.m2/repository/org/beanshell/bsh/2.0b4/bsh-2.0b4.jar:/root/.m2/repository/com/beust/jcommander/1.12/jcommander-1.12.jar:/root/.m2/repository/org/yaml/snakeyaml/1.6/snakeyaml-1.6.jar:/root/.m2/repository/org/mongodb/mongo-java-driver/2.10.1/mongo-java-driver-2.10.1.jar:/root/.m2/repository/com/googlecode/json-simple/json-simple/1.1.1/json-simple-1.1.1.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 1.062 s
    [INFO] Finished at: 2015-01-09T16:35:47+01:00
    [INFO] Final Memory: 14M/211M

    [INFO] ------------------------------------------------------------------------
  4. Append classpath in $FLUME_HOME/conf/flume-env.sh

    bash# vim /usr/local/flume/conf
    /flume-env.sh
          ... (editar la siguiente línea)
       FLUME_CLASSPATH=/usr/local/flume-ng-mongodb-sink/target/classes/

  5. Add the sink definition according to Configuration
    bash# vim /usr/local/flume/conf/flume-conf.properties
    ...(añadir las siguientes líneas)
    agent1.sources = source1
    agent1.channels = channel1
    agent1.sinks = sinkMongo

    # Source config
    agent1.sources.source1.type = netcat
    agent1.sources.source1.bind = <serverName>
    agent1.sources.source1.port = 1982
    agent1.sources.source1.channels = channel1

    # Channel Config
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 1000000
    agent1.channels.channel1.transactionCapacity = 800
    agent1.channels.channel1.keep-alive = 3

    # Flumen NG MongoDB Sink Config
    agent1.sinks.sinkMongo.channel = channel1
    agent1.sinks.sinkMongo.type = org.riderzen.flume.sink.MongoSink
    agent1.sinks.sinkMongo.host = <serverName>
    agent1.sinks.sinkMongo.port = 27017
    agent1.sinks.sinkMongo.model = single
    agent1.sinks.sinkMongo.db = events
    agent1.sinks.sinkMongo.collection = events
    agent1.sinks.sinkMongo.batch = 100
    agent1.sinks.sinkMongo.timestampField = "yyyy-MM-dd HH:mm:ss"
  6. TEST
    Lo primero será tratar de levantar el agente Flume:
    bash# flume-ng agent -n agent1 -f /usr/local/flume/conf/flume-conf.properties -c /usr/local/flume/conf
    Info: Sourcing environment configuration script /usr/local/flume/conf/flume-env.sh
    + exec /usr/java/default/bin/java -Xmx20m -cp '/usr/local/flume/conf:/usr/local/flume/lib/*:/usr/local/flume-ng-mongodb-sink/target/classes' -Djava.library.path= org.apache.flume.node.Application -n agent1 -f /usr/local/flume/conf/flume-conf.properties


    Aparentemente el agente parece haberse levantado correctamente, pero para validarlo os aconsejo que examinéis el contenido del fichero /usr/local/flume-ng-mongodb-sink/logs/flume.log.

    bash# more /usr/local/flume-ng-mongodb-sink/logs/flume.log

    En caso de detectar el siguiente error: 

    "09 ene 2015 16:45:27,366 ERROR [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145)  - Failed to start agent because dependencies were not found in classpath. Error follows.
    java.lang.NoClassDefFoundError: com/mongodb/BasicDBObject"


    Deberemos copiar la librería del driver de MongoDB:

    bash# find / -name "*mongo*.jar"
    /root/.m2/repository/org/mongodb/mongo-java-driver/2.10.1/mongo-java-driver-2.10.1.jar
    bash# cp /root/.m2/repository/org/mongodb/mongo-java-driver/2.10.1/mongo-java-driver-2.10.1.jar /usr/local/flume/lib

    De nuevo tratamos de levantar al agente:
    bash# flume-ng agent -n agent1 -f /usr/local/flume/conf/flume-conf.properties -c /usr/local/flume/confInfo: Sourcing environment configuration script /usr/local/flume/conf/flume-env.sh
    + exec /usr/java/default/bin/java -Xmx20m -cp '/usr/local/flume/conf:/usr/local/flume/lib/*:/usr/local/flume-ng-mongodb-sink/target/classes' -Djava.library.path= org.apache.flume.node.Application -n agent1 -f /usr/local/flume/conf/flume-conf.properties


    bash# tail /usr/local/flume-ng-mongodb-sink/logs/flume.log...
    09 ene 2015 16:55:09,825 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.NetcatSource.start:164)  - Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/:1982]09 ene 2015 16:55:09,845 INFO  [lifecycleSupervisor-1-3] (org.riderzen.flume.sink.MongoSink.start:134)  - Started MongSink_0.

    Desde otra terminal abrimos una conexión (telnet o nc) contra el servidor y puerto configurado como origen de los datos. El formato del mensaje que enviemos deberá seguir la estructura JSON. Ejemplo:
    bash# nc <serverName> 1982
    {"evento":"Hello World!"}

    OK

    {"evento":"prueba", "severtidad":"INFO"}
    OK

    A continuación si abrimos una conexión con la NoSQL de MongoDB y examinamos el contenido de la base de datos "events" y collección "events", deberemos observar lo siguiente:
    bash# mongo <serverName>/events

    MongoDB shell version: 2.6.6
    connecting to: <serverName>/events

    show collections
    events
    system.indexes

    db.events.find()
    { "_id" : ObjectId("54b3a21fe4b0f77f70218bcb"), 
    "evento" : "Hello World!", ""yyyy-MM-dd HH:mm:ss"" : ISODate("2015-01-12T10:29:48.258Z") }{ "_id" : ObjectId("54b3a21fe4b0f77f70218bcc"), "evento" : "prueba", "severtidad" : "INFO", ""yyyy-MM-dd HH:mm:ss"" : ISODate("2015-01-12T10:29:48.258Z") }

jueves, 8 de enero de 2015

[Cloudera & Flume] WebService to HDFS

Flume
Flume


«Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application» +info.



Flume Web Service to HDFS
[Flume] Web Service to HDFS
En caso de no disponer del servicio Flume en nuestra plataforma Cloudera, lo primero que haremos será desplegarlo.
  1. Pulsaremos sobre la lista desplegable ubicada junto al nombre de nuestro clúster.
  2. Seleccionaremos la opción "Agregar un servicio".
  3. Una vez se nos abra el asistente para agregar un servicio, marcaremos la opción Flume y pulsaremos el botón "Continuar".
    Cloudera - Agregar Servicio
    Cloudera - Agregar Servicio
  4. A continuación seleccionaremos un conjunto de dependencias de acuerdo a nuestra arquitectura y pulsaremos "Continuar".
  5. Nuestro siguiente paso será decidir sobre qué servidor se desplegará y configurará el agente. De nuevo pulsaremos el botón "Continuar".
    Cloudera - Agregar Servicio
    Cloudera - Agregar Servicio
  6. Nuestro último paso dentro de este asistente será pulsar el botón de "Finalizar".
    Cloudera - Agregar Servicio
    Cloudera - Agregar Servicio
Una vez desplegado el servicio, es hora de configurarlo. Lo primero, pinchar sobre el servicio Flume.
Cloudera - Servicios
A continuación nos dirigiremos a su pestaña de "Configuración" y seleccionaremos la opción "Ver y editar".
Cloudera & Flume
Cloudera & Flume
 Ahora, pincharemos "Agent Default Group" del menú que nos aparecerá a la izquierda.
Cloudera & Flume
Cloudera & Flume
De la lista de campos que nos aparecerán, nos centraremos en el de "Archivo de configuración".
Cloudera & Flume - Configuración por Defecto
Cloudera & Flume - Configuración por defecto
En dicho campo podremos pegar la configuración de nuestro agente Flume en caso de disponer de una y sino podremos proceder a ello.

Antes de continuar, decir que esta configuración que se nos establece por defecto, permite registrar en el log de flume de nuestro servidor los mensajes locales recibidos a través del puerto 9999.

He aquí la demostración:
  1. Abrir una conexión SSH con el servidor donde hemos desplegado el agente de Flume.
  2. bash# tail /var/log/flume-ng/flume-cmf-flume-AGENT-<servername>.log
    2015-01-07 17:13:16,170 INFO org.apache.flume.node.Application: Starting Channel channel1
    2015-01-07 17:13:16,197 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.
    2015-01-07 17:13:16,197 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
    2015-01-07 17:13:16,197 INFO org.apache.flume.node.Application: Starting Sink sink1
    2015-01-07 17:13:16,197 INFO org.apache.flume.node.Application: Starting Source source1
    2015-01-07 17:13:16,198 INFO org.apache.flume.source.NetcatSource: Source starting
    2015-01-07 17:13:16,207 INFO org.apache.flume.source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]
    2015-01-07 17:13:16,217 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
    2015-01-07 17:13:16,241 INFO org.mortbay.log: jetty-6.1.26.cloudera.2
    2015-01-07 17:13:16,252 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
  3. bash# telnet 127.0.0.1 9999
    Trying 127.0.0.1...
    Connected to 127.0.0.1
    Escape character is '^]'.
    Hello World! <enter>
    OK
  4. bash# tail /var/log/flume-ng/flume-cmf-flume-AGENT-<servername>.log
    ...
    2015-01-07 17:13:16,252 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
    2015-01-07 17:20:44,360 INFO org.apache.flume.sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 0D          Hello World!. }
Ahora bien, como podemos ver el valor de la variable tier1.sources.source1.bind que Cloudera nos establece por defecto en el agente, es 127.0.0.1, vamos, Flume sólo atenderá a peticiones locales. Por lo tanto habremos de modificar su valor para que responda a través del interfaz deseado de nuestro servidor o bien establecer el valor de 0.0.0.0 para que responda por cualquiera de sus interfaces. Con esto lograremos poder volcar información desde cualquier otro punto de nuestra infraestructura en este sistema.

De igual modo procederemos con la variable tier1.sources.source1.port si deseamos modificar el puerto por el que responda nuestro agente.

Cualquier cambio que hagamos en este apartado de configuración, acordarse de pulsar sobre el botón "Guardar cambios" y si aplicara, reiniciar el servicio Flume.

Así pues, ya seríamos capaces de recibir mensajes a través de un puerto y registrarlos en nuestro sistema, pero NO volcarlos sobre nuestro HDFS.

Nuestro siguiente paso por lo tanto sería configurar el recurso denominado sink de nuestro agente para llevar acabo dicha acción.
  1. Configuraremos su tipo:tier1.sinks.sink1.type = hdfs
  2. Para que Flume almacene los eventos recibidos como texto en los archivos de nuestro HDFS:
    tier1.sinks.sink1.hdfs.fileType = DataStream
    tier1.sinks.sink1.hdfs.writeFormat = Text
  3. Definiremos el path de nuestro HDFS en donde queremos dejar registrados los eventos que recibiremos.
    tier1.sinks.sink1.hdfs.path = hdfs://<serverip>:8020/user/flume/syslog/%y%m%d


    Nota: Recordad crear y configurar los permisos del directorio deseado, en mi caso de /user/flume/syslog/

    En caso de parametrizar la variable 
    tier1.sinks.sink1.hdfs.path con algún campo o variable relacionada con timestamps (%Y = año, %y = sólo los dos últimos dígitos del año, %m = mes, %d = día, %H = hora, %M = minuto, %S = segundos, %s = segundos desde 1970-01-01 00:00:00 UTC) será recomendable también incluir la variable tier1.sinks.sink1.hdfs.useLocalTimeStamp a true pues sin ella y si los mensajes que enviemos carecen de la cabecera necesaria, nos dará el siguiente error:
    ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
    java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
  4. Estableceremos si así lo deseamos un prefijo para el nombre de los archivos:
    tier1.sinks.sink1.hdfs.filePrefix = events
  5. Salvamos la configuración y reiniciamos el servicio flume.
Por último para validar nuestra configuración:
  1. Abrir una conexión SSH con el servidor donde hemos desplegado el agente de Flume.
  2. bash# tail /var/log/flume-ng/flume-cmf-flume-AGENT-<servername>.log
    ...
    2015-01-07 17:16:09,184 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started

    2015-01-07 17:16:09,190 INFO org.apache.flume.source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]
    2015-01-07 17:16:09,200 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
    2015-01-07 17:16:09,224 INFO org.mortbay.log: jetty-6.1.26.cloudera.2

    2015-01-07 17:16:09,234 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
  3. bash# telnet 127.0.0.1 9999
    Trying 127.0.0.1...
    Connected to 127.0.0.1
    Escape character is '^]'.
    Hello World! <enter>
    OK
  4. bash# tail /var/log/flume-ng/flume-cmf-flume-AGENT-<servername>.log
    ...
    2015-01-07 17:16:09,234 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
    2015-01-07 17:17
    :00,261 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false

    2015-01-07 17:17:00,329 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs://<serverip>:8020/user/flume/syslog/150107/events.1420707900262.tmp
  5. bash# hadoop fs -ls /user/flume/syslog/150107/ Found 1 items
    -rw-r--r--   3 flume flume         14 2015-01-07 17:17 /user/flume/syslog/150107/events.1420707900262
  6. bash# hadoop fs -cat /user/flume/syslog/150107/events.1420707900262Hello World!
Aquí os dejo la captura de la configuración final:
Cloudera & Flume - Configuración WebService to HDFS
Cloudera & Flume - Configuración WebService to HDFS