Hive Warehouse Connector 2.0 APIs in Azure HDInsight

In diesem Artikel sind alle APIs aufgeführt, die von Hive Warehouse Connector 2.0 unterstützt werden. Alle gezeigten Beispiele veranschaulichen die Ausführung mit spark-shell und Hive Warehouse Connector-Sitzungen.

Gehen Sie wie folgt vor, um eine Hive Warehouse Connector-Sitzung zu erstellen:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Voraussetzung

Schließen Sie die Schritte für das Hive Warehouse Connector-Setup ab.

Unterstützte APIs

  • Festlegen der Datenbank:

    hive.setDatabase("<database-name>")
    
  • Auflisten aller Datenbanken:

    hive.showDatabases()
    
  • Auflisten aller Tabellen in der aktuellen Datenbank

    hive.showTables()
    
  • Beschreiben einer Tabelle

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Löschen einer Datenbank

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Auflisten einer Tabelle in der aktuellen Datenbank

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Erstellen einer Datenbank

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Erstellen einer Tabelle in der aktuellen Datenbank

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    Für den create-table-Generator werden nur die folgenden Vorgänge unterstützt:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Hinweis

    Bei dieser API wird eine Tabelle mit ORC-Formatierung an einem Standardspeicherort erstellt. Verwenden Sie die executeUpdate-API für andere Features bzw. Optionen oder zum Erstellen einer Tabelle mit Hive-Abfragen.

  • Lesen einer Tabelle

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Ausführen von DDL-Befehlen auf HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw excpetion in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Ausführen einer Hive-Abfrage und Laden des Ergebnisses in das Dataset

    • Ausführen einer Abfrage über LLAP-Daemons [Empfohlen]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Ausführen einer Abfrage über HiveServer2 per JDBC

      Legen Sie spark.datasource.hive.warehouse.smartExecution in der Spark-Konfiguration auf false fest, bevor Sie die Spark-Sitzung für die Verwendung dieser API starten.

      hive.execute("<hive-query>")
      
  • Schließen der Hive Warehouse Connector-Sitzung

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Ausführen einer Hive-MERGE-Abfrage

    Diese API erstellt eine Hive-MERGE-Abfrage im folgenden Format:

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    Der Generator unterstützt die folgenden Vorgänge:

    mergeBuilder.mergeInto("<taget-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Schreiben eines Datasets in eine Hive-Tabelle per Batchvorgang

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • „TableName“ sollte das Format <db>.<table> oder <table> haben. Wenn kein Datenbankname angegeben wird, wird die Suche bzw. Erstellung der Tabelle in der aktuellen Datenbank durchgeführt.

    • Die SaveMode-Typen lauten:

      • Append: Fügt das Dataset an die jeweilige Tabelle an.

      • Overwrite: Überschreibt die Daten in der jeweiligen Tabelle durch das Dataset.

      • Ignore: Überspringt den Schreibvorgang ohne Fehlerauslösung, falls die Tabelle bereits vorhanden ist.

      • ErrorIfExists: Löst einen Fehler aus, falls die Tabelle bereits vorhanden ist.

  • Schreiben eines Datasets in eine Hive-Tabelle per HiveStreaming

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Hinweis

    Bei Streamschreibvorgängen werden immer Daten angefügt.

  • Schreiben eines Spark-Streams in eine Hive-Tabelle

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Nächste Schritte