Diffuser en continu à partir d’Apache Pulsar

Important

Cette fonctionnalité est disponible en préversion publique.

Dans Databricks Runtime 14.1 et versions ultérieures, vous pouvez utiliser Structured Streaming pour diffuser en continu des données à partir d’Apache Pulsar sur Azure Databricks.

Structured Streaming fournit une sémantique de traitement exactement une fois pour les données lues à partir de sources Pulsar.

Exemple de syntaxe

Voici un exemple de base d’utilisation de Structured Streaming pour lire à partir de Pulsar :

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Vous devez toujours fournir un service.url et l’une des options suivantes pour spécifier des rubriques :

  • topic
  • topics
  • topicsPattern

Pour obtenir la liste complète des options, consultez Configurer les options de lecture en continu Pulsar.

S’authentifier auprès de Pulsar

Azure Databricks prend en charge l’authentification truststore et keystore sur Pulsar. Databricks recommande d’utiliser des secrets lors du stockage des détails de configuration.

Vous pouvez définir les options suivantes lors de la configuration du flux :

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Si le flux utilise un PulsarAdmin, définissez également les éléments suivants :

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

L’exemple suivant illustre la configuration des options d’authentification :

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Schéma Pulsar

Le schéma des enregistrements lus à partir de Pulsar dépend de la façon dont les rubriques ont leurs schémas encodés.

  • Pour les rubriques avec un schéma Avro ou JSON, les noms de champs et les types de champs sont conservés dans le DataFrame Spark résultant.
  • Pour les rubriques sans schéma ou avec un type de données simple dans Pulsar, la charge utile est chargée dans une colonne value.
  • Si le lecteur est configuré pour lire plusieurs rubriques avec différents schémas, définissez allowDifferentTopicSchemas pour charger le contenu brut dans une colonne value.

Les enregistrements Pulsar ont les champs de métadonnées suivants :

Colonne Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configurer les options de lecture en continu Pulsar

Toutes les options sont configurées dans le cadre d’une lecture Structured Streaming à l’aide de la syntaxe .option("<optionName>", "<optionValue>"). Vous pouvez également configurer l’authentification à l’aide d’options. Consultez S’authentifier auprès de Pulsar.

Le tableau suivant décrit les configurations requises pour Pulsar. Vous ne devez spécifier qu’une des options topic, topics ou topicsPattern.

Option Valeur par défaut Description
service.url aucune Configuration serviceUrl de Pulsar pour le service Pulsar.
topic aucune Chaîne de nom de rubrique pour la rubrique à consommer.
topics aucune Liste séparée par des virgules des rubriques à consommer.
topicsPattern aucune Chaîne d’expression régulière Java à mettre en correspondance sur les rubriques à consommer.

Le tableau suivant décrit d’autres options prises en charge pour Pulsar :

Option Valeur par défaut Description
predefinedSubscription aucune Nom d’abonnement prédéfini utilisé par le connecteur pour suivre la progression de l’application Spark.
subscriptionPrefix aucune Préfixe utilisé par le connecteur pour générer un abonnement aléatoire pour suivre la progression de l’application Spark.
pollTimeoutMs 120 000 Délai d’attente de lecture des messages à partir de Pulsar en millisecondes.
waitingForNonExistedTopic false Indique si le connecteur doit attendre que les rubriques souhaitées soient créées.
failOnDataLoss true Contrôle l’échec d’une requête lorsque les données sont perdues (par exemple, les rubriques sont supprimées ou les messages sont supprimés en raison d’une stratégie de rétention).
allowDifferentTopicSchemas false Si plusieurs rubriques avec différents schémas sont lues, utilisez ce paramètre pour désactiver la désérialisation automatique des valeurs de rubrique basée sur des schémas. Seules les valeurs brutes sont retournées quand il s’agit de true.
startingOffsets latest S’il s’agit de latest, le lecteur lit les enregistrements les plus récents après son exécution. S’il s’agit de earliest, le lecteur lit à partir du décalage le plus ancien. L’utilisateur peut également spécifier une chaîne JSON qui spécifie un décalage spécifique.
maxBytesPerTrigger aucune Limite logicielle du nombre maximal d’octets que nous voulons traiter par microbatch. Si cette valeur est spécifiée, admin.url doit également être spécifié.
admin.url aucune Configuration serviceHttpUrl de Pulsar. Nécessaire uniquement quand maxBytesPerTrigger est spécifié.

Vous pouvez également spécifier n’importe quelle configuration client, administrateur et lecteur Pulsar à l’aide des modèles suivants :

Modèle Lien vers les options de configuration
pulsar.client.* Configuration du client Pulsar
pulsar.admin.* Configuration de l’administrateur Pulsar
pulsar.reader.* Configuration du lecteur Pulsar

Construction de décalages de démarrage JSON

Vous pouvez construire manuellement un identifiant de message pour spécifier un décalage spécifique et le transmettre en tant que JSON à l’option startingOffsets. L’exemple de code suivant illustre cette syntaxe :

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()