Azure Confidential Ledger の書き込みトランザクション レシートを検証する

Azure Confidential Ledger 書き込みトランザクションレシートは、対応する書き込みトランザクションが CCF ネットワークによってグローバルにコミットされたことを示す暗号化された Merkle 証明を表します。 Azure Confidential Ledger ユーザーは、任意の時点でコミットされた書き込みトランザクションを受け取って、対応する書き込み操作が変更できない台帳に正常に記録されたことを確認できます。

Azure Confidential Ledger 書き込みトランザクション レシートの詳細については、専用の記事を参照してください。

レシートの検証手順

書き込みトランザクション レシートは、次のサブセクションで説明されている特定の一連の手順に従って確認できます。 同じ手順については、CCF ドキュメントを参照してください。

リーフ ノードの計算

最初の手順では、コミットされたトランザクションに対応する Merkle ツリー内のリーフ ノードの SHA-256 ハッシュを計算します。 リーフ ノードは、Azure Confidential Ledger レシートの leafComponents の下にある次のフィールドの順序付き連結で構成されます。

  1. writeSetDigest
  2. SHA-256 ダイジェスト commitEvidence
  3. claimsDigest フィールド

これらの値はバイト配列として連結する必要があります。writeSetDigestclaimsDigest の両方を 16 進数の文字列からバイト配列に変換する必要があります。一方、commitEvidence のハッシュ (バイトの配列として) は、UTF-8 でエンコードされた commitEvidence 文字列に SHA-256 ハッシュ関数を適用することで取得できます。

同様に、リーフ ノード ハッシュ ダイジェストは、結果のバイトの結果連結に SHA-256 ハッシュ関数を適用することで計算できます。

ルート ノードの計算

2 番目の手順では、トランザクションがコミットされた時点で、Merkle ツリーのルートの SHA-256 ハッシュを計算します。 計算は、(前の手順で計算されたリーフ ノード ハッシュから始まる) 前のイテレーションの結果を、レシートの proof フィールドに指定された 順序付きノードのハッシュと繰り返し連結してハッシュすることによって行われます。 proof リストは順序付きリストとして提供され、その要素は指定された順序で反復処理する必要があります。

連結は、proof フィールド (left または right のいずれか) で提供されるオブジェクトで示される相対順序に関して、バイト表現で実行する必要があります。

  • proof の現在の要素のキーが left の場合、前の反復の結果を現在の要素の値に追加する必要があります。
  • proof の現在の要素のキーが right の場合、前の反復の結果を現在の要素値の先頭に追加する必要があります。

各連結の後、次のイテレーションの入力を取得するには、SHA-256 関数を適用する必要があります。 このプロセスは、標準的な手順に従って、計算に必要なノードを指定して、Merkle ツリー データ構造のルート ノードを計算します。

ルート ノード経由で署名を確認する

3 番目の手順では、ルート ノード ハッシュ経由で生成された暗号化署名が、レシートの署名ノード証明書を使用して有効であることを確認します。 検証プロセスは、楕円曲線デジタル署名アルゴリズム (ECDSA) を使用して署名されたメッセージのデジタル署名検証の標準的な手順に従います。 具体的には、次の手順を実行します。

  1. base64 文字列 signature をバイト配列にデコードします。
  2. 署名ノード証明書 cert から ECDSA 公開キーを抽出します。
  3. 前の手順で抽出した公開キーを使用して、(前のサブセクションの手順を使用して計算された) Merkle ツリーのルート上の署名が本物であることを確認します。 この手順は、ECDSA を使用した標準的なデジタル署名検証プロセスに効果的に対応します。 最も一般的なプログラミング言語には、一部のデータに対して公開キー証明書を使用して ECDSA 署名を検証できるライブラリが多数あります (たとえば、Python の暗号化ライブラリ)。

署名ノード証明書の保証を確認する

前のステップに加えて、署名ノード証明書が現在の台帳証明書によって承認 (つまり署名済み) されていることを検証することも必要です。 この手順は、前の他の 3 つの手順に依存せず、他の手順とは別に実行できます。

