Node.js を使用して Azure Cosmos DB for PostgreSQL に接続して SQL ステートメントを実行する

適用対象: Azure Cosmos DB for PostgreSQL (PostgreSQL の Citus データベース拡張機能を利用)

このクイックスタートでは、Node.js コードを使用してクラスターに接続する方法と、SQL ステートメントを使用してテーブルを作成する方法について説明します。 その後で、データベース内のデータの挿入、クエリ、更新、削除を実行します。 この記事内の手順では、Node.js の開発には精通しているが、Azure Cosmos DB for PostgreSQL を使用するのは初めてというユーザーを対象とします。

PostgreSQL ライブラリをインストールする

この記事のコード例では、pg ライブラリを PostgreSQL サーバーとインターフェイス接続する必要があります。 言語パッケージ マネージャー (npm など) を使用して pg をインストールする必要があります。

接続、テーブルの作成、およびデータの挿入

共通接続モジュールを作成する

ヒント

下のサンプル コードでは、接続プールを使用して PostgreSQL への接続を作成および管理します。 次の理由から、アプリケーション側の接続プールを強くお勧めします。

  • アプリケーションがデータベースへの接続を生成しすぎないようにするため、接続制限の超過を回避できます。
  • 待機時間とスループットの両方のパフォーマンスを大幅に向上させるのに役立ちます。 PostgreSQL サーバー プロセスでは、新しい各接続を処理するためにフォークする必要があり、接続を再利用すると、そのオーバーヘッドが回避されます。

db という名前のフォルダーを作成し、このフォルダー内に、次の共通接続コードを含む citus.js ファイルを作成します。 このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

テーブルを作成する

接続し、SQL ステートメント CREATE TABLE および INSERT INTO を使用してデータを読み込むには、次のコードを使用します。 このコードは、新しい pharmacy テーブルを作成し、いくつかのサンプル データを挿入します。

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

テーブルの分散

Azure Cosmos DB for PostgreSQL は、スケーラビリティを確保するために複数のノード全体にテーブルを分散させる優れた能力をユーザーに提供するものです。 テーブルを分散させるには、次のコマンドを使用します。 create_distributed_table および分散列の詳細については、こちらを参照してください。

注意

テーブルを分散させると、クラスターに追加されたすべてのワーカー ノード全体にテーブルを拡張することができます。

データベースに接続してテーブルを分散させるには、次のコードを使用します。

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

データの読み取り

接続し、SELECT SQL ステートメントを使用してデータを読み取るには、次のコードを使用します。

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

データの更新

接続し、UPDATE SQL ステートメントを使用してデータを更新するには、次のコードを使用します。

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

データの削除

接続し、DELETE SQL ステートメントを使用してデータを削除するには、次のコードを使用します。

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

高速取り込み用の COPY コマンド

COPY コマンドでは、Azure Cosmos DB for PostgreSQL にデータを取り込みながら、驚異的なスループットを達成できます。 COPY コマンドでは、ファイル内のデータを取り込んだり、メモリ内データのマイクロバッチからデータを取り込んだりすることで、リアルタイム インジェストを実現できます。

ファイルからデータを読み込む COPY コマンド

次のコードは、CSV ファイルからデータベース テーブルにデータをコピーします。 このコードには、pg-copy-streams パッケージと pharmacies.csv ファイルが必要です。

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

メモリ内データを読み込むための COPY コマンド

次のコードは、メモリ内データをテーブルにコピーします。 このコードには、パイプ チェーンを可能にする through2 パッケージが必要です。

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

データベース要求の失敗によるアプリの再試行

アプリケーションからのデータベース要求が失敗する場合があります。 このような問題は、アプリとデータベース間のネットワーク障害や、パスワードの誤りなど、さまざまなシナリオで発生することがあります。一部の問題は一時的なもので、数秒から数分で自然に解決されます。 一時的なエラーを克服するために、アプリで再試行ロジックを構成することができます。

アプリで再試行ロジックを構成すると、エンド ユーザー エクスペリエンスの向上に役立ちます。 障害シナリオでは、ユーザーはエラーを経験するかわりに、アプリケーションで要求が処理されるまで少し長く待つことになります。

次の例は、アプリに再試行ロジックを実装する方法を示しています。 サンプル コード スニペットでは、成功するまで 60 秒ごとに (最大 5 回) データベース要求を試行します。 再試行の回数と頻度はアプリケーションのニーズに基づいて構成できます。

このコードでは、<cluster> をクラスター名に置き換え、<password> を管理者パスワードに置き換えます。

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

次のステップ