How to analyse CSV data with Elastic stack

👁 111

Hi!

Imagine that we have to analyse a big amount of users data. At this article there will be a data in CSV format.

For example, we can have a data in next format:

(0118) 008 0694 | 180504 | Hattersheim am Main | 6

First column is user’s phone number, second column is date in YYMMDD format, third is a city, and the last one is a billed amount for this phone number for this day.

It would be great to get some aggregated info by data/cities/etc. To perform this aggregations for a huge amount of data we can use ELK-stack.

What is ELK-stack? According to official website:

«ELK» is the acronym for three open source projects: Elasticsearch, Logstash, and Kibana. Elasticsearch is a search and analytics engine. Logstash is a server‑side data processing pipeline that ingests data from multiple sources simultaneously, transforms it, and then sends it to a «stash» like Elasticsearch. Kibana lets users visualize data with charts and graphs in Elasticsearch.

Well, we need to perform next steps:

  1. Download and install Elasticsearch, Logstash and Kibana;
  2. Configure Logstash to put data from CSV file to Elasticsearch index;
  3. Perform some magic queries to Elasticsearch to aggregate some data.

Great. Let’s find our three components at official page: elastic.co

Extract archives (zip/tar) to some directories (I extracted to ~/elk/kibana and etc. for example).

It’s not necessary to change default configuration for Kibana and Elasticsearch. Firstly, we should run Elasticsearch and Kibana after that:

~/elk/elasticsearch/bin/elasticsearch
~/elk/kibana/bin/kibana

So, after few seconds terminal will look like that:elkscreen

After starting Elasticsearch we can find some info about this instance at localhost with port 9200:

