watermark keeps increasing and event are not processed in Stream Analytics
I have IoT package data coming in from Event Hub to Stream Analytics and then outputted to a Delta Lake
before switching to Delta Lake, I had output as CSV and it worked fine, no watermark delays, resource utilization was stable and low.
After switching to output type Delta Lake (preview) - data stopped being processed and watermark kept increasing. All the spike drops in the graph are manual restarts of SA, because that's then it actually processes data and then watermark starts rising again. after restart - it has no problem processing data from past day almost instantly, so I don't think it's related to streaming units
Other metrics show no indications (or at least I cannot see it) - there are no errors, CPU utilization is low, there are no early, late or out of order events.
I'm using 1 SU (V1 pricing SKU, lowest option) but I did try to scale up to 3 SUs and result was the same. scaling is not enabled.
Environment: standard (multi-tenant)
Compatibility level: 1.2
Error policy: Retry
Input:
Event Hub, Json format, no compression.
Data is usually comes in in batches of up to 1k events (event hub still receives each event individually). data is also not real time - timestamps could be from few days ago.
Query:
WITH NewInput AS
(
SELECT
udf.transformInput([eventhub-input]) AS UpdatedJson
FROM
[eventhub-input]
)
SELECT
UpdatedJson.*
INTO
[output-deltalake-type1]
FROM
NewInput
WHERE UpdatedJson.DocumentType = 'type1'
SELECT
UpdatedJson.*
INTO
[output-deltalake-type2]
FROM
NewInput
WHERE UpdatedJson.DocumentType = 'type2'
SELECT
UpdatedJson.*
INTO
[output-deltalake-type3]
FROM
NewInput
WHERE UpdatedJson.DocumentType = 'type3'
all 3 types have different schemas so they have are outputted to different paths within same storage account container
Job simulation says that this job is parallel.
udf.transformInput removes some fields from input, and does some other small updates.
Output
all output configuration is identical except for delta table path
type: Blob storage/ADLS Gen2
serialization format: Delta Lake (preview)
delta table path: data/{type1-3}
Partition column: userId
minimum rows: 10000
maximum time: 1 hour
Am I missing something in my configuration? any help would be appreciated