Databricks : Il se connecte aux API en mode Azure AD sans PAT tout le monde hallucine (PRANK)

De base pour se connecter à un workspace Databricks et utiliser les API, il faut utiliser un token appelé aussi PAT que l’on va passer dans le header des appels REST (cf. mon article sur le CI/CD et Databricks dans lequel j’utilise cette méthode : ici )

Voir ici pour s’authentifier avec un PAT de façon “classique” : https://docs.databricks.com/dev-tools/api/latest/authentication.html 

Seulement les problèmes avec ce PAT :

  • c’est d’abord qu’il est attaché à un utilisateur et à un workspace donc pour de l’administration et de la sécurité c’est pas top. Si l’utilisateur quitte l’entreprise par exemple on ne peut pas supprimer ou retirer les droits sur son compte sans tout casser.
  • de plus il n’est possible de le créer qu’à la main à partir de l’UI du workspace Databricks, ou via les API si on a déjà un PAT. Or, il n’y a pas de PAT disponible directement à la suite de la création d’un workspace. DONC c’est un gros problème si l’on cherche à automatiser une chaine complète dans un workflow CI/CD ! Par exemple, lors d’un déploiement d’un nouveau projet en production, nous ne pourrons pas créer le workspace et déployer dessus des notebooks ou configurer des clusters directement après, vu que nous n’avons pas de PAT Sad smile

Heureusement, il existe aujourd’hui un moyen de se connecter autrement qu’avec le PAT : directement avec une authentification Azure AD et un compte de service Azure qu’on appelle un service principal (SP). 

Dans cet article on va réaliser toutes les différentes actions en Powershell afin de montrer que cela est scriptable et intégrable dans Azure DevOps par exemple. Mais vu que nous sommes sur des techno REST, il est bien sûr possible de faire ça avec n’importe quel langage.

J’ajoute que dans cet article je me concentre sur l’authentification via Azure AD aux API, je pars du principe que la ressource Databricks existe déjà. Mais nous verrons dans de prochains articles comment automatiser la création des ressources et d’architectures complètes via des templates ARM ou Terraform.

C’est parti, il va donc falloir :

Créer un SP dans Azure AD et enregistrer son application ID et son secret key :

# Connect to Azure
Connect-AzAccount

# Create SP if not exists
if ($null -eq (Get-AzADServicePrincipal -DisplayName $ServicePrincipleName)) {
    $mySP = New-AzADServicePrincipal -DisplayName $servicePrincipleName -ErrorAction 'Stop'
    $ClientId = $mySP.ApplicationId
    $ClientSecret = [pscredential]::new($servicePrincipleName, $mySP.Secret).GetNetworkCredential().Password
}

Mettre le service principal en owner (propriétaire) du workspace Databricks :

# Set RBAC on Databricks workspace

New-AzRoleAssignment -ApplicationId $ClientId `
    -RoleDefinitionName "Owner" `
    -ResourceGroupName $ResourceGroupName `
    -ResourceName  $WorkspaceName `
    -ResourceType "Microsoft.Databricks/workspaces"

Se connecter au service au workspace Databricks avec le service principal, pour cela il va falloir dans un premier temps faire 2 appels REST au à l’entité d’authentification Azure avec votre SP : https://login.microsoftonline.com/<TenantId>/oauth2/token et récupérer 2 tokens:

– un token pour le service correspondant à l’application AD du service Databricks enregistré nativement dans l’Azure AD de votre tenant. Généralement, cette application a l’id 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d, mais il est aussi possible de retrouver cet id dans Azure AD en recherchant l’application AzureDatabricks :

VUw7BcTyBm

– un token pour le service de gestion Azure https://management.core.windows.net/ (qui sert d’ailleurs pour toutes les connexions classiques aux API Azure)

# Get AzureDatabricks app token
$RequestAccessTokenUri = "https://login.microsoftonline.com/$TenantId/oauth2/token"
$Resource = "https://management.core.windows.net/"
$DBXressource = "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d" # CF. AzureDatabricks AzureAD application 

$body = "grant_type=client_credentials&client_id=$ClientId&client_secret=$ClientSecret&resource=$DBXressource"

$Token = Invoke-RestMethod -Method Post -Uri $RequestAccessTokenUri -Body $body -ContentType 'application/x-www-form-urlencoded'

Write-Host "Print Token" -ForegroundColor Green
Write-Output $Token.access_token
$apiKey = $Token.access_token

# Get Azure Management token

$bodyManagement = "grant_type=client_credentials&client_id=$ClientId&client_secret=$ClientSecret&resource=$Resource"

$Token = Invoke-RestMethod -Method Post -Uri $RequestAccessTokenUri -Body $bodyManagement -ContentType 'application/x-www-form-urlencoded'

Write-Host "Print Token" -ForegroundColor Green
Write-Output $Token.access_token
$apiKeyManagement = $Token.access_token

Enfin avec ces 2 tokens, vous allez pouvoir faire des appels aux API Databricks en passant ces tokens dans le header de vos appels :

# Call Azure Databricks API

$headers = @{
    "Authorization"="Bearer $apiKey";
    "X-Databricks-Azure-SP-Management-Token"=$apiKeyManagement;
    "X-Databricks-Azure-Workspace-Resource-Id"="/subscriptions/$SubscriptionId/resourceGroups/$ResourceGroupName/providers/Microsoft.Databricks/workspaces/$WorkspaceName"
}
$uri = "$uriroot/2.0/dbfs/list?path=/"
Invoke-RestMethod -Method 'Get' -Uri $uri -Headers $headers 

Voici le script Powershell complet :

# Variables
$dbxurl = "https://westeurope.azuredatabricks.net" # Modify the region : westeurope, northeurope...
$uriroot = "$dbxurl/api" 

$ServicePrincipleName = "MySP"
$TenantId = "" # Enter Tenant Id.
$ClientId = "" # Enter Client Id if exists
$ClientSecret = "" # Enter Client Secret if exists
$SubscriptionId = "" # Enter Subscription ID
$DBXressource = "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d" # CF. AzureDatabricks AzureAD application 
$ResourceGroupName = "" # Enter the RG name where the Databricks ressource is
$WorkspaceName = "" # Enter the name of the Databricks ressource
$Resource = "https://management.core.windows.net/"

$RequestAccessTokenUri = "https://login.microsoftonline.com/$TenantId/oauth2/token"

# Connect to Azure
Connect-AzAccount

# Create SP if not exists
if ($null -eq (Get-AzADServicePrincipal -DisplayName $ServicePrincipleName)) {
    $mySP = New-AzADServicePrincipal -DisplayName $servicePrincipleName -ErrorAction 'Stop'
    $ClientId = $mySP.ApplicationId
    $ClientSecret = [pscredential]::new($servicePrincipleName, $mySP.Secret).GetNetworkCredential().Password
}

# Set RBAC on Databricks workspace

New-AzRoleAssignment -ApplicationId $ClientId `
    -RoleDefinitionName "Owner" `
    -ResourceGroupName $ResourceGroupName `
    -ResourceName  $WorkspaceName `
    -ResourceType "Microsoft.Databricks/workspaces"

# Get AzureDatabricks app token

$body = "grant_type=client_credentials&client_id=$ClientId&client_secret=$ClientSecret&resource=$DBXressource"

$Token = Invoke-RestMethod -Method Post -Uri $RequestAccessTokenUri -Body $body -ContentType 'application/x-www-form-urlencoded'

Write-Host "Print Token" -ForegroundColor Green
Write-Output $Token.access_token
$apiKey = $Token.access_token

# Get Azure Management token

$bodyManagement = "grant_type=client_credentials&client_id=$ClientId&client_secret=$ClientSecret&resource=$Resource"

$Token = Invoke-RestMethod -Method Post -Uri $RequestAccessTokenUri -Body $bodyManagement -ContentType 'application/x-www-form-urlencoded'

Write-Host "Print Token" -ForegroundColor Green
Write-Output $Token.access_token
$apiKeyManagement = $Token.access_token

# Call Azure Databricks API

$headers = @{
    "Authorization"="Bearer $apiKey";
    "X-Databricks-Azure-SP-Management-Token"=$apiKeyManagement;
    "X-Databricks-Azure-Workspace-Resource-Id"="/subscriptions/$SubscriptionId/resourceGroups/$ResourceGroupName/providers/Microsoft.Databricks/workspaces/$WorkspaceName"
}
$uri = "$uriroot/2.0/dbfs/list?path=/"
Invoke-RestMethod -Method 'Get' -Uri $uri -Headers $headers 

Et voila !

Une bonne pratique par la suite est d’aller générer un PAT Databricks classique avec cette méthode via l’API “api/2.0/token/create” (la doc ici) et l’enregistrer dans un secret Azure Key Vault pour que d’autre applicatifs l’utilisent comme Azure Data Factory par exemple.

Pour comprendre ce mode de connexion, j’ai du faire du reverse engineering sur le module Powershell azure.databricks.cicd.tools qui propose aussi une authentification Azure AD. Je vous recommande d’ailleurs vraiment ce module qui est très pratique : https://www.powershellgallery.com/packages/azure.databricks.cicd.tools/

Databricks : TOP 3 des façons de créer un environnement de développement ou de test !

Si vous êtes intéressés par Databricks et que vous souhaitez vous y mettre, il existe aujourd’hui plusieurs façons de se créer un environnement de test et de développement :

  1. Utiliser Azure Databricks directement
  2. Utiliser Databricks Community Edition 
  3. Installer un environnement local sur votre ordinateur

Bien sur toutes ces solutions ont chacune leurs avantages et inconvénients, et c’est que nous allons voir dans cet article !

Azure Databricks

Azure Databricks sera la version la plus complète et la plus proche de ce que vous pourrez faire réellement avec Databricks dans Azure.

Pour le créer, il vous faudra d’abord une souscription Azure, soit celle de votre entreprise, soit un version trial (=> https://azure.microsoft.com/fr-fr/offers/ms-azr-0044p/). Vous allez ensuite créer une ressource Azure Databricks dans votre souscription et vous aurez accès à un workspace. Vous pourrez en plus créer tous les autres composants Azure dont vous aurez besoins comme Azure Data Lake Store, Azure Data Factory, Azure Key Vault…

Avantages :

  • Simple à créer : il suffit de créer une ressource Azure Databricks
  • C’est la seule solution qui permet une expérience et une intégration complète avec Azure enrichissant fortement Databricks :
      • Azure Data Factory => ELT
      • Azure AD => Sécurisation des utilisateurs
      • Azure DevOps => CI/CD
      • Azure Key Vault => Sécurisation des connexions
  • Si vous créez votre workspace dans la même région Azure que vos données, vous ne payerez pas de coût de sorti de vos données.
  • Complètement scalable avec toutes les créations de clusters possibles : autant de taille et de noeuds que l’on veut, des clusters dynamiques, interactifs…
  • Utilisation des Token et donc toutes les fonctionnalités d’automatisation via les API ou autre sont possibles.

Inconvénients : 

  • La solution la plus couteuse, car elle est payante dès la création d’un cluster et que l’on veut exécuter un notebook. En même temps, c’est la plus scalable et donc forcément si vous voulez essayer de gros cluster pour traiter de grosses volumétries, cela se paye.

Databricks Community Edition

En dehors de Azure, Databricks propose un environnement très proche de la version Azure que l’on peut utiliser gratuitement : https://databricks.com/product/faq/community-edition

Ces workspaces sont hébergés sur AWS et vous donnent accès à de “petits” cluster de 1 noeud avec quelques Go de RAM (6 Go il me semble).

Il vous suffit de vous inscrire avec un simple mail (ca marche même avec un gmail !) et de vous connecter avec : https://community.cloud.databricks.com/login.html 

Avantages :

  • Simple à créer : juste un mail et quelques clics.
  • La plus part des fonctionnalités Databricks sont disponibles : le runtime Spark Databricks complet, l’expérience de développement du workspace complet (notebooks, metastore, gestion des librairies…)
  • C’est totalement gratuit.

Inconvénients :

  • Databricks Community tourne sur AWS sur de petits clusters, donc attention aux volumétries :
    • les sorties de données d’Azure sont payantes si vous attaquez des données
      Azure en source
    • l’environnement n’est pas scalable donc pas possible d’attaquer de
      réelles volumétries
  • Il n’est pas possible de générer des Token PAT donc :
    • pas de fonctionnalités d’administration
    • pas d’accès aux API
    • pas d’interaction avec Azure Data Factory et autres produits “externes”.

Environnent local

Databricks étant une solution utilisant le moteur Spark, vous pouvez aussi installer sur votre propre ordinateur une version standalone de Spark open-source (communauté) et d’autres composants comme Anaconda et Jupyter vous permettant de simuler des notebooks, comme dans un vrai workspace Databricks.

Spark peut s’installer sur Windows, Linux et Mac OSX. Personnellement je suis sur Windows 10 et je préfère utiliser Windows Bash (WSL 2) et tout installer dessus plutôt que des versions Windows. Car généralement les documentations que l’on va trouver sur le sujet sur internet sont sur Linux, et cela garanti une plus grande compatibilité avec d’autres briques de l’écosystème, comme Hadoop ou Blobfuse pour monter un chemin réseau vers un blob par exemple.

Vous pouvez aussi utiliser des images Docker si vous voulez et si vous maitrisez la création d’image et de conteneur, car les images déjà existantes avec tous les composants déjà installés sont rares.

Avantages :

  • C’est totalement gratuit : juste le prix de votre ordinateur… mais en fait si votre ordi c’est un MacBook ou un serveur à 16 Cores et 128Go de RAM, on va dire que c’est pas si gratuit que ça Open-mouthed smile
  • Vos réalisations restent en local sur votre ordinateur, et vous pouvez les synchroniser avec un repo GIT comme n’importe quel documents et fichiers.

Inconvénients :

  • Assez compliqué à installer et à maintenir, il faut tout faire soit-même, les documentations d’installation des composants ne sont pas toujours simple à prendre en main.
  • Même problématique que pour la version Community de Databricks, donc attention aussi aux volumétries :
      • les sorties de données d’Azure sont payantes si vous attaquez des données Azure en source
      • l’environnement n’est pas scalable donc pas possible d’attaquer de réelles volumétries
  • Pas de partage de notebook possible en mode web.
  • Pas complètement compatible avec la version Spark de Databricks, qui propose des fonctionnalité en avance non présentes dans la version open-source de Spark.

Conclusion

Pour un projet complet avec pour objectif d’aller jusqu’en production et/ou avec de grosses volumétries et/ou une compatibilité avec d’autres outils Azure comme Azure Data Factory, Azure Key Vault.. : Azure Databricks

Pour de l’apprentissage et découvrir Databricks gratuitement : Databricks Community Edition

Pour ceux qui veulent creuser la version Spark open-source, et si vous aimez installer des trucs et vous prendre la tête avec de la configuration système : Environnement Local

Azure Data Factory : Il essaie de récupérer un secret Azure Key Vault dans un Pipeline, ça tourne mal !

Azure Key Vault (AKV) est un produit Azure assez pratique pour stocker de façon sécurisée des secrets (entre autres) et Azure Data Factory (ADF) permet nativement d’y accéder. Ca s’utilise généralement dans la configuration de nos linked services ADF, cf. la doc officielle pour voir comment faire : https://docs.microsoft.com/en-us/azure/data-factory/store-credentials-in-key-vault

On va donc pouvoir utiliser AKV pour gérer toute la configuration de nos ADF : les users, les mots de passe, les chaines de connexions, les chemins, les tokens…

47306e90e47927fd0cd4b65648854841

Bon récupérer des secrets dans des linked services c’est cool…

…Mais maintenant, il y a des cas où ce n’est pas suffisant, et je vais vous montrer comment allez un peu plus loin, et récupérer des secrets directement dans un pipeline pour ensuite utiliser la valeur du secret dans n’importe quelle activité !

Il n’existe malheureusement pas, au moment où j’écris cet article, de moyen natif pour faire cela, par contre c’est possible via une simple activité web. Pour ce faire, on va falloir :

  1. S’assurer que le MSI de notre ADF a le bien droit de récupérer les secrets (GET) de votre AKV, voir le l’étape 2 de la doc ‘”In your key vault -> Access policies -> Add new -> search this managed identity application ID to grant Get permission in Secret permissions dropdown” et surtout oubliez pas de cliquer sur “save” pour enregistrer votre affectation :
  2. https://docs.microsoft.com/en-us/azure/data-factory/store-credentials-in-key-vault#steps

  3. Ajouter une activité web dans votre pipeline ADF avec la conf suivante :
    1. URL : l’url de votre secret, https://<AKVName>.vault.azure.net/secrets/<SecretName>?api-version=2016-10-01 en remplaçant <AKVName> par le nom de votre AKV et <SecretName> par le nom de votre secret
    2. Method : GET
    3. Authentication : MSI
    4. Resource : https://vault.azure.net

chrome_tINXRL6kx2

Vous pourrez ensuite récupérer la valeur du secret dans les activités suivantes et dans n’importe quelle expression avec la formule @activity(‘<NomDeVotreActiviteWeb>’).output.value, en remplaçant <NomDeVotreActiviteWeb> par le nom de votre activité web.

Si vous voulez sécuriser encore plus la valeur de votre secret je vous conseille d’aller dans l’onglet General de votre activité web et de cocher les paramètres “Secure output” et “Secure input”, sinon la valeur de votre secret sera affichée en clair dans le monitoring de votre pipeline. Ce qui n’est pas top si on va chercher un mot de passe par exemple Smile 

chrome_Y5dqX2NMIq

Voici au final, le code Json de votre activité web :

{
    "name": "GetSecret",
    "type": "WebActivity",
    "dependsOn": [],
    "policy": {
        "timeout": "7.00:00:00",
        "retry": 0,
        "retryIntervalInSeconds": 30,
        "secureOutput": true,
        "secureInput": true
    },
    "userProperties": [],
    "typeProperties": {
        "url": "https://<AKVName>.vault.azure.net/secrets/<SecretName>?api-version=2016-10-01",
        "method": "GET",
        "authentication": {
            "type": "MSI",
            "resource": "https://vault.azure.net"
        }
    }
}

Attention aussi avec l’activité “set value” de ADF, qui elle aussi affiche la valeur qu’on affecte dans une variable en clair dans la log. La sécurité s’est important !

Jupyter Notebooks : #LIFEHACK exécuter du SQL sur des Dataframes Pandas directement dans une cellule magic

Aujourd’hui petit article un peu spécial, dans lequel je vous propose de vous montrer comment requêter des Dataframes Pandas en SQL !

Pour ce faire, il va falloir utiliser en plus de Pandas la librairie pandasql (https://pypi.org/project/pandasql/). Elle est plutôt simple à utiliser, par exemple :

import pandas as pd
import numpy as np
from pandasql import sqldf

df_customer = pd.DataFrame([
                        [1, 'Customer A', '123 Street', np.nan],
                        [2, 'Customer B', '444 Street', '333 Street'],
                        [3, 'Customer C', '444 Street', '666 Street']
                        ], 
                columns=['ID', 'Customer', 'Billing Address', 'Shipping Address'])

query = "select * from df_customer"				
sqldf(query, globals())

Ainsi vous aller créer d’abord vos Dataframes (“df_customer” dans l’exemple), puis exécuter une requête SQL dans un string via la commande “sqldf()” qui vous retournera le résultat sous forme d’un nouveau Dataframe. Donc c’est top si vous n’avez pas le courage d’apprendre comment transformer vos données avec le langage Pandas, et que vous maitriser déjà le SQL Open-mouthed smile. Attention, pandasql utilise la syntaxe de sqlite (https://www.sqlite.org/lang.html), donc pour les adeptes de SQL Server, c’est proche mais ce n’est pas exactement du TSQL.

Maintenant pour rendre l’usage de cette fonctionnalité encore plus interactive, on va utiliser et développer une classe custom magic afin de pouvoir lancer du code SQL dans une cellule de nos notebooks grâce au mot clé %%sql.

Pour cela, on va donc d’abord créer un script de classe python à côté appelé “sqlpandas.py”, qui contiendra le bout de code suivant :

from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class
import pandasql as ps

@magics_class
class sqlpandas(Magics):

   @line_cell_magic  
   def sql(self, line, cell=None):
        return ps.sqldf(cell or line, get_ipython().user_ns)

ip = get_ipython()
ip.register_magics(sqlpandas)

Voila ensuite comment utiliser tout cela dans un notebook Jupyter :

Vous voyez que c’est plutôt simple à utiliser et que cela fonctionne, il est même possible de récupérer les résultats d’une cellule dans un autre Dataframe. Retrouvez tous les bouts de code dans mon github ici : https://github.com/fabienadato/magicSQLJupyter 

Plus d’info sur les classes custom magic : https://ipython.readthedocs.io/en/stable/config/custommagics.html

Databricks : CI/CD avec Azure DevOps et 3 méthodes de déploiement de notebooks en masse, la 3eme va vous étonner !

Avant de parler de méthode de déploiement, on va parler un peu de Databricks et de CI/CD.

Il n’y a pas toujours besoin de mettre en place du CI/CD, surtout lorsqu’on a des usages “self-service” et que l’on travaille directement production. Mais par exemple lorsqu’on a besoin d’automatiser des traitements (ça doit tourner tous les jours en automatique ou en continu via du streaming) et que l’on veut pas tout casser en production en cas d’erreur suite à une modification de code. Dans ces cas là, il vaut mieux travailler sur différents environnements identiques et indépendants. Et notamment au moins avec un environnement de dev pour pouvoir coder et tester son code tranquillement avant de modifier la prod. Beaucoup de personnes écrivent des livres entiers sur le sujet, mais nous on va simplifier et résumer le CI/CD à 2 choses :

  1. bien gérer la vie de son code grâce à un contrôleur de sources (un repo) => je ne perds plus de code, je trace les modifications, je travaille à plusieurs…
  2. déployer facilement et de façon automatisée le code dans les différents environnements => j’appuie sur un bouton et ça part en prod !

Databricks est compatible avec ces usages de développement et de déploiement en continue pour tous vos projets de data science et de data engineer.

Tout d’abord, il est possible de se connecter nativement à Github, Bitbucket Cloud ou dans Azure DevOps et de synchroniser ses notebooks automatiquement dans un repository GIT. Personnellement, j’utilise surtout Azure DevOps qui est simple à utiliser et sécurisé donc adapté à un usage en entreprise. Pour savoir comment mettre ca en place, la documentation Databricks est plutôt complète: https://docs.azuredatabricks.net/user-guide/notebooks/azure-devops-services-version-control.html#notebook-integration

Attention, il va  falloir mettre en place la synchro pour chaque notebook et après modification du notebook dans Databricks, surtout ne pas oublier de cliquer sur “Save now”  et mettre un commentaire (un vrai commentaire avec la description de vos modifications, pas de “zezg” ou “ghherhe” !) pour lancer un commit et synchroniser avec le script du repo, sinon bah ça fait rien Smile !

chrome_TdhduWhbxW

Une fois synchronisés, les notebooks se retrouvent alors dans le repo sous forme de fichiers textes Python, Scala, R ou SQL en fonction du type de notebook (même si ce notebook contient des blocks d’un autre langage).

rdjj1g4dYe

Tout ces principes de synchro seront à mettre en place dans le workspace Databricks de développement seulement, pas besoin pour les autres environnements.

Maintenant lorsque vos développements sont terminés, vous allez vouloir déployer vos notebooks sur d’autre workspaces Databricks pour l’UAT, la Préprod, la Prod … il est possible d’utiliser aussi Azure DevOps et ses pipelines de Build et de Release :

  • La partie Build va aller chercher les scripts dans le repo et générer un objet qu’on appelle un Artifact (l’équivalent d’un package de déploiement)
  • La partie Release elle va récupérer l’Artifact et le déployer dans le workspace Databricks de l’environnement souhaité.

POWERPNT_HuxGoRoZ8D

Sans rentrer trop dans les détails, le Build est simple, il suffit d’aller chercher les scripts dans le repo, la Release est un peu plus compliquée car il faut une méthode pour déployer les composants (code, notebooks…) et avec Azure DevOps il existe déjà des composants proposés par la communauté dans le marketplace :

Moi je vous propose une troisième solution de déploiement en masse de notebooks un peu plus “roots” via un script PowerShell :

Param(
    [string]$apiKey,
    [string]$rootDirectory,
    [string]$uriroot
)

Set-Location -Path $rootDirectory

# Create directory
foreach($directory in Get-ChildItem ".\" -Recurse -Directory)
{
    write-host $directory.FullName.Replace($rootDirectory,"")
    $body = @{
        path = $directory.FullName.Replace($rootDirectory,"").replace("\","/")
    }
    $bodyjson = (ConvertTo-Json $body)
    $headers = @{
        'Authorization' = "Bearer $apiKey"
    }
    $uri = "$uriroot/2.0/workspace/mkdirs"
    Invoke-RestMethod -Method 'Post' -Uri $uri -Headers $headers -Body $bodyjson
}

# Deploy Notebooks
foreach($file in Get-ChildItem ".\" -Recurse -File -Filter "*.*")
{
     
    write-host $file.FullName.Replace($rootDirectory,"")
    
    $EncodedText = [System.Convert]::ToBase64String([System.IO.File]::ReadAllBytes($file.FullName))
    
    $lng = ""
    if($file.Extension -eq ".py")
    {
        $lng = "PYTHON"
    }
    elseif($file.Extension -eq ".scala")
    {
        $lng = "SCALA"
    }
    elseif($file.Extension -eq ".sql")
    {
        $lng = "SQL"
    }
    elseif($file.Extension -eq ".r")
    {
        $lng = "R"
    }
    else {
        continue
    }
    $body = @{
        content = $EncodedText
        path = $file.FullName.Replace($rootDirectory,"").replace("\","/").replace($file.Extension,"")
        language = $lng
        overwrite = $TRUE
        format = "SOURCE"
    }
    $bodyjson = (ConvertTo-Json $body)
    $headers = @{
        'Authorization' = "Bearer $apiKey"
    }
    $uri = "$uriroot/2.0/workspace/import"
    Invoke-RestMethod -Method 'Post' -Uri $uri -Headers $headers -Body $bodyjson

}

L’intérêt de ce script PowerShell par rapport aux autres solutions est surtout qu’il fonctionnera aussi en dehors de Azure DevOps Smile

Les paramètres en entré sont :

En local il s’appelle de cette façon : .\deploynotebooks.ps1 -apiKey « <accesKey> » -rootDirectory « C:\pathTo\notebooks » -uriroot https://westeurope.azuredatabricks.net/api

Et dans Azure DevOps, mettez le script quelque part dans votre repo et vous pourrez passer par une tache d’exécution de script PowerShell de votre pipeline de release :

9IcLrYTvC6

Databricks : configurer le répertoire par défaut dataware

Par défaut lorsque l’on crée une table sans spécifier le paramètre location dans Databricks en SparkSQL, les fichiers de données sont stockée dans le DBFS interne du workspace Databricks configuré dans le metastore. On peut retrouver les fichiers de données dans le répertoire dbfs:/user/hive/warehouse/.

L’intérêt du répertoire par défaut géré par le metastore, c’est qu’il va automatiquement créer l’arborescence de dossiers/fichiers sur le stockage lorsque l’on crée une table et même gérer les partitions.

  • Table “normale” : dbfs:/user/hive/warehouse/<database_name>.db/<table_name>/
  • Table partitionnée : dbfs:/user/hive/warehouse/<database_name>.db/<table_name>/<partition_name>=<partition_key>/

Ce comportement est bien sûr un héritage du metastore Hive qui fonctionne de la même façon.

Or, ce n’est pas une bonne pratique de stocker de la données dans le DBFS interne car il n’est pas facilement accessible de l’extérieur ni compatible nativement avec d’autres technologie comme Azure Data Factory, Azure Stream Analytics ou Azure Function. Il vaut donc mieux utiliser à la place un autre stockage externe comme Azure Storage Blob ou Azure Data Lake Store. https://docs.azuredatabricks.net/user-guide/databricks-file-system.html#dbfs-root

On peut bien sûr spécifier le chemin de stockage de chaque table grâce à le mot clé LOCATION du CREATE TABLE, mais c’est un peu fastidieux et si l’on veut garder le fonctionnement automatique de création d’arborescence du metastore avec un stockage externe, il va falloir :

  1. Configurer les accès au stockage externe (blob ou adls)
  2. Changer la valeur de la configuration du répertoire par défaut spark.sql.warehouse.dir en mettant le chemin racine du stockage externe

Voici un exemple de config pour le blob storage :

spark.hadoop.fs.azure.account.key.<storageaccountname>.blob.core.windows.net <storagekey>
spark.sql.warehouse.dir wasb://<containeurname>@<storageaccountname>.blob.core.windows.net/<rep>

Cette configuration peut être directement paramétrée dans la config Spark d’un cluster :

ApplicationFrameHost_o4fxFwUZCY

Ainsi la configuration sera valable automatiquement pour tous les notebooks qui s’exécute sur le cluster et toutes les nouvelles tables créées dans une nouvelles database seront correctement stockées dans le bon stockage externe ici : wasb://<containeurname>@<storageaccountname>.blob.core.windows.net/<rep>/<database_name>.db/<table_name>/

Si la database existait déjà avant le changement de configuration vers le stockage externe, alors cela ne fonctionnera pas pour les nouvelles tables car elles continueront à utiliser le répertoire source de la base dbfs:/user/hive/warehouse/<database_name>.db/…

Voici un bout de code scala pour vérifier les locations des bases de données du metastore :

display(spark.catalog.listDatabases)

Et un bout de code SQL pour tester que cela fonctionne correctement :

create database if not exists testwasbdb;
create table if not exists testwasbdb.testtable (col1 int);
describe DETAIL testwasbdb.testtable;

chrome_sPO72pCORT

Databricks (SPARK) : réparer toutes les tables partitionnées d’un seul coup

Si vous avez des tables partitionnées dans votre metastore HIVE-Spark et que vos données sont écrites sur le stockage dans des sous-répertoires partitionnées (/<pathToTable>/<partitionName>=<value>/) par un autre traitement que Spark ou Hive (Azure Stream Analytics, Event Hub, Azure Function ….), par exemple :

  • /logs/day=2019-08-10/
  • /logs/day=2019-08-11/

Les tables partitionnées ne verront pas les données des nouveaux répertoires tant qu’il n’existera pas de partition spécifiquement crée pour ce sous répertoire.

Mais heureusement il est possible de recréer automatiquement toutes les partition d’une table partitionnée avec la commande MSCK REPAIR <TABLENAME>, ce qui est plutôt pratique mais c’est à faire à la main pour chaque table ! https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/using-hiveql/content/hive-msck-repair.html

Maintenant, si vous voulez réparer toutes les partitions de toutes les tables partitionnées d’un seul coup, je vous propose le script pyspark suivant qui boucle sur toutes les bases et toutes les tables afin de les réparer (seulement si elles sont partitionnées bien sûr) :

#Repair partition foreach tables

df_databases = spark.sql("show databases")

for database in df_databases.collect():
  dbname = database.databaseName
  print(dbname)
  query = "use " + dbname 
  spark.sql(query)
  df_tables = spark.sql("show tables")
  
  for table in df_tables.collect():
    tablename = table.tableName
    print("_" + tablename)
    query = "DESCRIBE DETAIL " + tablename
    
    try:
      df_desc = spark.sql(query)
      nb_partitionColumn = len(df_desc.select("partitionColumns").take(1)[0][0])
      if nb_partitionColumn > 0:
        print("__" + tablename + " : is a partition table")
        query = "MSCK REPAIR TABLE " + tablename
        spark.sql(query).collect()
      else:
        print("__" + tablename + " : is not a partition table")
    except:
      print("__" + tablename + " : error with the table")

Ce script est compatible avec tout ce qui lance du Spark et du PySpark, donc Azure Databricks ! Il peut aussi être utilisé lors d’une migration de HDInsight Spark vers Databricks ou d’un workspace Databricks vers un autre workspace :

  1. Vous rejouez les create table sur le workspace cible en faisant bien attention à faire pointer les locations des tables vers les stockages sources existants (Azure Blob / Azure Data Lake Store…)
  2. Vous jouez le script dans un notebook python
  3. Magie toutes les partitions sont recrées !

Power BI + Elasticsearch !!

Elasticsearch est un moteur open source NoSQL proposé par l’entreprise Elastic basé sur la technologie Apache Lucene : https://www.elastic.co/fr/products/elasticsearch

Il est très simple à installer et à utiliser. Si vous ne connaissez pas du tout Elastisearch, n’hésitez pas à suivre ce rapide tuto avant de continuer cet article : http://joelabrahamsson.com/elasticsearch-101/

Pour résumer, Elasticsearch permet de stocker des documents Json dans ce qu’on appelle des index, puis de les requêter via web service HTTP REST.

Et le top, c’est que Http REST + Json fait de Power BI un client nativement compatible avec Elasticsearch ! En effet, Power BI peut appeler des web services Http REST grâce à la fonction Web.Contents(), puis récupérer le résultat de la requête en Json et le parser en dataset (lignes-colonnes).

Principes de requêtage avec Elasticsearch

Pour l’article j’ai stocké 20 606 tweets contenant le mot “car” (voiture en anglais) dans un index appelé “test-index” (très mal nommé Smile ) :

Kopf_index

(J’utilise ici le plugin Koft pour visualiser mes index et administrer mon cluster Elasticsearch => https://github.com/lmenezes/elasticsearch-kopf)

Mon cluster Elasticsearch tourne en local et est accessible sur le port 9200 via l’url http://localhost:9200/. Pour récupérer les tweets, je vais ajouter le nom de mon index à l’url et utiliser la méthode “_search”. J’ajoute aussi “?pretty” pour mettre en forme le résultat. En copiant http://localhost:9200/test-index/_search?pretty dans mon navigateur, s’affichent des résultats sous forme de Json. Si l’on regarde bien je n’obtiens que 10 tweets sur les 20 606.

ES_search

Afin de récupérer plus d’éléments, nous allons utiliser les paramètres from (à partir de quel élément) et size (nombre d’éléments à récupérer), et faire plusieurs appels (requêtage par “page”). Par exemple, pour afficher 50 éléments par lot de 10, nous allons faire 5 appels avec les paramètres from et size suivants :

from size url
0 10 http://localhost:9200/test-index/_search?from=0&size=10
10 10 http://localhost:9200/test-index/_search?from=10&size=10
20 10 http://localhost:9200/test-index/_search?from=20&size=10
30 10 http://localhost:9200/test-index/_search?from=30&size=10
40 10 http://localhost:9200/test-index/_search?from=40&size=10

Il aurait été plus simple de faire qu’un seul appel avec size = 20 606, sauf que size est limité à 10 000. Nous allons donc devoir faire au moins 3 appels pour récupérer tous les tweets (10 000 + 10 000 + 606).

Attention ! Par défaut il y a une règle sur l’index qui empêche les paramètres size + from de dépasser 10 000. Pour y remédier, il faut modifier le paramètre “max_result_window” de l’index et lui affecter une plus grande valeur, avec une requête PUT :

PUT /test-index/_settings
{
    "index": {
        "max_result_window" : "1000000000"
        
    }
}

 

ES_max_result_window

Nous allons aussi ajouter dans notre appel REST la possibilité d’envoyer un body qui contiendra la query DSL (l’équivalent du SQL pour Elasticsearch), plus d’infos : https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html

2 petits exemple de query DSL

  • renvoie tout :
POST test-index/_search
{
   "query": {
     "bool": {
       "must": [
         {
           "match_all": { }
         }
       ]
     }
   }
}
  • recherche le mot “mustang” dans le texte des tweet :
POST test-index/_search
 {
  "query": {
     "bool": {
       "must": [
         {
           "match": {
             "text": "mustang"
           }
         }
       ]
     }
   }
 }

Utilisons Power BI

Nous allons créer dans Power BI Desktop une requête vide.

PowerBI_Requete_vide

Copier ensuite la requête M suivante dans l’éditeur avancé :

let
    url = "http://localhost:9200/test-index/_search",
    req = "{""query"": {""bool"": {""must"": [{""match_all"": {}}]}}}",

    totalnb = Json.Document(Web.Contents(url,[Content=Text.ToBinary(req)]))[hits][total],
    max_res = 10000,
    nb_page_temp = Int32.From(totalnb/max_res),
    rest = Number.Mod(totalnb , max_res ),

    nbpages = if Number.Mod(totalnb , max_res ) > 0 then nb_page_temp + 1 else nb_page_temp,

    Source = List.Generate(()=>nbpages, each _ > 0, each _ - 1),
    #"Converti en table" = Table.FromList(Source, Splitter.SplitByNothing(), null, null, ExtraValues.Error),
    #"Colonnes renommées" = Table.RenameColumns(#"Converti en table",{{"Column1", "page"}}),
    #"Personnalisée ajoutée" = Table.AddColumn(#"Colonnes renommées", "req", each Json.Document(Web.Contents(url & "?from=" & Text.From(([page]-1)*max_res) & "&size=" & Text.From(max_res),[Content=Text.ToBinary(req)]))),
    #"req développé" = Table.ExpandRecordColumn(#"Personnalisée ajoutée", "req", {"hits"}, {"req.hits"}),
    #"req.hits développé" = Table.ExpandRecordColumn(#"req développé", "req.hits", {"total", "max_score", "hits"}, {"total", "max_score", "hits"}),
    #"hits développé" = Table.ExpandListColumn(#"req.hits développé", "hits"),
    #"hits développé1" = Table.ExpandRecordColumn(#"hits développé", "hits", {"_index", "_type", "_id", "_score", "_source"}, {"_index", "_type", "_id", "_score", "_source"}),
    #"_source développé1" = Table.ExpandRecordColumn(#"hits développé1", "_source", {"contributors", "truncated", "text", "is_quote_status", "in_reply_to_status_id", "id", "favorite_count", "source", "retweeted", "coordinates", "timestamp_ms", "entities", "in_reply_to_screen_name", "id_str", "retweet_count", "in_reply_to_user_id", "favorited", "user", "geo", "in_reply_to_user_id_str", "possibly_sensitive", "lang", "created_at", "filter_level", "in_reply_to_status_id_str", "place", "retweeted_status", "display_text_range", "extended_entities", "quoted_status_id", "quoted_status", "quoted_status_id_str", "extended_tweet"}, {"_source.contributors", "_source.truncated", "_source.text", "_source.is_quote_status", "_source.in_reply_to_status_id", "_source.id", "_source.favorite_count", "_source.source", "_source.retweeted", "_source.coordinates", "_source.timestamp_ms", "_source.entities", "_source.in_reply_to_screen_name", "_source.id_str", "_source.retweet_count", "_source.in_reply_to_user_id", "_source.favorited", "_source.user", "_source.geo", "_source.in_reply_to_user_id_str", "_source.possibly_sensitive", "_source.lang", "_source.created_at", "_source.filter_level", "_source.in_reply_to_status_id_str", "_source.place", "_source.retweeted_status", "_source.display_text_range", "_source.extended_entities", "_source.quoted_status_id", "_source.quoted_status", "_source.quoted_status_id_str", "_source.extended_tweet"}),
    #"_source.user développé" = Table.ExpandRecordColumn(#"_source développé1", "_source.user", {"name", "profile_image_url"}, {"_source.user.name", "_source.user.profile_image_url"})
in
    #"_source.user développé"

Remplacer les valeurs des variables url et req par votre url de votre cluster ES et votre query DSL (en doublant les « ).

    url = "http://localhost:9200/test-index/_search",
    req = "{""query"": {""bool"": {""must"": [{""match_all"": {}}]}}}",

Le principe de la requête est le suivant :

  1. l’étape totalnb va requêter l’index pour récupérer le nombre total d’éléments retournés par la requête :   ES_search_nbtotal
  2. le nombre d’appels nécessaires (pages) va être calculé dans la variable nbpage (3 dans mon exemple).
  3. une liste est ensuite générée avec le décompte de 1 à nbpage des pages à réaliser : PowerBI_ES_Page
  4. on ajoute une colonne calculée qui va, pour chaque page, exécuter l’appel REST avec les bonnes valeurs des paramètres from et size : PowerBI_ES_Page_requete
  5. Il ne reste plus qu’à naviguer dans la structure du Json en cliquant sur les boutons développer PowerBI_Developper_bouton, aller rechercher les attributs nécessaires et réaliser le rapport : PowerBI_ES_Rapport On peut d’ailleurs vérifier dans le tableau en haut à gauche que les 3 pages ont bien été chargées avec 10 000, 10 000 et 606 éléments.

C’est plutôt simple à réaliser, et le parsing Json de Power BI est l’un des plus intuitifs que je connaisse.

Le gros plus: le rafraichissement de données fonctionne une fois le rapport déployé sur le portail Power BI Services ! Il utilisera la Gateway d’Entreprise si le cluster est on premise, ou alors pourra directement se connecter dessus si le cluster est ouvert sur internet (attention !) ou par exemple hébergé sur le cloud Elastic avec le module de sécurité Shield.

En bonus, la requête M fonctionne aussi dans Excel avec son module de requête (anciennement Power Query), ce qui peut être pratique pour de l’export de données par exemple.Excel_ES_Page_requete

Cette requête permet de s’affranchir de la limitation des 10 000 éléments, mais attention le but n’est pas d’exporter l’ensemble des données d’un index dans un rapport Power BI ! Il est recommandé d’ensuite utiliser les fonctions d’agrégations d’Elasticsearch (https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html) pour ramener moins de données et optimiser les temps de réponses, exactement comme on le ferait avec un SELECT GROUP BY et une base de données SQL classique.

Afficher un message d’erreur personnalisé dans la visualisation R de Power BI

Par défaut dans Power BI, lorsqu’un élément visuel de script R plante, un message d’erreur apparaît dans la zone du graphique avec la possibilité d’afficher les détails techniques de l’erreur :

PowerBI_R_error_thumb4

Cela peut être pratique pour débugger son code R lors du développement du rapport. Mais dans certains cas, les graphiques R peuvent fonctionner sur un ensemble de données, puis ne plus fonctionner dans d’autres cas lorsque l’on filtre dynamiquement les données (via des filtres dans le rapport).

Par exemple, une prédiction réalisée avec la librairie R forecast, prédisant les ventes de produits avec une saisonnalité à la semaine, peut planter s’il y a moins de 2 périodes dans le jeu de données. Le script R fonctionnera avec certains produits, mais il plantera pour ceux qui ne justifient pas de la condition de la fonction de forecast. Dans ce cas la on préférera afficher un message customisé (par exemple “Nous sommes désolé mais le forecast n’est pas disponible pour ce produit”) à l’utilisateur plutôt qu’une erreur technique R.

Nous allons donc utiliser 2 principes de codes R :

  • Un try catch pour catcher l’erreur technique. Vous noterez que contrairement à beaucoup d’autres langages, le try catch de R fonctionne sous forme d’une fonction tryCatch() :

result = tryCatch({

#code à réaliser

}, warning = function(w) {

#ce qui se passe en cas de warning

}, error = function(e) {

#ce qui se passe en cas d’erreur

}, finally = {

#ce qui se passe à la fin dans tous les cas

})

  • Un bout de code affichant un plot avec un message. L’idée est de créer une zone de graphique vide avec par() et plot(), puis d’y afficher un texte avec la fonction text() :

par(mar = c(0,0,0,0))

plot(c(0, 1), c(0, 1), ann = F, bty = 'n', type = 'n', xaxt = 'n', yaxt = 'n')

text(x = 0.5, y = 0.5, paste("Texte du message"), cex = 1.6, col = "black")

Voici le code complet :

result = tryCatch({

  #bout de code R

}, error = function(e) {

  par(mar = c(0,0,0,0))

  plot(c(0, 1), c(0, 1), ann = F, bty = 'n', type = 'n', xaxt = 'n', yaxt = 'n')

  text(x = 0.5, y = 0.5, paste("Nous sommes désolé mais le forecast n’est pas disponible pour ce produit"),

        cex = 1.6, col = "black")

})

Ce qui donne :

PowerBI_R_error_catch_thumb2

Plus d’infos sur le try catch en R : http://mazamascience.com/WorkingWithData/?p=912

Afficher une simple table dans la visualisation R de Power BI

Pour afficher une table avec notre jeu de données, il est possible d’utiliser le package R gridExtra et sa fonction grid.table(). C’est très simple à utiliser, il suffit de créer un élément visuel de script R, faire glisser les champs du modèle dans l’élément, puis de copier les lignes de code suivantes dans la partie code :

library(gridExtra)

grid.table(dataset[1:10,])

PowerBI_Table_Plot_zoom_thumb

PowerBI_Table_Plot_thumb2

Cet affichage ne semble pas adapté aux tables avec plus d’une dizaine de lignes, c’est pourquoi je n’affiche que les 10 premières lignes du dataset (dataset[1:10,]). A vous de trier le jeu de données pour afficher un TOP 10 par exemple.

Plus d’infos ici :

Older posts