read_kinesis
streamovaná funkce s hodnotou tabulky
Platí pro: Databricks SQL Databricks Runtime 13.3 LTS a vyšší
Vrátí tabulku se záznamy načtenými z Kinesis z jednoho nebo více datových proudů.
Syntaxe
read_kinesis ( { parameter => value } [, ...] )
Argumenty
read_kinesis
vyžaduje vyvolání pojmenovaného parametru.
Jediným povinným argumentem je streamName
. Všechny ostatní argumenty jsou volitelné.
Popisy argumentů jsou zde stručné. Další podrobnosti najdete v dokumentaci k Amazon Kinesis .
Existují různé možnosti připojení pro připojení a ověření pomocí AWS.
awsAccessKey
a awsSecretKey
buď lze zadat v argumentech funkce pomocí tajné funkce, ručně nastavit v argumentech nebo nakonfigurovat jako proměnné prostředí, jak je uvedeno níže.
roleArn
, roleExternalID
roleSessionName
lze také použít k ověřování pomocí AWS pomocí profilů instancí.
Pokud žádnou z těchto možností nezadáte, použije se výchozí řetězec zprostředkovatele AWS.
Parametr | Typ | Popis |
---|---|---|
streamName |
STRING |
Povinný, čárkami oddělený seznam jednoho nebo více kinezních proudů. |
awsAccessKey |
STRING |
Přístupový klíč AWS( pokud existuje). Je také možné zadat prostřednictvím různých možností podporovaných prostřednictvím výchozího řetězce zprostředkovatele přihlašovacích údajů AWS, včetně proměnných prostředí (AWS_ACCESS_KEY_ID ) a souboru profilů přihlašovacích údajů. |
awsSecretKey |
STRING |
Tajný klíč, který odpovídá přístupovém klíči. Je možné zadat buď v argumentech, nebo prostřednictvím různých možností podporovaných prostřednictvím výchozího řetězce zprostředkovatele přihlašovacích údajů AWS, včetně proměnných prostředí (AWS_SECRET_KEY nebo AWS_SECRET_ACCESS_KEY ) a souboru profilů přihlašovacích údajů. |
roleArn |
STRING |
Název prostředku Amazonu, který se má při přístupu k Kinesis předpokládat. |
roleExternalId |
STRING |
Používá se při delegování přístupu k účtu AWS. |
roleSessionName |
STRING |
Název relace role AWS. |
stsEndpoint |
STRING |
Koncový bod pro vyžádání dočasných přihlašovacích údajů pro přístup. |
region |
STRING |
Oblast pro zadání datových proudů Výchozí hodnotou je místně vyřešená oblast. |
endpoint |
STRING |
regionální koncový bod datových proudů Kinesis. Výchozí hodnotou je místně vyřešená oblast. |
initialPosition |
STRING |
Počáteční pozice pro čtení z datového proudu Jedna z těchto možností: "latest" (výchozí), "trim_horizon", "earliest", "at_timestamp". |
consumerMode |
STRING |
Jedna z těchto možností: dotazování (výchozí) nebo EFO (enhanced-fan-out). |
consumerName |
STRING |
Název příjemce. Všichni příjemci mají předponu databricks_. Výchozí hodnota je prázdný řetězec. |
registerConsumerTimeoutInterval |
STRING |
Maximální časový limit čekání na registraci příjemce EFO Kinesis u datového proudu Kinesis před vyvolání chyby. Výchozí hodnota je 300s. |
requireConsumerDeregistration |
BOOLEAN |
true pro zrušení registrace příjemce EFO při ukončení dotazu. Výchozí hodnota je false . |
deregisterConsumerTimeoutInterval |
STRING |
Maximální časový limit čekání na zrušení registrace uživatele EFO Kinesis u datového proudu Kinesis před vyvolání chyby. Výchozí hodnota je 300s. |
consumerRefreshInterval |
STRING |
Interval, ve kterém je příjemce kontrolován a aktualizován. Výchozí hodnota je 300s. |
Následující argumenty slouží k řízení propustnosti čtení a latence kinesis:
Parametr | Typ | Popis |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Volitelné, s výchozím nastavením 10 000 záznamů, které se mají načíst na požadavek rozhraní API na Kinesis. |
maxFetchRate |
STRING |
Jak rychle se mají předem načítat data na horizontální oddíly. Hodnota mezi "1,0" a "2,0", která se měří v MB/s. Výchozí hodnota je 1.0. |
minFetchPeriod |
STRING |
Maximální doba čekání mezi po sobě jdoucími pokusy o předběžné načtení. Výchozí hodnota je 400 ms. |
maxFetchDuration |
STRING |
Maximální doba trvání pro uložení předem načtených nových dat do vyrovnávací paměti. Výchozí hodnota je 10s. |
fetchBufferSize |
STRING |
Množství dat pro další aktivační událost. Výchozí hodnota je 20 gb. |
shardsPerTask |
INTEGER (>0) |
Počet horizontálních oddílů Kinesis, ze které se mají předčítat paralelně na úlohu Sparku. Výchozí volba je 5. |
shardFetchinterval |
STRING |
Jak často se má dotazovat na horizontální dělení. Výchozí hodnota je 1s. |
coalesceThresholdBlockSize |
INTEGER (>0) |
Prahová hodnota, při které dochází k automatickému sluhování. Výchozí hodnota je 10 000 000. |
coalesce |
BOOLEAN |
true a zkompilovat předem načtené požadavky. Výchozí hodnota je true . |
coalesceBinSize |
INTEGER (>0) |
Přibližná velikost bloku po zvětšování. Výchozí hodnota je 128 000 000. |
reuseKinesisClient |
BOOLEAN |
true pro opakované použití klienta Kinesis uloženého v mezipaměti. Výchozí hodnota je true s výjimkou clusteru PE. |
clientRetries |
INTEGER (>0) |
Počet opakování ve scénáři opakování Výchozí volba je 5. |
Návraty
Tabulka záznamů Kinesis s následujícím schématem:
Name | Datový typ | Vynulovatelné | Standard | Popis |
---|---|---|---|---|
partitionKey |
STRING |
No | Klíč, který se používá k distribuci dat mezi horizontální oddíly datového proudu. Všechny datové záznamy se stejným klíčem oddílu se načtou ze stejného horizontálního oddílu. | |
data |
BINARY |
No | Datová část kinesis s kódováním base-64. | |
stream |
STRING |
No | Název datového proudu, ze kterého byla data načtena. | |
shardId |
STRING |
No | Jedinečný identifikátor horizontálního oddílu, ze kterého byla data načtena. | |
sequenceNumber |
BIGINT |
No | Jedinečný identifikátor záznamu v rámci jeho horizontálního oddílu. | |
approximateArrivalTimestamp |
TIMESTAMP |
No | Přibližný čas vložení záznamu do datového proudu |
Sloupce (stream, shardId, sequenceNumber)
představují primární klíč.
Příklady
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');