Interagir com clusters do Apache Kafka no Azure HDInsight através de um proxy REST
O Kafka REST Proxy permite que você interaja com seu cluster Kafka por meio de uma API REST sobre HTTPS. Esta ação significa que os seus clientes Kafka podem estar fora da sua rede virtual. Os clientes podem fazer chamadas HTTPS simples e seguras para o cluster Kafka, em vez de depender das bibliotecas Kafka. Este artigo mostra como criar um cluster Kafka habilitado para proxy REST. Também fornece um código de exemplo que mostra como fazer chamadas para proxy REST.
Referência da API REST
Para operações suportadas pela API REST do Kafka, consulte Referência da API de proxy REST do HDInsight Kafka.
Fundo
Para obter a especificação completa das operações suportadas pela API, consulte Apache Kafka REST Proxy API.
Ponto de extremidade do proxy REST
A criação de um cluster Kafka do HDInsight com proxy REST cria um novo ponto de extremidade público para o cluster, que você pode encontrar nas Propriedades do cluster HDInsight no portal do Azure.
Segurança
Acesso ao proxy Kafka REST gerenciado com grupos de segurança do Microsoft Entra. Ao criar o cluster Kafka, forneça ao grupo de segurança Microsoft Entra acesso ao ponto de extremidade REST. Os clientes Kafka que precisam de acesso ao proxy REST devem ser registrados neste grupo pelo proprietário do grupo. O proprietário do grupo pode se registrar por meio do Portal ou do PowerShell.
Para solicitações de ponto de extremidade de proxy REST, os aplicativos cliente devem obter um token OAuth. O token é usado para verificar a associação ao grupo de segurança. Encontrar um exemplo de aplicativo cliente mostra como obter um token OAuth. O aplicativo cliente passa o token OAuth na solicitação HTTPS para o proxy REST.
Nota
Consulte Gerenciar o acesso a aplicativos e recursos usando grupos do Microsoft Entra para saber mais sobre os grupos de segurança do Microsoft Entra. Para obter mais informações sobre como os tokens OAuth funcionam, consulte Autorizar o acesso a aplicativos Web do Microsoft Entra usando o fluxo de concessão de código OAuth 2.0.
Proxy Kafka REST com grupos de segurança de rede
Se você trouxer sua própria rede virtual e controlar o tráfego de rede com grupos de segurança de rede, permita o tráfego de entrada na porta 9400 , além da porta 443. Isso garante que o servidor proxy Kafka REST esteja acessível.
Pré-requisitos
Registe uma aplicação com o Microsoft Entra ID. Os aplicativos cliente que você escreve para interagir com o proxy REST Kafka usam a ID e o segredo desse aplicativo para autenticar no Azure.
Crie um grupo de segurança do Microsoft Entra. Adicione o aplicativo que você registrou com a ID do Microsoft Entra ao grupo de segurança como membro do grupo. Esse security group será usado para controlar quais aplicativos permitem interagir com o proxy REST. Para obter mais informações sobre como criar grupos do Microsoft Entra, consulte Criar um grupo básico e adicionar membros usando o Microsoft Entra ID.
Valide se o grupo é do tipo Segurança.
Valide se o aplicativo é membro do Grupo.
Criar um cluster Kafka com proxy REST ativado
As etapas usam o portal do Azure. Para obter um exemplo usando a CLI do Azure, consulte Criar cluster de proxy REST Apache Kafka usando a CLI do Azure.
Durante o fluxo de trabalho de criação do cluster Kafka, na guia Segurança + rede , marque a opção Habilitar proxy REST Kafka.
Clique em Selecionar Grupo de Segurança. Na lista de grupos de segurança, selecione o grupo de segurança que você deseja ter acesso ao proxy REST. Você pode usar a caixa de pesquisa para encontrar o grupo de segurança apropriado. Clique no botão Selecionar na parte inferior.
Conclua as etapas restantes para criar seu cluster conforme descrito em Criar cluster Apache Kafka no Azure HDInsight usando o portal do Azure.
Depois que o cluster for criado, vá para as propriedades do cluster para registrar a URL do proxy REST Kafka.
Exemplo de aplicativo cliente
Você pode usar o código Python para interagir com o proxy REST em seu cluster Kafka. Para utilizar o exemplo de código, siga estes passos:
Salve o código de exemplo em uma máquina com Python instalado.
Instale as dependências Python necessárias executando
pip3 install msal
.Modifique a seção de código Configure essas propriedades e atualize as seguintes propriedades para seu ambiente:
Property Description ID de Inquilino do O locatário do Azure onde sua assinatura está. ID de Cliente A ID do aplicativo que você registrou no grupo de segurança. Segredo do Cliente O segredo para o aplicativo que você registrou no grupo de segurança. Kafkarest_endpoint Obtenha esse valor na guia Propriedades na visão geral do cluster, conforme descrito na seção de implantação. Deve estar no seguinte formato: https://<clustername>-kafkarest.azurehdinsight.net
A partir da linha de comando, execute o arquivo Python executando
sudo python3 <filename.py>
Este código executa a seguinte ação:
- Busca um token OAuth do Microsoft Entra ID.
- Mostra como fazer uma solicitação para o proxy Kafka REST.
Para obter mais informações sobre como obter tokens OAuth em Python, consulte Classe Python AuthenticationContext. Você pode ver um atraso enquanto topics
isso não é criado ou excluído através do proxy Kafka REST são refletidos lá. Esse atraso é devido à atualização do cache. O campo de valor da API do produtor foi aprimorado. Agora, ele aceita objetos JSON e qualquer formulário serializado.
#Required Python packages
#pip3 install msal
import json
import msal
import random
import requests
import string
import sys
import time
def get_random_string():
letters = string.ascii_letters
random_string = ''.join(random.choice(letters) for i in range(7))
return random_string
#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'aaaabbbb-0000-cccc-1111-dddd2222eeee'
# Your Client Application Id
client_id = '00001111-aaaa-2222-bbbb-3333cccc4444'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.net"
#--------------------------Configure these properties-------------------------------#
# Get access token
# Scope
scope = 'https://hib.azurehdinsight.net/.default'
#Authority
authority = 'https://login.microsoftonline.com/' + tenant_id
app = msal.ConfidentialClientApplication(
client_id , client_secret, authority,
#cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)
# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10
# Print access token
print("Access token: " + accessToken)
# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}' # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'
# Request header
headers = {
'Authorization': 'Bearer ' + accessToken,
'Content-type': 'application/json' # set Content-type to 'application/json'
}
# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")
topics = []
# Create a new topic
# Example of topic config
topic_config = {
"partition_count": 1,
"replication_factor": 1,
"topic_properties": {
"retention.ms": 604800000,
"min.insync.replicas": "1"
}
}
create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)
if response.ok:
while new_topic not in topics:
print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
time.sleep(30)
# List Topic
get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)
response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
topic_list = response.json()
topics = topic_list.get("topics", [])
else:
print("Topic " + new_topic + " was created. Exit.")
sys.exit(1)
# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
"records": [
{
"key": "key1",
"value": "**********" # A string
},
{
"partition": 0,
"value": 5 # An integer
},
{
"value": 3.14 # A floating number
},
{
"value": { # A JSON object
"id": 1,
"name": "HDInsight Kafka REST proxy"
}
},
{
"value": [ # A list of JSON objects
{
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
{
"id": 2,
"name": "HDInsight Kafka REST proxy 2"
},
{
"id": 3,
"name": "HDInsight Kafka REST proxy 3"
}
]
},
{
"value": { # A nested JSON object
"group id": 1,
"HDI Kafka REST": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1"
},
"HDI Kafka REST server info": {
"id": 1,
"name": "HDInsight Kafka REST proxy 1",
"servers": [
{
"server id": 1,
"server name": "HDInsight Kafka REST proxy server 1"
},
{
"server id": 2,
"server name": "HDInsight Kafka REST proxy server 2"
},
{
"server id": 3,
"server name": "HDInsight Kafka REST proxy server 3"
}
]
}
}
}
]
}
print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)
# Consume messages from the topic
partition_id = 0
offset = 0
count = 2
while True:
consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
print("Consuming " + str(count) + " messages from offset " + str(offset))
response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)
if response.ok:
messages = response.json()
print("Consumed messages: \n" + json.dumps(messages, indent=2))
next_offset = response.headers.get("NextOffset")
if offset == next_offset or not messages.get("records", []):
print("Consumer caught up with producer. Exit for now...")
break
offset = next_offset
else:
print("Error " + str(response.status_code))
break
# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from " + get_partitions_url)
response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))
# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from " + get_partition_url)
response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))
Encontre abaixo outro exemplo sobre como obter um token do Azure para proxy REST usando um comando curl. Observe que precisamos do scope=https://hib.azurehdinsight.net/.default
especificado ao obter um token.
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.net/.default' 'https://login.microsoftonline.com/<tenantid>/oauth2/v2.0/token'