AKS 上の HDInsight での Apache Flink® クラスターの Hive 言語


この記事では、AKS 上の HDInsight での Apache Flink クラスターで Hive 言語を使用する方法について説明します。


AKS クラスター上の HDInsight での使用のために、ユーザーが既定の flink 方言を Hive 言語に変更することはできません。 どの SQL 操作も、Hive 方言に変更されると次のエラーにより失敗します。

*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*

この問題は、Hive Jira を開いているために発生します。 現在、Hive では、システム クラス ローダーが URLClassLoader のインスタンスであると想定しています。 Java 11 では、この前提が当てはまりません。

  • webssh で次の手順を実行します。

    1. lib の場所にある既存の flink-sql-connector-hive*jar を削除します
      rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
    2. webssh ポッドで次の jar をダウンロードし、/opt/flink-webssh/lib wget https://aka.ms/hdiflinkhivejdk11jar の下に追加します。 (上記の hive jar には修正プログラム https://issues.apache.org/jira/browse/HIVE-27508 があります)
    mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
    mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
    1. core-site.xml セクションの下の flink 構成管理に次のキーを追加します。
      fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
  • hive-dialect クエリの概要を次に示します。

    • パーティション分割なしで Flink で Hive 方言を実行する
      root [ ~ ]# ./bin/sql-client.sh
      Flink SQL>
      Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
      [INFO] Execute statement succeed.
      Flink SQL> use catalog myhive;
      [INFO] Execute statement succeed.
      Flink SQL> load module hive;
      [INFO] Execute statement succeed.
      Flink SQL> use modules hive,core;
      [INFO] Execute statement succeed.
      Flink SQL> set table.sql-dialect=hive;
      [INFO] Session property has been set.
      Flink SQL> set sql-client.execution.result-mode=tableau;
      [INFO] Session property has been set.
      Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8
    > [!WARNING]
    > An illegal reflective access operation has occurred
    > [!WARNING]
    > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string
    > [!WARNING]
    > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils
    > [!WARNING]
    > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations
    > [!WARNING]
    >  All illegal access operations will be denied in a future release
    select explode(array(1,2,3));
    | op |         col |
    | +I |           1 |
    | +I |           2 |
    | +I |           3 |
    Received a total of 3 rows
    Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837
    [INFO] Execute statement succeed.
    Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d');
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: d0542da4c4252f9494298666ff4e9f8e
    Flink SQL> set execution.runtime-mode=batch;
    [INFO] Session property has been set.
    Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31
    | key | value |
    |   1 |     a |
    |   1 |     a |
    |   2 |     b |
    |   3 |     c |
    |   3 |     c |
    |   3 |     c |
    |   4 |     d |
    |   5 |     e |
    8 rows in set
    Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98
    [INFO] Exiting Flink SQL CLI Client...
    Shutting down the session...
    root [ ~ ]# exit

    データは、hive/warehouse ディレクトリで構成されたのと同じコンテナーに書き込まれます。

    コンテナー テーブル 1 を示すスクリーンショット。

    • パーティション分割ありで Flink で Hive 方言を実行する
  create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');

  insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');

コンテナー テーブル 2 を示すスクリーンショット。

コンテナー テーブル 3 を示すスクリーンショット。
