IROWSETFASTLOAD と ISEQUENTIALSTREAM を使用した SQL SERVER への BLOB データの送信 (OLE DB)

このサンプルでは、IRowsetFastLoad を使用して可変長の BLOB データを行ごとにストリーミングする方法について説明します。

このサンプルの既定では、IRowsetFastLoad を使用して、インライン バインドによって可変長の BLOB データを行ごとに送信する方法が示されます。 インライン BLOB データは、使用できるメモリ容量に合わせる必要があります。 BLOB データが数 MB 未満の場合はストリームのオーバーヘッドが追加されないため、このメソッドを使用するとパフォーマンスが最も高くなります。 数 MB を超えるデータ (特にブロックで利用できないデータ) の場合は、ストリーミングした方がパフォーマンスが向上します。

ソース コードで #define USE_ISEQSTREAM のコメント化を解除した場合、このサンプルでは ISequentialStream が使用されます。 サンプルにストリームの実装を定義すると、MAX_BLOB を変更するだけであらゆるサイズの BLOB データを送信できます。 ストリーム データの場合、メモリ容量に合わせる必要も、1 ブロックにする必要もありません。 このプロバイダーを呼び出すには、IRowsetFastLoad::InsertRow を使用します。 IRowsetFastLoad::InsertRow を使用して、ストリームから読み出すことができるデータ量と一緒に、ポインターをデータ バッファー (rgBinding.obValue オフセット) のストリームの実装に渡します。 プロバイダーによっては、バインドするときにデータの長さを渡す必要がない場合があります。 その場合は、バインドから長さを省略できます。

このサンプルでは、プロバイダーにデータを書き込む際に、プロバイダーのストリーム インターフェイスを使用しません。 代わりに、プロバイダーがデータを読み出すために使用するストリーム オブジェクトにポインターを渡します。 通常、Microsoft のプロバイダー (SQLOLEDB および SQLNCLI) では、すべてのデータを処理するまでオブジェクトからデータを 1024 バイト単位で読み出します。 SQLOLEDB と SQLNCLI のどちらも、コンシューマーからプロバイダーのストリーム オブジェクトへのデータの書き込みができる完全な実装は備えていません。 プロバイダーのストリーム オブジェクトを使用して送信できるのは、長さ 0 のデータだけです。

コンシューマーに実装された ISequentialStream オブジェクトは、行セット データ (IRowsetChange::InsertRow、IRowsetChange::SetData) と共に使用できます。DBTYPE_IUNKNOWN としてパラメーターをバインドすることで、パラメーターとも共に使用できます。

DBTYPE_IUNKNOWN はバインドのデータ型として指定されているため、列または対象パラメーターの型と一致している必要があります。 データを行セット インターフェイスから ISequentialStream を使用して送信する場合は、変換できません。 パラメーターの場合は、ICommandWithParameters::SetParameterInfo を使用しないで、別の型を指定して強制的に変換させます。そのためには、BLOB データをすべてローカルにキャッシュするプロバイダーを使用して、SQL Server に送信する前にそのデータを変換する必要があります。 大きな BLOB をキャッシュし、ローカルに変換すると、パフォーマンスが低下します。

詳細については、「BLOB と OLE オブジェクト」を参照してください。

セキュリティに関する注意セキュリティに関する注意

可能な場合は、Windows 認証を使用します。 Windows 認証が使用できない場合は、実行時に資格情報を入力するようユーザーに求めます。 資格情報をファイルに保存するのは避けてください。 資格情報を保存する必要がある場合は、Win32 Crypto API を使用して暗号化してください。

使用例

1 つ目の (Transact-SQL) コード リストを実行して、アプリケーションで使用するテーブルを作成します。

ole32.lib と oleaut32.lib を使用して次の C++ コード リストをコンパイルし、実行します。 このアプリケーションは、コンピューターの既定の SQL Server インスタンスに接続します。 一部の Windows オペレーティング システムでは、(localhost) または (local) を実際の SQL Server インスタンスの名前に変更する必要があります。 名前付きインスタンスに接続するには、接続文字列を L"(local)" から L"(local)\\name" に変更します。name は名前付きインスタンスです。 SQL Server Express は、既定で名前付きインスタンスとしてインストールされます。 INCLUDE 環境変数に、sqlncli.h が保存されているディレクトリが含まれていることを確認します。

3 つ目の (Transact-SQL) コード リストを実行して、アプリケーションで使用したテーブルを削除します。

use master
create table fltest(col1 int, col2 int, col3 image)

// compile with: ole32.lib oleaut32.lib
#include <windows.h>

#define DBINITCONSTANTS   // Must be defined to initialize constants in oledb.h
#define INITGUID              

#include <sqloledb.h>
#include <oledb.h>
#include <msdasc.h>
#include <stdio.h>
#include <stdlib.h>
#include <conio.h>

#define MAX_BLOB  200   // For stream binding this can be any size, but for inline it must fit in memory
#define MAX_ROWS  100

#define SAFE_RELEASE(p) { \
   if (p) { \
      (p)->Release(); \
      (p)=NULL; \
   } \
}

#ifdef USE_ISEQSTREAM
// ISequentialStream implementation for streaming data
class MySequentialStream : public ISequentialStream {

private:
   ULONG m_ulRefCount;
   ULONG m_ulBufSize;
   ULONG m_ulReadSize;
   ULONG m_ulBytesLeft;
   ULONG m_ulReadPos;
   BYTE * m_pSrcData;
   BYTE * m_pReadPtr;
   BOOL m_fWasRead;

public:

   MySequentialStream() {
      m_ulRefCount = 1;
      m_ulBufSize = 0;
      m_ulReadSize = 0;
      m_ulBytesLeft = 0;
      m_ulReadPos = 0;
      m_pSrcData = NULL;
      m_pReadPtr = NULL;
      m_fWasRead = FALSE;
   }

   ~MySequentialStream() {}

   virtual ULONG STDMETHODCALLTYPE AddRef() {
      return ++m_ulRefCount;
   }

   virtual ULONG STDMETHODCALLTYPE Release() {
      --m_ulRefCount;
      if (m_ulRefCount == 0) {
         delete this;
         return 0;
      }
      return m_ulRefCount;
   }

   virtual HRESULT STDMETHODCALLTYPE QueryInterface(REFIID riid, void ** ppvObj) {
      if (!ppvObj)
         return E_INVALIDARG;
      else
         *ppvObj = NULL;

      if (riid != IID_ISequentialStream && riid != IID_IUnknown)
         return E_NOINTERFACE;

      AddRef();
      *ppvObj = this;
      return S_OK;
   }

   HRESULT Init(const void * pSrcData, const ULONG ulBufSize, const ULONG ulReadSize) {
      if (NULL == pSrcData)
         return E_INVALIDARG;

      // Data length must be non-zero
      if (0 == ulBufSize)
         return E_INVALIDARG;

      m_ulBufSize = ulBufSize;
      m_ulReadSize = ulReadSize;
      m_pSrcData = (BYTE *)pSrcData;
      m_pReadPtr = m_pSrcData;
      m_ulBytesLeft = m_ulReadSize;
      m_ulReadPos = 0;
      m_fWasRead = FALSE;

      return S_OK;
   }

   // Can't write data to SQL Server providers (SQLOLEDB/SQLNCLI).  Instead, they read from our object.
   virtual HRESULT STDMETHODCALLTYPE Write(const void *, ULONG, ULONG * ) {
      return E_NOTIMPL;
   }

   // This implementation simply copies data from the source buffer in whatever size requested.
   // But you can do anything here such as reading from a file, reading from a different rowset, stream, etc.
   virtual HRESULT STDMETHODCALLTYPE Read(void * pv, ULONG cb, ULONG * pcbRead) {
      ULONG ulBytesWritten = 0;
      ULONG ulCBToWrite = cb;
      ULONG ulCBToCopy;
      BYTE * pvb = (BYTE *)pv;

      m_fWasRead = TRUE;

      if (NULL == m_pSrcData)
         return E_FAIL;

      if (NULL == pv)
         return STG_E_INVALIDPOINTER;

      while (ulBytesWritten < ulCBToWrite && m_ulBytesLeft) {
         // Make sure we don't write more than our max read size or the size they asked for
         ulCBToCopy = min(m_ulBytesLeft, cb);

         // Make sure we don't read past the end of the internal buffer
         ulCBToCopy = min(m_ulBufSize - m_ulReadPos, ulCBToCopy);

         memcpy(pvb, m_pReadPtr + m_ulReadPos, ulCBToCopy);
         pvb += ulCBToCopy;
         ulBytesWritten += ulCBToCopy;
         m_ulBytesLeft -= ulCBToCopy;
         cb -= ulCBToCopy;

         // Wrap reads around the src buffer
         m_ulReadPos += ulCBToCopy;
         if (m_ulReadPos >= m_ulBufSize)
            m_ulReadPos = 0;
      }

      if (pcbRead)
         *pcbRead = ulBytesWritten;

      return S_OK;
   }
};

#endif // USE_ISEQSTREAM

HRESULT SetFastLoadProperty(IDBInitialize * pIDBInitialize) {
   HRESULT hr = S_OK;
   IDBProperties * pIDBProps = NULL;
   DBPROP rgProps[1];
   DBPROPSET PropSet;

   VariantInit(&rgProps[0].vValue);

   rgProps[0].dwOptions = DBPROPOPTIONS_REQUIRED;
   rgProps[0].colid = DB_NULLID;
   rgProps[0].vValue.vt = VT_BOOL;
   rgProps[0].dwPropertyID = SSPROP_ENABLEFASTLOAD;

   rgProps[0].vValue.boolVal = VARIANT_TRUE;

   PropSet.rgProperties = rgProps;
   PropSet.cProperties = 1;
   PropSet.guidPropertySet = DBPROPSET_SQLSERVERDATASOURCE;

   if (SUCCEEDED(hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (LPVOID *)&pIDBProps))) {
      hr = pIDBProps->SetProperties(1, &PropSet);
   }

   VariantClear(&rgProps[0].vValue); 

   if (pIDBProps)
      pIDBProps->Release();

   return hr;
}

void wmain() {
   // Setup the initialization options
   ULONG cProperties = 0;
   DBPROP rgProperties[10];
   ULONG cPropSets = 0;
   DBPROPSET rgPropSets[1];
   LPWSTR pwszProgID = L"SQLOLEDB";
   LPWSTR pwszDataSource = NULL;
   LPWSTR pwszUserID = NULL;
   LPWSTR pwszPassword = NULL;
   LPWSTR pwszProviderString = L"server=(local);trusted_connection=yes;";

   IDBInitialize * pIDBInitialize = NULL;
   IDBCreateSession * pIDBCrtSess = NULL;
   IOpenRowset * pIOpenRowset = NULL;
   IDBCreateCommand * pIDBCrtCmd = NULL;
   ICommandText * pICmdText = NULL;
   IAccessor * pIAccessor = NULL;
   IRowsetFastLoad * pIRowsetFastLoad = NULL;
   IDBProperties * pIDBProperties = NULL;
   DBBINDING rgBinding[3];
   DBBINDSTATUS rgStatus[3];
   ULONG ulOffset = 0;
   HACCESSOR hAcc = DB_NULL_HACCESSOR;
   BYTE * pData = NULL;
   ULONG iRow = 0;
   LPWSTR pwszTableName = L"fltest";
   DBID TableID;

   HRESULT hr;

#ifdef USE_ISEQSTREAM
   BYTE bSrcBuf[1024];   // A buffer to hold our data for streaming
   memset((void *)&bSrcBuf, 0xAB, sizeof(bSrcBuf));   // Stream data value 0xAB
   MySequentialStream * pMySeqStream = new MySequentialStream();
   DBOBJECT MyObject = {STGM_READ, IID_ISequentialStream};   // NULL pObject implies STGM_READ and IID_IUnknown, but not recommended
#endif

   memset(rgBinding, 0, ( sizeof(rgBinding) / sizeof(rgBinding[0])) * sizeof(DBBINDING) );
   TableID.eKind = DBKIND_NAME;
   TableID.uName.pwszName = pwszTableName;

   // Col1
   rgBinding[0].iOrdinal = 1;
   rgBinding[0].wType = DBTYPE_I4;
   rgBinding[0].obStatus = ulOffset;
   ulOffset+=sizeof(DBSTATUS);
   rgBinding[0].obLength = ulOffset;
   ulOffset+=sizeof(DBLENGTH);
   rgBinding[0].obValue = ulOffset;
   ulOffset += sizeof(LONG);
   rgBinding[0].cbMaxLen = sizeof(LONG);
   rgBinding[0].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
   rgBinding[0].eParamIO = DBPARAMIO_NOTPARAM;
   rgBinding[0].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

   //Col2
   rgBinding[1].iOrdinal = 2;
   rgBinding[1].wType = DBTYPE_I4;
   rgBinding[1].obStatus = ulOffset;
   ulOffset+=sizeof(DBSTATUS);
   rgBinding[1].obLength = ulOffset;
   ulOffset+=sizeof(DBLENGTH);
   rgBinding[1].obValue = ulOffset;
   ulOffset += sizeof(LONG);
   rgBinding[1].cbMaxLen = sizeof(LONG);
   rgBinding[1].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;
   rgBinding[1].eParamIO = DBPARAMIO_NOTPARAM;
   rgBinding[1].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

   //Col3
   rgBinding[2].iOrdinal = 3;
   rgBinding[2].obStatus = ulOffset;
   ulOffset+=sizeof(DBSTATUS);
   rgBinding[2].obLength = ulOffset;
   ulOffset+=sizeof(DBLENGTH);
   rgBinding[2].obValue = ulOffset;
   rgBinding[2].dwPart = DBPART_VALUE | DBPART_STATUS | DBPART_LENGTH;   // DBPART_LENGTH not needed for providers that don't require length
   rgBinding[2].eParamIO = DBPARAMIO_NOTPARAM;
   rgBinding[2].dwMemOwner = DBMEMOWNER_CLIENTOWNED;

#ifdef USE_ISEQSTREAM
   rgBinding[2].wType = DBTYPE_IUNKNOWN;
   ulOffset += sizeof(ISequentialStream *);   // Technically should be sizeof(MySequentialStream *), but who's counting?
   rgBinding[2].cbMaxLen = sizeof(ISequentialStream *);
   rgBinding[2].pObject = &MyObject;
#else
   rgBinding[2].wType = DBTYPE_BYTES;
   ulOffset += MAX_BLOB;
   rgBinding[2].cbMaxLen = MAX_BLOB;
#endif

   // Set init props
   for ( ULONG i = 0 ; i < sizeof(rgProperties) / sizeof(rgProperties[0]) ; i++ )
      VariantInit(&rgProperties[i].vValue);

   // Obtain the provider's clsid
   CLSID clsidProv;
   hr = CLSIDFromProgID(pwszProgID, &clsidProv);

   // Get our initial connection
   CoInitialize(NULL);

   if (SUCCEEDED(hr))
      hr = CoCreateInstance(clsidProv, NULL, CLSCTX_ALL, IID_IDBInitialize,(void **)&pIDBInitialize);

   if (SUCCEEDED(hr))
      hr = pIDBInitialize->QueryInterface(IID_IDBProperties, (void **)&pIDBProperties);

   // DBPROP_INIT_DATASOURCE
   if (pwszDataSource) {
      rgProperties[cProperties].dwPropertyID    = DBPROP_INIT_DATASOURCE;
      rgProperties[cProperties].dwOptions       = DBPROPOPTIONS_REQUIRED;
      rgProperties[cProperties].dwStatus        = DBPROPSTATUS_OK;
      rgProperties[cProperties].colid           = DB_NULLID;
      rgProperties[cProperties].vValue.vt       = VT_BSTR;
      V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszDataSource);               
      cProperties++;
   }

   // DBPROP_AUTH_USERID
   if (pwszUserID) {
      rgProperties[cProperties].dwPropertyID    = DBPROP_AUTH_USERID;
      rgProperties[cProperties].dwOptions       = DBPROPOPTIONS_REQUIRED;
      rgProperties[cProperties].dwStatus        = DBPROPSTATUS_OK;
      rgProperties[cProperties].colid           = DB_NULLID;
      rgProperties[cProperties].vValue.vt       = VT_BSTR;
      V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszUserID);
      cProperties++;
   }

   // DBPROP_AUTH_PASSWORD
   if (pwszPassword) {
      rgProperties[cProperties].dwPropertyID    = DBPROP_AUTH_PASSWORD;
      rgProperties[cProperties].dwOptions       = DBPROPOPTIONS_REQUIRED;
      rgProperties[cProperties].dwStatus        = DBPROPSTATUS_OK;
      rgProperties[cProperties].colid           = DB_NULLID;
      rgProperties[cProperties].vValue.vt       = VT_BSTR;
      V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszPassword);
      cProperties++;
   }

   // DBPROP_INIT_PROVIDERSTRING
   if (pwszProviderString) {
      rgProperties[cProperties].dwPropertyID    = DBPROP_INIT_PROVIDERSTRING;
      rgProperties[cProperties].dwOptions       = DBPROPOPTIONS_REQUIRED;
      rgProperties[cProperties].dwStatus        = DBPROPSTATUS_OK;
      rgProperties[cProperties].colid           = DB_NULLID;
      rgProperties[cProperties].vValue.vt       = VT_BSTR;
      V_BSTR(&rgProperties[cProperties].vValue) = SysAllocString(pwszProviderString);
      cProperties++;
   }

   if (cProperties) {
      rgPropSets[cPropSets].cProperties = cProperties;
      rgPropSets[cPropSets].rgProperties = rgProperties;
      rgPropSets[cPropSets].guidPropertySet = DBPROPSET_DBINIT;
      cPropSets++;
   }

   // Initialize
   if (SUCCEEDED(hr))
      hr = pIDBProperties->SetProperties(cPropSets, rgPropSets);

   if (SUCCEEDED(hr))
      hr = pIDBInitialize->Initialize();

   if (SUCCEEDED(hr)) {
      printf("\tConnected!\r\n");
   }
   else
      printf("Unable to connect\r\n");

   // Set fastload prop
   if (SUCCEEDED(hr))
      hr = SetFastLoadProperty(pIDBInitialize);

   if (SUCCEEDED(hr))
      hr = pIDBInitialize->QueryInterface(IID_IDBCreateSession, (void **)&pIDBCrtSess);

   if (SUCCEEDED(hr))
      hr = pIDBCrtSess->CreateSession(NULL, IID_IOpenRowset, (IUnknown **)&pIOpenRowset);

   if (SUCCEEDED(hr))
      hr = pIOpenRowset->OpenRowset(NULL, &TableID, NULL, IID_IRowsetFastLoad, 0, NULL, (IUnknown **)&pIRowsetFastLoad);

   if (SUCCEEDED(hr))
      hr = pIRowsetFastLoad->QueryInterface(IID_IAccessor, (void **)&pIAccessor);

   if (SUCCEEDED(hr))
      hr = pIAccessor->CreateAccessor(DBACCESSOR_ROWDATA, 3, rgBinding, ulOffset, &hAcc, (DBBINDSTATUS *)&rgStatus);

   if (SUCCEEDED(hr)) {
      pData = (BYTE *)malloc(ulOffset);

      for (iRow = 0 ; iRow < MAX_ROWS ; iRow++) {
         // Column 1 data        
         *(DBSTATUS *)(pData + rgBinding[0].obStatus) = DBSTATUS_S_OK;
         *(DBLENGTH *)(pData + rgBinding[0].obLength) = 1234567;   // Ignored for I4 data
         *(LONG *)(pData + rgBinding[0].obValue) = iRow;

         // Column 2 data        
         *(DBSTATUS *)(pData + rgBinding[1].obStatus) = DBSTATUS_S_OK;
         *(DBLENGTH *)(pData + rgBinding[1].obLength) = 1234567;   // Ignored for I4 data
         *(LONG *)(pData + rgBinding[1].obValue) = iRow + 1;

         // Column 3 data        
         *(DBSTATUS *)(pData + rgBinding[2].obStatus) = DBSTATUS_S_OK;
         *(DBLENGTH *)(pData + rgBinding[2].obLength) = MAX_BLOB/(iRow + 1);   // Not needed for providers that don't require length
#ifdef USE_ISEQSTREAM
         // DBLENGTH is used to tell the provider how much BLOB data to expect from the stream, not required
         // if provider supports sending data without length
         *(ISequentialStream **)(pData+rgBinding[2].obValue) = (ISequentialStream *)pMySeqStream; 
         pMySeqStream->Init((void *)&bSrcBuf, sizeof(bSrcBuf), MAX_BLOB / (iRow + 1));   // Here we set the size we will let the provider read
         pMySeqStream->AddRef();   // The provider releases the object, so we addref it so it doesn't get destructed
#else
         memset(pData + rgBinding[2].obValue, 0, MAX_BLOB);   // Not strictly necessary
         memset(pData + rgBinding[2].obValue, 0x23, MAX_BLOB / (iRow + 1)); 
#endif
         if (SUCCEEDED(hr))
            hr = pIRowsetFastLoad->InsertRow(hAcc, pData);
      }
   }

   if (SUCCEEDED(hr))
      hr = pIRowsetFastLoad->Commit(TRUE);

   if (hAcc)
      pIAccessor->ReleaseAccessor(hAcc, NULL);

   SAFE_RELEASE(pIDBInitialize);
   SAFE_RELEASE(pIDBCrtSess);
   SAFE_RELEASE(pIOpenRowset);
   SAFE_RELEASE(pIDBCrtCmd);
   SAFE_RELEASE(pICmdText);
   SAFE_RELEASE(pIAccessor);
   SAFE_RELEASE(pIRowsetFastLoad);
   SAFE_RELEASE(pIDBProperties);
#ifdef USE_ISEQSTREAM
   SAFE_RELEASE(pMySeqStream);
#endif

   if (pData)
      free(pData);

   CoUninitialize();
}

use master
drop table fltest