Coletor de mensagens do Apache Kafka® no Azure Cosmos DB para Apache Cassandra, com o Apache Flink® no Azure HDInsight no AKS

Observação

Desativaremos o Microsoft Azure HDInsight no AKS em 31 de janeiro de 2025. Para evitar o encerramento abrupto das suas cargas de trabalho, você precisará migrá-las para o Microsoft Fabric ou para um produto equivalente do Azure antes de 31 de janeiro de 2025. Os clusters restantes em sua assinatura serão interrompidos e removidos do host.

Somente o suporte básico estará disponível até a data de desativação.

Importante

Esse recurso está atualmente na visualização. Os Termos de uso complementares para versões prévias do Microsoft Azure incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, confira Informações sobre a versão prévia do Azure HDInsight no AKS. No caso de perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para ver mais atualizações sobre a Comunidade do Azure HDInsight.

Este exemplo usa o Apache Flink para o coletor de mensagens do Azure HDInsight para Apache Kafka no Azure Cosmos DB for Apache Cassandra.

Esse exemplo é proeminente quando os engenheiros preferem dados agregados em tempo real para análise. Com acesso a dados históricos agregados, você pode criar modelos de aprendizado de máquina (ML) para criar insights ou ações. Você também pode ingerir dados de IoT no Apache Flink para agregar dados em tempo real e armazená-los no Apache Cassandra.

Pré-requisitos

Azure Cosmos DB para Apache Cassandra

O Azure Cosmos DB for Apache Cassandra pode ser usado como armazenamento de dados para aplicativos escritos para o Apache Cassandra. Essa compatibilidade significa que ao usar os drivers do Apache em conformidade com CQLv4 existentes, o aplicativo do Cassandra agora pode se comunicar com a API para Cassandra.

Para obter mais informações, consulte os links a seguir.

Captura de tela mostrando como criar o Azure Cosmos DB para Apache Cassandra no portal do Azure.

“Obter credenciais” a usará no código-fonte do Stream:

Captura de tela mostrando como obter credenciais no código-fonte do fluxo.

Implementação

Em uma VM Ubuntu, prepare o ambiente de desenvolvimento

Clonagem do repositório de amostras do Azure

Confira o leia-me do GitHub para baixar o Maven e clone o repositório usando Azure-Samples/azure-cosmos-db-cassandra-java-getting-started.git de Amostras do Azure

Atualizar o projeto Maven para o Cassandra

Acesse a pasta do projeto Maven azure-cosmos-db-cassandra-java-getting-started-main e atualize as alterações necessárias para este exemplo.

maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
 
  <groupId>com.azure.cosmosdb.cassandra</groupId>
  <artifactId>cosmosdb-cassandra-examples</artifactId>
   <version>1.0-SNAPSHOT</version>
  <dependencies>
            <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
           <version>1.17.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java</artifactId>
           <version>1.17.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-files</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka</artifactId>
           <version>1.17.0</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-core</artifactId>
          <version>3.3.0</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-mapping</artifactId>
          <version>3.1.4</version>
       </dependency>
       <dependency>
          <groupId>com.datastax.cassandra</groupId>
          <artifactId>cassandra-driver-extras</artifactId>
          <version>3.1.4</version>
       </dependency>
       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.5</version>
       </dependency>
       <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.5</version>
       </dependency>
   </dependencies>
 
   <build>
       <plugins>
           <plugin>
              <artifactId>maven-assembly-plugin</artifactId>
               <configuration>
                   <descriptorRefs>
                      <descriptorRef>jar-with-dependencies</descriptorRef>
                   </descriptorRefs>
                  <finalName>cosmosdb-cassandra-examples</finalName>
                  <appendAssemblyId>false</appendAssemblyId>
               </configuration>
               <executions>
                   <execution>
                      <id>make-assembly</id>
                      <phase>package</phase>
                       <goals>
                          <goal>single</goal>
                       </goals>
                   </execution>
               </executions>
           </plugin>
           <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
               <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
               </configuration>
           </plugin>
       </plugins>
   </build>
 