レシートを発行した現在のサービス ID が、署名ノードを承認したサービス ID と異なる可能性があります (たとえば、証明書の更新が原因)。 この場合、署名ノード証明書 (つまり、レシートの cert フィールド) から、他の以前のサービスID (つまり、レシートの serviceEndorsements リスト フィールド) を経て、信頼できるルート認証局 (CA) (つまり、現在のサービス ID 証明書) までの証明書の信頼のチェーンを検証する必要があります。 serviceEndorsements リストは、最も古いサービス ID から最新のサービス ID への順序付きリストとして提供されます。

証明書の承認は、チェーン全体について検証する必要があり、前のサブセクションで説明したのとまったく同じデジタル署名検証プロセスに従います。 通常、証明書の承認手順を実行するために使用できる、一般的なオープンソースの暗号化ライブラリ (OpenSSL など) があります。

アプリケーション要求ダイジェストを検証する

オプションの手順として、アプリケーション要求がレシートに添付されている場合、この公開されている要求から要求のダイジェストを計算し (特定のアルゴリズムに従う)、このダイジェストがレシートのペイロードに含まれている claimsDigest と一致することを検証できます。 公開の要求オブジェクトからダイジェストを計算するには、リストにある各アプリケーション要求オブジェクトを反復処理し、その kind フィールドを確認する必要があります。

要求オブジェクトの種類が LedgerEntry の場合は、要求の台帳コレクション ID (collectionId) とコンテンツ (contents) を抽出し、要求オブジェクトに記載されている秘密鍵 (secretKey) を使用して HMAC ダイジェストを計算するのに使用します。 これら 2 つのダイジェストを連結し、連結されたダイジェストのSHA-256 ハッシュ値を計算します。 プロトコル (protocol) と計算した要求データのダイジェストを連結し、この連結でもうひとつの SHA-256 ハッシュ値を計算して最終的なダイジェストを取得します。

要求オブジェクトの種類が ClaimDigest の場合は、要求ダイジェスト (value) を抽出してプロトコル (protocol) と連結し、この連結の SHA-256 ハッシュ値を計算して最終的なダイジェストを取得します。

個々の各要求ダイジェストを計算した後、各アプリケーション要求オブジェクトから計算されたすべてのダイジェストを連結する必要があります (レシートに表示されているのと同じ順序で)。 その後、連結の先頭に、処理された要求の数を付加します。 前の連結の SHA-256 ハッシュ値によって最終的な要求ダイジェストが生成されます。これは、レシート オブジェクトにある claimsDigest と一致する必要があります。

その他のリソース

Azure Confidential Ledger 書き込みトランザクション レシートと各フィールドの説明の内容の詳細については、専用の記事を参照してください。 CCF ドキュメントには、次のリンク先にあるレシートの確認とその他の関連リソースに関する詳細情報も含まれています。

トランザクション レシートの書き込みを検証する

レシート検証ユーティリティ

Python 用 Azure Confidential Ledger クライアント ライブラリには、書き込みトランザクションのレシートを検証し、アプリケーション要求のリストから要求ダイジェストを計算するユーティリティ関数が用意されています。 データ プレーン SDK およびレシート固有のユーティリティの使用方法の詳細については、このセクションこのサンプル コードを参照してください。

セットアップと前提条件

参考のために、前のセクションで説明したステップに従って、Azure Confidential Ledger の書き込みトランザクションのレシートを完全に検証するサンプル コードを Python で提供しています。

完全検証アルゴリズムを実行するには、現在のサービス ネットワーク証明書と、実行中の Confidential Ledger リソースからの書き込みトランザクションのレシートが必要です。 Confidential Ledger インスタンスから書き込みトランザクション レシートとサービス証明書を取得する方法の詳細については、この記事を参照してください。

コードのチュートリアル

次のコードを使用して、必要なオブジェクトを初期化し、レシート検証アルゴリズムを実行できます。 別のユーティリティ (verify_receipt) を完全な検証アルゴリズムの実行に使用し、GET_RECEIPT 応答の receipt フィールドの内容を辞書として、サービス証明書を単純な文字列として受け取ります。 この関数は、レシートが有効でない場合、または処理中にエラーが発生した場合に例外をスローします。

レシートとサービス証明書の両方をファイルから読み込むことができることを前提としています。 service_certificate_file_name 定数と receipt_file_name 定数の両方を、確認するサービス証明書と領収書のそれぞれのファイル名で更新してください。

import json 

# Constants
service_certificate_file_name = "<your-service-certificate-file>"
receipt_file_name = "<your-receipt-file>"

# Use the receipt and the service identity to verify the receipt content 
with open(service_certificate_file_name, "r") as service_certificate_file, open( 
    receipt_file_name, "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"] 
    service_certificate_cert = service_certificate_file.read() 

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack
        raise e 

検証プロセスにはいくつかの暗号およびハッシュ プリミティブが必要なため、計算を容易にするために次のライブラリが使用されます。

  • CCF Python ライブラリ: このモジュールには、レシート確認用の一連のツールが用意されています。
  • Python 暗号化ライブラリ: さまざまな暗号化アルゴリズムとプリミティブを含む広く使用されているライブラリです。
  • Python 標準ライブラリの一部である hashlib モジュール: 一般的なハッシュ アルゴリズム用の共通インターフェイスを提供するモジュール。
from ccf.receipt import verify, check_endorsements, root 
from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

verify_receipt 関数内で、指定されたレシートが有効であり、すべての必須フィールドが含まれていることを確認します。

# Check that all the fields are present in the receipt 
assert "cert" in receipt 
assert "leafComponents" in receipt 
assert "claimsDigest" in receipt["leafComponents"] 
assert "commitEvidence" in receipt["leafComponents"] 
assert "writeSetDigest" in receipt["leafComponents"] 
assert "proof" in receipt 
assert "signature" in receipt 

プログラムの残りの部分で使用される変数を初期化します。

# Set the variables 
node_cert_pem = receipt["cert"] 
claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 
write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
proof_list = receipt["proof"] 
service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
root_node_signature = receipt["signature"] 

暗号化ライブラリを使用して、以前のサービス ID からサービス ID、署名ノード、および承認証明書の PEM 証明書を読み込むことができます。

# Load service and node PEM certificates 
service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

# Load service endorsements PEM certificates 
service_endorsements_certs = [ 
    load_pem_x509_certificate(pem.encode()) 
    for pem in service_endorsements_certs_pem 
] 

検証プロセスの最初の手順は、リーフ ノードのダイジェストを計算することです。

# Compute leaf of the Merkle Tree corresponding to our transaction 
leaf_node_hex = compute_leaf_node( 
    claims_digest_hex, commit_evidence_str, write_set_digest_hex 
)

compute_leaf_node 関数は、レシートのリーフ コンポーネント (claimsDigestcommitEvidencewriteSetDigest) をパラメーターとして受け取り、リーフ ノード ハッシュを 16 進数形式で返します。

前述したように、commitEvidence のダイジェストを計算します (SHA-256 hashlib 関数を使用)。 次に、writeSetDigestclaimsDigest の両方をバイト配列に変換します。 最後に、3 つの配列を連結し、SHA256 関数を使用して結果をダイジェストします。

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string 
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes 
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes 
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

リーフを計算した後、Merkle ツリーのルートを計算できます。

# Compute root of the Merkle Tree 
root_node = root(leaf_node_hex, proof_list) 

CCF Python ライブラリの一部として提供されている関数 root を使用します。 この関数は、前の反復の結果を proof からの新しい要素と連続的に連結し、連結をダイジェストしてから、proof 内のすべての要素に対して以前に計算されたダイジェストを使用してステップを繰り返します。 連結では、ルートが正しく再計算されるように、Merkle ツリー内のノードの順序を考慮する必要があります。

def root(leaf: str, proof: List[dict]): 
    """ 
    Recompute root of Merkle tree from a leaf and a proof of the form: 
    [{"left": digest}, {"right": digest}, ...] 
    """ 

    current = bytes.fromhex(leaf) 

    for n in proof: 
        if "left" in n: 
            current = sha256(bytes.fromhex(n["left"]) + current).digest() 
        else: 
            current = sha256(current + bytes.fromhex(n["right"])).digest() 
    return current.hex() 

レシートに含まれる署名をルート上で検証して、署名が正しいことを検証できます。

# Verify signature of the signing node over the root of the tree 
verify(root_node, root_node_signature, node_cert) 

同様に、CCF ライブラリには、この検証を実行する関数 verify が用意されています。 署名ノード証明書の ECDSA 公開キーを使用して、ツリーのルート上の署名を確認します。

def verify(root: str, signature: str, cert: Certificate):
    """ 
    Verify signature over root of Merkle Tree 
    """ 

    sig = base64.b64decode(signature) 
    pk = cert.public_key() 
    assert isinstance(pk, ec.EllipticCurvePublicKey) 
    pk.verify( 
        sig, 
        bytes.fromhex(root), 
        ec.ECDSA(utils.Prehashed(hashes.SHA256())), 
    )

レシート確認の最後の手順は、Merkle ツリーのルートへの署名に使用された証明書を検証することです。

# Verify node certificate is endorsed by the service certificates through endorsements 
check_endorsements(node_cert, service_cert, service_endorsements_certs) 

同様に、CCF ユーティリティ check_endorsements を使用して、サービス ID が署名ノードを承認していることを検証できます。 証明書チェーンは以前のサービス証明書で構成されている可能性があるため、serviceEndorsements が空のリストでない場合、保証が推移的に適用されることを検証する必要があります。

def check_endorsement(endorsee: Certificate, endorser: Certificate): 
    """ 
    Check endorser has endorsed endorsee 
    """ 

    digest_algo = endorsee.signature_hash_algorithm 
    assert digest_algo 
    digester = hashes.Hash(digest_algo) 
    digester.update(endorsee.tbs_certificate_bytes) 
    digest = digester.finalize() 
    endorser_pk = endorser.public_key() 
    assert isinstance(endorser_pk, ec.EllipticCurvePublicKey) 
    endorser_pk.verify( 
        endorsee.signature, digest, ec.ECDSA(utils.Prehashed(digest_algo)) 
    ) 

def check_endorsements( 
    node_cert: Certificate, service_cert: Certificate, endorsements: List[Certificate] 
): 
    """ 
    Check a node certificate is endorsed by a service certificate, transitively through a list of endorsements. 
    """ 

    cert_i = node_cert 
    for endorsement in endorsements: 
        check_endorsement(cert_i, endorsement) 
        cert_i = endorsement 
    check_endorsement(cert_i, service_cert) 

別の方法として、同様のメソッドを使用して OpenSSL ライブラリを使用して証明書を検証することもできます。

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
)

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store 
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates 
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate 
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate 
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

サンプル コード

コードのチュートリアルで使用したサンプル コードの全体を示します。

メイン プログラム

import json 

# Use the receipt and the service identity to verify the receipt content 
with open("network_certificate.pem", "r") as service_certificate_file, open( 
    "receipt.json", "r" 
) as receipt_file: 

    # Load relevant files content 
    receipt = json.loads(receipt_file.read())["receipt"]
    service_certificate_cert = service_certificate_file.read()

    try: 
        verify_receipt(receipt, service_certificate_cert) 
        print("Receipt verification succeeded") 

    except Exception as e: 
        print("Receipt verification failed") 

        # Raise caught exception to look at the error stack 
        raise e 

レシートの検証

from cryptography.x509 import load_pem_x509_certificate, Certificate 
from hashlib import sha256 
from typing import Dict, List, Any 

from OpenSSL.crypto import ( 
    X509, 
    X509Store, 
    X509StoreContext, 
) 

from ccf.receipt import root, verify, check_endorsements 

def verify_receipt(receipt: Dict[str, Any], service_cert_pem: str) -> None: 
    """Function to verify that a given write transaction receipt is valid based 
    on its content and the service certificate. 
    Throws an exception if the verification fails.""" 

    # Check that all the fields are present in the receipt 
    assert "cert" in receipt 
    assert "leafComponents" in receipt 
    assert "claimsDigest" in receipt["leafComponents"] 
    assert "commitEvidence" in receipt["leafComponents"] 
    assert "writeSetDigest" in receipt["leafComponents"] 
    assert "proof" in receipt 
    assert "signature" in receipt 

    # Set the variables 
    node_cert_pem = receipt["cert"] 
    claims_digest_hex = receipt["leafComponents"]["claimsDigest"] 
    commit_evidence_str = receipt["leafComponents"]["commitEvidence"] 

    write_set_digest_hex = receipt["leafComponents"]["writeSetDigest"] 
    proof_list = receipt["proof"] 
    service_endorsements_certs_pem = receipt.get("serviceEndorsements", [])
    root_node_signature = receipt["signature"] 

    # Load service and node PEM certificates
    service_cert = load_pem_x509_certificate(service_cert_pem.encode()) 
    node_cert = load_pem_x509_certificate(node_cert_pem.encode()) 

    # Load service endorsements PEM certificates
    service_endorsements_certs = [ 
        load_pem_x509_certificate(pem.encode()) 
        for pem in service_endorsements_certs_pem 
    ] 

    # Compute leaf of the Merkle Tree 
    leaf_node_hex = compute_leaf_node( 
        claims_digest_hex, commit_evidence_str, write_set_digest_hex 
    ) 

    # Compute root of the Merkle Tree
    root_node = root(leaf_node_hex, proof_list) 

    # Verify signature of the signing node over the root of the tree
    verify(root_node, root_node_signature, node_cert) 

    # Verify node certificate is endorsed by the service certificates through endorsements
    check_endorsements(node_cert, service_cert, service_endorsements_certs) 

    # Alternative: Verify node certificate is endorsed by the service certificates through endorsements 
    verify_openssl_certificate(node_cert, service_cert, service_endorsements_certs) 

def compute_leaf_node( 
    claims_digest_hex: str, commit_evidence_str: str, write_set_digest_hex: str 
) -> str: 
    """Function to compute the leaf node associated to a transaction 
    given its claims digest, commit evidence, and write set digest.""" 

    # Digest commit evidence string
    commit_evidence_digest = sha256(commit_evidence_str.encode()).digest() 

    # Convert write set digest to bytes
    write_set_digest = bytes.fromhex(write_set_digest_hex) 

    # Convert claims digest to bytes
    claims_digest = bytes.fromhex(claims_digest_hex) 

    # Create leaf node by hashing the concatenation of its three components 
    # as bytes objects in the following order: 
    # 1. write_set_digest 
    # 2. commit_evidence_digest 
    # 3. claims_digest 
    leaf_node_digest = sha256( 
        write_set_digest + commit_evidence_digest + claims_digest 
    ).digest() 

    # Convert the result into a string of hexadecimal digits 
    return leaf_node_digest.hex() 

def verify_openssl_certificate( 
    node_cert: Certificate, 
    service_cert: Certificate, 
    service_endorsements_certs: List[Certificate], 
) -> None: 
    """Verify that the given node certificate is a valid OpenSSL certificate through 
    the service certificate and a list of endorsements certificates.""" 

    store = X509Store() 

    # pyopenssl does not support X509_V_FLAG_NO_CHECK_TIME. For recovery of expired 
    # services and historical receipts, we want to ignore the validity time. 0x200000 
    # is the bitmask for this option in more recent versions of OpenSSL. 
    X509_V_FLAG_NO_CHECK_TIME = 0x200000 
    store.set_flags(X509_V_FLAG_NO_CHECK_TIME) 

    # Add service certificate to the X.509 store
    store.add_cert(X509.from_cryptography(service_cert)) 

    # Prepare X.509 endorsement certificates
    certs_chain = [X509.from_cryptography(cert) for cert in service_endorsements_certs] 

    # Prepare X.509 node certificate
    node_cert_pem = X509.from_cryptography(node_cert) 

    # Create X.509 store context and verify its certificate
    ctx = X509StoreContext(store, node_cert_pem, certs_chain) 
    ctx.verify_certificate() 

次のステップ