Considérations relatives à la production pour Structured Streaming

Cet article contient des recommandations pour la planification des charges de travail Structured Streaming avec des travaux sur Azure Databricks.

Databricks recommande de toujours effectuer ce qui suit :

  • Supprimez le code inutile des notebooks qui retourne des résultats, tels que display et count.
  • N’exécutez pas de charges de travail Structured Streaming avec un calcul à usage général. Planifiez toujours les flux en tant que travaux avec le calcul des travaux.
  • Planifiez les travaux à l’aide du mode Continuous.
  • N’activez pas la mise à l’échelle automatique pour le calcul des travaux Structured Streaming.

Certaines charges de travail bénéficient de ce qui suit :

Azure Databricks a introduit Delta Live Tables afin de réduire la complexité de la gestion de l’infrastructure de production pour les charges de travail de Structured Streaming. Databricks recommande d’utiliser Delta Live Tables pour les nouveaux pipelines de Structured Streaming. Consultez l’article Qu’est-ce que Delta Live Tables ?.

Remarque

La mise à l’échelle automatique du calcul présente des limitations pour la réduction de la taille du cluster pour les charges de travail Structured Streaming. Databricks recommande d’utiliser Delta Live Tables avec mise à l’échelle automatique améliorée pour les charges de travail de diffusion en continu. Consultez Optimiser l’utilisation du cluster des pipelines Delta Live Tables avec mise à l’échelle automatique améliorée.

Conception de charges de travail de diffusion en continu pour anticiper les échecs

Databricks recommande de toujours configurer les travaux de diffusion en continu pour qu’ils redémarrent automatiquement en cas d’échec. Certaines fonctionnalités, notamment l’évolution du schéma, supposent que les charges de travail Structured Streaming sont configurées pour effectuer des nouvelles tentatives automatiquement. Consultez Configurer des travaux de Structured Streaming pour redémarrer des requêtes de diffusion en continu en cas d’échec.

Certaines opérations, telles que foreachBatch, fournissent « au moins une » garantie plutôt qu’exactement une. Pour ces opérations, vous devez vous assurer que votre pipeline de traitement est idempotent. Consultez Utiliser foreachBatch pour écrire dans des récepteurs de données arbitraires.

Remarque

Lorsqu’une requête redémarre, le micro-lot planifié pendant l’exécution précédente est traité. Si votre travail a échoué en raison d’une erreur de mémoire insuffisante, ou si vous avez annulé manuellement un travail en raison d’un micro-lot surdimensionné, vous devrez peut-être effectuer un scale-up du calcul pour traiter correctement le micro-lot.

Si vous modifiez les configurations entre les exécutions, celles-ci s’appliquent au premier nouveau lot planifié. Consultez Récupérer après des modifications dans une requête de Structured Streaming.

Quand une tâche effectue-t-elle une nouvelle tentative ?

Vous pouvez planifier plusieurs tâches dans le cadre d’un travail Azure Databricks. Lorsque vous configurez un travail à l’aide du déclencheur continu, vous ne pouvez pas définir de dépendances entre tâches.

Vous pouvez choisir de planifier plusieurs flux dans un même travail à l’aide de l’une des approches suivantes :

  • Plusieurs tâches : définissez un travail avec plusieurs tâches qui exécutent des charges de travail de diffusion en continu à l’aide du déclencheur continu.
  • Plusieurs requêtes : définissez plusieurs requêtes de diffusion en continu dans le code source pour une seule tâche.

Vous pouvez également combiner ces stratégies. Le tableau suivant compare ces approches.

Tâches multiples Requêtes multiples
Comment le calcul est-il partagé ? Databricks recommande de déployer des travaux de calcul dimensionnés à la bonne taille pour chaque tâche de diffusion en continu. Vous pouvez éventuellement partager le calcul entre tâches. Toutes les requêtes partagent le même calcul. Vous pouvez éventuellement affecter des requêtes à des pools de planificateurs.
Comment les nouvelles tentatives sont-elles gérées ? Toutes les tâches doivent échouer avant que le travail lance une nouvelle tentative. La tâche est tentée à nouveau si une requête échoue.

Configurer des travaux de Structured Streaming pour redémarrer des requêtes de diffusion en continu en cas d’échec

Databricks recommande de configurer toutes les charges de travail de diffusion en continu à l’aide du déclencheur continu. Consultez Exécuter des travaux en continu.

Le déclencheur continu fournit le comportement suivant par défaut :

  • Empêche les exécutions simultanées multiples du travail.
  • Démarre une nouvelle exécution en cas d’échec d’une précédente.
  • Utilise l’interruption exponentielle pour les nouvelles tentatives.

Databricks recommande de toujours utiliser le calcul des travaux au lieu du calcul à usage unique lors de la planification des workflows. Lors de l’échec du travail et de la nouvelle tentative, de nouvelles ressources de calcul sont déployées.

Remarque

Vous n’avez pas besoin d’utiliser streamingQuery.awaitTermination() ou spark.streams.awaitAnyTermination(). Les travaux empêchent automatiquement un cycle d’aboutir quand une requête de diffusion en continu est active.

Utiliser des pools de planificateurs pour plusieurs requêtes de travail de streaming

Vous pouvez configurer des pools de planification pour affecter la capacité de calcul aux requêtes lors de l’exécution de plusieurs requêtes de diffusion en continu à partir du même code source.

Par défaut, toutes les requêtes démarrées dans un notebook s’exécutent dans le même pool de planification équitable. Les travaux Apach Spark générés par des déclencheurs à partir de toutes les requêtes de diffusion en continu dans un notebook s’exécutent l’un après l’autre dans l’ordre FIFO (premier entré, premier sorti). Cela peut occasionner des retards inutiles dans les requêtes, car celles-ci ne partagent pas efficacement les ressources de cluster.

Les pools de planificateurs vous permettent de déclarer les requêtes de Structured Streaming qui partagent des ressources de calcul.

L’exemple suivant attribue query1 à un pool dédié, tandis que query2 et query3 partagent un pool de planificateurs.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Notes

La configuration de la propriété locale doit se trouver dans la cellule du notebook dans laquelle vous lancez votre requête de diffusion en continu.

Pour plus d’informations, consultez la documentation sur le planificateur équitable d’Apache.