変更をバッチで配信する方法 (SQL Server)
このトピックでは、SqlSyncProvider、SqlCeSyncProvider、または DbSyncProvider を使用する Sync Framework でのデータベース同期で、変更をバッチで配信する方法について説明します。このトピックのコードでは、次の Sync Framework クラスを中心に説明します。
バッチ処理について
Sync Framework の既定では、各ノードに対して、1 つの DataSet オブジェクトとして変更が配信されます。変更がノードに適用されるとき、このオブジェクトはメモリ内に格納されています。変更が適用されるコンピューターに十分なメモリがあり、その接続の信頼性が高い場合、この既定の動作で対応できます。ただし、一部のアプリケーションでは、変更を複数のバッチ処理に分割した方が良い場合があります。同期アプリケーションに関して、次のシナリオを考えてみます。
SqlCeSyncProvider を使用する多数のクライアントと、SqlSyncProvider を使用するサーバーとの間で定期的に同期を行う。
いずれのクライアントもメモリ量とディスク領域が限られている。
サーバーとクライアント間の接続は帯域幅が狭く不安定で、しばしば同期に時間がかかったり、接続が中断されたりする場合がある。
通常の同期セッションで変更のサイズ (KB) が大きい。
この種のシナリオでは、変更をバッチに分割した方がよい結果が得られます。バッチ分割することには次のような利点があります。
変更をクライアントに格納するための使用メモリ量 (メモリ データ キャッシュ サイズ) を開発者が制御できる。これによって、クライアント側のメモリ不足エラーを回避できます。
Sync Framework は、同期処理に失敗しても現在のバッチから再開すればよく、一連の変更全体をやり直す必要がない。
エラーの発生時にサーバーで変更のダウンロードや列挙をやり直す必要性を軽減または排除できる。
2 層および N 層アプリケーションの場合、バッチ分割のための構成は簡単です。初回同期セッションから利用できます。
バッチ処理の構成と使用
Sync Framework でのバッチ処理は次のように機能します。
アプリケーションが、同期セッションに参加する各プロバイダーのメモリ データ キャッシュ サイズを指定します。
両方のプロバイダーのキャッシュ サイズが指定された場合、Sync Framework は小さい方の値を両方のプロバイダーに使用します。実際のキャッシュ サイズは、指定された小さい方のサイズの 110% 以下になります。同期セッション中、いずれかの行が指定サイズの 110% を超えた場合、例外が発生してセッションが強制的に終了されます。
値を 0 (既定値) にすると、バッチ処理が無効になります。片方のプロバイダーでのみバッチ処理が有効になっている場合は、アップロードとダウンロードの両方に対してバッチ処理が有効になります。
アプリケーションが各プロバイダーのスプール ファイルの場所を指定します。既定では、同期プロセスが実行されているアカウントの temp ディレクトリにスプール ファイルが書き込まれます。
アプリケーションが Synchronize を呼び出します。
Sync Framework が 1 行ずつ変更を列挙します。同期元プロバイダーのメモリ データ キャッシュ サイズに達した場合、変更がローカル スプール ファイルに反映され、メモリ内データがフラッシュされます。すべての変更が列挙されるまで、このプロセスが続けられます。
N 層シナリオの場合、アプリケーション内のサービスおよびプロキシ コードによって、スプール ファイルが同期先にストリーミングされます。詳細については、このトピックの「N 層に固有のコード」を参照してください。2 層シナリオの場合、同期コードはすべて同期先で実行されるため、ローカル ファイルは既に同期先に存在します。
Sync Framework がスプール ファイルから変更を逆シリアル化し、変更を適用します。すべての変更が同期先に適用されるまで、このプロセスが続けられます。
すべてのバッチは 1 つのトランザクションとして適用されます。同期先プロバイダーが最後のバッチを受け取って初めてトランザクションが成立します。
2 層シナリオの場合、Sync Framework がスプール ファイルをクリーンアップします。N 層シナリオの場合、同期を開始したコンピューター上のスプール ファイルは Sync Framework がクリーンアップし、中間層のファイルはプロキシがクリーンアップする必要があります (この点については、この後の
Cleanup()
メソッドのサンプルで取り上げます)。セッションが中止されるケースも想定し、中間層には、特定の日数を経過したファイルをクリーンアップするプロセスも必要となります。
注意
ノードに適用されるデータ変更は、DbChangesSelectedEventArgs オブジェクトの Context プロパティから使用可能です。データがバッチ分割されていない場合は、ChangesSelected イベントが一度だけ発生し、すべての変更が Context プロパティから使用可能になります。データがバッチ分割されている場合は、各バッチに対して ChangesSelected が発生し、そのバッチからの変更だけが使用可能になります。すべてのバッチからの変更が必要な場合は、各 ChangesSelected イベントに応答して、返されるデータを保存します。
次の表は、バッチ処理に関係する型とメンバーを説明しています。バッチ処理に必須のプロパティは MemoryDataCacheSize だけですが、BatchingDirectory も設定することをお勧めします。
型またはメンバー | 説明 |
---|---|
BatchingDirectory |
ディスクにスプールされるバッチ ファイルが格納されているディレクトリを取得または設定します。実行中のプロバイダーまたはプロキシのローカル ディレクトリのパスを指定する必要があります。UNC ファイル パスやファイル以外の URI パスは指定できません。 注意 スプール ファイルには生のデータベース データが格納されます。ファイルの書き込み先ディレクトリは、適切なアクセス制御を使って保護する必要があります。 |
バッチ処理ファイル内の変更が同期先に適用された後にそのファイルをクリーンアップするかどうかを示す値を取得または設定します。既定では、ファイルがクリーンアップされます。 |
|
MemoryDataCacheSize |
変更をディスクにスプールする前に Sync Framework でそれらの変更をキャッシュするために使用するメモリの最大容量 (KB 単位) を取得または設定します。 注意 この設定が影響するのは、あくまで、同期先に送信される変更のデータおよびメタデータがメモリ内に維持されるサイズです。他の Sync Framework コンポーネントやユーザー アプリケーション コンポーネントによって使用されるメモリを制限するものではありません。 |
変更バッチが同期先に適用されるたびに発生するイベント。 |
|
変更バッチがディスクに書き込まれるたびに発生するイベント。 |
|
DbBatchAppliedEventArgs |
適用するバッチ数の合計や現在のバッチ番号など、BatchApplied イベントのデータを提供します。 |
DbBatchSpooledEventArgs |
現在のバッチ番号やバッチ サイズなど、BatchSpooled イベントのデータを提供します。 |
スプールされた変更の書き込み先のファイルの名前を取得または設定します。 |
|
データが複数のバッチで送信されるか、単一の DataSet オブジェクトで送信されるかを表す値を取得または設定します。 |
|
現在のバッチが最後の変更バッチかどうかを示す値を取得または設定します。 |
|
変更がバッチ処理された同期セッション中に再試行された削除操作の数を取得または設定します。 バッチで削除が再試行されるのは、主キーの削除と外部キーの削除の順番が原因です。現在のバッチ、またはその前のバッチに外部キーの削除がない場合、対応する主キーの削除は失敗します。すべてのバッチが適用された後に、失敗した削除を再試行します。 |
|
SelectIncrementalChangesCommand (DbSyncProvider のみに関係) |
ローカル データベースの増分変更を選択するために使用されるクエリまたはストアド プロシージャを取得または設定します。 注意 指定するクエリには、 |
同期される変更を格納する DataTable オブジェクトを取得または設定します。バッチ処理が有効な場合、このプロパティにアクセスすると、ディスクからスプール ファイルが逆シリアル化されます。その後、テーブルに対して行われたすべての変更がスプール ファイルに反映されます。 |
|
ピア データベースから選択した行を格納する DataSet オブジェクトを取得または設定します。IsDataBatched が true の場合は null を返します。 |
2 層および N 層に共通のコード
このセクションのコード例では、2 層および N 層のシナリオにおけるバッチ処理の方法を紹介します。このコードは Sync Framework SDK の SharingAppDemo-CEProviderEndToEnd
と WebSharingAppDemo-CEProviderEndToEnd
に含まれている 2 つのサンプルから抜粋したものです。それぞれの例には、コードが使用されている場所 (SharingAppDemo/CESharingForm
など) が明記されています。バッチ処理の観点から見て、2 つのアプリケーションの主な相違は、N 層の場合、スプール ファイルをアップロード/ダウンロードし、変更を列挙するノードごとにディレクトリを作成するためのコードが別途必要になる点です。
次のコード例は、SharingAppDemo/CESharingForm
の synchronizeBtn_Click
イベント ハンドラーから抜粋したもので、メモリ データ キャッシュ サイズと、スプール ファイルの書き込み先ディレクトリを設定します。BatchingDirectory
には、実行中のプロバイダーまたはプロキシのローカル ディレクトリのパスを指定する必要があります。UNC ファイル パスやファイル以外の URI パスは指定できません。BatchingDirectory
に指定するのはルート ディレクトリのパスです。Sync Framework は、同期セッションごとに、そのセッションのスプール ファイルを格納するための専用のサブディレクトリを作成します。このディレクトリを現在の同期元/同期先の組み合わせに固有とすることで、別のセッションのファイルを分離します。
次のコード例は、WebSharingAppDemo/CESharingForm
の synchronizeBtn_Click
イベント ハンドラーから抜粋したもので、設定するプロパティは同じですが、同期先のバッチ ディレクトリは、2 層シナリオのようにプロバイダーに対して直接設定するのではなく、プロキシに対して設定しています。
//Set memory data cache size property. 0 represents non batched mode.
//No need to set memory cache size for Proxy, because the source is
//enabled for batching: both upload and download will be batched.
srcProvider.MemoryDataCacheSize = this._batchSize;
//Set batch spool location. Default value if not set is %Temp% directory.
if (!string.IsNullOrEmpty(this.batchSpoolLocation.Text))
{
srcProvider.BatchingDirectory = this.batchSpoolLocation.Text;
destinationProxy.BatchingDirectory = this.batchSpoolLocation.Text;
}
次のコード例は、両方のアプリケーションの SynchronizationHelper
ファイルから抜粋したもので、変更の列挙中および変更の適用中にプロバイダーによって生成される BatchSpooled
と BatchAppliedEvents
を処理するメソッドです。
void provider_BatchSpooled(object sender, DbBatchSpooledEventArgs e)
{
this.progressForm.listSyncProgress.Items.Add("BatchSpooled event fired: Details");
this.progressForm.listSyncProgress.Items.Add("\tSource Database :" + ((RelationalSyncProvider)sender).Connection.Database);
this.progressForm.listSyncProgress.Items.Add("\tBatch Name :" + e.BatchFileName);
this.progressForm.listSyncProgress.Items.Add("\tBatch Size :" + e.DataCacheSize);
this.progressForm.listSyncProgress.Items.Add("\tBatch Number :" + e.CurrentBatchNumber);
this.progressForm.listSyncProgress.Items.Add("\tTotal Batches :" + e.TotalBatchesSpooled);
this.progressForm.listSyncProgress.Items.Add("\tBatch Watermark :" + ReadTableWatermarks(e.CurrentBatchTableWatermarks));
}
void provider_BatchApplied(object sender, DbBatchAppliedEventArgs e)
{
this.progressForm.listSyncProgress.Items.Add("BatchApplied event fired: Details");
this.progressForm.listSyncProgress.Items.Add("\tDestination Database :" + ((RelationalSyncProvider)sender).Connection.Database);
this.progressForm.listSyncProgress.Items.Add("\tBatch Number :" + e.CurrentBatchNumber);
this.progressForm.listSyncProgress.Items.Add("\tTotal Batches To Apply :" + e.TotalBatchesToApply);
}
//Reads the watermarks for each table from the batch spooled event. //The watermark denotes the max tickcount for each table in each batch.
private string ReadTableWatermarks(Dictionary<string, ulong> dictionary)
{
StringBuilder builder = new StringBuilder();
Dictionary<string, ulong> dictionaryClone = new Dictionary<string, ulong>(dictionary);
foreach (KeyValuePair<string, ulong> kvp in dictionaryClone)
{
builder.Append(kvp.Key).Append(":").Append(kvp.Value).Append(",");
}
return builder.ToString();
}
N 層に固有のコード
以降のコード例は、WebSharingAppDemo
の N 層シナリオのみを対象としています。関連する N 層のコードは次の 3 つのファイルに存在します。
サービス コントラクト :
IRelationalSyncContract
Web サービス :
RelationalWebSyncService
プロキシ :
RelationalProviderProxy
SqlSyncProvider と SqlCeSyncProvider の 2 つのプロバイダーはどちらも RelationalSyncProvider を継承しているので、このコードは両方のプロバイダーが対象になります。その他、ストアに固有の機能は、プロバイダーの種類ごとにプロキシとサービス ファイルに分離されます。
N 層シナリオにおけるバッチ処理の動作を理解するために、サーバーを同期元、クライアントを同期先とする同期セッションを考えます。サーバー上のローカル ディレクトリに変更が書き込まれると、ダウンロードされた変更に対して次の処理が発生します。
クライアント プロキシ上の
GetChangeBatch
メソッドが呼び出されます。この後のサンプル コードで示すように、このメソッドには、バッチを処理するためのコードが含まれている必要があります。サービスはバッチ ファイルを
SqlSyncProvider
から取得します。完全なパス情報が削除され、ファイル名のみがネットワーク経由で送信されます。そのため、サーバーのディレクトリ構造がクライアントに公開されることはありません。プロキシによって呼び出された
GetChangeBatch
の処理が戻ります。プロキシは変更がバッチ分割されていることを検出すると、引数にバッチ ファイル名を渡して
DownloadBatchFile
を呼び出します。プロキシはバッチ ファイルをローカルで保管するために、
RelationalProviderProxy.BatchingDirectory
に固有のディレクトリ (そのセッションでまだ作成されていない場合) を作成します。ディレクトリ名は、変更を列挙しているピアのレプリカ ID です。これにより、プロキシとサービスに固有のディレクトリが、列挙中のピアごとに 1 つ作成されることになります。
プロキシがファイルをダウンロードし、ローカルに保存します。プロキシは、現在のコンテキストにおけるファイル名を新しい (ローカル ディスク上のバッチ ファイルの) 完全パスに置き換えます。
プロキシがコンテキストをオーケストレータに戻します。
プロキシが最後のバッチを受け取るまで、手順 1. ~ 6. が繰り返されます。
アップロードされた変更に対しては、次の処理が行われます。
オーケストレータがプロキシ上の
ProcessChangeBatch
を呼び出します。プロキシはそれをバッチ ファイルであると判断し、次の処理を実行します。
完全なパス情報を削除し、ファイル名のみをネットワーク経由で送信します。
HasUploadedBatchFile
を呼び出してそのファイルが既にアップロードされているかどうかを調べます。既にアップロード済みである場合、手順 C は不要です。HasUploadedBatchFile
が false を返す場合、サービスのUploadBatchFile
を呼び出し、バッチ ファイルの内容をアップロードします。サービスは、
UploadBatchFile
の呼び出しを受け、バッチをローカルに保存します。ディレクトリの作成は、前の手順 4. と同様です。サービスの
ApplyChanges
を呼び出します。
サーバーが
ApplyChanges
の呼び出しを受け、それがバッチ ファイルであると判断します。現在のコンテキストにおけるファイル名を新しいローカル ディスク上のバッチ ファイルの完全パスに置き換えます。サーバーは
DbSyncContext
をローカルのSqlSyncProvider
に渡します。最後のバッチが送信されるまで、手順 1. ~ 6. が繰り返されます。
次のコード例は IRelationalSyncContract
から抜粋したもので、中間層との間でスプール ファイルを転送するためのアップロード メソッドとダウンロード メソッドを指定しています。
[OperationContract(IsOneWay = true)]
void UploadBatchFile(string batchFileid, byte[] batchFile);
[OperationContract]
byte[] DownloadBatchFile(string batchFileId);
次のコード例は RelationalWebSyncService
から抜粋したもので、コントラクトに定義されている UploadBatchFile
メソッドおよび DownloadBatchFile
メソッドを公開し、バッチ処理に関連した追加のロジックを次のメソッドに含めています。
Cleanup
: 指定されたディレクトリ (指定されなかった場合は temp ディレクトリ) からスプール ファイルをクリーンアップします。GetChanges
: データがバッチ分割されているかどうかをチェックし、バッチ分割されている場合は、スプール ファイルのディレクトリ パスがネットワークを介して送信されないように削除します。N 層シナリオの場合、ネットワーク接続を介してディレクトリの完全パスを送信することにはセキュリティ上のリスクが伴います。ファイル名は GUID です。HasUploadedBatchFile
: 特定のバッチ ファイルがサービスにアップロード済みかどうかを返します。ApplyChanges
: データがバッチ分割されているかどうかをチェックし、バッチ分割されている場合は、必要なバッチ ファイルが既にアップロードされているかどうかをチェックします。ファイルがアップロードされていなかった場合は、例外がスローされます。クライアントはApplyChanges
を呼び出す前に、スプール ファイルをアップロードしておく必要があります。
public abstract class RelationalWebSyncService: IRelationalSyncContract
{
protected bool isProxyToCompactDatabase;
protected RelationalSyncProvider peerProvider;
protected DirectoryInfo sessionBatchingDirectory = null;
protected Dictionary<string, string> batchIdToFileMapper;
int batchCount = 0;
public void Initialize(string scopeName, string hostName)
{
this.peerProvider = this.ConfigureProvider(scopeName, hostName);
this.batchIdToFileMapper = new Dictionary<string, string>();
}
public void Cleanup()
{
this.peerProvider = null;
//Delete all file in the temp session directory
if (sessionBatchingDirectory != null && sessionBatchingDirectory.Exists)
{
try
{
sessionBatchingDirectory.Delete(true);
}
catch
{
//Ignore
}
}
}
public void BeginSession(SyncProviderPosition position)
{
Log("*****************************************************************");
Log("******************** New Sync Session ***************************");
Log("*****************************************************************");
Log("BeginSession: ScopeName: {0}, Position: {1}", this.peerProvider.ScopeName, position);
//Clean the mapper for each session.
this.batchIdToFileMapper = new Dictionary<string, string>();
this.peerProvider.BeginSession(position, null/*SyncSessionContext*/);
this.batchCount = 0;
}
public SyncBatchParameters GetKnowledge()
{
Log("GetSyncBatchParameters: {0}", this.peerProvider.Connection.ConnectionString);
SyncBatchParameters destParameters = new SyncBatchParameters();
this.peerProvider.GetSyncBatchParameters(out destParameters.BatchSize, out destParameters.DestinationKnowledge);
return destParameters;
}
public GetChangesParameters GetChanges(uint batchSize, SyncKnowledge destinationKnowledge)
{
Log("GetChangeBatch: {0}", this.peerProvider.Connection.ConnectionString);
GetChangesParameters changesWrapper = new GetChangesParameters();
changesWrapper.ChangeBatch = this.peerProvider.GetChangeBatch(batchSize, destinationKnowledge, out changesWrapper.DataRetriever);
DbSyncContext context = changesWrapper.DataRetriever as DbSyncContext;
//Check to see if data is batched
if (context != null && context.IsDataBatched)
{
Log("GetChangeBatch: Data Batched. Current Batch #:{0}", ++this.batchCount);
//Dont send the file location info. Just send the file name
string fileName = new FileInfo(context.BatchFileName).Name;
this.batchIdToFileMapper[fileName] = context.BatchFileName;
context.BatchFileName = fileName;
}
return changesWrapper;
}
public SyncSessionStatistics ApplyChanges(ConflictResolutionPolicy resolutionPolicy, ChangeBatch sourceChanges, object changeData)
{
Log("ProcessChangeBatch: {0}", this.peerProvider.Connection.ConnectionString);
DbSyncContext dataRetriever = changeData as DbSyncContext;
if (dataRetriever != null && dataRetriever.IsDataBatched)
{
string remotePeerId = dataRetriever.MadeWithKnowledge.ReplicaId.ToString();
//Data is batched. The client should have uploaded this file to us prior to calling ApplyChanges.
//So look for it.
//The Id would be the DbSyncContext.BatchFileName which is just the batch file name without the complete path
string localBatchFileName = null;
if (!this.batchIdToFileMapper.TryGetValue(dataRetriever.BatchFileName, out localBatchFileName))
{
//Service has not received this file. Throw exception
throw new FaultException<WebSyncFaultException>(new WebSyncFaultException("No batch file uploaded for id " + dataRetriever.BatchFileName, null));
}
dataRetriever.BatchFileName = localBatchFileName;
}
SyncSessionStatistics sessionStatistics = new SyncSessionStatistics();
this.peerProvider.ProcessChangeBatch(resolutionPolicy, sourceChanges, changeData, new SyncCallbacks(), sessionStatistics);
return sessionStatistics;
}
public void EndSession()
{
Log("EndSession: {0}", this.peerProvider.Connection.ConnectionString);
Log("*****************************************************************");
Log("******************** End Sync Session ***************************");
Log("*****************************************************************");
this.peerProvider.EndSession(null);
Log("");
}
/// <summary>
/// Used by proxy to see if the batch file has already been uploaded. Optimizes by not resending batch files.
/// NOTE: This method takes in a file name as an input parameter and hence is suseptible for name canonicalization
/// attacks. This sample is meant to be a starting point in demonstrating how to transfer sync batch files and is
/// not intended to be a secure way of doing the same. This SHOULD NOT be used as such in production environment
/// without doing proper security analysis.
///
/// Please refer to the following two MSDN whitepapers for more information on guidelines for securing Web servies.
///
/// Design Guidelines for Secure Web Applications - https://msdn.microsoft.com/en-us/library/aa302420.aspx (Refer InputValidation section)
/// Architecture and Design Review for Security - https://msdn.microsoft.com/en-us/library/aa302421.aspx (Refer InputValidation section)
/// </summary>
/// <param name="batchFileId"></param>
/// <returns>bool</returns>
public bool HasUploadedBatchFile(String batchFileId, string remotePeerId)
{
this.CheckAndCreateBatchingDirectory(remotePeerId);
//The batchFileId is the fileName without the path information in it.
FileInfo fileInfo = new FileInfo(Path.Combine(this.sessionBatchingDirectory.FullName, batchFileId));
if (fileInfo.Exists && !this.batchIdToFileMapper.ContainsKey(batchFileId))
{
//If file exists but is not in the memory id to location mapper then add it to the mapping
this.batchIdToFileMapper.Add(batchFileId, fileInfo.FullName);
}
//Check to see if the proxy has already uploaded this file to the service
return fileInfo.Exists;
}
/// <summary>
/// NOTE: This method takes in a file name as an input parameter and hence is suseptible for name canonicalization
/// attacks. This sample is meant to be a starting point in demonstrating how to transfer sync batch files and is
/// not intended to be a secure way of doing the same. This SHOULD NOT be used as such in production environment
/// without doing proper security analysis.
///
/// Please refer to the following two MSDN whitepapers for more information on guidelines for securing Web servies.
///
/// Design Guidelines for Secure Web Applications - https://msdn.microsoft.com/en-us/library/aa302420.aspx (Refer InputValidation section)
/// Architecture and Design Review for Security - https://msdn.microsoft.com/en-us/library/aa302421.aspx (Refer InputValidation section)
/// </summary>
/// <param name="batchFileId"></param>
/// <param name="batchContents"></param>
/// <param name="remotePeerId"></param>
public void UploadBatchFile(string batchFileId, byte[] batchContents, string remotePeerId)
{
Log("UploadBatchFile: {0}", this.peerProvider.Connection.ConnectionString);
try
{
if (HasUploadedBatchFile(batchFileId, remotePeerId))
{
//Service has already received this file. So dont save it again.
return;
}
//Service hasnt seen the file yet so save it.
String localFileLocation = Path.Combine(sessionBatchingDirectory.FullName, batchFileId);
FileStream fs = new FileStream(localFileLocation, FileMode.Create, FileAccess.Write);
using (fs)
{
fs.Write(batchContents, 0, batchContents.Length);
}
//Save this Id to file location mapping in the mapper object
this.batchIdToFileMapper[batchFileId] = localFileLocation;
}
catch (Exception e)
{
throw new FaultException<WebSyncFaultException>(new WebSyncFaultException("Unable to save batch file.", e));
}
}
/// <summary>
/// NOTE: This method takes in a file name as an input parameter and hence is suseptible for name canonicalization
/// attacks. This sample is meant to be a starting point in demonstrating how to transfer sync batch files and is
/// not intended to be a secure way of doing the same. This SHOULD NOT be used as such in production environment
/// without doing proper security analysis.
///
/// Please refer to the following two MSDN whitepapers for more information on guidelines for securing Web servies.
///
/// Design Guidelines for Secure Web Applications - https://msdn.microsoft.com/en-us/library/aa302420.aspx (Refer InputValidation section)
/// Architecture and Design Review for Security - https://msdn.microsoft.com/en-us/library/aa302421.aspx (Refer InputValidation section)
/// </summary>
/// <param name="batchFileId"></param>
/// <returns></returns>
public byte[] DownloadBatchFile(string batchFileId)
{
try
{
Log("DownloadBatchFile: {0}", this.peerProvider.Connection.ConnectionString);
Stream localFileStream = null;
string localBatchFileName = null;
if (!this.batchIdToFileMapper.TryGetValue(batchFileId, out localBatchFileName))
{
throw new FaultException<WebSyncFaultException>(new WebSyncFaultException("Unable to retrieve batch file for id." + batchFileId, null));
}
localFileStream = new FileStream(localBatchFileName, FileMode.Open, FileAccess.Read);
byte[] contents = new byte[localFileStream.Length];
localFileStream.Read(contents, 0, contents.Length);
return contents;
}
catch (Exception e)
{
throw new FaultException<WebSyncFaultException>(new WebSyncFaultException("Unable to read batch file for id " + batchFileId, e));
}
}
protected void Log(string p, params object[] paramArgs)
{
Console.WriteLine(p, paramArgs);
}
//Utility functions that the sub classes need to implement.
protected abstract RelationalSyncProvider ConfigureProvider(string scopeName, string hostName);
private void CheckAndCreateBatchingDirectory(string remotePeerId)
{
//Check to see if we have temp directory for this session.
if (sessionBatchingDirectory == null)
{
//Generate a unique Id for the directory
//We use the peer id of the store enumerating the changes so that the local temp directory is same for a given source
//across sync sessions. This enables us to restart a failed sync by not downloading already received files.
string sessionDir = Path.Combine(this.peerProvider.BatchingDirectory, "WebSync_" + remotePeerId);
sessionBatchingDirectory = new DirectoryInfo(sessionDir);
//Create the directory if it doesnt exist.
if (!sessionBatchingDirectory.Exists)
{
sessionBatchingDirectory.Create();
}
}
}
}
次のコード例は RelationalProviderProxy
から抜粋したもので、Web サービスのプロパティを設定し、次のメソッドを呼び出しています。
BatchingDirectory
: アプリケーションから中間層のバッチ ディレクトリを設定できるようにします。EndSession
: 指定されたディレクトリからスプール ファイルをクリーンアップします。GetChangeBatch
:DownloadBatchFile
メソッドを呼び出して、変更バッチをダウンロードします。ProcessChangeBatch
:UploadBatchFile
メソッドを呼び出して、変更バッチをアップロードします。
public abstract class RelationalProviderProxy : KnowledgeSyncProvider, IDisposable
{
protected IRelationalSyncContract proxy;
protected SyncIdFormatGroup idFormatGroup;
protected string scopeName;
protected DirectoryInfo localBatchingDirectory;
//Represents either the SQL server host name or the CE database file name. Sql database name
//is always peer1
//For this sample scopeName is always Sales
protected string hostName;
private string batchingDirectory = Environment.ExpandEnvironmentVariables("%TEMP%");
public string BatchingDirectory
{
get { return batchingDirectory; }
set
{
if (string.IsNullOrEmpty(value))
{
throw new ArgumentException("value cannot be null or empty");
}
try
{
Uri uri = new Uri(value);
if (!uri.IsFile || uri.IsUnc)
{
throw new ArgumentException("value must be a local directory");
}
batchingDirectory = value;
}
catch (Exception e)
{
throw new ArgumentException("Invalid batching directory.", e);
}
}
}
public RelationalProviderProxy(string scopeName, string hostName)
{
this.scopeName = scopeName;
this.hostName = hostName;
this.CreateProxy();
this.proxy.Initialize(scopeName, hostName);
}
public override void BeginSession(SyncProviderPosition position, SyncSessionContext syncSessionContext)
{
this.proxy.BeginSession(position);
}
public override void EndSession(SyncSessionContext syncSessionContext)
{
proxy.EndSession();
if (this.localBatchingDirectory != null && this.localBatchingDirectory.Exists)
{
//Cleanup batching releated files from this session
this.localBatchingDirectory.Delete(true);
}
}
public override ChangeBatch GetChangeBatch(uint batchSize, SyncKnowledge destinationKnowledge, out object changeDataRetriever)
{
GetChangesParameters changesWrapper = proxy.GetChanges(batchSize, destinationKnowledge);
//Retrieve the ChangeDataRetriever and the ChangeBatch
changeDataRetriever = changesWrapper.DataRetriever;
DbSyncContext context = changeDataRetriever as DbSyncContext;
//Check to see if the data is batched.
if (context != null && context.IsDataBatched)
{
if (this.localBatchingDirectory == null)
{
//Retrieve the remote peer id from the MadeWithKnowledge.ReplicaId. MadeWithKnowledge is the local knowledge of the peer
//that is enumerating the changes.
string remotePeerId = context.MadeWithKnowledge.ReplicaId.ToString();
//Generate a unique Id for the directory.
//We use the peer id of the store enumerating the changes so that the local temp directory is same for a given source
//across sync sessions. This enables us to restart a failed sync by not downloading already received files.
string sessionDir = Path.Combine(this.batchingDirectory, "WebSync_" + remotePeerId);
this.localBatchingDirectory = new DirectoryInfo(sessionDir);
//Create the directory if it doesnt exist.
if (!this.localBatchingDirectory.Exists)
{
this.localBatchingDirectory.Create();
}
}
string localFileName = Path.Combine(this.localBatchingDirectory.FullName, context.BatchFileName);
FileInfo localFileInfo = new FileInfo(localFileName);
//Download the file only if doesnt exist
FileStream localFileStream = new FileStream(localFileName, FileMode.Create, FileAccess.Write);
if (!localFileInfo.Exists)
{
byte[] remoteFileContents = this.proxy.DownloadBatchFile(context.BatchFileName);
using (localFileStream)
{
localFileStream.Write(remoteFileContents, 0, remoteFileContents.Length);
}
}
//Set DbSyncContext.Batchfile name to the new local file name
context.BatchFileName = localFileName;
}
return changesWrapper.ChangeBatch;
}
public override FullEnumerationChangeBatch GetFullEnumerationChangeBatch(uint batchSize, SyncId lowerEnumerationBound, SyncKnowledge knowledgeForDataRetrieval, out object changeDataRetriever)
{
throw new NotImplementedException();
}
public override void GetSyncBatchParameters(out uint batchSize, out SyncKnowledge knowledge)
{
SyncBatchParameters wrapper = proxy.GetKnowledge();
batchSize = wrapper.BatchSize;
knowledge = wrapper.DestinationKnowledge;
}
public override SyncIdFormatGroup IdFormats
{
get
{
if (idFormatGroup == null)
{
idFormatGroup = new SyncIdFormatGroup();
//
// 1 byte change unit id (Harmonica default before flexible ids)
//
idFormatGroup.ChangeUnitIdFormat.IsVariableLength = false;
idFormatGroup.ChangeUnitIdFormat.Length = 1;
//
// Guid replica id
//
idFormatGroup.ReplicaIdFormat.IsVariableLength = false;
idFormatGroup.ReplicaIdFormat.Length = 16;
//
// Sync global id for item ids
//
idFormatGroup.ItemIdFormat.IsVariableLength = true;
idFormatGroup.ItemIdFormat.Length = 10 * 1024;
}
return idFormatGroup;
}
}
public override void ProcessChangeBatch(ConflictResolutionPolicy resolutionPolicy, ChangeBatch sourceChanges, object changeDataRetriever, SyncCallbacks syncCallbacks, SyncSessionStatistics sessionStatistics)
{
DbSyncContext context = changeDataRetriever as DbSyncContext;
if (context != null && context.IsDataBatched)
{
string fileName = new FileInfo(context.BatchFileName).Name;
//Retrieve the remote peer id from the MadeWithKnowledge.ReplicaId. MadeWithKnowledge is the local knowledge of the peer
//that is enumerating the changes.
string peerId = context.MadeWithKnowledge.ReplicaId.ToString();
//Check to see if service already has this file
if (!this.proxy.HasUploadedBatchFile(fileName, peerId))
{
//Upload this file to remote service
FileStream stream = new FileStream(context.BatchFileName, FileMode.Open, FileAccess.Read);
byte[] contents = new byte[stream.Length];
using (stream)
{
stream.Read(contents, 0, contents.Length);
}
this.proxy.UploadBatchFile(fileName, contents, peerId);
}
context.BatchFileName = fileName;
}
this.proxy.ApplyChanges(resolutionPolicy, sourceChanges, changeDataRetriever);
}
public override void ProcessFullEnumerationChangeBatch(ConflictResolutionPolicy resolutionPolicy, FullEnumerationChangeBatch sourceChanges, object changeDataRetriever, SyncCallbacks syncCallbacks, SyncSessionStatistics sessionStatistics)
{
throw new NotImplementedException();
}
protected abstract void CreateProxy();
#region IDisposable Members
public void Dispose()
{
this.proxy.Cleanup();
this.proxy = null;
GC.SuppressFinalize(this);
}
#endregion
}