Azure Data Factory : Il essaie de récupérer un secret Azure Key Vault dans un Pipeline, ça tourne mal !

Azure Key Vault (AKV) est un produit Azure assez pratique pour stocker de façon sécurisée des secrets (entre autres) et Azure Data Factory (ADF) permet nativement d’y accéder. Ca s’utilise généralement dans la configuration de nos linked services ADF, cf. la doc officielle pour voir comment faire : https://docs.microsoft.com/en-us/azure/data-factory/store-credentials-in-key-vault

On va donc pouvoir utiliser AKV pour gérer toute la configuration de nos ADF : les users, les mots de passe, les chaines de connexions, les chemins, les tokens…

Bon récupérer des secrets dans des linked services c’est cool…

…Mais maintenant, il y a des cas où ce n’est pas suffisant, et je vais vous montrer comment allez un peu plus loin, et récupérer des secrets directement dans un pipeline pour ensuite utiliser la valeur du secret dans n’importe quelle activité !

Il n’existe malheureusement pas, au moment où j’écris cet article, de moyen natif pour faire cela, par contre c’est possible via une simple activité web. Pour ce faire, on va falloir :

  1. S’assurer que le MSI de notre ADF a le bien droit de récupérer les secrets (GET) de votre AKV, voir le l’étape 2 de la doc ‘”In your key vault -> Access policies -> Add new -> search this managed identity application ID to grant Get permission in Secret permissions dropdown” et surtout oubliez pas de cliquer sur “save” pour enregistrer votre affectation :
  2. https://docs.microsoft.com/en-us/azure/data-factory/store-credentials-in-key-vault#steps

  3. Ajouter une activité web dans votre pipeline ADF avec la conf suivante :
    1. URL : l’url de votre secret, https://<AKVName>.vault.azure.net/secrets/<SecretName>?api-version=2016-10-01 en remplaçant <AKVName> par le nom de votre AKV et <SecretName> par le nom de votre secret
    2. Method : GET
    3. Authentication : MSI
    4. Resource : https://vault.azure.net

chrome_tINXRL6kx2

Vous pourrez ensuite récupérer la valeur du secret dans les activités suivantes et dans n’importe quelle expression avec la formule @activity(‘<NomDeVotreActiviteWeb>’).output.value, en remplaçant <NomDeVotreActiviteWeb> par le nom de votre activité web.

Si vous voulez sécuriser encore plus la valeur de votre secret je vous conseille d’aller dans l’onglet General de votre activité web et de cocher les paramètres “Secure output” et “Secure input”, sinon la valeur de votre secret sera affichée en clair dans le monitoring de votre pipeline. Ce qui n’est pas top si on va chercher un mot de passe par exemple Smile 

chrome_Y5dqX2NMIq

Voici au final, le code Json de votre activité web :

{
    "name": "GetSecret",
    "type": "WebActivity",
    "dependsOn": [],
    "policy": {
        "timeout": "7.00:00:00",
        "retry": 0,
        "retryIntervalInSeconds": 30,
        "secureOutput": true,
        "secureInput": true
    },
    "userProperties": [],
    "typeProperties": {
        "url": "https://<AKVName>.vault.azure.net/secrets/<SecretName>?api-version=2016-10-01",
        "method": "GET",
        "authentication": {
            "type": "MSI",
            "resource": "https://vault.azure.net"
        }
    }
}

Attention aussi avec l’activité “set value” de ADF, qui elle aussi affiche la valeur qu’on affecte dans une variable en clair dans la log. La sécurité s’est important !

Jupyter Notebooks : #LIFEHACK exécuter du SQL sur des Dataframes Pandas directement dans une cellule magic

Aujourd’hui petit article un peu spécial, dans lequel je vous propose de vous montrer comment requêter des Dataframes Pandas en SQL !

Pour ce faire, il va falloir utiliser en plus de Pandas la librairie pandasql (https://pypi.org/project/pandasql/). Elle est plutôt simple à utiliser, par exemple :

import pandas as pd
import numpy as np
from pandasql import sqldf

df_customer = pd.DataFrame([
                        [1, 'Customer A', '123 Street', np.nan],
                        [2, 'Customer B', '444 Street', '333 Street'],
                        [3, 'Customer C', '444 Street', '666 Street']
                        ], 
                columns=['ID', 'Customer', 'Billing Address', 'Shipping Address'])

query = "select * from df_customer"				
sqldf(query, globals())

Ainsi vous aller créer d’abord vos Dataframes (“df_customer” dans l’exemple), puis exécuter une requête SQL dans un string via la commande “sqldf()” qui vous retournera le résultat sous forme d’un nouveau Dataframe. Donc c’est top si vous n’avez pas le courage d’apprendre comment transformer vos données avec le langage Pandas, et que vous maitriser déjà le SQL Open-mouthed smile. Attention, pandasql utilise la syntaxe de sqlite (https://www.sqlite.org/lang.html), donc pour les adeptes de SQL Server, c’est proche mais ce n’est pas exactement du TSQL.

Maintenant pour rendre l’usage de cette fonctionnalité encore plus interactive, on va utiliser et développer une classe custom magic afin de pouvoir lancer du code SQL dans une cellule de nos notebooks grâce au mot clé %%sql.

Pour cela, on va donc d’abord créer un script de classe python à côté appelé “sqlpandas.py”, qui contiendra le bout de code suivant :

from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class
import pandasql as ps

@magics_class
class sqlpandas(Magics):

   @line_cell_magic  
   def sql(self, line, cell=None):
        return ps.sqldf(cell or line, get_ipython().user_ns)

ip = get_ipython()
ip.register_magics(sqlpandas)

Voila ensuite comment utiliser tout cela dans un notebook Jupyter :

Vous voyez que c’est plutôt simple à utiliser et que cela fonctionne, il est même possible de récupérer les résultats d’une cellule dans un autre Dataframe. Retrouvez tous les bouts de code dans mon github ici : https://github.com/fabienadato/magicSQLJupyter 

Plus d’info sur les classes custom magic : https://ipython.readthedocs.io/en/stable/config/custommagics.html

Databricks : CI/CD avec Azure DevOps et 3 méthodes de déploiement de notebooks en masse, la 3eme va vous étonner !

Avant de parler de méthode de déploiement, on va parler un peu de Databricks et de CI/CD.

Il n’y a pas toujours besoin de mettre en place du CI/CD, surtout lorsqu’on a des usages “self-service” et que l’on travaille directement production. Mais par exemple lorsqu’on a besoin d’automatiser des traitements (ça doit tourner tous les jours en automatique ou en continu via du streaming) et que l’on veut pas tout casser en production en cas d’erreur suite à une modification de code. Dans ces cas là, il vaut mieux travailler sur différents environnements identiques et indépendants. Et notamment au moins avec un environnement de dev pour pouvoir coder et tester son code tranquillement avant de modifier la prod. Beaucoup de personnes écrivent des livres entiers sur le sujet, mais nous on va simplifier et résumer le CI/CD à 2 choses :

  1. bien gérer la vie de son code grâce à un contrôleur de sources (un repo) => je ne perds plus de code, je trace les modifications, je travaille à plusieurs…
  2. déployer facilement et de façon automatisée le code dans les différents environnements => j’appuie sur un bouton et ça part en prod !

Databricks est compatible avec ces usages de développement et de déploiement en continue pour tous vos projets de data science et de data engineer.

Tout d’abord, il est possible de se connecter nativement à Github, Bitbucket Cloud ou dans Azure DevOps et de synchroniser ses notebooks automatiquement dans un repository GIT. Personnellement, j’utilise surtout Azure DevOps qui est simple à utiliser et sécurisé donc adapté à un usage en entreprise. Pour savoir comment mettre ca en place, la documentation Databricks est plutôt complète: https://docs.azuredatabricks.net/user-guide/notebooks/azure-devops-services-version-control.html#notebook-integration

Attention, il va  falloir mettre en place la synchro pour chaque notebook et après modification du notebook dans Databricks, surtout ne pas oublier de cliquer sur “Save now”  et mettre un commentaire (un vrai commentaire avec la description de vos modifications, pas de “zezg” ou “ghherhe” !) pour lancer un commit et synchroniser avec le script du repo, sinon bah ça fait rien Smile !

chrome_TdhduWhbxW

Une fois synchronisés, les notebooks se retrouvent alors dans le repo sous forme de fichiers textes Python, Scala, R ou SQL en fonction du type de notebook (même si ce notebook contient des blocks d’un autre langage).

rdjj1g4dYe

Tout ces principes de synchro seront à mettre en place dans le workspace Databricks de développement seulement, pas besoin pour les autres environnements.

Maintenant lorsque vos développements sont terminés, vous allez vouloir déployer vos notebooks sur d’autre workspaces Databricks pour l’UAT, la Préprod, la Prod … il est possible d’utiliser aussi Azure DevOps et ses pipelines de Build et de Release :

  • La partie Build va aller chercher les scripts dans le repo et générer un objet qu’on appelle un Artifact (l’équivalent d’un package de déploiement)
  • La partie Release elle va récupérer l’Artifact et le déployer dans le workspace Databricks de l’environnement souhaité.

POWERPNT_HuxGoRoZ8D

Sans rentrer trop dans les détails, le Build est simple, il suffit d’aller chercher les scripts dans le repo, la Release est un peu plus compliquée car il faut une méthode pour déployer les composants (code, notebooks…) et avec Azure DevOps il existe déjà des composants proposés par la communauté dans le marketplace :

Moi je vous propose une troisième solution de déploiement en masse de notebooks un peu plus “roots” via un script PowerShell :

Param(
    [string]$apiKey,
    [string]$rootDirectory,
    [string]$uriroot
)

Set-Location -Path $rootDirectory

# Create directory
foreach($directory in Get-ChildItem ".\" -Recurse -Directory)
{
    write-host $directory.FullName.Replace($rootDirectory,"")
    $body = @{
        path = $directory.FullName.Replace($rootDirectory,"").replace("\","/")
    }
    $bodyjson = (ConvertTo-Json $body)
    $headers = @{
        'Authorization' = "Bearer $apiKey"
    }
    $uri = "$uriroot/2.0/workspace/mkdirs"
    Invoke-RestMethod -Method 'Post' -Uri $uri -Headers $headers -Body $bodyjson
}

# Deploy Notebooks
foreach($file in Get-ChildItem ".\" -Recurse -File -Filter "*.*")
{
     
    write-host $file.FullName.Replace($rootDirectory,"")
    
    $EncodedText = [System.Convert]::ToBase64String([System.IO.File]::ReadAllBytes($file.FullName))
    
    $lng = ""
    if($file.Extension -eq ".py")
    {
        $lng = "PYTHON"
    }
    elseif($file.Extension -eq ".scala")
    {
        $lng = "SCALA"
    }
    elseif($file.Extension -eq ".sql")
    {
        $lng = "SQL"
    }
    elseif($file.Extension -eq ".r")
    {
        $lng = "R"
    }
    else {
        continue
    }
    $body = @{
        content = $EncodedText
        path = $file.FullName.Replace($rootDirectory,"").replace("\","/").replace($file.Extension,"")
        language = $lng
        overwrite = $TRUE
        format = "SOURCE"
    }
    $bodyjson = (ConvertTo-Json $body)
    $headers = @{
        'Authorization' = "Bearer $apiKey"
    }
    $uri = "$uriroot/2.0/workspace/import"
    Invoke-RestMethod -Method 'Post' -Uri $uri -Headers $headers -Body $bodyjson

}

L’intérêt de ce script PowerShell par rapport aux autres solutions est surtout qu’il fonctionnera aussi en dehors de Azure DevOps Smile

Les paramètres en entré sont :

En local il s’appelle de cette façon : .\deploynotebooks.ps1 -apiKey « <accesKey> » -rootDirectory « C:\pathTo\notebooks » -uriroot https://westeurope.azuredatabricks.net/api

Et dans Azure DevOps, mettez le script quelque part dans votre repo et vous pourrez passer par une tache d’exécution de script PowerShell de votre pipeline de release :

9IcLrYTvC6

Databricks : configurer le répertoire par défaut dataware

Par défaut lorsque l’on crée une table sans spécifier le paramètre location dans Databricks en SparkSQL, les fichiers de données sont stockée dans le DBFS interne du workspace Databricks configuré dans le metastore. On peut retrouver les fichiers de données dans le répertoire dbfs:/user/hive/warehouse/.

L’intérêt du répertoire par défaut géré par le metastore, c’est qu’il va automatiquement créer l’arborescence de dossiers/fichiers sur le stockage lorsque l’on crée une table et même gérer les partitions.

  • Table “normale” : dbfs:/user/hive/warehouse/<database_name>.db/<table_name>/
  • Table partitionnée : dbfs:/user/hive/warehouse/<database_name>.db/<table_name>/<partition_name>=<partition_key>/

Ce comportement est bien sûr un héritage du metastore Hive qui fonctionne de la même façon.

Or, ce n’est pas une bonne pratique de stocker de la données dans le DBFS interne car il n’est pas facilement accessible de l’extérieur ni compatible nativement avec d’autres technologie comme Azure Data Factory, Azure Stream Analytics ou Azure Function. Il vaut donc mieux utiliser à la place un autre stockage externe comme Azure Storage Blob ou Azure Data Lake Store. https://docs.azuredatabricks.net/user-guide/databricks-file-system.html#dbfs-root

On peut bien sûr spécifier le chemin de stockage de chaque table grâce à le mot clé LOCATION du CREATE TABLE, mais c’est un peu fastidieux et si l’on veut garder le fonctionnement automatique de création d’arborescence du metastore avec un stockage externe, il va falloir :

  1. Configurer les accès au stockage externe (blob ou adls)
  2. Changer la valeur de la configuration du répertoire par défaut spark.sql.warehouse.dir en mettant le chemin racine du stockage externe

Voici un exemple de config pour le blob storage :

spark.hadoop.fs.azure.account.key.<storageaccountname>.blob.core.windows.net <storagekey>
spark.sql.warehouse.dir wasb://<containeurname>@<storageaccountname>.blob.core.windows.net/<rep>

Cette configuration peut être directement paramétrée dans la config Spark d’un cluster :

ApplicationFrameHost_o4fxFwUZCY

Ainsi la configuration sera valable automatiquement pour tous les notebooks qui s’exécute sur le cluster et toutes les nouvelles tables créées dans une nouvelles database seront correctement stockées dans le bon stockage externe ici : wasb://<containeurname>@<storageaccountname>.blob.core.windows.net/<rep>/<database_name>.db/<table_name>/

Si la database existait déjà avant le changement de configuration vers le stockage externe, alors cela ne fonctionnera pas pour les nouvelles tables car elles continueront à utiliser le répertoire source de la base dbfs:/user/hive/warehouse/<database_name>.db/…

Voici un bout de code scala pour vérifier les locations des bases de données du metastore :

display(spark.catalog.listDatabases)

Et un bout de code SQL pour tester que cela fonctionne correctement :

create database if not exists testwasbdb;
create table if not exists testwasbdb.testtable (col1 int);
describe DETAIL testwasbdb.testtable;

chrome_sPO72pCORT

Databricks (SPARK) : réparer toutes les tables partitionnées d’un seul coup

Si vous avez des tables partitionnées dans votre metastore HIVE-Spark et que vos données sont écrites sur le stockage dans des sous-répertoires partitionnées (/<pathToTable>/<partitionName>=<value>/) par un autre traitement que Spark ou Hive (Azure Stream Analytics, Event Hub, Azure Function ….), par exemple :

  • /logs/day=2019-08-10/
  • /logs/day=2019-08-11/

Les tables partitionnées ne verront pas les données des nouveaux répertoires tant qu’il n’existera pas de partition spécifiquement crée pour ce sous répertoire.

Mais heureusement il est possible de recréer automatiquement toutes les partition d’une table partitionnée avec la commande MSCK REPAIR <TABLENAME>, ce qui est plutôt pratique mais c’est à faire à la main pour chaque table ! https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/using-hiveql/content/hive-msck-repair.html

Maintenant, si vous voulez réparer toutes les partitions de toutes les tables partitionnées d’un seul coup, je vous propose le script pyspark suivant qui boucle sur toutes les bases et toutes les tables afin de les réparer (seulement si elles sont partitionnées bien sûr) :

#Repair partition foreach tables

df_databases = spark.sql("show databases")

for database in df_databases.collect():
  dbname = database.databaseName
  print(dbname)
  query = "use " + dbname 
  spark.sql(query)
  df_tables = spark.sql("show tables")
  
  for table in df_tables.collect():
    tablename = table.tableName
    print("_" + tablename)
    query = "DESCRIBE DETAIL " + tablename
    
    try:
      df_desc = spark.sql(query)
      nb_partitionColumn = len(df_desc.select("partitionColumns").take(1)[0][0])
      if nb_partitionColumn > 0:
        print("__" + tablename + " : is a partition table")
        query = "MSCK REPAIR TABLE " + tablename
        spark.sql(query).collect()
      else:
        print("__" + tablename + " : is not a partition table")
    except:
      print("__" + tablename + " : error with the table")

Ce script est compatible avec tout ce qui lance du Spark et du PySpark, donc Azure Databricks ! Il peut aussi être utilisé lors d’une migration de HDInsight Spark vers Databricks ou d’un workspace Databricks vers un autre workspace :

  1. Vous rejouez les create table sur le workspace cible en faisant bien attention à faire pointer les locations des tables vers les stockages sources existants (Azure Blob / Azure Data Lake Store…)
  2. Vous jouez le script dans un notebook python
  3. Magie toutes les partitions sont recrées !

Power BI + Elasticsearch !!

Elasticsearch est un moteur open source NoSQL proposé par l’entreprise Elastic basé sur la technologie Apache Lucene : https://www.elastic.co/fr/products/elasticsearch

Il est très simple à installer et à utiliser. Si vous ne connaissez pas du tout Elastisearch, n’hésitez pas à suivre ce rapide tuto avant de continuer cet article : http://joelabrahamsson.com/elasticsearch-101/

Pour résumer, Elasticsearch permet de stocker des documents Json dans ce qu’on appelle des index, puis de les requêter via web service HTTP REST.

Et le top, c’est que Http REST + Json fait de Power BI un client nativement compatible avec Elasticsearch ! En effet, Power BI peut appeler des web services Http REST grâce à la fonction Web.Contents(), puis récupérer le résultat de la requête en Json et le parser en dataset (lignes-colonnes).

Principes de requêtage avec Elasticsearch

Pour l’article j’ai stocké 20 606 tweets contenant le mot “car” (voiture en anglais) dans un index appelé “test-index” (très mal nommé Smile ) :

Kopf_index

(J’utilise ici le plugin Koft pour visualiser mes index et administrer mon cluster Elasticsearch => https://github.com/lmenezes/elasticsearch-kopf)

Mon cluster Elasticsearch tourne en local et est accessible sur le port 9200 via l’url http://localhost:9200/. Pour récupérer les tweets, je vais ajouter le nom de mon index à l’url et utiliser la méthode “_search”. J’ajoute aussi “?pretty” pour mettre en forme le résultat. En copiant http://localhost:9200/test-index/_search?pretty dans mon navigateur, s’affichent des résultats sous forme de Json. Si l’on regarde bien je n’obtiens que 10 tweets sur les 20 606.

ES_search

Afin de récupérer plus d’éléments, nous allons utiliser les paramètres from (à partir de quel élément) et size (nombre d’éléments à récupérer), et faire plusieurs appels (requêtage par “page”). Par exemple, pour afficher 50 éléments par lot de 10, nous allons faire 5 appels avec les paramètres from et size suivants :

from size url
0 10 http://localhost:9200/test-index/_search?from=0&size=10
10 10 http://localhost:9200/test-index/_search?from=10&size=10
20 10 http://localhost:9200/test-index/_search?from=20&size=10
30 10 http://localhost:9200/test-index/_search?from=30&size=10
40 10 http://localhost:9200/test-index/_search?from=40&size=10

Il aurait été plus simple de faire qu’un seul appel avec size = 20 606, sauf que size est limité à 10 000. Nous allons donc devoir faire au moins 3 appels pour récupérer tous les tweets (10 000 + 10 000 + 606).

Attention ! Par défaut il y a une règle sur l’index qui empêche les paramètres size + from de dépasser 10 000. Pour y remédier, il faut modifier le paramètre “max_result_window” de l’index et lui affecter une plus grande valeur, avec une requête PUT :

PUT /test-index/_settings
{
    "index": {
        "max_result_window" : "1000000000"
        
    }
}

 

ES_max_result_window

Nous allons aussi ajouter dans notre appel REST la possibilité d’envoyer un body qui contiendra la query DSL (l’équivalent du SQL pour Elasticsearch), plus d’infos : https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html

2 petits exemple de query DSL

  • renvoie tout :
POST test-index/_search
{
   "query": {
     "bool": {
       "must": [
         {
           "match_all": { }
         }
       ]
     }
   }
}
  • recherche le mot “mustang” dans le texte des tweet :
POST test-index/_search
 {
  "query": {
     "bool": {
       "must": [
         {
           "match": {
             "text": "mustang"
           }
         }
       ]
     }
   }
 }

Utilisons Power BI

Nous allons créer dans Power BI Desktop une requête vide.

PowerBI_Requete_vide

Copier ensuite la requête M suivante dans l’éditeur avancé :

let
    url = "http://localhost:9200/test-index/_search",
    req = "{""query"": {""bool"": {""must"": [{""match_all"": {}}]}}}",

    totalnb = Json.Document(Web.Contents(url,[Content=Text.ToBinary(req)]))[hits][total],
    max_res = 10000,
    nb_page_temp = Int32.From(totalnb/max_res),
    rest = Number.Mod(totalnb , max_res ),

    nbpages = if Number.Mod(totalnb , max_res ) > 0 then nb_page_temp + 1 else nb_page_temp,

    Source = List.Generate(()=>nbpages, each _ > 0, each _ - 1),
    #"Converti en table" = Table.FromList(Source, Splitter.SplitByNothing(), null, null, ExtraValues.Error),
    #"Colonnes renommées" = Table.RenameColumns(#"Converti en table",{{"Column1", "page"}}),
    #"Personnalisée ajoutée" = Table.AddColumn(#"Colonnes renommées", "req", each Json.Document(Web.Contents(url & "?from=" & Text.From(([page]-1)*max_res) & "&size=" & Text.From(max_res),[Content=Text.ToBinary(req)]))),
    #"req développé" = Table.ExpandRecordColumn(#"Personnalisée ajoutée", "req", {"hits"}, {"req.hits"}),
    #"req.hits développé" = Table.ExpandRecordColumn(#"req développé", "req.hits", {"total", "max_score", "hits"}, {"total", "max_score", "hits"}),
    #"hits développé" = Table.ExpandListColumn(#"req.hits développé", "hits"),
    #"hits développé1" = Table.ExpandRecordColumn(#"hits développé", "hits", {"_index", "_type", "_id", "_score", "_source"}, {"_index", "_type", "_id", "_score", "_source"}),
    #"_source développé1" = Table.ExpandRecordColumn(#"hits développé1", "_source", {"contributors", "truncated", "text", "is_quote_status", "in_reply_to_status_id", "id", "favorite_count", "source", "retweeted", "coordinates", "timestamp_ms", "entities", "in_reply_to_screen_name", "id_str", "retweet_count", "in_reply_to_user_id", "favorited", "user", "geo", "in_reply_to_user_id_str", "possibly_sensitive", "lang", "created_at", "filter_level", "in_reply_to_status_id_str", "place", "retweeted_status", "display_text_range", "extended_entities", "quoted_status_id", "quoted_status", "quoted_status_id_str", "extended_tweet"}, {"_source.contributors", "_source.truncated", "_source.text", "_source.is_quote_status", "_source.in_reply_to_status_id", "_source.id", "_source.favorite_count", "_source.source", "_source.retweeted", "_source.coordinates", "_source.timestamp_ms", "_source.entities", "_source.in_reply_to_screen_name", "_source.id_str", "_source.retweet_count", "_source.in_reply_to_user_id", "_source.favorited", "_source.user", "_source.geo", "_source.in_reply_to_user_id_str", "_source.possibly_sensitive", "_source.lang", "_source.created_at", "_source.filter_level", "_source.in_reply_to_status_id_str", "_source.place", "_source.retweeted_status", "_source.display_text_range", "_source.extended_entities", "_source.quoted_status_id", "_source.quoted_status", "_source.quoted_status_id_str", "_source.extended_tweet"}),
    #"_source.user développé" = Table.ExpandRecordColumn(#"_source développé1", "_source.user", {"name", "profile_image_url"}, {"_source.user.name", "_source.user.profile_image_url"})
in
    #"_source.user développé"

Remplacer les valeurs des variables url et req par votre url de votre cluster ES et votre query DSL (en doublant les « ).

    url = "http://localhost:9200/test-index/_search",
    req = "{""query"": {""bool"": {""must"": [{""match_all"": {}}]}}}",

Le principe de la requête est le suivant :

  1. l’étape totalnb va requêter l’index pour récupérer le nombre total d’éléments retournés par la requête :   ES_search_nbtotal
  2. le nombre d’appels nécessaires (pages) va être calculé dans la variable nbpage (3 dans mon exemple).
  3. une liste est ensuite générée avec le décompte de 1 à nbpage des pages à réaliser : PowerBI_ES_Page
  4. on ajoute une colonne calculée qui va, pour chaque page, exécuter l’appel REST avec les bonnes valeurs des paramètres from et size : PowerBI_ES_Page_requete
  5. Il ne reste plus qu’à naviguer dans la structure du Json en cliquant sur les boutons développer PowerBI_Developper_bouton, aller rechercher les attributs nécessaires et réaliser le rapport : PowerBI_ES_Rapport On peut d’ailleurs vérifier dans le tableau en haut à gauche que les 3 pages ont bien été chargées avec 10 000, 10 000 et 606 éléments.

C’est plutôt simple à réaliser, et le parsing Json de Power BI est l’un des plus intuitifs que je connaisse.

Le gros plus: le rafraichissement de données fonctionne une fois le rapport déployé sur le portail Power BI Services ! Il utilisera la Gateway d’Entreprise si le cluster est on premise, ou alors pourra directement se connecter dessus si le cluster est ouvert sur internet (attention !) ou par exemple hébergé sur le cloud Elastic avec le module de sécurité Shield.

En bonus, la requête M fonctionne aussi dans Excel avec son module de requête (anciennement Power Query), ce qui peut être pratique pour de l’export de données par exemple.Excel_ES_Page_requete

Cette requête permet de s’affranchir de la limitation des 10 000 éléments, mais attention le but n’est pas d’exporter l’ensemble des données d’un index dans un rapport Power BI ! Il est recommandé d’ensuite utiliser les fonctions d’agrégations d’Elasticsearch (https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html) pour ramener moins de données et optimiser les temps de réponses, exactement comme on le ferait avec un SELECT GROUP BY et une base de données SQL classique.

Afficher un message d’erreur personnalisé dans la visualisation R de Power BI

Par défaut dans Power BI, lorsqu’un élément visuel de script R plante, un message d’erreur apparaît dans la zone du graphique avec la possibilité d’afficher les détails techniques de l’erreur :

PowerBI_R_error_thumb4

Cela peut être pratique pour débugger son code R lors du développement du rapport. Mais dans certains cas, les graphiques R peuvent fonctionner sur un ensemble de données, puis ne plus fonctionner dans d’autres cas lorsque l’on filtre dynamiquement les données (via des filtres dans le rapport).

Par exemple, une prédiction réalisée avec la librairie R forecast, prédisant les ventes de produits avec une saisonnalité à la semaine, peut planter s’il y a moins de 2 périodes dans le jeu de données. Le script R fonctionnera avec certains produits, mais il plantera pour ceux qui ne justifient pas de la condition de la fonction de forecast. Dans ce cas la on préférera afficher un message customisé (par exemple “Nous sommes désolé mais le forecast n’est pas disponible pour ce produit”) à l’utilisateur plutôt qu’une erreur technique R.

Nous allons donc utiliser 2 principes de codes R :

  • Un try catch pour catcher l’erreur technique. Vous noterez que contrairement à beaucoup d’autres langages, le try catch de R fonctionne sous forme d’une fonction tryCatch() :

result = tryCatch({

#code à réaliser

}, warning = function(w) {

#ce qui se passe en cas de warning

}, error = function(e) {

#ce qui se passe en cas d’erreur

}, finally = {

#ce qui se passe à la fin dans tous les cas

})

  • Un bout de code affichant un plot avec un message. L’idée est de créer une zone de graphique vide avec par() et plot(), puis d’y afficher un texte avec la fonction text() :

par(mar = c(0,0,0,0))

plot(c(0, 1), c(0, 1), ann = F, bty = 'n', type = 'n', xaxt = 'n', yaxt = 'n')

text(x = 0.5, y = 0.5, paste("Texte du message"), cex = 1.6, col = "black")

Voici le code complet :

result = tryCatch({

  #bout de code R

}, error = function(e) {

  par(mar = c(0,0,0,0))

  plot(c(0, 1), c(0, 1), ann = F, bty = 'n', type = 'n', xaxt = 'n', yaxt = 'n')

  text(x = 0.5, y = 0.5, paste("Nous sommes désolé mais le forecast n’est pas disponible pour ce produit"),

        cex = 1.6, col = "black")

})

Ce qui donne :

PowerBI_R_error_catch_thumb2

Plus d’infos sur le try catch en R : http://mazamascience.com/WorkingWithData/?p=912

Afficher une simple table dans la visualisation R de Power BI

Pour afficher une table avec notre jeu de données, il est possible d’utiliser le package R gridExtra et sa fonction grid.table(). C’est très simple à utiliser, il suffit de créer un élément visuel de script R, faire glisser les champs du modèle dans l’élément, puis de copier les lignes de code suivantes dans la partie code :

library(gridExtra)

grid.table(dataset[1:10,])

PowerBI_Table_Plot_zoom_thumb

PowerBI_Table_Plot_thumb2

Cet affichage ne semble pas adapté aux tables avec plus d’une dizaine de lignes, c’est pourquoi je n’affiche que les 10 premières lignes du dataset (dataset[1:10,]). A vous de trier le jeu de données pour afficher un TOP 10 par exemple.

Plus d’infos ici :

Utiliser un model ML entrainé en R avec PowerBI

Cet article fait suite au précédent article dans lequel j’explique comment envoyer un model entrainé dans Azure ML. Le principe sera plus ou moins le même ici sauf qu’il s’applique cette fois-ci à Power BI Smile .

Nous allons donc tout d’abord créer un modèle entrainé en R et l’enregistrer sur le disque. Reprenons par exemple le modèle de classification d’iris :

library(mda)

data(iris)

fit <- mda(Species~., data=iris)

save(fit, file = "C:/model.rda")

Assurez vous que les librairies R utilisées soient bien installées (la librairie utilisée dans cet exemple est la librairie “mda”).

Une fois le modèle enregistrer sur le disque, nous allons passer à la partie Power BI. Pour rappel, depuis quelques mois Power BI permet d’intégrer du code R à plusieurs endroits :

1 – Dans les requêtes :

  • En source de données

R_source_PowerBI

  • En étape de transformation

R_transform_PowerBI

2 – Dans les visualisations via “l’élément visuel de script R”

R_visual_PowerBI

Dans cet article, nous allons utiliser le mode étape de transformation R, vous vous demandez sans doute pourquoi ? Parce qu’il permet, contrairement aux autres modes source et visualisation, d’appliquer un traitement R sur n’importe quelle source Power BI (base de données, fichiers, R source…), puis d’utiliser tous les éléments visuels Power BI sur le jeu de données (natifs, custom, R visual…). Voici le processus :

    1. Tout d’abord ouvrir Power BI Desktop et charger les données à scorer dans une requête :
      R_PowerBI_Req1
    2. Cliquer ensuite sur “Exécuter un script R” dans l’onglet “Transformer” puis copier le code R qui charge le modèle et score les données :

      library(mda)

      load("C:/model.rda")

      pred <- predict(fit, dataset)

      out <- cbind(dataset,pred)

      out

      R_PowerBI_Req2

    3. Cliquer sur OK puis renseigner le niveau confidentialité (le niveau de confidentialité doit être le même pour les deux sources, “Public” par exemple) : PowerBI_niveau_confidentialite
    4. La requête listera en suite les dataframe présents dans le code R, cliquer sur “Table” en face du dataframe “out” pour naviguer dans le résultat du script : R_PowerBI_Req3
    5. Le jeu de données avec la colonne de prédiction apparaît alors : R_PowerBI_Req4

Vous n’avez plus qu’à enregistrer votre requête, puis réaliser les visuels qui vous intéressent. Par exemple une petite matrice de confusion :

R_PowerBI_Visual

Cette démo prouve que cela fonctionne en mode desktop. Prochaine étape : tester cela en mode Power BI Services (en ligne), et surtout que le rafraichissement des données fonctionne aussi… affaire à suivre !

Lancer un Batch Azure ML en PowerShell

Dans Azure ML, lorsque l’on veut scorer un important jeu de données de façon automatisé, il est recommandé d’utiliser le web service en mode “Batch”, plutôt que le mode “Request” (qui engendre beaucoup trop d’appel). La documentation des web services propose des exemples de codes en C#, R et Python… ce qui est très bien. Cependant, il manque le langage le plus important lorsque l’on cherche à scripter une tâche dans un univers Windows : le PowerShell !!!!! 

Avec le PowerShell, plus besoin d’installer Python/R sur le serveur ou de développer une appli console en C#, il est nativement installé dans les OS Windows. Un petit script “.ps1”, une ligne de commande, et le tour est joué. Pratique si l’on souhaite utiliser l’Agent SQL Server comme ordonnanceur par exemple.

Je me suis donc fortement inspiré du script Python. En voici la version PowerShell :

#copier la clé de l'API
$key='…'

#copier l'uri du mode batch (qui se termine par ‘/jobs’) sans la partie '?api-version=2.0'
$uri = '…'

#Soumet le job
$url = $uri + '?api-version=2.0'

$headers = @{'Authorization'='Bearer ' + $key}

#Renseigner le body si besoin..
$json ='{}'

$out_submit = Invoke-WebRequest -Uri  $url -Headers $headers -Method POST -ContentType 'application/json' -Body $json -TimeoutSec 0


#Demarre le job
$url = $uri + "/" + $out_submit.Content.Replace('"','') + "/start?api-version=2.0" 

$url

$out_start = Invoke-WebRequest -Uri $url -Headers $headers -Method POST -ContentType 'application/json'  -TimeoutSec 0

$out_start

#Boucle qui verifie le statut du job
$url = $uri + "/" + $out_submit.Content.Replace('"','') + "?api-version=2.0" 

$url

$StatutCode = ""
while (1 -eq 1) {
	$out_status = Invoke-WebRequest -Uri $url -Headers $headers  -TimeoutSec 0

	$res = $out_status.Content | ConvertFrom-Json
	$StatutCode = $res.StatusCode

	if  ($res.StatusCode -eq 0 -or $res.StatusCode -eq "NotStarted"){
		write-host "Job " $out_submit.Content.Replace('"','') " not yet started..."
	} 
	if  ($res.StatusCode -eq 1 -or $res.StatusCode -eq "Running"){
		write-host "Job " $out_submit.Content.Replace('"','') " running..."
	} 
	if  ($res.StatusCode -eq 2 -or $res.StatusCode -eq "Failed"){
		write-host "Job " $out_submit.Content.Replace('"','') " failed!"
		write-host "Error details : " $res.Details
	break
	} 
	if  ($res.StatusCode -eq 3 -or $res.StatusCode -eq "Cancelled"){
		write-host "Job " $out_submit.Content.Replace('"','') " cancelled!"
		break
	} 
	if  ($res.StatusCode -eq 4 -or $res.StatusCode -eq "Finished"){
		write-host "Job " $out_submit.Content.Replace('"','') " finished!"
		write-host "Result : " $res
		break
	} 
	Start-Sleep -Seconds 2
}

Il est bien sur possible de créer le même type de script pour le mode “Request”, mais à part pour du debug, j’y vois beaucoup moins d’intérêt.

Older posts