{
  "name" : "MacBook-Air-Yuri.local",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "EKex54PqTSOsU9GoE6nuZg",
  "version" : {
    "number" : "7.2.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "508c38a",
    "build_date" : "2019-06-20T15:54:18.811730Z",
    "build_snapshot" : false,
    "lucene_version" : "8.0.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

And at port 5601 we can find Kibana. Kibana connects to our running Elasticsearch instance. Let’s click Monitoring tab at the left and get some info about instance:

kibana

 

We’ve found that everything is OK with our ES and Kibana instances, so let’s configure Logstash and put some data into Elasticsearch index.

Q: What is an index?

A:  An index is like a table in a relational database. It has a mapping which contains a type, which contains the fields in the index. An index is a logical namespace which maps to one or more primary shards and can have zero or more replica shards.

Let’s configure pipeline. The Logstash event processing pipeline has three stages: inputs → filters → outputs. Inputs generate events, filters modify them, and outputs ship them elsewhere.

There is pipeline directory in Logstash installation path. So, there are pipelines configurations located. We can create example_pipeline.conf file.

input {
    file {
        path => "/home/usershome/somelocation/datadir/*.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    csv {
        separator => " | "
	columns => ["phone", "date", "city", "amount"]
    }
}
output {
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "cdr_index_v1"
    }
    stdout { }
}

Our target index is cdr_index_v1.

Now let’s tell Logstash that we prepared configuration for the new pipeline. There is also config directory in Logstash installation. We have to remove example configuration files and make some changes in pipelines.yml:

 - pipeline.id: example_pipeline
   pipeline.workers: 2
   path.config: "/Users/b00blik/elk/logstash/pipeline/example_pipeline.conf"

OK, our configuration is prepared. Now let’s go to logstash/bin directory and start logstash executable file. Something like following will be written during data process:

logstashAnd after that we can check our Index Management in Kibana:

kibana_index

 

Let’s go to «Dev Tools» tab and perform some query to select data:

query_by_city

We performed query to select data by city. hits->total->value is 2000, so ES returned to us 2000 documents. In hits array there are all selected documents.

Pay attention to amount field in documents – it contains string value instead of number and date is also string-typed instead of datetime.

To perform some aggregations we have to convert this data into specific type. Conversion can be performed by reindexing.

Using reindex API we can make a copy of source index with some changes to field types.

Firstly, we have to prepare ingest node. In Elasticsearch installation directory set following string in config/elasticsearch.yml file:

node.ingest: true

If it was false, restart an Elasticsearch.

Go to Dev Tools console. Let’s create pipeline for data converting:

PUT _ingest/pipeline/ingest-pipeline-Date
{
  "description": "convert Date",
  "processors": [
    {
      "date": {
        "field": "date",
        "target_field": "eventDate",
        "formats": ["yyMMdd"],
        "timezone": "UTC"
      },
      "convert": {
        "field": "amount",
        "type": "long"
      }
    }
  ]
}

And click «play» button to perform request. Answer will be like that:

{
  "acknowledged" : true
}

Next, let’s perform reindex request to make new index in ES with updated fields’ types:

POST _reindex?refresh=true
{
  "source": {
    "index": "cdr_index_v1"
  },
  "dest": {
    "index": "cdr_index_refiltered_v1",
    "pipeline": "ingest-pipeline-Date"
  }
}

A new index can be found in Kibana:

indices

Great! After querying to ES all docs will look like that:

{
        "_index" : "cdr_index_refiltered_v1",
        "_type" : "_doc",
        "_id" : "OBGy4GsB4cQZQeUNx0Pq",
        "_score" : 5.013716,
        "_source" : {
          "date" : "180503",
          "amount" : 9,
          "city" : "Norman",
          "message" : "(0171)313 8219 | 180503 | Norman | 9",
          "path" : "/Users/b00blik/elk/datadir/example.csv",
          "@timestamp" : "2019-07-11T11:02:21.152Z",
          "phone" : "(0171)313 8219",
          "@version" : "1",
          "host" : "Air-Yuri.Dlink",
          "eventDate" : "2018-05-03T00:00:00.000Z"
        }

Looks good. But this index exists only in Elasticsearch, and we have to tell Kibana that we have something new! We can do it by creating Index Pattern. Select «Index Patterns» in Kibana section at Management console and create Index Pattern:

ind_p1

At the second step Kibana asks us to select a field with timestamp. I selected nothing.ind_p2

Well, now we can make some visualizations. Open «Visualize» tab in Kibana and click «Create new visualization»:

vis1

After that, for example, let’s select «Horizontal bar» and our recently created Kibana index.

We can make visualization by cities. Axis can be changed in the left panel:

vis2

 

Another type of data analysis can be an aggregation. For example, we can aggregate an amount value for every city. Query will look like this:

POST cdr_index_refiltered_v1/_search?size=0
{
  "aggs": {
    "by_city": {
        "terms": {
          "field":"city.keyword"
        },
        "aggs": {
                    "sum_au": {
                      "sum": {
                        "field": "amount"
                      }
                    }
       }
    }
  }
}

And result of query will look like that:

{
  "took" : 70,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_city" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 274000,
      "buckets" : [
        {
          "key" : "Castelnovo del Friuli",
          "doc_count" : 4000,
          "sum_au" : {
            "value" : 20000.0
          }
        },
        {
          "key" : "Roveredo in Piano",
          "doc_count" : 4000,
          "sum_au" : {
            "value" : 24000.0
          }
        },
        {
          "key" : "Allentown",
          "doc_count" : 3000,
          "sum_au" : {
            "value" : 21000.0
          }
        },
        {
          "key" : "Devon",
          "doc_count" : 3000,
          "sum_au" : {
            "value" : 26000.0
          }
        },
        {
          "key" : "Flin Flon",
          "doc_count" : 3000,
          "sum_au" : {
            "value" : 16000.0
          }
        },
        {
          "key" : "Acciano",
          "doc_count" : 2000,
          "sum_au" : {
            "value" : 2000.0
          }
        },
        {
          "key" : "Alphen aan den Rijn",
          "doc_count" : 2000,
          "sum_au" : {
            "value" : 4000.0
          }
        },
        {
          "key" : "Alès",
          "doc_count" : 2000,
          "sum_au" : {
            "value" : 14000.0
          }
        },
        {
          "key" : "Aquila d'Arroscia",
          "doc_count" : 2000,
          "sum_au" : {
            "value" : 20000.0
          }
        },
        {
          "key" : "Bendigo",
          "doc_count" : 2000,
          "sum_au" : {
            "value" : 10000.0
          }
        }
      ]
    }
  }
}

Useful links:

  1. https://www.elastic.co/guide/en/elastic-stack/current/index.html
  2. https://www.elastic.co/guide/en/kibana/current/setup.html
  3. https://www.elastic.co/guide/en/kibana/current/visualize.html