Usar Python para conectar e executar comandos SQL no Azure Cosmos DB para PostgreSQL
APLICA-SE A: Azure Cosmos DB para PostgreSQL (alimentado pela extensão de banco de dados Citus para PostgreSQL)
Este guia de início rápido mostra como usar o código Python para se conectar a um cluster e usar instruções SQL para criar uma tabela. Em seguida, você inserirá, consultará, atualizará e excluirá dados no banco de dados. As etapas neste artigo pressupõem que você esteja familiarizado com o desenvolvimento Python e seja novo no trabalho com o Azure Cosmos DB para PostgreSQL.
Instalar a biblioteca PostgreSQL
Os exemplos de código neste artigo requerem a biblioteca psycopg2 . Você precisará instalar o psycopg2 com seu gerenciador de pacotes de idiomas (como pip).
Conectar, criar uma tabela e inserir dados
O exemplo de código a seguir cria um pool de conexões para seu banco de dados Postgres. Em seguida, ele usa funções cursor.execute com instruções SQL, CREATE TABLE e INSERT INTO para criar uma tabela e inserir dados.
Gorjeta
O código de exemplo abaixo usa um pool de conexões para criar e gerenciar conexões com o PostgreSQL. O pool de conexões do lado do aplicativo é altamente recomendado porque:
- Ele garante que o aplicativo não gere muitas conexões com o banco de dados e, portanto, evita exceder os limites de conexão.
- Ele pode ajudar a melhorar drasticamente o desempenho, tanto a latência quanto a taxa de transferência. O processo do servidor PostgreSQL deve ser bifurcado para lidar com cada nova conexão, e a reutilização de uma conexão evita essa sobrecarga.
No código a seguir, substitua <o cluster> pelo nome e <senha> do cluster pela senha de administrador ou token de ID do Microsoft Entra.
Nota
Este exemplo fecha a conexão no final, portanto, se você quiser executar os outros exemplos no artigo na mesma sessão, não inclua a # Clean up
seção quando executar este exemplo.
import psycopg2
from psycopg2 import pool
# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
print("Connection pool created successfully")
# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")
# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")
# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")
# Insert some data into the table
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")
# Clean up
conn.commit()
cursor.close()
conn.close()
Quando o código é executado com êxito, ele produz a seguinte saída:
Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data
Distribuir tabelas
O Azure Cosmos DB para PostgreSQL oferece o super poder de distribuir tabelas entre vários nós para escalabilidade. O comando abaixo permite distribuir uma tabela. Você pode saber mais sobre create_distributed_table
e a coluna de distribuição aqui.
Nota
A distribuição de tabelas permite que elas cresçam em qualquer nó de trabalho adicionado ao cluster.
# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")
Ler dados
O exemplo de código a seguir usa as seguintes APIs para ler dados do banco de dados:
- cursor.execute com a instrução SQL SELECT para ler dados.
- cursor.fetchall() para aceitar uma consulta e retornar um conjunto de resultados para iterar.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()
# Print all rows
for row in rows:
print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))
Atualizar dados
O exemplo de código a seguir usa cursor.execute
com a instrução SQL UPDATE para atualizar dados.
# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")
Eliminar dados
O exemplo de código a seguir é executado cursor.execute
com a instrução SQL DELETE para excluir os dados.
# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")
Comando COPY para ingestão rápida
O comando COPY pode gerar uma taxa de transferência tremenda ao ingerir dados no Azure Cosmos DB para PostgreSQL. O comando COPY pode ingerir dados em arquivos ou de microlotes de dados na memória para ingestão em tempo real.
Comando COPY para carregar dados de um ficheiro
O código a seguir copia dados de um arquivo CSV para uma tabela de banco de dados. O código requer o arquivo pharmacies.csv.
with open('pharmacies.csv', 'r') as f:
# Notice that we don't need the `csv` module.
next(f) # Skip the header row.
cursor.copy_from(f, 'pharmacy', sep=',')
print("copying data completed")
Comando COPY para carregar dados na memória
O código a seguir copia dados na memória para uma tabela.
data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)
buf.seek(0)
with conn.cursor() as cur:
cur.copy_from(buf, "pharmacy", sep=",")
conn.commit()
conn.close()
Repetição de aplicativo para falhas de solicitação de banco de dados
Às vezes, é possível que as solicitações de banco de dados do seu aplicativo falhem. Tais problemas podem acontecer em diferentes cenários, como falha de rede entre o aplicativo e o banco de dados, senha incorreta, etc. Alguns problemas podem ser transitórios e resolver-se em poucos segundos a minutos. Você pode configurar a lógica de repetição em seu aplicativo para superar os erros transitórios.
Configurar a lógica de repetição em seu aplicativo ajuda a melhorar a experiência do usuário final. Em cenários de falha, os usuários apenas esperarão um pouco mais para que o aplicativo atenda às solicitações, em vez de experimentar erros.
O exemplo abaixo mostra como implementar a lógica de repetição em seu aplicativo. O trecho de código de exemplo tenta uma solicitação de banco de dados a cada 60 segundos (até cinco vezes) até que seja bem-sucedido. O número e a frequência de novas tentativas podem ser configurados com base nas necessidades do seu aplicativo.
Neste código, substitua <o cluster> pelo nome e <senha> do cluster pela senha de administrador.
import psycopg2
import time
from psycopg2 import pool
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)
def executeRetry(query, retryCount):
for x in range(retryCount):
try:
if (postgreSQL_pool):
# Use getconn() to Get Connection from connection pool
conn = postgreSQL_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
return cursor.fetchall()
break
except Exception as err:
print(err)
postgreSQL_pool.putconn(conn)
time.sleep(60)
return None
print(executeRetry("select 1", 5))
Próximos passos
- Veja como a API do Azure Cosmos DB para PostgreSQL estende o PostgreSQL e tente consultas de diagnóstico úteis
- Escolha o melhor tamanho de cluster para sua carga de trabalho
- Monitorizar o desempenho do cluster