</project>

Configuração de conexão do Cosmos DB for Apache Cassandra

É necessário atualizar o nome de host, o nome de usuário e as chaves no snippet abaixo.

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/resources# cat config.properties
###Cassandra endpoint details on cosmosdb
cassandra_host=<update-host-name>.cassandra.cosmos.azure.com
cassandra_port = 10350
cassandra_username=<update-user-name>
cassandra_password=mxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#ssl_keystore_file_path=<SSL key store file location>
#ssl_keystore_password=<SSL key store password>

estrutura de origem

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra# ll
total 24
drwxr-xr-x 5 root root 4096 May 12 12:46 ./
drwxr-xr-x 3 root root 4096 Apr  9  2020 ../
-rw-r--r-- 1 root root 1105 Apr  9  2020 User.java
drwxr-xr-x 2 root root 4096 May 15 03:53 examples/
drwxr-xr-x 2 root root 4096 Apr  9  2020 repository/
drwxr-xr-x 2 root root 4096 May 15 02:43 util/

pasta util
CassandraUtils.java

Observação

A alteração de ssl_keystore_file_path depende da localização do certificado Java. Cluster do Apache Flink no Azure HDInsight no AKS, o caminho é /usr/lib/jvm/msopenjdk-11-jre/lib/security

package com.azure.cosmosdb.cassandra.util;

import com.datastax.driver.core.*;

import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.security.*;

/**
 * Cassandra utility class to handle the Cassandra Sessions
 */
public class CassandraUtils {

    private Cluster cluster;
    private Configurations config = new Configurations();
    private String cassandraHost = "<cassandra-host-ip>";
    private int cassandraPort = 10350;
    private String cassandraUsername = "localhost";
    private String cassandraPassword = "<cassandra-password>";
    private File sslKeyStoreFile = null;
    private String sslKeyStorePassword = "<keystore-password>";


    /**
     * This method creates a Cassandra Session based on the end-point details given in config.properties.
     * This method validates the SSL certificate based on ssl_keystore_file_path & ssl_keystore_password properties.
     * If ssl_keystore_file_path & ssl_keystore_password are not given then it uses 'cacerts' from JDK.
     * @return Session Cassandra Session
     */
    public Session getSession() {

        try {
            //Load cassandra endpoint details from config.properties
            loadCassandraConnectionDetails();

            final KeyStore keyStore = KeyStore.getInstance("JKS");
            try (final InputStream is = new FileInputStream(sslKeyStoreFile)) {
                keyStore.load(is, sslKeyStorePassword.toCharArray());
            }

            final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
                    .getDefaultAlgorithm());
            kmf.init(keyStore, sslKeyStorePassword.toCharArray());
            final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory
                    .getDefaultAlgorithm());
            tmf.init(keyStore);

