Power BI + Elasticsearch !!

Elasticsearch est un moteur open source NoSQL proposé par l’entreprise Elastic basé sur la technologie Apache Lucene : https://www.elastic.co/fr/products/elasticsearch

Il est très simple à installer et à utiliser. Si vous ne connaissez pas du tout Elastisearch, n’hésitez pas à suivre ce rapide tuto avant de continuer cet article : http://joelabrahamsson.com/elasticsearch-101/

Pour résumer, Elasticsearch permet de stocker des documents Json dans ce qu’on appelle des index, puis de les requêter via web service HTTP REST.

Et le top, c’est que Http REST + Json fait de Power BI un client nativement compatible avec Elasticsearch ! En effet, Power BI peut appeler des web services Http REST grâce à la fonction Web.Contents(), puis récupérer le résultat de la requête en Json et le parser en dataset (lignes-colonnes).

Principes de requêtage avec Elasticsearch

Pour l’article j’ai stocké 20 606 tweets contenant le mot “car” (voiture en anglais) dans un index appelé “test-index” (très mal nommé Smile ) :

Kopf_index

(J’utilise ici le plugin Koft pour visualiser mes index et administrer mon cluster Elasticsearch => https://github.com/lmenezes/elasticsearch-kopf)

Mon cluster Elasticsearch tourne en local et est accessible sur le port 9200 via l’url http://localhost:9200/. Pour récupérer les tweets, je vais ajouter le nom de mon index à l’url et utiliser la méthode “_search”. J’ajoute aussi “?pretty” pour mettre en forme le résultat. En copiant http://localhost:9200/test-index/_search?pretty dans mon navigateur, s’affichent des résultats sous forme de Json. Si l’on regarde bien je n’obtiens que 10 tweets sur les 20 606.

ES_search

Afin de récupérer plus d’éléments, nous allons utiliser les paramètres from (à partir de quel élément) et size (nombre d’éléments à récupérer), et faire plusieurs appels (requêtage par “page”). Par exemple, pour afficher 50 éléments par lot de 10, nous allons faire 5 appels avec les paramètres from et size suivants :

from size url
0 10 http://localhost:9200/test-index/_search?from=0&size=10
10 10 http://localhost:9200/test-index/_search?from=10&size=10
20 10 http://localhost:9200/test-index/_search?from=20&size=10
30 10 http://localhost:9200/test-index/_search?from=30&size=10
40 10 http://localhost:9200/test-index/_search?from=40&size=10

Il aurait été plus simple de faire qu’un seul appel avec size = 20 606, sauf que size est limité à 10 000. Nous allons donc devoir faire au moins 3 appels pour récupérer tous les tweets (10 000 + 10 000 + 606).

Attention ! Par défaut il y a une règle sur l’index qui empêche les paramètres size + from de dépasser 10 000. Pour y remédier, il faut modifier le paramètre “max_result_window” de l’index et lui affecter une plus grande valeur, avec une requête PUT :

PUT /test-index/_settings
{
    "index": {
        "max_result_window" : "1000000000"
        
    }
}

 

ES_max_result_window

Nous allons aussi ajouter dans notre appel REST la possibilité d’envoyer un body qui contiendra la query DSL (l’équivalent du SQL pour Elasticsearch), plus d’infos : https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html

2 petits exemple de query DSL

  • renvoie tout :
POST test-index/_search
{
   "query": {
     "bool": {
       "must": [
         {
           "match_all": { }
         }
       ]
     }
   }
}
  • recherche le mot “mustang” dans le texte des tweet :
POST test-index/_search
 {
  "query": {
     "bool": {
       "must": [
         {
           "match": {
             "text": "mustang"
           }
         }
       ]
     }
   }
 }

Utilisons Power BI

Nous allons créer dans Power BI Desktop une requête vide.

PowerBI_Requete_vide

Copier ensuite la requête M suivante dans l’éditeur avancé :

let
    url = "http://localhost:9200/test-index/_search",
    req = "{""query"": {""bool"": {""must"": [{""match_all"": {}}]}}}",

    totalnb = Json.Document(Web.Contents(url,[Content=Text.ToBinary(req)]))[hits][total],
    max_res = 10000,
    nb_page_temp = Int32.From(totalnb/max_res),
    rest = Number.Mod(totalnb , max_res ),

    nbpages = if Number.Mod(totalnb , max_res ) > 0 then nb_page_temp + 1 else nb_page_temp,

    Source = List.Generate(()=>nbpages, each _ > 0, each _ - 1),
    #"Converti en table" = Table.FromList(Source, Splitter.SplitByNothing(), null, null, ExtraValues.Error),
    #"Colonnes renommées" = Table.RenameColumns(#"Converti en table",{{"Column1", "page"}}),
    #"Personnalisée ajoutée" = Table.AddColumn(#"Colonnes renommées", "req", each Json.Document(Web.Contents(url & "?from=" & Text.From(([page]-1)*max_res) & "&size=" & Text.From(max_res),[Content=Text.ToBinary(req)]))),
    #"req développé" = Table.ExpandRecordColumn(#"Personnalisée ajoutée", "req", {"hits"}, {"req.hits"}),
    #"req.hits développé" = Table.ExpandRecordColumn(#"req développé", "req.hits", {"total", "max_score", "hits"}, {"total", "max_score", "hits"}),
    #"hits développé" = Table.ExpandListColumn(#"req.hits développé", "hits"),
    #"hits développé1" = Table.ExpandRecordColumn(#"hits développé", "hits", {"_index", "_type", "_id", "_score", "_source"}, {"_index", "_type", "_id", "_score", "_source"}),
    #"_source développé1" = Table.ExpandRecordColumn(#"hits développé1", "_source", {"contributors", "truncated", "text", "is_quote_status", "in_reply_to_status_id", "id", "favorite_count", "source", "retweeted", "coordinates", "timestamp_ms", "entities", "in_reply_to_screen_name", "id_str", "retweet_count", "in_reply_to_user_id", "favorited", "user", "geo", "in_reply_to_user_id_str", "possibly_sensitive", "lang", "created_at", "filter_level", "in_reply_to_status_id_str", "place", "retweeted_status", "display_text_range", "extended_entities", "quoted_status_id", "quoted_status", "quoted_status_id_str", "extended_tweet"}, {"_source.contributors", "_source.truncated", "_source.text", "_source.is_quote_status", "_source.in_reply_to_status_id", "_source.id", "_source.favorite_count", "_source.source", "_source.retweeted", "_source.coordinates", "_source.timestamp_ms", "_source.entities", "_source.in_reply_to_screen_name", "_source.id_str", "_source.retweet_count", "_source.in_reply_to_user_id", "_source.favorited", "_source.user", "_source.geo", "_source.in_reply_to_user_id_str", "_source.possibly_sensitive", "_source.lang", "_source.created_at", "_source.filter_level", "_source.in_reply_to_status_id_str", "_source.place", "_source.retweeted_status", "_source.display_text_range", "_source.extended_entities", "_source.quoted_status_id", "_source.quoted_status", "_source.quoted_status_id_str", "_source.extended_tweet"}),
    #"_source.user développé" = Table.ExpandRecordColumn(#"_source développé1", "_source.user", {"name", "profile_image_url"}, {"_source.user.name", "_source.user.profile_image_url"})
in
    #"_source.user développé"

Remplacer les valeurs des variables url et req par votre url de votre cluster ES et votre query DSL (en doublant les « ).

    url = "http://localhost:9200/test-index/_search",
    req = "{""query"": {""bool"": {""must"": [{""match_all"": {}}]}}}",

Le principe de la requête est le suivant :

  1. l’étape totalnb va requêter l’index pour récupérer le nombre total d’éléments retournés par la requête :   ES_search_nbtotal
  2. le nombre d’appels nécessaires (pages) va être calculé dans la variable nbpage (3 dans mon exemple).
  3. une liste est ensuite générée avec le décompte de 1 à nbpage des pages à réaliser : PowerBI_ES_Page
  4. on ajoute une colonne calculée qui va, pour chaque page, exécuter l’appel REST avec les bonnes valeurs des paramètres from et size : PowerBI_ES_Page_requete
  5. Il ne reste plus qu’à naviguer dans la structure du Json en cliquant sur les boutons développer PowerBI_Developper_bouton, aller rechercher les attributs nécessaires et réaliser le rapport : PowerBI_ES_Rapport On peut d’ailleurs vérifier dans le tableau en haut à gauche que les 3 pages ont bien été chargées avec 10 000, 10 000 et 606 éléments.

C’est plutôt simple à réaliser, et le parsing Json de Power BI est l’un des plus intuitifs que je connaisse.

Le gros plus: le rafraichissement de données fonctionne une fois le rapport déployé sur le portail Power BI Services ! Il utilisera la Gateway d’Entreprise si le cluster est on premise, ou alors pourra directement se connecter dessus si le cluster est ouvert sur internet (attention !) ou par exemple hébergé sur le cloud Elastic avec le module de sécurité Shield.

En bonus, la requête M fonctionne aussi dans Excel avec son module de requête (anciennement Power Query), ce qui peut être pratique pour de l’export de données par exemple.Excel_ES_Page_requete

Cette requête permet de s’affranchir de la limitation des 10 000 éléments, mais attention le but n’est pas d’exporter l’ensemble des données d’un index dans un rapport Power BI ! Il est recommandé d’ensuite utiliser les fonctions d’agrégations d’Elasticsearch (https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html) pour ramener moins de données et optimiser les temps de réponses, exactement comme on le ferait avec un SELECT GROUP BY et une base de données SQL classique.

FADATA

Fabien Adato est Consultant Data et BI chez AZEO. Après avoir intégré la société CGI Business Consulting où il rejoint une équipe dédiée à la Business Intelligence, il fait ses premières armes sur la solution Microsoft SQL Server. Pendant plus de 4 ans, il acquiert des compétences techniques et fonctionnelles sur toute la chaîne de valeur de la BI (ETL, Base de Données, Cube Olap et Rapport). Il a pu se spécialiser également sur de nouvelles technologies de la Data et notamment Hadoop, Pig et Hive, ainsi que des technologies self-service BI, le No-SQL comme la base de données MongoDB et le moteur d’indexation ElasticSearch. Il entre chez AZEO en 2014 pour y assurer le poste de Consultant DATA / BI, autour des technologies Microsoft SQL Server, Power BI et Azure Cortana Intelligence.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *