S’abonner à Google Pub/Sub

Azure Databricks fournit un connecteur intégré pour s’abonner à Google Pub/Sub dans Databricks Runtime 13.3 LTS et versions ultérieures. Ce connecteur fournit une sémantique de traitement une seule fois pour les enregistrements de l’abonné.

Remarque

Pub/Sub peut publier des enregistrements dupliqués, et les enregistrements peuvent arriver à l’abonné dans le désordre. Vous devez écrire du code Azure Databricks pour gérer les enregistrements dupliqués et dans le désordre.

Exemple de syntaxe

L’exemple de code suivant illustre la syntaxe de base pour la configuration d’une lecture Structured Streaming à partir de Pub/Sub :

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Pour plus d’options de configuration, consultez Configurer les options de lecture de diffusion en continu Pub/Sub.

Configurer l’accès à Pub/Sub

Databricks recommande d’utiliser des secrets lors de la définition des options d’autorisation. Les options suivantes sont requises pour autoriser une connexion :

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Le tableau suivant décrit les rôles requis pour les informations d’identification configurées :

Rôles Obligatoire ou facultatif Utilisation
roles/pubsub.viewer ou roles/viewer Requis Vérifier si l’abonnement existe et obtenir l’abonnement
roles/pubsub.subscriber Requis Extraire des données d’un abonnement
roles/pubsub.editor ou roles/editor Facultatif Active la création d’un abonnement s’il n’en existe pas et permet également l’utilisation de deleteSubscriptionOnStreamStop pour supprimer des abonnements lors de l’arrêt du flux

Schéma Pub/Sub

Le schéma du flux correspond aux enregistrements extraits de Pub/Sub, comme décrit dans le tableau suivant :

Champ Type
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurer les options de diffusion en continu Pub/Sub

Le tableau suivant décrit d’autres options prises en charge pour Pub/Sub. Toutes les options sont configurées dans le cadre d’une lecture Structured Streaming à l’aide de la syntaxe .option("<optionName>", "<optionValue>").

Remarque

Certaines options de configuration Pub/Sub utilisent le concept d’extractions au lieu de micro-lots. Cela reflète les détails de l’implémentation interne et les options fonctionnent de la même façon que les corollaires dans d’autres connecteurs Structured Streaming, sauf que les enregistrements sont récupérés, puis traités.

Option Valeur par défaut Description
numFetchPartitions Définissez sur la moitié du nombre d’Exécuteurs présents lors de l’initialisation du flux. Le nombre de tâches Spark parallèles qui extraient des enregistrements à partir d’un abonnement.
deleteSubscriptionOnStreamStop false Si true, l’abonnement passé au flux est supprimé lorsque la tâche de diffusion en continu se termine.
maxBytesPerTrigger Aucune Une limite réversible de la taille du lot à traiter pendant chaque micro-lot déclenché.
maxRecordsPerFetch 1 000 Le nombre d’enregistrements à extraire par tâche avant de traiter les enregistrements.
maxFetchPeriod 10 secondes La durée pendant laquelle chaque tâche doit extraire avant de traiter les enregistrements. Databricks recommande d’utiliser la valeur par défaut.

Sémantique de traitement par lots incrémentiel pour Pub/Sub

Vous pouvez utiliser Trigger.AvailableNow pour consommer des enregistrements disponibles à partir des sources Pub/Sub en tant que lot incrémentiel.

Azure Databricks enregistre le timestamp lorsque vous commencez une lecture avec le paramètre Trigger.AvailableNow. Les enregistrements traités par le lot incluent toutes les données extraites précédemment et tous les enregistrements récemment publiés avec un timestamp inférieur au timestamp de démarrage du flux enregistré.

Consultez Configuration du traitement par lots incrémentiel.

Surveiller les métriques de diffusion en continu

Les métriques de progression Structured Streaming indiquent le nombre d’enregistrements récupérés et prêts à être traités, la taille des enregistrements extraits et prêts à être traités, ainsi que le nombre de doublons vus depuis le début du flux. Voici un exemple de ces métriques :

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limites

L’exécution spéculative (spark.speculation) n’est pas prise en charge avec Pub/Sub.