            // Creates a socket factory for HttpsURLConnection using JKS contents.
            final SSLContext sc = SSLContext.getInstance("TLSv1.2");
            sc.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new java.security.SecureRandom());

            JdkSSLOptions sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
                    .withSSLContext(sc)
                    .build();
            cluster = Cluster.builder()
                    .addContactPoint(cassandraHost)
                    .withPort(cassandraPort)
                    .withCredentials(cassandraUsername, cassandraPassword)
                    .withSSL(sslOptions)
                    .build();

            return cluster.connect();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        return null;
    }

    public Cluster getCluster() {
        return cluster;
    }

    /**
     * Closes the cluster and Cassandra session
     */
    public void close() {
        cluster.close();
    }

    /**
     * Loads Cassandra end-point details from config.properties.
     * @throws Exception
     */
    private void loadCassandraConnectionDetails() throws Exception {
        cassandraHost = config.getProperty("cassandra_host");
        cassandraPort = Integer.parseInt(config.getProperty("cassandra_port"));
        cassandraUsername = config.getProperty("cassandra_username");
        cassandraPassword = config.getProperty("cassandra_password");
        String ssl_keystore_file_path = config.getProperty("ssl_keystore_file_path");
        String ssl_keystore_password = config.getProperty("ssl_keystore_password");

        // If ssl_keystore_file_path, build the path using JAVA_HOME directory.
        if (ssl_keystore_file_path == null || ssl_keystore_file_path.isEmpty()) {
            String javaHomeDirectory = System.getenv("JAVA_HOME");
            if (javaHomeDirectory == null || javaHomeDirectory.isEmpty()) {
                throw new Exception("JAVA_HOME not set");
            }
            ssl_keystore_file_path = new StringBuilder(javaHomeDirectory).append("/lib/security/cacerts").toString();
        }

        sslKeyStorePassword = (ssl_keystore_password != null && !ssl_keystore_password.isEmpty()) ?
                ssl_keystore_password : sslKeyStorePassword;

        sslKeyStoreFile = new File(ssl_keystore_file_path);

        if (!sslKeyStoreFile.exists() || !sslKeyStoreFile.canRead()) {
            throw new Exception(String.format("Unable to access the SSL Key Store file from %s", ssl_keystore_file_path));
        }
    }
}

Configurations.java

package com.azure.cosmosdb.cassandra.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Configuration utility to read the configurations from properties file
 */
public class Configurations {
    private static final Logger LOGGER = LoggerFactory.getLogger(Configurations.class);
    private static String PROPERTY_FILE = "config.properties";
    private static Properties prop = null;

    private void loadProperties() throws IOException {
        InputStream input = getClass().getClassLoader().getResourceAsStream(PROPERTY_FILE);
        if (input == null) {
            LOGGER.error("Sorry, unable to find {}", PROPERTY_FILE);
            return;
        }
        prop = new Properties();
        prop.load(input);
    }

    public String getProperty(String propertyName) throws IOException {
        if (prop == null) {
            loadProperties();
        }
        return prop.getProperty(propertyName);

    }
}

Pasta de exemplos

CassandraSink.java

package com.azure.cosmosdb.cassandra.examples;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.azure.cosmosdb.cassandra.repository.UserRepository;
import com.azure.cosmosdb.cassandra.util.CassandraUtils;

public class CassandraSink implements SinkFunction<Tuple3<Integer, String, String>> {

    @Override
    public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {

            CassandraUtils utils = new CassandraUtils();
            Session cassandraSession = utils.getSession();
            try {
                UserRepository repository = new UserRepository(cassandraSession);

                //Insert rows into user table
                PreparedStatement preparedStatement = repository.prepareInsertStatement();
                repository.insertUser(preparedStatement, value.f0, value.f1, value.f2);

                } finally {
                        if (null != utils) utils.close();
                        if (null != cassandraSession) cassandraSession.close();
                        }
            }
}

Classe principal: CassandraDemo.java

Observação

  • Substitua os IPs do agente do Kafka pelos IPs do agente do cluster do Kafka
  • Prepare o tópico
    • usuário /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic user --bootstrap-server wn0-flinkd:9092
package com.azure.cosmosdb.cassandra.examples;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CassandraDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        // 1. read kafka message as stream input, update the broker IPs from your Kafka setup
        String brokers = "<update-broker-ips>:9092,<update-broker-ips>:9092,<update-broker-ips>:9092";

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("user")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        kafka.print();

        DataStream<Tuple3<Integer,String,String>> dataStream = kafka.map(line-> {
            String[] fields = line.split(",");
            int v1 = Integer.parseInt(fields[0]);
            Tuple3<Integer,String,String> tuple3 = Tuple3.of(v1,fields[1],fields[2]);
            return tuple3;
        }).returns(Types.TUPLE(Types.INT,Types.STRING,Types.STRING));


        dataStream.addSink(new CassandraSink());

        // 4. run stream
        env.execute("sink Kafka to Cosmos DB for Apache Cassandra");
    }
}

Como compilar o projeto

Execute mvn clean install na pasta azure-cosmos-db-cassandra-java-getting-started-main para compilar o projeto. Este comando gera cosmosdb-cassandra-examples.jar na pasta de destino.

root@flinkvm:/home/flinkvm/azure-cosmos-db-cassandra-java-getting-started-main/target# ll
total 91156
drwxr-xr-x 7 root root     4096 May 15 03:54 ./
drwxr-xr-x 7 root root     4096 May 15 03:54 ../
drwxr-xr-x 2 root root     4096 May 15 03:54 archive-tmp/
drwxr-xr-x 3 root root     4096 May 15 03:54 classes/
-rw-r--r-- 1 root root    15542 May 15 03:54 cosmosdb-cassandra-examples-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 93290819 May 15 03:54 cosmosdb-cassandra-examples.jar
drwxr-xr-x 3 root root     4096 May 15 03:54 generated-sources/
drwxr-xr-x 2 root root     4096 May 15 03:54 maven-archiver/
drwxr-xr-x 3 root root     4096 May 15 03:54 maven-status/

Carregue o JAR no Armazenamento do Azure e o wget no webssh

msdata@pod-0 [ ~ ]$ ls -l cosmosdb-cassandra-examples.jar
-rw-r----- 1 msdata msdata 93290819 May 15 04:02 cosmosdb-cassandra-examples.jar

Como preparar o repositório de chaves e a tabela do Cosmos DB

Execute a classe UserProfile em /azure-cosmos-db-cassandra-java-getting-started-main/src/main/java/com/azure/cosmosdb/cassandra/examples para criar o repositório de chaves e a tabela do Azure Cosmos DB.

bin/flink run -c com.azure.cosmosdb.cassandra.examples.UserProfile -j cosmosdb-cassandra-examples.jar

Coletor de tópicos do Kafka no Cosmos DB for Apache Cassandra

Execute a classe CassandraDemo para coletar o tópico do Kafka no Cosmos DB for Apache Cassandra.

bin/flink run -c com.azure.cosmosdb.cassandra.examples.CassandraDemo -j cosmosdb-cassandra-examples.jar

Captura de tela mostrando como executar a Demonstração do Cassandra.

Verificar o trabalho na interface do usuário da Web do Flink no Azure HDInsight no Cluster do AKS.

Captura de tela mostrando como verificar o trabalho no HDInsight na interface do usuário do AKS Flink.

Produzir mensagens no Kafka

Produza uma mensagem no tópico do Kafka.

sshuser@hn0-flinkd:~$ cat user.py
import time
from datetime import datetime
import random

user_set = [
        'John',
        'Mike',
        'Lucy',
        'Tom',
        'Machael',
        'Lily',
        'Zark',
        'Tim',
        'Andrew',
        'Pick',
        'Sean',
        'Luke',
        'Chunck'
]

city_set = [
        'Atmore',
        'Auburn',
        'Bessemer',
        'Birmingham',
        'Chickasaw',
        'Clanton',
        'Decatur',
        'Florence',
        'Greenville',
        'Jasper',
        'Huntsville',
        'Homer',
        'Homer'
]

def main():
        while True:
                unique_id = str(int(time.time()))
                if random.randrange(10) < 4:
                        city = random.choice(city_set[:3])
                else:
                        city = random.choice(city_set)
                user = random.choice(user_set)
                print(unique_id + "," + user + "," + city )
                time.sleep(1)

if __name__ == "__main__":
    main()
sshuser@hn0-flinkd:~$ python user.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-flinkd:9092 --topic user &
[2] 11516

Verificar no portal do Azure a tabela do Cosmos DB for Apache Cassandra

Captura de tela mostrando o Cosmos DB para Apache Cassandra no portal do Azure.

Preferências