Azure HDInsight의 Hive Warehouse Connector에서 지원하는 Apache Spark 작업

이 문서에서는 HWC(Hive Warehouse Connector)에서 지원하는 Spark 기반 작업을 보여 줍니다. 표시된 모든 예제는 Apache Spark 셸을 통해 실행됩니다.

전제 조건

Hive Warehouse Connector 설정 단계를 완료합니다.

시작하기

Spark 셸 세션을 시작하려면 다음 단계를 수행합니다.

  1. ssh command 명령을 사용하여 Apache Spark 클러스터에 연결합니다. CLUSTERNAME을 클러스터 이름으로 바꿔서 명령을 편집한 다음, 다음 명령을 입력합니다.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. ssh 세션에서 다음 명령을 실행하여 hive-warehouse-connector-assembly 버전을 확인합니다.

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. 위에서 식별한 버전으로 hive-warehouse-connector-assembly 코드를 편집합니다. 그런 다음 명령을 실행하여 Spark 셸을 시작합니다.

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. spark-shell을 시작한 후 다음 명령을 사용하여 Hive Warehouse Connector 인스턴스를 시작할 수 있습니다.

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Hive 쿼리를 사용하여 Spark DataFrame 만들기

HWC 라이브러리를 사용하는 모든 쿼리 결과가 DataFrame으로 반환됩니다. 다음 예에서는 기본 hive 쿼리를 만드는 방법을 보여 줍니다.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

쿼리 결과는 Spark DataFrame이며 이는 MLIB 및 SparkSQL과 같은 Spark 라이브러리와 함께 사용할 수 있습니다.

Hive 테이블에 Spark DataFrame 작성

Spark는 기본적으로 Hive의 관리되는 ACID 테이블에 쓰기를 지원하지 않습니다. 그러나 HWC를 사용하여 Hive 테이블에 모든 DataFrame을 작성할 수 있습니다. 다음 예의 작업에서 이 기능을 확인할 수 있습니다.

  1. 다음 명령을 사용하여 sampletable_colorado라는 테이블을 만들고 해당 열을 지정합니다.

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. stateColorado와 일치하는 테이블 hivesampletable을 필터링합니다. 이 하이브 쿼리는 Spark DataFrame을 반환하고 결과는 write 함수를 사용하여 Hive 테이블 sampletable_colorado에 저장됩니다.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. 다음 명령을 사용하여 결과를 확인합니다.

    hive.table("sampletable_colorado").show()
    

    hive 웨어하우스 커넥터는 hive 테이블을 표시합니다.

구조적 스트림 작성

Hive Warehouse Connector를 사용하여 Spark 스트리밍을 사용해 데이터를 Hive 테이블에 쓸 수 있습니다.

Important

ESP 지원 Spark 4.0 클러스터에서 구조적 스트리밍 쓰기가 지원되지 않습니다.

단계에 따라 localhost 포트 9999의 Spark 스트림에서 Hive 테이블로 데이터를 수집합니다. 수집합니다

  1. 열려 있는 Spark 셸에서 다음 명령을 사용하여 Spark 스트림을 시작합니다.

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. 다음 단계를 통해 만든 Spark 스트림에 대한 데이터를 생성합니다.

    1. 동일한 Spark 클러스터에서 두 번째 SSH 세션을 엽니다.
    2. 명령 프롬프트에서 nc -lk 9999를 입력합니다. 이 명령은 netcat 유틸리티를 사용하여 데이터를 명령줄에서 지정된 포트로 보냅니다.
  3. 첫 번째 SSH 세션으로 돌아가서 스트리밍 데이터를 보관할 새 Hive 테이블을 만듭니다. Spark 셸에서 다음 명령을 입력합니다.

    hive.createTable("stream_table").column("value","string").create()
    
  4. 그런 후에 다음 명령을 사용하여 새로 만든 테이블에 스트리밍 데이터를 작성합니다.

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Important

    metastoreUridatabase 옵션은 현재 Apache Spark의 알려진 문제로 인해 수동으로 설정해야 합니다. 이 문제에 대한 자세한 내용은 SPARK-25460을 참조하세요.

  5. 두 번째 SSH 세션으로 돌아가서 다음 값을 입력합니다.

    foo
    HiveSpark
    bar
    
  6. 첫 번째 SSH 세션으로 돌아가서 간단한 작업을 기록합니다. 데이터를 보려면 다음 명령을 사용합니다.

    hive.table("stream_table").show()
    

Ctrl + C를 사용하여 두 번째 SSH 세션에서 netcat을 중지합니다. :q를 사용하여 첫 번째 SSH 세션에서 Spark 셸을 종료합니다.

다음 단계