Início Rápido: Receber eventos dos Hubs de Eventos com o Apache Storm

O Apache Storm é um sistema de computação distribuído em tempo real que simplifica o processamento fiável de fluxos de dados não vinculados. Esta secção mostra como utilizar um Hubs de Eventos do Azure spout do Storm para receber eventos dos Hubs de Eventos. Com o Apache Storm, pode dividir eventos em vários processos alojados em nós diferentes. A integração dos Hubs de Eventos com o Storm simplifica o consumo de eventos ao fazer um ponto de verificação transparente do seu progresso com a instalação do Zookeeper do Storm, ao gerir pontos de verificação persistentes e receções paralelas dos Hubs de Eventos.

Para obter mais informações sobre os padrões de receção dos Hubs de Eventos, veja a Descrição geral dos Hubs de Eventos.

Pré-requisitos

Antes de começar com o início rápido, crie um espaço de nomes dos Hubs de Eventos e um hub de eventos. Utilize o portal do Azure para criar um espaço de nomes do tipo Hubs de Eventos e obter as credenciais de gestão de que a sua aplicação precisa para comunicar com o hub de eventos. Para criar um espaço de nomes e um hub de eventos, siga o procedimento neste artigo.

Criar projeto e adicionar código

  1. Utilize o seguinte comando para instalar o pacote no arquivo maven local. Isto permite-lhe adicioná-lo como referência no projeto Storm num passo posterior.

    mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar
    
  2. No Eclipse, crie um novo projeto Maven (clique em Ficheiro e, em seguida, Em Novo e, em seguida, em Projeto).

    Ficheiro -> Novo -> Projeto

  3. Selecione Utilizar localização predefinida da Área de Trabalho e, em seguida, clique em Seguinte

  4. Selecione o arquétipo maven-archetype-quickstart e, em seguida, clique em Seguinte

  5. Insira um GroupId e ArtifactId e, em seguida, clique em Concluir

  6. No pom.xml, adicione as seguintes dependências no <dependency> nó.

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.eventhubs</groupId>
        <artifactId>eventhubs-storm-spout</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>com.netflix.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>1.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
        <scope>provided</scope>
    </dependency>
    
  7. Na pasta src , crie um ficheiro chamado Config.properties e copie o seguinte conteúdo, substituindo os receive rule key valores e event hub name :

    eventhubspout.username = ReceiveRule
    eventhubspout.password = {receive rule key}
    eventhubspout.namespace = ioteventhub-ns
    eventhubspout.entitypath = {event hub name}
    eventhubspout.partitions.count = 16
    
    # if not provided, will use storm's zookeeper settings
    # zookeeper.connectionstring=localhost:2181
    
    eventhubspout.checkpoint.interval = 10
    eventhub.receiver.credits = 10
    

    O valor de eventhub.receiver.credits determina quantos eventos são colocados em lotes antes de os lançar no pipeline do Storm. Por uma questão de simplicidade, este exemplo define este valor como 10. Na produção, deve normalmente ser definido para valores mais elevados; por exemplo, 1024. 1 . Crie uma nova classe denominada LoggerBolt com o seguinte código:

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class LoggerBolt extends BaseRichBolt {
        private OutputCollector collector;
        private static final Logger logger = LoggerFactory
                  .getLogger(LoggerBolt.class);
    
        @Override
        public void execute(Tuple tuple) {
            String value = tuple.getString(0);
            logger.info("Tuple value: " + value);
    
            collector.ack(tuple);
        }
    
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            this.count = 0;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // no output fields
        }
    
    }
    

    Este bolt do Storm regista o conteúdo dos eventos recebidos. Isto pode ser facilmente expandido para armazenar cadeias de identificação num serviço de armazenamento. O exemplo do HdInsight Storm com o Hub de Eventos utiliza esta mesma abordagem para armazenar dados no Armazenamento do Azure e no Power BI.

  8. Crie uma classe denominada LogTopology com o seguinte código:

    import java.io.FileReader;
    import java.util.Properties;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    import com.microsoft.eventhubs.samples.EventCount;
    import com.microsoft.eventhubs.spout.EventHubSpout;
    import com.microsoft.eventhubs.spout.EventHubSpoutConfig;
    
    public class LogTopology {
        protected EventHubSpoutConfig spoutConfig;
        protected int numWorkers;
    
        protected void readEHConfig(String[] args) throws Exception {
            Properties properties = new Properties();
            if (args.length > 1) {
                properties.load(new FileReader(args[1]));
            } else {
                properties.load(EventCount.class.getClassLoader()
                        .getResourceAsStream("Config.properties"));
            }
    
            String username = properties.getProperty("eventhubspout.username");
            String password = properties.getProperty("eventhubspout.password");
            String namespaceName = properties
                    .getProperty("eventhubspout.namespace");
            String entityPath = properties.getProperty("eventhubspout.entitypath");
            String zkEndpointAddress = properties
                    .getProperty("zookeeper.connectionstring"); // opt
            int partitionCount = Integer.parseInt(properties
                    .getProperty("eventhubspout.partitions.count"));
            int checkpointIntervalInSeconds = Integer.parseInt(properties
                    .getProperty("eventhubspout.checkpoint.interval"));
            int receiverCredits = Integer.parseInt(properties
                    .getProperty("eventhub.receiver.credits")); // prefetch count
                                                                // (opt)
            System.out.println("Eventhub spout config: ");
            System.out.println("  partition count: " + partitionCount);
            System.out.println("  checkpoint interval: "
                    + checkpointIntervalInSeconds);
            System.out.println("  receiver credits: " + receiverCredits);
    
            spoutConfig = new EventHubSpoutConfig(username, password,
                    namespaceName, entityPath, partitionCount, zkEndpointAddress,
                    checkpointIntervalInSeconds, receiverCredits);
    
            // set the number of workers to be the same as partition number.
            // the idea is to have a spout and a logger bolt co-exist in one
            // worker to avoid shuffling messages across workers in storm cluster.
            numWorkers = spoutConfig.getPartitionCount();
    
            if (args.length > 0) {
                // set topology name so that sample Trident topology can use it as
                // stream name.
                spoutConfig.setTopologyName(args[0]);
            }
        }
    
        protected StormTopology buildTopology() {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
            topologyBuilder.setSpout("EventHubsSpout", eventHubSpout,
                    spoutConfig.getPartitionCount()).setNumTasks(
                    spoutConfig.getPartitionCount());
            topologyBuilder
                    .setBolt("LoggerBolt", new LoggerBolt(),
                            spoutConfig.getPartitionCount())
                    .localOrShuffleGrouping("EventHubsSpout")
                    .setNumTasks(spoutConfig.getPartitionCount());
            return topologyBuilder.createTopology();
        }
    
        protected void runScenario(String[] args) throws Exception {
            boolean runLocal = true;
            readEHConfig(args);
            StormTopology topology = buildTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (runLocal) {
                config.setMaxTaskParallelism(2);
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("test", config, topology);
                Thread.sleep(5000000);
                localCluster.shutdown();
            } else {
                config.setNumWorkers(numWorkers);
                StormSubmitter.submitTopology(args[0], config, topology);
            }
        }
    
        public static void main(String[] args) throws Exception {
            LogTopology topology = new LogTopology();
            topology.runScenario(args);
        }
    }
    

    Esta classe cria um novo spout dos Hubs de Eventos, utilizando as propriedades no ficheiro de configuração para instanciá-lo. É importante ter em atenção que este exemplo cria tantas tarefas de spouts como o número de partições no hub de eventos, de modo a utilizar o paralelismo máximo permitido por esse hub de eventos.

Passos seguintes

Pode saber mais sobre os Hubs de Eventos ao aceder às seguintes ligações: