Diffuser en continu à partir d’Apache Pulsar
Important
Cette fonctionnalité est disponible en préversion publique.
Dans Databricks Runtime 14.1 et versions ultérieures, vous pouvez utiliser Structured Streaming pour diffuser en continu des données à partir d’Apache Pulsar sur Azure Databricks.
Structured Streaming fournit une sémantique de traitement exactement une fois pour les données lues à partir de sources Pulsar.
Exemple de syntaxe
Voici un exemple de base d’utilisation de Structured Streaming pour lire à partir de Pulsar :
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Vous devez toujours fournir un service.url
et l’une des options suivantes pour spécifier des rubriques :
topic
topics
topicsPattern
Pour obtenir la liste complète des options, consultez Configurer les options de lecture en continu Pulsar.
S’authentifier auprès de Pulsar
Azure Databricks prend en charge l’authentification truststore et keystore sur Pulsar. Databricks recommande d’utiliser des secrets lors du stockage des détails de configuration.
Vous pouvez définir les options suivantes lors de la configuration du flux :
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Si le flux utilise un PulsarAdmin
, définissez également les éléments suivants :
pulsar.admin.authPluginClassName
pulsar.admin.authParams
L’exemple suivant illustre la configuration des options d’authentification :
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Schéma Pulsar
Le schéma des enregistrements lus à partir de Pulsar dépend de la façon dont les rubriques ont leurs schémas encodés.
- Pour les rubriques avec un schéma Avro ou JSON, les noms de champs et les types de champs sont conservés dans le DataFrame Spark résultant.
- Pour les rubriques sans schéma ou avec un type de données simple dans Pulsar, la charge utile est chargée dans une colonne
value
. - Si le lecteur est configuré pour lire plusieurs rubriques avec différents schémas, définissez
allowDifferentTopicSchemas
pour charger le contenu brut dans une colonnevalue
.
Les enregistrements Pulsar ont les champs de métadonnées suivants :
Colonne | Type |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Configurer les options de lecture en continu Pulsar
Toutes les options sont configurées dans le cadre d’une lecture Structured Streaming à l’aide de la syntaxe .option("<optionName>", "<optionValue>")
. Vous pouvez également configurer l’authentification à l’aide d’options. Consultez S’authentifier auprès de Pulsar.
Le tableau suivant décrit les configurations requises pour Pulsar. Vous ne devez spécifier qu’une des options topic
, topics
ou topicsPattern
.
Option | Valeur par défaut | Description |
---|---|---|
service.url |
aucune | Configuration serviceUrl de Pulsar pour le service Pulsar. |
topic |
aucune | Chaîne de nom de rubrique pour la rubrique à consommer. |
topics |
aucune | Liste séparée par des virgules des rubriques à consommer. |
topicsPattern |
aucune | Chaîne d’expression régulière Java à mettre en correspondance sur les rubriques à consommer. |
Le tableau suivant décrit d’autres options prises en charge pour Pulsar :
Option | Valeur par défaut | Description |
---|---|---|
predefinedSubscription |
aucune | Nom d’abonnement prédéfini utilisé par le connecteur pour suivre la progression de l’application Spark. |
subscriptionPrefix |
aucune | Préfixe utilisé par le connecteur pour générer un abonnement aléatoire pour suivre la progression de l’application Spark. |
pollTimeoutMs |
120 000 | Délai d’attente de lecture des messages à partir de Pulsar en millisecondes. |
waitingForNonExistedTopic |
false |
Indique si le connecteur doit attendre que les rubriques souhaitées soient créées. |
failOnDataLoss |
true |
Contrôle l’échec d’une requête lorsque les données sont perdues (par exemple, les rubriques sont supprimées ou les messages sont supprimés en raison d’une stratégie de rétention). |
allowDifferentTopicSchemas |
false |
Si plusieurs rubriques avec différents schémas sont lues, utilisez ce paramètre pour désactiver la désérialisation automatique des valeurs de rubrique basée sur des schémas. Seules les valeurs brutes sont retournées quand il s’agit de true . |
startingOffsets |
latest |
S’il s’agit de latest , le lecteur lit les enregistrements les plus récents après son exécution. S’il s’agit de earliest , le lecteur lit à partir du décalage le plus ancien. L’utilisateur peut également spécifier une chaîne JSON qui spécifie un décalage spécifique. |
maxBytesPerTrigger |
aucune | Limite logicielle du nombre maximal d’octets que nous voulons traiter par microbatch. Si cette valeur est spécifiée, admin.url doit également être spécifié. |
admin.url |
aucune | Configuration serviceHttpUrl de Pulsar. Nécessaire uniquement quand maxBytesPerTrigger est spécifié. |
Vous pouvez également spécifier n’importe quelle configuration client, administrateur et lecteur Pulsar à l’aide des modèles suivants :
Modèle | Lien vers les options de configuration |
---|---|
pulsar.client.* |
Configuration du client Pulsar |
pulsar.admin.* |
Configuration de l’administrateur Pulsar |
pulsar.reader.* |
Configuration du lecteur Pulsar |
Construction de décalages de démarrage JSON
Vous pouvez construire manuellement un identifiant de message pour spécifier un décalage spécifique et le transmettre en tant que JSON à l’option startingOffsets
. L’exemple de code suivant illustre cette syntaxe :
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()