Hello,

Suite à l'article sur la présentation de la pile Elastic et l'installation d'Elasticsearch que vous pouvez consulter ici, nous présenterons dans ce document, la mise en place de Logstash et sa configuration avec un fichier de rapport Nessus et un fichier d'accès Apache.

Logstash est disponible sous plusieurs plateformes. Nous utilisons le même serveur pour Elasticsearch et Kibana. De plus, Logstash est disponible pour plusieurs plateformes. Pour information si les évènements à collecter à Logstash sont sur un serveur distinct, il est recommandé d'y installer Beats [1] qui peut communiquer avec Logstash et/ou Elasticsearch.

 

Installation

Installer la clé de signature:

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | apt-key add -

L'excution de cette commande doit retourner : OK

 

Installer le package apt-transport-https:

apt-get install apt-transport-https

 

Ajouter cette ligne dans le fichier de dépôt de paquets /etc/apt/sources.list.d/elastic-5.x.list:

echo "deb https://artifacts.elastic.co/packages/5.x/apt stable main" | tee -a /etc/apt/sources.list.d/elastic-5.x.list

Nous obtenons : deb https://artifacts.elastic.co/packages/5.x/apt stable main

 

Effectuer une mise à jour puis un téléchargement de Logstash

apt-get update && apt-get install logstash

 

Démarrage - Statut Logstash

Pour démarrer Logstash au lancement de votre serveur (sous Debian, nous utilisons systemd):

/bin/systemctl enable logstash.service

Pour un système d'exploitation qui utilise sysV init :

update-rc.d logstash defaults 95 10

 

Pour démarrrer Logstash sous systemd:

systemctl start logstash.service

 

Pour un système qui utilise upstart, utiliser la commande:

initctl start logstash

 

Pour Sysv init, utiliser la commande:

/etc/init.d/logstash start

 

Afin de vérifier le statut de Logstash, utiliser la commande suivante qui doit retourner "active (running)":

systemctl status logstash.service

logstash.service - logstash

   Loaded: loaded (/etc/systemd/system/logstash.service; disabled)

Active: active (running) since Sat 2017-03-25 17:54:17 CET; 5s ago

Main PID: 1148 (java)

   CGroup: /system.slice/logstash.service

           └─1148 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfile.encoding=UTF-8 -XX:+HeapDumpOnOut...

Mar 25 17:54:17 elk systemd[1]: Started logstash.

 

 

Collecte des logs

Logstash utilise un "pipeline" afin de collecter les logs. Ce pipeline est constitué de trois éléments [2] : l'entrée, le filtre et la sortie.

  • L'entrée : sources de données à parser/collecter
  • Le filtre : paser les données
  • La sortie : le resultat du parse est enovyé dans une destination (Elasticsearch, CSV, email, etc)

 

Application de Logstash à Apache

Dans cet exemple nous allons collecter les évènements du fichier /var/log/apache2/access.log. Suite à l'explication ci-dessous concernant le pipeline Logstash:

  • Entrée = /var/log/apache2/access.log
  • Filtre = utilisation du plugin "grok" pour parser ce fichier. Précisement le pattern suivant "%{HTTPD_COMMONLOG}"

Pour lister les plugins [3] par défaut de Logstash disponibles sous Debian, utiliser la commande :

/usr/share/logstash/bin/logstash-plugin list

Pour plus d'informations sur grok et ses patterns, consulter [4] et [5].

  • Output = les données parsées sont envoyées à elasticsearch (dans notre exemple : 192.168.154.141:9200)

 

Ces trois éléments sont inscrits dans ce fichier : /etc/logstash/conf.d/apache_access.conf

input {

   file {

       path => '/var/log/apache2/access.log'

   }

}

filter {

   grok {

       match => { "message" => "%{COMBINEDAPACHELOG}"}

   }

  

}

output {

   elasticsearch {

       hosts => [ "192.168.154.141:9200" ]

   }

}

 

Enfin, nous donnons les droits d'accès au fichier /etc/logstash/conf.d/apache_access.conf au compte de service logstash:

gpasswd -a logstash adm

Puis arrêt et redemarrage de logstash. Ensuite, la commande suivante peut être executée afin de générer du trafic à destination de notre serveur web Apache:

for x in {1..10} ; do curl -s 192.168.154.141 ; done

 

Vérifications Elasticsearch

Suite au trafic généré, des logs ont été collectés, parsés et envoyés vers Elasticsearch. Nous pouvons donc vérifier qu'un index a bien été créé. Pour affichier la liste des index présents dans Elasticsearch, utiliser la commande curl '192.168.154.141:9200/_cat/indices?v' (atttendion modifier l'adresse IP par celle de votre serveur).

health status index               uuid                   pri rep docs.count docs.deleted store.size pri.store.size

yellow open   logstash-2017.03.25 09Vbl2EaT8uJJqOpWEKaFg   5   1         2           0     20.4kb        20.4kb

yellow open   .kibana             OrKL-oj9TsybY7hNoy-cNQ   1   1         1           0     3.1kb         3.1kb

L'index logstash-2017.03.25 a été créé.

 

Il est également possible de lister les requêtes vers le serveur web qui se sont bien terminées (code de retour 200) : curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=response=200'

{

"took" : 117,

"timed_out" : false,

"_shards" : {

   "total" : 5,

   "successful" : 5,

   "failed" : 0

},

"hits" : {

   "total" : 2,

   "max_score" : 0.39556286,

   "hits" : [

     {

       "_index" : "logstash-2017.03.25",

       "_type" : "logs",

       "_id" : "AVsGiw5egA8QiyyfhqRJ",

       "_score" : 0.39556286,

       "_source" : {

         "request" : "/",

         "agent" : "\"curl/7.38.0\"",

         "auth" : "-",

         "ident" : "-",

         "verb" : "GET",

         "message" : "192.168.154.141 - - [25/Mar/2017:18:37:07 +0100] \"GET / HTTP/1.1\" 200 10956 \"-\" \"curl/7.38.0\"",

         "path" : "/var/log/apache2/access.log",

         "referrer" : "\"-\"",

         "@timestamp" : "2017-03-25T17:37:08.164Z",

         "response" : "200",

         "bytes" : "10956",

         "clientip" : "192.168.154.141",

         "@version" : "1",

         "host" : "elk",

         "httpversion" : "1.1",

         "timestamp" : "25/Mar/2017:18:37:07 +0100"

       }

     },

     {

       "_index" : "logstash-2017.03.25",

       "_type" : "logs",

       "_id" : "AVsGipckgA8QiyyfhqRI",

       "_score" : 0.39556286,

       "_source" : {

         "request" : "/",

         "agent" : "\"curl/7.38.0\"",

         "auth" : "-",

         "ident" : "-",

         "verb" : "GET",

         "message" : "::1 - - [25/Mar/2017:18:36:36 +0100] \"GET / HTTP/1.1\" 200 10956 \"-\" \"curl/7.38.0\"",

         "path" : "/var/log/apache2/access.log",

         "referrer" : "\"-\"",

         "@timestamp" : "2017-03-25T17:36:37.117Z",

         "response" : "200",

         "bytes" : "10956",

         "clientip" : "::1",

         "@version" : "1",

         "host" : "elk",

         "httpversion" : "1.1",

         "timestamp" : "25/Mar/2017:18:36:36 +0100"

       }

     }

   ]

}

}

 

Application de Logstash à Nessus

Avant de réaliser cette étape. Il faudrait exporter le rapport Nessus au format CSV.

De même que précédemment, nous avons créé le fichier /etc/logstash/conf.d/nessus_report_csv.conf qui contient les trois éléments du pipeline Logstash.

  • Input = fichier CSV (nommé ici test.csv) obtenu en "parsant" un rapport CSV Nessus. Pour réaliser ce parse nous avons utilisé Powershell; précisement le module Import-csv:

Import-Csv apache-srv_35nil4.csv | select "Plugin ID", CVE, CVSS, Risk, Host, Protocol, Port, Name | convertto-csv -delimiter "," > test.csv

Dans cette commande, nous récuperons les champs "Plugin ID", CVE, CVSS, Risk, Host, Protocol, Port et Name du fichier apache-srv_35nil4.csv. Ensuite, nous convertissons le resultat en un autre fichier CSV (test.csv) qui a pour délimiteur la virgule.

 

Voici un extrait du contenu de test.csv:

#TYPE Selected.System.Management.Automation.PSCustomObject
"Plugin ID","CVE","CVSS","Risk","Host","Protocol","Port","Name"
"10107","","","None","192.168.154.141","tcp","80","HTTP Server Type and Version"
"10114","CVE-1999-0524","","None","192.168.154.141","icmp","0","ICMP Timestamp Request Remote Date Disclosure"

 

Ce fichier est ensuite copié dans un repertoire accessible en lecture par Logstash (pour cet exemple : /home/elk/nessus/).

  • Filter = plugin CSV qui permettra de récuperer le contenu des champs "Plugin ID", CVE, CVSS, Risk, Host, Protocol, Port et Name. Le séparateur est la virgule.
  • Output = le résultat est redirigé vers Elasticsearch pour indexation. De plus, nous avons donné un nom à cet index "nessus-%{+YYYY.MM.dd}" (le paramètre "YYYY.MM.dd" variera avec la date)

 

Le fichier /etc/logstash/conf.d/nessus_report_csv.conf contient alors les directives suivantes:

 input {
  file {
    path => "/home/elk/nessus/*.csv"
    start_position => "beginning"
  }
}
filter {
  csv {
    separator => ","
    #Plugin ID,CVE,CVSS,Risk,Host,Protocol,Port,Name,Synopsis,Description,Solution,See Also,Plugin Output
    columns => ["Plugin ID","CVE","CVSS","Risk","Host","Protocol","Port","Name"]
  }
}
output {
    elasticsearch {
        hosts => [ "192.168.154.141:9200" ]
        index => "nessus-%{+YYYY.MM.dd}"
    }
}

 

Conclusion

Nous avons mis en place Logstash afin de collecter, parser des évènements Apache ou Nessus. Ceux-ci ont ensuite été indexés par Elasticsearch. Nous verrons dans le prochain article le resultat de cet indexation via l'interface Kibana.

 

Take care!

Michel

 

Ressources

Liens consultés entre le 26/03/2017 et le 01/04/2017

1. https://www.elastic.co/products/beats

2. https://www.elastic.co/guide/en/logstash/current/first-event.html

3. https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html

4. https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

5. https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

6. https://www.linode.com/docs/databases/elasticsearch/visualizing-apache-webserver-logs-in-the-elk-stack-on-debian-8

Add comment

You are encouraged to comment. However, I will exercise my right to moderate and edit comments which are offensive or not constructive.


Security code
Refresh