Потоки изменений в API Azure Cosmos DB для MongoDB
Область применения: MongoDB
Поддержка канала изменений в API Azure Cosmos DB для MongoDB доступна с помощью API потоков изменений. С помощью API потоков изменений приложения могут получать изменения, внесенные в коллекцию или элементы в пределах одного сегмента. На основе полученных результатов можно будет выполнять дальнейшие действия. Изменения элементов в коллекции фиксируются в порядке времени их изменения, и в каждом ключе сегмента обязательно соблюдается порядок сортировки.
Примечание.
Чтобы использовать потоки изменений, создайте API Azure Cosmos DB для учетной записи MongoDB с версией сервера 3.6 и выше. При выполнении примеров потока изменений в более ранней версии может появиться следующее сообщение об ошибке: Unrecognized pipeline stage name: $changeStream (Неопознанное имя этапа конвейера: $changeStream).
Примеры
В следующем примере показано, как получить потоки изменений для всех элементов в коллекции. В этом примере создается курсор для просмотра элементов при их вставке, обновлении или замене. Для получения потоков изменений требуются этап $match
, этап $project
и параметр fullDocument
. Наблюдение за операциями удаления с помощью потоков изменений сейчас не поддерживается. В качестве обходного решения в удаляемые элементы можно добавить программный маркер. Например, вы можете добавить атрибут в элемент под названием "deleted". Когда вы захотите удалить элемент, вы можете установить "deleted" (удален) в true
и установить TTL для элемента. Так как обновление "deleted" до true
является изменением, оно будет отображено в потоке изменений.
var cursor = db.coll.watch(
[
{ $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
],
{ fullDocument: "updateLookup" });
while (!cursor.isExhausted()) {
if (cursor.hasNext()) {
printjson(cursor.next());
}
}
Изменения в пределах одного сегмента
В следующем примере показано, как получить изменения элементов в пределах одного сегмента. В этом примере показано получение изменений элементов, ключ сегмента которых равен "a", а значение ключа сегмента равно "1". Допускается ситуация, когда разные клиенты могут одновременно считывать изменения из разных сегментов.
var cursor = db.coll.watch(
[
{
$match: {
$and: [
{ "fullDocument.a": 1 },
{ "operationType": { $in: ["insert", "update", "replace"] } }
]
}
},
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} }
],
{ fullDocument: "updateLookup" });
Масштабирование потоков изменений
При использовании потоков изменений в большом масштабе рекомендуется равномерно распределить нагрузку. Используйте пользовательскую команду GetChangeStreamTokens, чтобы распределить нагрузку между физическими сегментами и секциями.
Текущие ограничения
При использовании потоков изменений действуют следующие ограничения.
- Свойства
operationType
иupdateDescription
пока не поддерживаются в выходном документе. - В настоящее время поддерживаются типы операций
insert
,update
иreplace
. При этом операция удаления или другие события пока не поддерживаются.
В связи с этими ограничениями этап $match, этап $project и параметры fullDocument являются обязательными, как показано в предыдущих примерах.
В отличие от канала изменений в API Azure Cosmos DB для NoSQL, не существует отдельной библиотеки обработчика канала изменений для использования потоков изменений или необходимости в контейнере аренды. В настоящее время поддержка триггеров Функций Azure для обработки потоков изменений отсутствует.
Обработка ошибок
При использовании потоков изменений поддерживаются следующие коды ошибок и сообщения.
Код ошибки HTTP: 16500 — когда поток изменений регулируется, он возвращает пустую страницу.
NamespaceNotFound (OperationType Invalidate) — при запуске потока изменений в несуществующей коллекции или при удалении коллекции возвращается ошибка
NamespaceNotFound
. Так как свойствоoperationType
не может быть возвращено в выходном документе, вместо ошибкиoperationType Invalidate
возвращается ошибкаNamespaceNotFound
.