S’abonner à Google Pub/Sub
Azure Databricks fournit un connecteur intégré pour s’abonner à Google Pub/Sub dans Databricks Runtime 13.3 LTS et versions ultérieures. Ce connecteur fournit une sémantique de traitement une seule fois pour les enregistrements de l’abonné.
Remarque
Pub/Sub peut publier des enregistrements dupliqués, et les enregistrements peuvent arriver à l’abonné dans le désordre. Vous devez écrire du code Azure Databricks pour gérer les enregistrements dupliqués et dans le désordre.
Exemple de syntaxe
L’exemple de code suivant illustre la syntaxe de base pour la configuration d’une lecture Structured Streaming à partir de Pub/Sub :
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Pour plus d’options de configuration, consultez Configurer les options de lecture de diffusion en continu Pub/Sub.
Configurer l’accès à Pub/Sub
Databricks recommande d’utiliser des secrets lors de la définition des options d’autorisation. Les options suivantes sont requises pour autoriser une connexion :
clientEmail
clientId
privateKey
privateKeyId
Le tableau suivant décrit les rôles requis pour les informations d’identification configurées :
Rôles | Obligatoire ou facultatif | Utilisation |
---|---|---|
roles/pubsub.viewer ou roles/viewer |
Requis | Vérifier si l’abonnement existe et obtenir l’abonnement |
roles/pubsub.subscriber |
Requis | Extraire des données d’un abonnement |
roles/pubsub.editor ou roles/editor |
Facultatif | Active la création d’un abonnement s’il n’en existe pas et permet également l’utilisation de deleteSubscriptionOnStreamStop pour supprimer des abonnements lors de l’arrêt du flux |
Schéma Pub/Sub
Le schéma du flux correspond aux enregistrements extraits de Pub/Sub, comme décrit dans le tableau suivant :
Champ | Type |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurer les options de diffusion en continu Pub/Sub
Le tableau suivant décrit d’autres options prises en charge pour Pub/Sub. Toutes les options sont configurées dans le cadre d’une lecture Structured Streaming à l’aide de la syntaxe .option("<optionName>", "<optionValue>")
.
Remarque
Certaines options de configuration Pub/Sub utilisent le concept d’extractions au lieu de micro-lots. Cela reflète les détails de l’implémentation interne et les options fonctionnent de la même façon que les corollaires dans d’autres connecteurs Structured Streaming, sauf que les enregistrements sont récupérés, puis traités.
Option | Valeur par défaut | Description |
---|---|---|
numFetchPartitions |
Définissez sur la moitié du nombre d’Exécuteurs présents lors de l’initialisation du flux. | Le nombre de tâches Spark parallèles qui extraient des enregistrements à partir d’un abonnement. |
deleteSubscriptionOnStreamStop |
false |
Si true , l’abonnement passé au flux est supprimé lorsque la tâche de diffusion en continu se termine. |
maxBytesPerTrigger |
Aucune | Une limite réversible de la taille du lot à traiter pendant chaque micro-lot déclenché. |
maxRecordsPerFetch |
1 000 | Le nombre d’enregistrements à extraire par tâche avant de traiter les enregistrements. |
maxFetchPeriod |
10 secondes | La durée pendant laquelle chaque tâche doit extraire avant de traiter les enregistrements. Databricks recommande d’utiliser la valeur par défaut. |
Sémantique de traitement par lots incrémentiel pour Pub/Sub
Vous pouvez utiliser Trigger.AvailableNow
pour consommer des enregistrements disponibles à partir des sources Pub/Sub en tant que lot incrémentiel.
Azure Databricks enregistre le timestamp lorsque vous commencez une lecture avec le paramètre Trigger.AvailableNow
. Les enregistrements traités par le lot incluent toutes les données extraites précédemment et tous les enregistrements récemment publiés avec un timestamp inférieur au timestamp de démarrage du flux enregistré.
Consultez Configuration du traitement par lots incrémentiel.
Surveiller les métriques de diffusion en continu
Les métriques de progression Structured Streaming indiquent le nombre d’enregistrements récupérés et prêts à être traités, la taille des enregistrements extraits et prêts à être traités, ainsi que le nombre de doublons vus depuis le début du flux. Voici un exemple de ces métriques :
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limites
L’exécution spéculative (spark.speculation
) n’est pas prise en charge avec Pub/Sub.