Integrar o suporte do Apache Kafka Connect nos Hubs de Eventos do Azure

Apache Kafka Connect é uma estrutura para conectar e importar / exportar dados de / para qualquer sistema externo, como MySQL, HDFS e sistema de arquivos através de um cluster Kafka. Este artigo orienta você sobre o uso da estrutura Kafka Connect com Hubs de Eventos.

Este artigo orienta você na integração do Kafka Connect com um hub de eventos e na implantação de conectores básicos FileStreamSource e FileStreamSink de conectores. Embora esses conectores não se destinem ao uso em produção, eles demonstram um cenário Kafka Connect de ponta a ponta em que os Hubs de Eventos do Azure atuam como um broker Kafka.

Nota

Este exemplo está disponível no GitHub.

Pré-requisitos

Para concluir esta orientação, confirme que tem os seguintes pré-requisitos:

Criar um espaço de nomes dos Hubs de Eventos

É necessário um espaço de nomes dos Hubs de Eventos para enviar e receber a partir de qualquer serviços dos Hubs de Eventos. Consulte Criando um hub de eventos para obter instruções sobre como criar um namespace e um hub de eventos. Obtenha a cadeia de ligação dos Hubs de Eventos e o nome de domínio completamente qualificado (FQDN), para utilizar mais tarde. Para obter instruções, veja Get an Event Hubs connection string (Obter uma cadeia de ligação dos Hubs de Eventos).

Clonar o projeto de exemplo

Clone o repositório dos Hubs de Eventos do Azure e navegue para a subpasta tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Configurar o Kafka Connect para os Hubs de Eventos

Ao redirecionar o débito do Kafka Connect do Kafka para os Hubs de Eventos, é necessário fazer uma reconfiguração mínima. O exemplo seguinte, connect-distributed.properties, ilustra como configurar o Connect para se autenticar e comunicar com o ponto final do Kafka nos Hubs de Eventos:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

Importante

Substitua {YOUR.EVENTHUBS.CONNECTION.STRING} pela cadeia de conexão do namespace Hubs de Eventos. Para obter instruções sobre como obter a cadeia de conexão, consulte Obter uma cadeia de conexão de Hubs de Eventos. Aqui está um exemplo de configuração: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Executar o Kafka Connect

Neste passo, é iniciada uma função de trabalho do Kafka Connect localmente no modo distribuído e são utilizados os Hubs de Eventos para manter o estado do cluster.

  1. Salve o connect-distributed.properties arquivo localmente. Confirme que substitui todos os valores dentro das chavetas.
  2. Navegue para a localização da versão do Kafka no computador.
  3. Execute o ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Quando vir 'INFO Finished starting connectors and tasks', significa que a API REST da função de trabalho do Connect está pronta para interação.

Nota

O Kafka Connect usa a API Kafka AdminClient para criar automaticamente tópicos com configurações recomendadas, incluindo compactação. Uma rápida observação ao espaço de nomes no portal do Azure mostra que os tópicos internos da função de trabalho do Connect foram criados de forma automática.

Os tópicos internos do Kafka Connect devem usar compactação. A equipe de Hubs de Eventos não é responsável por corrigir configurações inadequadas se os tópicos internos do Connect estiverem configurados incorretamente.

Criar conectores

Esta seção orienta FileStreamSource você através da rotação e FileStreamSink conectores.

  1. Crie um diretório para os ficheiros de dados de entrada e de saída.

    mkdir ~/connect-quickstart
    
  2. Crie dois arquivos: um arquivo com dados de propagação a partir do qual o FileStreamSource conector lê e outro no qual nosso FileStreamSink conector grava.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Crie um FileStreamSource conector. Confirme que substitui as chavetas pelo caminho do seu diretório de raiz.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Você deve ver o hub connect-quickstart de eventos na instância dos Hubs de Eventos depois de executar o comando.

  4. Verifique o estado do conector de origem.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Opcionalmente, você pode usar o Service Bus Explorer para verificar se os eventos chegaram ao connect-quickstart tópico.

  5. Crie um conector FileStreamSink. Mais uma vez, confirme que substitui as chavetas pelo caminho do seu diretório de raiz.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Verifique o estado do conector sink.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Confirme que os dados foram replicados entre os ficheiros e que são idênticos em ambos os ficheiros.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Limpeza

O Kafka Connect cria tópicos de Hubs de Eventos para armazenar configurações, deslocamentos e status que persistem mesmo após o cluster Connect ter sido removido. A menos que essa persistência seja desejada, recomendamos que você exclua esses tópicos. Você também pode querer excluir os connect-quickstart Hubs de Eventos que foram criados durante esta explicação passo a passo.

Para saber mais sobre os Hubs de Eventos para Kafka, consulte os seguintes artigos: