Guidance on how to use Service Principal with Certificate to Authorize for EventHub Stream Read
I found this documentation https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/use-aad-authentication-to-connect-eventhubs.md online on how to use service principal with certificate to use spark stream read from EventHubs, I want to do this in PySpark running in a Synapse notebook using a spark pool, I would like to get some guidance on how to propertly do this in PySpark.
Azure Event Hubs
Azure Synapse Analytics
Azure Databricks
Microsoft Entra ID
-
Smaran Thoomu 12,100 Reputation points • Microsoft Vendor
2024-07-02T06:59:05.1766667+00:00 Hi @BEPV
Thanks for the question and using MS Q&A platform.
It appears that you are attempting to use a certificated service principal in a Synapse notebook to grant permission for a PySpark stream to be read from EventHubs using a Spark pool.To use a service principal with a certificate to authorize a PySpark stream read from EventHubs, you can follow the steps below:
- In your Azure Active Directory tenant, create a service principal and get the tenant and client IDs of the service principal.
- Obtain the password and certificate file, then create a self-signed certificate.
- Upload the certificate file to a secure location, such as Azure Key Vault or Azure Blob Storage.
- In your PySpark code, use the
com.microsoft.aad.msal4j
library to authenticate with the service principal using the certificate. You can use theConfidentialClientApplication
class to create a new instance of the authentication application, and theClientCredentialFactory.createFromCertificate
method to create a new instance of the client credential using the certificate. - Use the
com.microsoft.azure.eventhubs.spark
library to read from EventHubs using theEventHubsConf
class. In theEventHubsConf
class, set theeventhubs.connectionString
property to the connection string of your EventHubs namespace, and theeventhubs.authentication
property to an instance of theAadAuthenticationCallback
class. In theAadAuthenticationCallback
class, set theauthority
property to the tenant ID of your service principal, and theacquireToken
method to an instance of theAuthByCertCallBack
class.
This sample of code shows how to use a certificated service principal to allow a PySpark stream to be read from EventHubs:
import java.io.{ByteArrayInputStream, File} import java.util.Collections import java.util.concurrent.CompletableFuture import com.microsoft.aad.msal4j.{ClientCredentialFactory, ClientCredentialParameters, ConfidentialClientApplication, IAuthenticationResult} import org.apache.commons.io.FileUtils import org
I hope this helps. If you have any further questions or concerns, please let me know.
-
BEPV 0 Reputation points
2024-07-02T16:13:53.34+00:00 The sample code shows incomplete
-
Sina Salam 6,581 Reputation points
2024-07-02T20:47:59.34+00:00 Hello @BEPV
Thank you for your time and the question.
Regarding incomplete code if that will work for you, could you please try the below code:
import java.io.{ByteArrayInputStream, File}; import java.util.Collections; import java.util.concurrent.CompletableFuture; import com.microsoft.aad.msal4j.*; import org.apache.commons.io.FileUtils; import org.apache.spark.sql.SparkSession; import org.apache.spark.eventhubs.ConnectionStringBuilder; import org.apache.spark.eventhubs.{EventHubsConf, EventPosition}; import org.apache.spark.sql.streaming.Trigger; public class EventHubsStreaming { public static void main(String[] args) throws Exception { // Define your EventHubs and AAD configurations String eventHubsNamespace = "YOUR_EVENT_HUBS_NAMESPACE"; String eventHubName = "YOUR_EVENT_HUB_NAME"; String tenantId = "YOUR_TENANT_ID"; String clientId = "YOUR_CLIENT_ID"; String certificatePath = "PATH_TO_YOUR_CERTIFICATE.pfx"; String certificatePassword = "YOUR_CERTIFICATE_PASSWORD"; // Initialize Spark session SparkSession spark = SparkSession.builder() .appName("EventHubsSample") .getOrCreate(); // Load certificate byte[] certificateBytes = FileUtils.readFileToByteArray(new File(certificatePath)); // Create confidential client application ConfidentialClientApplication app = ConfidentialClientApplication.builder( clientId, ClientCredentialFactory.createFromCertificate( new ByteArrayInputStream(certificateBytes), certificatePassword.toCharArray() )) .authority("https://login.microsoftonline.com/" + tenantId) .build(); // Get the access token ClientCredentialParameters parameters = ClientCredentialParameters.builder( Collections.singleton("https://eventhubs.azure.net/.default")) .build(); CompletableFuture<IAuthenticationResult> future = app.acquireToken(parameters); IAuthenticationResult result = future.get(); String token = result.accessToken(); // Create EventHubs connection string ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder() .setNamespaceName(eventHubsNamespace) .setEventHubName(eventHubName) .setSasKeyName("RootManageSharedAccessKey") .setSasKey(token); // Create EventHubs configuration EventHubsConf eventHubsConf = EventHubsConf(connectionStringBuilder.toString()) .setConsumerGroup("$Default") .setStartingPosition(EventPosition.fromEndOfStream()); // Read from EventHubs spark.readStream() .format("eventhubs") .options(eventHubsConf.toMap()) .load() .writeStream() .format("console") .trigger(Trigger.ProcessingTime("5 seconds")) .start() .awaitTermination(); } }
This will help you to:
- Initialization
- Authentication
- EventHubs Configuration and
- Stream Reading.
Hope to read if this solves the issue. Though, I have another perspective for the solution.
Best Regards,
Sina
-
Smaran Thoomu 12,100 Reputation points • Microsoft Vendor
2024-07-04T09:41:49.15+00:00 @BEPV We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others. Otherwise, will respond with more details and we will try to help.
Sign in to comment