API Hive Warehouse Connector 2.0 in Azure HDInsight

Questo articolo elenca tutte le API supportate da Hive Warehouse Connector 2.0. Tutti gli esempi illustrati sono come eseguire usando spark-shell e la sessione del connettore hive warehouse.

Come creare una sessione del connettore Hive warehouse:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()


Completare la procedura di configurazione di Hive Warehouse Connector.

API supportate

  • Impostare il database:

  • Elencare tutti i database:

  • Elencare tutte le tabelle nel database corrente

  • Descrivere una tabella

    // Describes the table <table-name> in the current database
    // Describes the table <table-name> in <database-name>
  • Eliminare un database

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
  • Eliminare una tabella nel database corrente

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
  • Creazione di un database

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
  • Creare una tabella nel database corrente

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")

    Il generatore per create-table supporta solo le operazioni seguenti:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    // Creates the table


    Questa API crea una tabella formattata ORC nel percorso predefinito. Per altre funzionalità/opzioni o per creare una tabella usando query Hive, usare l'API executeUpdate .

  • Leggere una tabella

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
  • Eseguire comandi DDL in HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
  • Eseguire query Hive e caricare i risultati nel set di dati

    • Esecuzione di query tramite daemon LLAP. [Consigliato]

      // <hive-query> should be a hive query 
    • Esecuzione di query tramite HiveServer2 tramite JDBC.

      Impostare su spark.datasource.hive.warehouse.smartExecutionfalse in configurazioni Spark prima di avviare la sessione Spark per usare questa API

  • Chiudere la sessione del connettore Hive Warehouse

    // Closes all the open connections and
    // release resources/locks from HiveServer2
  • Eseguire una query hive merge

    Questa API crea una query di merge Hive nel formato

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query

    Builder supporta le operazioni seguenti:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    // Executes the merge query
  • Scrivere un set di dati in una tabella Hive in batch

       .option("table", tableName)
    • TableName deve essere di tipo form <db>.<table> o <table>. Se non viene specificato alcun nome di database, verrà eseguita la ricerca/creazione della tabella nel database corrente

    • I tipi SaveMode sono:

      • Accoda: aggiunge il set di dati alla tabella specificata

      • Sovrascrittura: sovrascrive i dati nella tabella specificata con il set di dati

      • Ignora: ignora la scrittura se la tabella esiste già, non viene generato alcun errore

      • ErrorIfExists: genera un errore se la tabella esiste già

  • Scrivere un set di dati in una tabella Hive usando HiveStreaming

       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
     // To write to static partition
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster


    Stream scrive sempre i dati di accodamento.

  • Scrittura di un flusso spark in una tabella Hive

        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster

