lunes, 12 de enero de 2015

[Flume + MongoDB] Plugin Flume NG MongoDB Sink

[Flume + MongoDB] Plugin Flume NG MongoDB Sink
[Flume + MongoDB] Plugin Flume NG MongoDB Sink

Instalar Flume en CentOS:

Visitar Apache Flume - Downloads, copiar la URL de los binarios de la versión deseada y descargarla:
   bash# cd /usr/local
   bash# wget http://apache.rediris.es/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz

Extraer su contenido:
   bash# tar xzf apache-flume-1.5.2-bin.tar.gz
   bash# mv apache-flume-1.5.2-bin/ apache-flume-1.5.2
   bash# ln -s apache-flume-1.5.2/ flume
   bash# ls -al flume
   lrwxrwxrwx 1 root root 19 ene  9 16:52 flume -> apache-flume-1.5.2/

Establecer las variables de entorno:
   bash# 
vim /etc/profile.d/flume.sh
   ... (añadir las siguientes líneas)
   export FLUME_HOME=/usr/local/flume
   export PATH=$PATH:$FLUME_HOME/bin

   bash# source /etc/profile

Editar la configuración:
   bash# cd flume
   bash# cp conf/flume-env.sh.template conf/flume-env.sh
   bash# vim conf/flume-env.sh
      ... (editar la siguiente línea)
   JAVA_HOME=/usr/java/default

Plugin Flume NG MongoDB Sink

En la propia web del plugin flume-ng-mongodb-sink sólo se nos listan los 5 pasos que deberemos dar para instalar y configurar el plugin en cuestión, pero sin darnos mayor detalle sobre los mismos.

