Azure Cosmos DB for PostgreSQL で SQL コマンドを接続して実行するための Java アプリ

適用対象: Azure Cosmos DB for PostgreSQL (PostgreSQL の Citus データベース拡張機能を利用)

このクイックスタートでは、Java コードを使用してクラスターに接続する方法と、SQL ステートメントを使用してテーブルを作成する方法について説明します。 その後で、データベース内のデータの挿入、クエリ、更新、削除を実行します。 この記事内の手順では、Java の開発と JDBC には慣れているものの、Azure Cosmos DB for PostgreSQL を使用するのは初めてというユーザーを対象とします。

Java プロジェクトと接続を設定する

Azure Cosmos DB for PostgreSQL に接続するための新しい Java プロジェクトと構成ファイルを作成します。

新しい Java プロジェクトを作成する

お気に入りの統合開発環境 (IDE) を使用して、groupId testと artifactIdcrud を使用する新しい Java プロジェクトを作成します。 プロジェクトのルート ディレクトリに、次の内容を含む pom.xml ファイルを追加します。 このファイルは、Java 8 と Java 用の最近の PostgreSQL ドライバーを使用するように Apache Maven を構成します。

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>test</groupId>
  <artifactId>crud</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>crud</name>
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-engine</artifactId>
      <version>5.7.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.2.12</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.zaxxer/HikariCP -->
    <dependency>
      <groupId>com.zaxxer</groupId>
      <artifactId>HikariCP</artifactId>
      <version>5.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-params</artifactId>
      <version>5.7.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>3.0.0-M5</version>
      </plugin>
    </plugins>
  </build>
</project>

データベース接続を構成する

src/main/resources/ で、次の内容を含む application.properties ファイルを作成します。 <cluster> をクラスター名に置き換え、<password> を管理パスワードに置き換えます。

driver.class.name=org.postgresql.Driver
db.url=jdbc:postgresql://c-<cluster>.<uniqueID>.postgres.cosmos.azure.com:5432/citus?ssl=true&sslmode=require
db.username=citus
db.password=<password>

プロパティ?ssl=true&sslmode=require内の文字列はdb.url、データベースに接続するときにトランスポート層セキュリティ (TLS) を使用するように JDBC ドライバーに指示します。 Azure Cosmos DB for PostgreSQL で TLS を使用することは必須であり、優れたセキュリティ慣行です。

テーブルの作成

分散テーブルを含むデータベース スキーマを構成します。 データベースに接続して、スキーマとテーブルを作成します。

データベース スキーマを生成する

src/main/resources/ で、次の内容を含む schema.sql ファイルを作成します。

DROP TABLE IF EXISTS public.pharmacy;
CREATE TABLE  public.pharmacy(pharmacy_id integer,pharmacy_name text ,city text ,state text ,zip_code integer);
CREATE INDEX idx_pharmacy_id ON public.pharmacy(pharmacy_id);

テーブルの分散

Azure Cosmos DB for PostgreSQL は、スケーラビリティを確保するために複数のノード全体にテーブルを分散させる優れた能力をユーザーに提供するものです。 テーブルを分散させるには、次のコマンドを使用します。 create_distributed_table および分散列の詳細については、こちらを参照してください。

注意

テーブルを分散させると、クラスターに追加されたすべてのワーカー ノード全体にテーブルを拡張することができます。

テーブルを配布するには、前のセクションで作成した schema.sql ファイルに次の行を追加します。

select create_distributed_table('public.pharmacy','pharmacy_id');

データベースに接続してスキーマを作成する

次に、JDBC を使用して クラスターにデータを格納および取得する Java コードを追加します。 コードは application.properties ファイルと schema.sql ファイルを使用して クラスターに接続し、スキーマを作成します。

  1. DButilクラスを含む次のコードを使用して DButil.java ファイルを作成します。 DBUtil クラスは、HikariCP を使用して PostgreSQL への接続プールを設定します。 このクラスを使用して PostgreSQL に接続し、クエリを開始します。

    ヒント

    下のサンプル コードでは、接続プールを使用して PostgreSQL への接続を作成および管理します。 次の理由から、アプリケーション側の接続プールを強くお勧めします。

    • アプリケーションがデータベースへの接続を生成しすぎないようにするため、接続制限の超過を回避できます。
    • 待機時間とスループットの両方のパフォーマンスを大幅に向上させるのに役立ちます。 PostgreSQL サーバー プロセスでは、新しい各接続を処理するためにフォークする必要があり、接続を再利用すると、そのオーバーヘッドが回避されます。
    //DButil.java
    package test.crud;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.sql.SQLException;
    import java.util.Properties;
    
    import javax.sql.DataSource;
    
    import com.zaxxer.hikari.HikariDataSource;
    
    public class DButil {
        private static final String DB_USERNAME = "db.username";
        private static final String DB_PASSWORD = "db.password";
        private static final String DB_URL = "db.url";
        private static final String DB_DRIVER_CLASS = "driver.class.name";
        private static Properties properties =  null;
        private static HikariDataSource datasource;
    
        static {
            try {
                properties = new Properties();
                properties.load(new FileInputStream("src/main/java/application.properties"));
    
                datasource = new HikariDataSource();
                datasource.setDriverClassName(properties.getProperty(DB_DRIVER_CLASS ));
                datasource.setJdbcUrl(properties.getProperty(DB_URL));
                datasource.setUsername(properties.getProperty(DB_USERNAME));
                datasource.setPassword(properties.getProperty(DB_PASSWORD));
                datasource.setMinimumIdle(100);
                datasource.setMaximumPoolSize(1000000000);
                datasource.setAutoCommit(true);
                datasource.setLoginTimeout(3);
            } catch (IOException | SQLException  e) {
                e.printStackTrace();
            }
        }
        public static DataSource getDataSource() {
            return datasource;
        }
    }
    
  2. src/main/java/ で、次のコードを含む DemoApplication.java ファイルを作成します。

    package test.crud;
    import java.io.IOException;
    import java.sql.*;
    import java.util.*;
    import java.util.logging.Logger;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import org.postgresql.copy.CopyManager;
    import org.postgresql.core.BaseConnection;
    import java.io.IOException;
    import java.io.Reader;
    import java.io.StringReader;
    
    public class DemoApplication {
    
        private static final Logger log;
    
        static {
            System.setProperty("java.util.logging.SimpleFormatter.format", "[%4$-7s] %5$s %n");
            log =Logger.getLogger(DemoApplication.class.getName());
        }
        public static void main(String[] args)throws Exception
        {
            log.info("Connecting to the database");
            Connection connection = DButil.getDataSource().getConnection();
            System.out.println("The Connection Object is of Class: " + connection.getClass());
            log.info("Database connection test: " + connection.getCatalog());
            log.info("Creating table");
            log.info("Creating index");
            log.info("distributing table");
            Scanner scanner = new Scanner(DemoApplication.class.getClassLoader().getResourceAsStream("schema.sql"));
            Statement statement = connection.createStatement();
            while (scanner.hasNextLine()) {
                statement.execute(scanner.nextLine());
            }
            log.info("Closing database connection");
            connection.close();
        }
    
    }
    

    注意

    データベース userpassword 資格情報は、DriverManager.getConnection(properties.getProperty("url"), properties);の実行時 に使用されます。 資格情報は、引数として渡される application.properties ファイルに格納されます。

  3. 以後、このメイン クラスは、次の任意のツールを使用して実行することができます。

    • IDE を使用する場合: DemoApplication クラスを右クリックして実行します。
    • Maven を使用して、次を実行することによってアプリケーションを実行できます。
      [https://login.microsoftonline.com/consumers/](mvn exec:java -Dexec.mainClass="com.example.demo.DemoApplication")

次のコンソール ログが示すように、このアプリケーションは、Azure Cosmos DB for PostgreSQL に接続してデータベース スキーマを作成した後、接続を閉じます。

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Create database schema
[INFO   ] Closing database connection

ドメイン クラスを作成する

DemoApplication クラスの横に新しい Pharmacy Java クラスを作成し、以下のコードを追加します。

public class Pharmacy {
    private Integer pharmacy_id;
    private String pharmacy_name;
    private String city;
    private String state;
    private Integer zip_code;
    public Pharmacy() { }
    public Pharmacy(Integer pharmacy_id, String pharmacy_name, String city,String state,Integer zip_code)
    {
        this.pharmacy_id = pharmacy_id;
        this.pharmacy_name = pharmacy_name;
        this.city = city;
        this.state = state;
        this.zip_code = zip_code;
    }

    public Integer getpharmacy_id() {
        return pharmacy_id;
    }

    public void setpharmacy_id(Integer pharmacy_id) {
        this.pharmacy_id = pharmacy_id;
    }

    public String getpharmacy_name() {
        return pharmacy_name;
    }

    public void setpharmacy_name(String pharmacy_name) {
        this.pharmacy_name = pharmacy_name;
    }

    public String getcity() {
        return city;
    }

    public void setcity(String city) {
        this.city = city;
    }

    public String getstate() {
        return state;
    }

    public void setstate(String state) {
        this.state = state;
    }

    public Integer getzip_code() {
        return zip_code;
    }

    public void setzip_code(Integer zip_code) {
        this.zip_code = zip_code;
    }
    @Override
    public String toString() {
        return "TPharmacy{" +
               "pharmacy_id=" + pharmacy_id +
               ", pharmacy_name='" + pharmacy_name + '\'' +
               ", city='" + city + '\'' +
               ", state='" + state + '\'' +
               ", zip_code='" + zip_code + '\'' +
               '}';
    }
}

このクラスは、schema.sql スクリプトを実行する際に作成した Pharmacy テーブルにマップされるドメイン モデルです。

データの挿入

DemoApplication.java ファイルのmainメソッドの後に、INSERT INTO SQL ステートメントを使用してデータベースにデータを挿入する次のメソッドを追加します。

private static void insertData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Insert data");
    PreparedStatement insertStatement = connection
        .prepareStatement("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code)  VALUES (?, ?, ?, ?, ?);");

    insertStatement.setInt(1, todo.getpharmacy_id());
    insertStatement.setString(2, todo.getpharmacy_name());
    insertStatement.setString(3, todo.getcity());
    insertStatement.setString(4, todo.getstate());
    insertStatement.setInt(5, todo.getzip_code());

    insertStatement.executeUpdate();
}

main メソッド内に、次の 2 行を追加します。

Pharmacy todo = new Pharmacy(0,"Target","Sunnyvale","California",94001);
insertData(todo, connection);

メイン クラスを実行すると、次の出力が生成されるはずです。

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Closing database connection

データの読み取り

先ほど挿入したデータを読み取って、コードが正しく動作することを確認します。

insertDataメソッドの後で、DemoApplication.java ファイルに、SELECT SQL ステートメントを使用してデータベースからデータを読み取る次のメソッドを追加します。

private static Pharmacy readData(Connection connection) throws SQLException {
    log.info("Read data");
    PreparedStatement readStatement = connection.prepareStatement("SELECT * FROM Pharmacy;");
    ResultSet resultSet = readStatement.executeQuery();
    if (!resultSet.next()) {
        log.info("There is no data in the database!");
        return null;
    }
    Pharmacy todo = new Pharmacy();
    todo.setpharmacy_id(resultSet.getInt("pharmacy_id"));
    todo.setpharmacy_name(resultSet.getString("pharmacy_name"));
    todo.setcity(resultSet.getString("city"));
    todo.setstate(resultSet.getString("state"));
    todo.setzip_code(resultSet.getInt("zip_code"));
    log.info("Data read from the database: " + todo.toString());
    return todo;
}

main メソッドに次の行を追加します。

todo = readData(connection);

メイン クラスを実行すると、次の出力が生成されるはずです。

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Closing database connection

データの更新

前に挿入したデータを更新します。

引き続き DemoApplication.java ファイルの readData メソッドの後に、次のメソッドを追加して、UPDATE SQL ステートメントを使用したデータベース内のデータを更新します。

private static void updateData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Update data");
    PreparedStatement updateStatement = connection
        .prepareStatement("UPDATE pharmacy SET city = ? WHERE pharmacy_id = ?;");

    updateStatement.setString(1, todo.getcity());

    updateStatement.setInt(2, todo.getpharmacy_id());
    updateStatement.executeUpdate();
    readData(connection);
}

main メソッド内に、次の 2 行を追加します。

todo.setcity("Guntur");
updateData(todo, connection);

メイン クラスを実行すると、次の出力が生成されるはずです。

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Closing database connection

データの削除

最後に、前に挿入したデータを削除します。 引き続き DemoApplication.java ファイルの updateData メソッドの後に、次のメソッドを追加して、UPDATE SQL ステートメントを使用したデータベース内のデータを削除します。

private static void deleteData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Delete data");
    PreparedStatement deleteStatement = connection.prepareStatement("DELETE FROM pharmacy WHERE pharmacy_id = ?;");
    deleteStatement.setLong(1, todo.getpharmacy_id());
    deleteStatement.executeUpdate();
    readData(connection);
}

これで main メソッドに次の行を追加できます。

deleteData(todo, connection);

メイン クラスを実行すると、次の出力が生成されるはずです。

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
[INFO   ] Closing database connection

高速取り込み用の COPY コマンド

COPY コマンドでは、Azure Cosmos DB for PostgreSQL にデータを取り込みながら、驚異的なスループットを達成できます。 COPY コマンドでは、ファイル内のデータを取り込んだり、メモリ内データのマイクロバッチからデータを取り込んだりすることで、リアルタイム インジェストを実現できます。

ファイルからデータを読み込む COPY コマンド

次のコードは、CSV ファイルからデータベース テーブルにデータをコピーします。 コード サンプルには pharmacies.csv というファイルが必要です。

public static long
copyFromFile(Connection connection, String filePath, String tableName)
throws SQLException, IOException {
    long count = 0;
    FileInputStream fileInputStream = null;

    try {
        Connection unwrap = connection.unwrap(Connection.class);
        BaseConnection  connSec = (BaseConnection) unwrap;

        CopyManager copyManager = new CopyManager((BaseConnection) connSec);
        fileInputStream = new FileInputStream(filePath);
        count = copyManager.copyIn("COPY " + tableName + " FROM STDIN delimiter ',' csv", fileInputStream);
    } finally {
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    return count;
}

これで main メソッドに次の行を追加できます。

int c = (int) copyFromFile(connection,"C:\\Users\\pharmacies.csv", "pharmacy");
log.info("Copied "+ c +" rows using COPY command");

main クラスを実行すると、次の出力が生成されるはずです。

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
[INFO ] Copied 5000 rows using COPY command
[INFO   ] Closing database connection

メモリ内データを読み込むための COPY コマンド

次のコードは、メモリ内データをテーブルにコピーします。

private static void inMemory(Connection connection) throws SQLException,IOException
    {
    log.info("Copying inmemory data into table");
            
    final List<String> rows = new ArrayList<>();
    rows.add("0,Target,Sunnyvale,California,94001");
    rows.add("1,Apollo,Guntur,Andhra,94003");
        
    final BaseConnection baseConnection = (BaseConnection) connection.unwrap(Connection.class);
    final CopyManager copyManager = new CopyManager(baseConnection);

    // COPY command can change based on the format of rows. This COPY command is for above rows.
    final String copyCommand = "COPY pharmacy FROM STDIN with csv";        
       
    try (final Reader reader = new StringReader(String.join("\n", rows))) {
        copyManager.copyIn(copyCommand, reader);
    }
}

これで main メソッドに次の行を追加できます。

inMemory(connection);

メイン クラスを実行すると、次の出力が生成されるはずです。

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
5000
[INFO   ] Copying in-memory data into table
[INFO   ] Closing database connection

データベース要求の失敗によるアプリの再試行

アプリケーションからのデータベース要求が失敗する場合があります。 このような問題は、アプリとデータベース間のネットワーク障害や、パスワードの誤りなど、さまざまなシナリオで発生することがあります。一部の問題は一時的なもので、数秒から数分で自然に解決されます。 一時的なエラーを克服するために、アプリで再試行ロジックを構成することができます。

アプリで再試行ロジックを構成すると、エンド ユーザー エクスペリエンスの向上に役立ちます。 障害シナリオでは、ユーザーはエラーを経験するかわりに、アプリケーションで要求が処理されるまで少し長く待つことになります。

次の例は、アプリに再試行ロジックを実装する方法を示しています。 サンプル コード スニペットでは、成功するまで 60 秒ごとに (最大 5 回) データベース要求を試行します。 再試行の回数と頻度はアプリケーションのニーズに基づいて構成できます。

このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

package test.crud;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.logging.Logger;
import com.zaxxer.hikari.HikariDataSource;

public class DemoApplication
{
    private static final Logger log;

    static
    {
        System.setProperty("java.util.logging.SimpleFormatter.format", "[%4$-7s] %5$s %n");
        log = Logger.getLogger(DemoApplication.class.getName());
    }
    private static final String DB_USERNAME = "citus";
    private static final String DB_PASSWORD = "<password>";
    private static final String DB_URL = "jdbc:postgresql://c-<cluster>.<uniqueID>.postgres.cosmos.azure.com:5432/citus?sslmode=require";
    private static final String DB_DRIVER_CLASS = "org.postgresql.Driver";
    private static HikariDataSource datasource;

    private static String executeRetry(String sql, int retryCount) throws InterruptedException
    {
        Connection con = null;
        PreparedStatement pst = null;
        ResultSet rs = null;
        for (int i = 1; i <= retryCount; i++)
        {
            try
            {
                datasource = new HikariDataSource();
                datasource.setDriverClassName(DB_DRIVER_CLASS);
                datasource.setJdbcUrl(DB_URL);
                datasource.setUsername(DB_USERNAME);
                datasource.setPassword(DB_PASSWORD);
                datasource.setMinimumIdle(10);
                datasource.setMaximumPoolSize(1000);
                datasource.setAutoCommit(true);
                datasource.setLoginTimeout(3);
                log.info("Connecting to the database");
                con = datasource.getConnection();
                log.info("Connection established");
                log.info("Read data");
                pst = con.prepareStatement(sql);
                rs = pst.executeQuery();
                StringBuilder builder = new StringBuilder();
                int columnCount = rs.getMetaData().getColumnCount();
                while (rs.next())
                {
                    for (int j = 0; j < columnCount;)
                    {
                        builder.append(rs.getString(j + 1));
                        if (++j < columnCount)
                            builder.append(",");
                    }
                    builder.append("\r\n");
                }
                return builder.toString();
            }
            catch (Exception e)
            {
                Thread.sleep(60000);
                System.out.println(e.getMessage());
            }
        }
        return null;
    }

    public static void main(String[] args) throws Exception
    {
        String result = executeRetry("select 1", 5);
        System.out.print(result);
    }
}

次のステップ