Mi propósito a continuación es mostrarlo paso a paso.
  1. Clone the repository
    Recordar que en caso de estar detrás de un proxy deberemos exportar las variables oportunas:
    bash# git config --global http.proxy http://user:password@host:port
    bash# git config --global https.proxy https://user:password@host:port

    bash# cd /usr/local
    bash# git clone https://github.com/leonlee/flume-ng-mongodb-sink.git
  2. Install latest Maven and build source by 'mvn package'
    Visitar la web del proyecto Apache Maven => Downloads y copiar la URL de los binarios de la versión deseada.

    bash# 
    wget http://ftp.cixug.es/apache/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.tar.gz

    bash# tar xzf apache-maven-3.2.5-bin.tar.gz
    bash# 
    ln -s apache-maven-3.2.5 maven
    bash# ls -al maven

    lrwxrwxrwx 1 user group 18 ene  9 16:10 maven -> apache-maven-3.2.5

    bash# 
    vim /etc/profile.d/maven.sh
    ... (añadir las siguientes líneas)
    export M2_HOME=/usr/local/maven
    export PATH=$PATH:$M2_HOME/bin


    bash# source /etc/profile
    bash# mvn -version     (verificamos su correcta instalación)
    Apache Maven 3.2.5...

    bash# 
    vi ~/.m2/settings.xml     (en caso de estar detrás de un proxy crear el siguiente fichero y modificarlo de acuerdo a nuestro entorno)
        
     <settings>
           <proxies>
             <proxy>
               <active>true</active>
               <protocol>http</protocol>
               <host>host.domain</host>
               <port>port</port>
               <username>username</username>
               <password>password</password>
               <nonProxyHosts></nonProxyHosts>
             </proxy>
           </proxies>
         </settings>


    bash# cd flume-ng-mongodb-sink
    bash# mvn package
    [INFO] Scanning for projects...
    [INFO]                                                                      
    [INFO] ------------------------------------------------------------------------
    [INFO] Building Flume NG MongoDB sink 1.0.0
    [INFO] ------------------------------------------------------------------------
    Downloading: https://repo.maven.apache.org/maven2/org/apache/maven/plugins/maven-resources-plugin/2.6/maven-resources-plugin-2.6.pom
    ...
    [INFO] Building jar: /usr/local/flume-ng-mongodb-sink/target/flume-ng-mongodb-sink-1.0.0.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 31.738 s
    [INFO] Finished at: 2015-01-09T16:33:15+01:00
    [INFO] Final Memory: 14M/139M
    [INFO] ------------------------------------------------------------------------

    bash# 
  3. Generate classpath by 'mvn dependency:build-classpath'
    bash# mvn dependency:build-classpath
    [INFO] Scanning for projects...
    [INFO]                                                                         
    [INFO] ------------------------------------------------------------------------
    [INFO] Building Flume NG MongoDB sink 1.0.0
    [INFO] ------------------------------------------------------------------------
    [INFO] 
    [INFO] --- maven-dependency-plugin:2.8:build-classpath (default-cli) @ flume-ng-mongodb-sink ---
    [INFO] Dependencies classpath:
    /root/.m2/repository/org/apache/flume/flume-ng-sdk/1.3.0/flume-ng-sdk-1.3.0.jar:/root/.m2/repository/org/apache/avro/avro/1.7.2/avro-1.7.2.jar:/root/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar:/root/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar:/root/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/root/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/root/.m2/repository/org/apache/avro/avro-ipc/1.7.2/avro-ipc-1.7.2.jar:/root/.m2/repository/org/apache/velocity/velocity/1.7/velocity-1.7.jar:/root/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:/root/.m2/repository/commons-lang/commons-lang/2.4/commons-lang-2.4.jar:/root/.m2/repository/io/netty/netty/3.4.0.Final/netty-3.4.0.Final.jar:/root/.m2/repository/org/apache/flume/flume-ng-core/1.3.0/flume-ng-core-1.3.0.jar:/root/.m2/repository/org/apache/flume/flume-ng-configuration/1.3.0/flume-ng-configuration-1.3.0.jar:/root/.m2/repository/com/google/guava/guava/10.0.1/guava-10.0.1.jar:/root/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/root/.m2/repository/commons-io/commons-io/2.1/commons-io-2.1.jar:/root/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar:/root/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/root/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar:/root/.m2/repository/org/mortbay/jetty/servlet-api/2.5-20110124/servlet-api-2.5-20110124.jar:/root/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/root/.m2/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/root/.m2/repository/com/google/code/gson/gson/2.2.2/gson-2.2.2.jar:/root/.m2/repository/org/apache/mina/mina-core/2.0.4/mina-core-2.0.4.jar:/root/.m2/repository/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar:/root/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/root/.m2/repository/org/testng/testng/6.7/testng-6.7.jar:/root/.m2/repository/junit/junit/4.10/junit-4.10.jar:/root/.m2/repository/org/hamcrest/hamcrest-core/1.1/hamcrest-core-1.1.jar:/root/.m2/repository/org/beanshell/bsh/2.0b4/bsh-2.0b4.jar:/root/.m2/repository/com/beust/jcommander/1.12/jcommander-1.12.jar:/root/.m2/repository/org/yaml/snakeyaml/1.6/snakeyaml-1.6.jar:/root/.m2/repository/org/mongodb/mongo-java-driver/2.10.1/mongo-java-driver-2.10.1.jar:/root/.m2/repository/com/googlecode/json-simple/json-simple/1.1.1/json-simple-1.1.1.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 1.062 s
    [INFO] Finished at: 2015-01-09T16:35:47+01:00
    [INFO] Final Memory: 14M/211M

    [INFO] ------------------------------------------------------------------------
  4. Append classpath in $FLUME_HOME/conf/flume-env.sh

    bash# vim /usr/local/flume/conf
    /flume-env.sh
          ... (editar la siguiente línea)
       FLUME_CLASSPATH=/usr/local/flume-ng-mongodb-sink/target/classes/

  5. Add the sink definition according to Configuration
    bash# vim /usr/local/flume/conf/flume-conf.properties
    ...(añadir las siguientes líneas)
    agent1.sources = source1
    agent1.channels = channel1
    agent1.sinks = sinkMongo

    # Source config
    agent1.sources.source1.type = netcat
    agent1.sources.source1.bind = <serverName>
    agent1.sources.source1.port = 1982
    agent1.sources.source1.channels = channel1

    # Channel Config
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.capacity = 1000000
    agent1.channels.channel1.transactionCapacity = 800
    agent1.channels.channel1.keep-alive = 3

    # Flumen NG MongoDB Sink Config
    agent1.sinks.sinkMongo.channel = channel1
    agent1.sinks.sinkMongo.type = org.riderzen.flume.sink.MongoSink
    agent1.sinks.sinkMongo.host = <serverName>
    agent1.sinks.sinkMongo.port = 27017
    agent1.sinks.sinkMongo.model = single
    agent1.sinks.sinkMongo.db = events
    agent1.sinks.sinkMongo.collection = events
    agent1.sinks.sinkMongo.batch = 100
    agent1.sinks.sinkMongo.timestampField = "yyyy-MM-dd HH:mm:ss"
  6. TEST
    Lo primero será tratar de levantar el agente Flume:
    bash# flume-ng agent -n agent1 -f /usr/local/flume/conf/flume-conf.properties -c /usr/local/flume/conf
    Info: Sourcing environment configuration script /usr/local/flume/conf/flume-env.sh
    + exec /usr/java/default/bin/java -Xmx20m -cp '/usr/local/flume/conf:/usr/local/flume/lib/*:/usr/local/flume-ng-mongodb-sink/target/classes' -Djava.library.path= org.apache.flume.node.Application -n agent1 -f /usr/local/flume/conf/flume-conf.properties


    Aparentemente el agente parece haberse levantado correctamente, pero para validarlo os aconsejo que examinéis el contenido del fichero /usr/local/flume-ng-mongodb-sink/logs/flume.log.

    bash# more /usr/local/flume-ng-mongodb-sink/logs/flume.log

    En caso de detectar el siguiente error: 

    "09 ene 2015 16:45:27,366 ERROR [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145)  - Failed to start agent because dependencies were not found in classpath. Error follows.
    java.lang.NoClassDefFoundError: com/mongodb/BasicDBObject"


    Deberemos copiar la librería del driver de MongoDB:

    bash# find / -name "*mongo*.jar"
    /root/.m2/repository/org/mongodb/mongo-java-driver/2.10.1/mongo-java-driver-2.10.1.jar
    bash# cp /root/.m2/repository/org/mongodb/mongo-java-driver/2.10.1/mongo-java-driver-2.10.1.jar /usr/local/flume/lib

    De nuevo tratamos de levantar al agente:
    bash# flume-ng agent -n agent1 -f /usr/local/flume/conf/flume-conf.properties -c /usr/local/flume/confInfo: Sourcing environment configuration script /usr/local/flume/conf/flume-env.sh
    + exec /usr/java/default/bin/java -Xmx20m -cp '/usr/local/flume/conf:/usr/local/flume/lib/*:/usr/local/flume-ng-mongodb-sink/target/classes' -Djava.library.path= org.apache.flume.node.Application -n agent1 -f /usr/local/flume/conf/flume-conf.properties


    bash# tail /usr/local/flume-ng-mongodb-sink/logs/flume.log...
    09 ene 2015 16:55:09,825 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.source.NetcatSource.start:164)  - Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/:1982]09 ene 2015 16:55:09,845 INFO  [lifecycleSupervisor-1-3] (org.riderzen.flume.sink.MongoSink.start:134)  - Started MongSink_0.

    Desde otra terminal abrimos una conexión (telnet o nc) contra el servidor y puerto configurado como origen de los datos. El formato del mensaje que enviemos deberá seguir la estructura JSON. Ejemplo:
    bash# nc <serverName> 1982
    {"evento":"Hello World!"}

    OK

    {"evento":"prueba", "severtidad":"INFO"}
    OK

    A continuación si abrimos una conexión con la NoSQL de MongoDB y examinamos el contenido de la base de datos "events" y collección "events", deberemos observar lo siguiente:
    bash# mongo <serverName>/events

    MongoDB shell version: 2.6.6
    connecting to: <serverName>/events

    show collections
    events
    system.indexes

    db.events.find()
    { "_id" : ObjectId("54b3a21fe4b0f77f70218bcb"), 
    "evento" : "Hello World!", ""yyyy-MM-dd HH:mm:ss"" : ISODate("2015-01-12T10:29:48.258Z") }{ "_id" : ObjectId("54b3a21fe4b0f77f70218bcc"), "evento" : "prueba", "severtidad" : "INFO", ""yyyy-MM-dd HH:mm:ss"" : ISODate("2015-01-12T10:29:48.258Z") }

jueves, 8 de enero de 2015

[Cloudera & Flume] WebService to HDFS

Flume
Flume


«Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application» +info.



Flume Web Service to HDFS
[Flume] Web Service to HDFS
En caso de no disponer del servicio Flume en nuestra plataforma Cloudera, lo primero que haremos será desplegarlo.
  1. Pulsaremos sobre la lista desplegable ubicada junto al nombre de nuestro clúster.
  2. Seleccionaremos la opción "Agregar un servicio".
  3. Una vez se nos abra el asistente para agregar un servicio, marcaremos la opción Flume y pulsaremos el botón "Continuar".
    Cloudera - Agregar Servicio
    Cloudera - Agregar Servicio
  4. A continuación seleccionaremos un conjunto de dependencias de acuerdo a nuestra arquitectura y pulsaremos "Continuar".
  5. Nuestro siguiente paso será decidir sobre qué servidor se desplegará y configurará el agente. De nuevo pulsaremos el botón "Continuar".
    Cloudera - Agregar Servicio
    Cloudera - Agregar Servicio
  6. Nuestro último paso dentro de este asistente será pulsar el botón de "Finalizar".
    Cloudera - Agregar Servicio
    Cloudera - Agregar Servicio
Una vez desplegado el servicio, es hora de configurarlo. Lo primero, pinchar sobre el servicio Flume.
Cloudera - Servicios
A continuación nos dirigiremos a su pestaña de "Configuración" y seleccionaremos la opción "Ver y editar".
Cloudera & Flume
Cloudera & Flume
 Ahora, pincharemos "Agent Default Group" del menú que nos aparecerá a la izquierda.
Cloudera & Flume
Cloudera & Flume
De la lista de campos que nos aparecerán, nos centraremos en el de "Archivo de configuración".
Cloudera & Flume - Configuración por Defecto
Cloudera & Flume - Configuración por defecto
En dicho campo podremos pegar la configuración de nuestro agente Flume en caso de disponer de una y sino podremos proceder a ello.

Antes de continuar, decir que esta configuración que se nos establece por defecto, permite registrar en el log de flume de nuestro servidor los mensajes locales recibidos a través del puerto 9999.

He aquí la demostración:
  1. Abrir una conexión SSH con el servidor donde hemos desplegado el agente de Flume.
  2. bash# tail /var/log/flume-ng/flume-cmf-flume-AGENT-<servername>.log
    2015-01-07 17:13:16,170 INFO org.apache.flume.node.Application: Starting Channel channel1
    2015-01-07 17:13:16,197 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.
    2015-01-07 17:13:16,197 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: channel1 started
    2015-01-07 17:13:16,197 INFO org.apache.flume.node.Application: Starting Sink sink1
    2015-01-07 17:13:16,197 INFO org.apache.flume.node.Application: Starting Source source1
    2015-01-07 17:13:16,198 INFO org.apache.flume.source.NetcatSource: Source starting
    2015-01-07 17:13:16,207 INFO org.apache.flume.source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]
    2015-01-07 17:13:16,217 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
    2015-01-07 17:13:16,241 INFO org.mortbay.log: jetty-6.1.26.cloudera.2
    2015-01-07 17:13:16,252 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
  3. bash# telnet 127.0.0.1 9999
    Trying 127.0.0.1...
    Connected to 127.0.0.1
    Escape character is '^]'.
    Hello World! <enter>
    OK
  4. bash# tail /var/log/flume-ng/flume-cmf-flume-AGENT-<servername>.log
    ...
    2015-01-07 17:13:16,252 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
    2015-01-07 17:20:44,360 INFO org.apache.flume.sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 0D          Hello World!. }
Ahora bien, como podemos ver el valor de la variable tier1.sources.source1.bind que Cloudera nos establece por defecto en el agente, es 127.0.0.1, vamos, Flume sólo atenderá a peticiones locales. Por lo tanto habremos de modificar su valor para que responda a través del interfaz deseado de nuestro servidor o bien establecer el valor de 0.0.0.0 para que responda por cualquiera de sus interfaces. Con esto lograremos poder volcar información desde cualquier otro punto de nuestra infraestructura en este sistema.

De igual modo procederemos con la variable tier1.sources.source1.port si deseamos modificar el puerto por el que responda nuestro agente.

Cualquier cambio que hagamos en este apartado de configuración, acordarse de pulsar sobre el botón "Guardar cambios" y si aplicara, reiniciar el servicio Flume.

Así pues, ya seríamos capaces de recibir mensajes a través de un puerto y registrarlos en nuestro sistema, pero NO volcarlos sobre nuestro HDFS.

Nuestro siguiente paso por lo tanto sería configurar el recurso denominado sink de nuestro agente para llevar acabo dicha acción.
  1. Configuraremos su tipo:tier1.sinks.sink1.type = hdfs
  2. Para que Flume almacene los eventos recibidos como texto en los archivos de nuestro HDFS:
    tier1.sinks.sink1.hdfs.fileType = DataStream
    tier1.sinks.sink1.hdfs.writeFormat = Text
  3. Definiremos el path de nuestro HDFS en donde queremos dejar registrados los eventos que recibiremos.
    tier1.sinks.sink1.hdfs.path = hdfs://<serverip>:8020/user/flume/syslog/%y%m%d


    Nota: Recordad crear y configurar los permisos del directorio deseado, en mi caso de /user/flume/syslog/

    En caso de parametrizar la variable 
    tier1.sinks.sink1.hdfs.path con algún campo o variable relacionada con timestamps (%Y = año, %y = sólo los dos últimos dígitos del año, %m = mes, %d = día, %H = hora, %M = minuto, %S = segundos, %s = segundos desde 1970-01-01 00:00:00 UTC) será recomendable también incluir la variable tier1.sinks.sink1.hdfs.useLocalTimeStamp a true pues sin ella y si los mensajes que enviemos carecen de la cabecera necesaria, nos dará el siguiente error:
    ERROR org.apache.flume.sink.hdfs.HDFSEventSink: process failed
    java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
  4. Estableceremos si así lo deseamos un prefijo para el nombre de los archivos:
    tier1.sinks.sink1.hdfs.filePrefix = events
  5. Salvamos la configuración y reiniciamos el servicio flume.
Por último para validar nuestra configuración:
  1. Abrir una conexión SSH con el servidor donde hemos desplegado el agente de Flume.
  2. bash# tail /var/log/flume-ng/flume-cmf-flume-AGENT-<servername>.log
    ...
    2015-01-07 17:16:09,184 INFO org.apache.flume.instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started

    2015-01-07 17:16:09,190 INFO org.apache.flume.source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:9999]
    2015-01-07 17:16:09,200 INFO org.mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
    2015-01-07 17:16:09,224 INFO org.mortbay.log: jetty-6.1.26.cloudera.2

    2015-01-07 17:16:09,234 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
  3. bash# telnet 127.0.0.1 9999
    Trying 127.0.0.1...
    Connected to 127.0.0.1
    Escape character is '^]'.
    Hello World! <enter>
    OK
  4. bash# tail /var/log/flume-ng/flume-cmf-flume-AGENT-<servername>.log
    ...
    2015-01-07 17:16:09,234 INFO org.mortbay.log: Started SelectChannelConnector@0.0.0.0:41414
    2015-01-07 17:17
    :00,261 INFO org.apache.flume.sink.hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false

    2015-01-07 17:17:00,329 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating hdfs://<serverip>:8020/user/flume/syslog/150107/events.1420707900262.tmp
  5. bash# hadoop fs -ls /user/flume/syslog/150107/ Found 1 items
    -rw-r--r--   3 flume flume         14 2015-01-07 17:17 /user/flume/syslog/150107/events.1420707900262
  6. bash# hadoop fs -cat /user/flume/syslog/150107/events.1420707900262Hello World!
Aquí os dejo la captura de la configuración final:
Cloudera & Flume - Configuración WebService to HDFS
Cloudera & Flume - Configuración WebService to HDFS