Databricks SDK for Go
この記事では、Databricks SDK for Go を使用して、Azure Databricks アカウント、ワークスペース、および関連リソースでの操作を自動化する方法について説明します。 この記事は、Databricks SDK for Go の README、API リファレンス、例を補足するものです。
Note
この機能はベータ版であり、運用環境で使用しても問題ありません。
ベータ期間中、Databricks では、コードが依存する Databricks SDK for Go の特定のマイナー バージョン (プロジェクトの go.mod
ファイルなど) に依存関係をピン留めすることをお勧めします。 依存関係のピン留めについて詳しくは、「依存関係の管理」を参照してください。
開始する前に
Databricks SDK for Go の使用を開始する前に、開発マシンには次が必要です。
Databricks SDK for Go を使ってみる
Go が既にインストールされ、既存の Go コード プロジェクトが既に作成され、Azure Databricks 認証が構成されている開発マシンで、
go mod init
コマンドを実行して、Go コードの依存関係を追跡するgo.mod
ファイルを作成します。例:go mod init sample
go mod edit -require
コマンドを実行して、Databricks SDK for Go パッケージへの依存関係を設定します。0.8.0
は、CHANGELOG に記載されている Databricks SDK for Go パッケージの最新バージョンに置き換えてください。go mod edit -require github.com/databricks/databricks-sdk-go@v0.8.0
go.mod
ファイルは次のようになります。module sample go 1.18 require github.com/databricks/databricks-sdk-go v0.8.0
プロジェクト内で、Databricks SDK for Go をインポートする Go コード ファイルを作成します。 次の例では、次のコンテンツの
main.go
という名前のファイルで、Azure Databricks ワークスペース内のすべてのクラスターを一覧表示します。package main import ( "context" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/compute" ) func main() { w := databricks.Must(databricks.NewWorkspaceClient()) all, err := w.Clusters.ListAll(context.Background(), compute.ListClustersRequest{}) if err != nil { panic(err) } for _, c := range all { println(c.ClusterName) } }
go mod tidy
コマンドを実行して、不足しているモジュール依存関係を追加します。go mod tidy
注意
エラー
go: warning: "all" matched no packages
が発生した場合は、Databricks SDK for Go をインポートする Go コード ファイルを追加するのを忘れています。go mod vendor
コマンドを実行して、main
モジュール内のパッケージのビルドとテストをサポートするために必要なすべてのパッケージのコピーを取得します。go mod vendor
Azure Databricks 認証用に開発マシンを設定します。
go run
コマンドを実行して、main.go
という名前のファイルを想定して Go コード ファイルを実行します。go run main.go
注意
w := databricks.Must(databricks.NewWorkspaceClient())
への前の呼び出しで、引数として*databricks.Config
を設定しないことで、Databricks SDK for Go では、認証の実行を試みるために既定のプロセスが使用されます。 この既定の動作をオーバーライドするには、「Azure Databricks アカウントまたはワークスペースで Databricks SDK for Go を認証する」を参照してください。
Databricks SDK for Go を更新する
CHANGELOG に記載されている Databricks SDK for Go パッケージのいずれかを使用するように Go プロジェクトを更新するには、次の操作を行います。
プロジェクト ルートから、更新実行の
-u
フラグを指定してgo get
コマンドを実行し、Databricks SDK for Go パッケージの名前とターゲット バージョン番号を指定します。 たとえば、バージョン0.12.0
に更新するには、次のコマンドを実行します。go get -u github.com/databricks/databricks-sdk-go@v0.12.0
go mod tidy
コマンドを実行して、不足しているモジュールや古いモジュールの依存関係を追加および更新します。go mod tidy
go mod vendor
コマンドを実行して、main
モジュール内のパッケージのビルドとテストをサポートするために必要なすべての新しい、更新されたパッケージのコピーを取得します。go mod vendor
Azure Databricks アカウントまたはワークスペースで Databricks SDK for Go を認証する
Databricks SDK for Go は、Databricks クライアント統合認証標準を実装しています。これは、統合されていて一貫性がある、アーキテクチャとプログラムによる認証アプローチです。 このアプローチは、Azure Databricks を使用した認証の設定と自動化を、より一元的で予測可能なものにするのに役立ちます。 これにより、Databricks 認証を一度構成すれば、それ以上認証構成を変更しなくても、複数の Databricks ツールおよび SDK でその構成を使用できます。 Go のより完全なコード例を含む詳細については、Databricks クライアント統合認証に関するページを参照してください。
Databricks SDK for Go を使用して Databricks 認証を初期化するために使用できるコーディング パターンには、次のようなものがあります。
次のいずれかを行って、Databricks の既定の認証を使用します。
- ターゲットの Databricks 認証の種類に必要なフィールドを含むカスタムの Databricks 構成プロファイルを作成または識別します。 次に、
DATABRICKS_CONFIG_PROFILE
環境変数をカスタム構成プロファイルの名前に設定します。 - ターゲットの Databricks 認証の種類に必要な環境変数を設定します。
次に、たとえば、以下のように Databricks の既定の認証を使って
WorkspaceClient
オブジェクトのインスタンスを作成します。import ( "github.com/databricks/databricks-sdk-go" ) // ... w := databricks.Must(databricks.NewWorkspaceClient())
- ターゲットの Databricks 認証の種類に必要なフィールドを含むカスタムの Databricks 構成プロファイルを作成または識別します。 次に、
必要なフィールドのハードコーディングはサポートされていますが、Azure Databricks 個人用アクセス トークンなどの機密情報がコードで公開されるリスクがあるため、推奨されません。 次の例では、Databricks トークン認証用に Azure Databricks ホストとアクセス トークンの値をハードコードします。
import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/config" ) // ... w := databricks.Must(databricks.NewWorkspaceClient(&databricks.Config{ Host: "https://...", Token: "...", }))
Databricks SDK for Go の README の「Authentication」(認証) も参照してください。
例
次のコード例は、Databricks SDK for Go を使用して、クラスターを作成および削除し、ジョブを実行し、アカウント ユーザーを一覧表示する方法を示しています。 これらのコード例では、Databricks SDK for Go の既定の Azure Databricks 認証 プロセスを使用しています。
その他のコード例については、GitHub の Databricks SDK for Go リポジトリの examples フォルダーを参照してください。
クラスターの作成
このコード例では、ローカル ディスクで、使用可能な最新の Databricks Runtime Long Term Support (LTS) バージョンと、使用可能な最小のクラスター ノードの種類でクラスターを作成しています。 このクラスターには 1 つのワーカーがあり、クラスターは 15 分のアイドル時間の後に自動的に終了します。 CreateAndWait
メソッド呼び出しにより、新しいクラスターがワークスペースで実行されるまでコードが一時停止されます。
package main
import (
"context"
"fmt"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/compute"
)
func main() {
const clusterName = "my-cluster"
const autoTerminationMinutes = 15
const numWorkers = 1
w := databricks.Must(databricks.NewWorkspaceClient())
ctx := context.Background()
// Get the full list of available Spark versions to choose from.
sparkVersions, err := w.Clusters.SparkVersions(ctx)
if err != nil {
panic(err)
}
// Choose the latest Long Term Support (LTS) version.
latestLTS, err := sparkVersions.Select(compute.SparkVersionRequest{
Latest: true,
LongTermSupport: true,
})
if err != nil {
panic(err)
}
// Get the list of available cluster node types to choose from.
nodeTypes, err := w.Clusters.ListNodeTypes(ctx)
if err != nil {
panic(err)
}
// Choose the smallest available cluster node type.
smallestWithLocalDisk, err := nodeTypes.Smallest(clusters.NodeTypeRequest{
LocalDisk: true,
})
if err != nil {
panic(err)
}
fmt.Println("Now attempting to create the cluster, please wait...")
runningCluster, err := w.Clusters.CreateAndWait(ctx, compute.CreateCluster{
ClusterName: clusterName,
SparkVersion: latestLTS,
NodeTypeId: smallestWithLocalDisk,
AutoterminationMinutes: autoTerminationMinutes,
NumWorkers: numWorkers,
})
if err != nil {
panic(err)
}
switch runningCluster.State {
case compute.StateRunning:
fmt.Printf("The cluster is now ready at %s#setting/clusters/%s/configuration\n",
w.Config.Host,
runningCluster.ClusterId,
)
default:
fmt.Printf("Cluster is not running or failed to create. %s", runningCluster.StateMessage)
}
// Output:
//
// Now attempting to create the cluster, please wait...
// The cluster is now ready at <workspace-host>#setting/clusters/<cluster-id>/configuration
}
クラスターを完全に削除する
このコード例では、指定したクラスター ID のクラスターをワークスペースから完全に削除しています。
package main
import (
"context"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/clusters"
)
func main() {
// Replace with your cluster's ID.
const clusterId = "1234-567890-ab123cd4"
w := databricks.Must(databricks.NewWorkspaceClient())
ctx := context.Background()
err := w.Clusters.PermanentDelete(ctx, compute.PermanentDeleteCluster{
ClusterId: clusterId,
})
if err != nil {
panic(err)
}
}
ジョブの実行
このコード例では、指定したクラスターで指定したノートブックを実行する Azure Databricks ジョブを作成しています。 コードを実行すると、ターミナルでユーザーから既存のノートブックのパス、既存のクラスター ID、および関連するジョブ設定が取得されます。 RunNowAndWait
メソッド呼び出しにより、ワークスペースで新しいジョブの実行が完了するまでコードが一時停止されます。
package main
import (
"bufio"
"context"
"fmt"
"os"
"strings"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
func main() {
w := databricks.Must(databricks.NewWorkspaceClient())
ctx := context.Background()
nt := jobs.NotebookTask{
NotebookPath: askFor("Workspace path of the notebook to run:"),
}
jobToRun, err := w.Jobs.Create(ctx, jobs.CreateJob{
Name: askFor("Some short name for the job:"),
Tasks: []jobs.JobTaskSettings{
{
Description: askFor("Some short description for the job:"),
TaskKey: askFor("Some key to apply to the job's tasks:"),
ExistingClusterId: askFor("ID of the existing cluster in the workspace to run the job on:"),
NotebookTask: &nt,
},
},
})
if err != nil {
panic(err)
}
fmt.Printf("Now attempting to run the job at %s/#job/%d, please wait...\n",
w.Config.Host,
jobToRun.JobId,
)
runningJob, err := w.Jobs.RunNow(ctx, jobs.RunNow{
JobId: jobToRun.JobId,
})
if err != nil {
panic(err)
}
jobRun, err := runningJob.Get()
if err != nil {
panic(err)
}
fmt.Printf("View the job run results at %s/#job/%d/run/%d\n",
w.Config.Host,
jobRun.JobId,
jobRun.RunId,
)
// Output:
//
// Now attempting to run the job at <workspace-host>/#job/<job-id>, please wait...
// View the job run results at <workspace-host>/#job/<job-id>/run/<run-id>
}
// Get job settings from the user.
func askFor(prompt string) string {
var s string
r := bufio.NewReader(os.Stdin)
for {
fmt.Fprint(os.Stdout, prompt+" ")
s, _ = r.ReadString('\n')
if s != "" {
break
}
}
return strings.TrimSpace(s)
}
Unity Catalog ボリューム内のファイルを管理する
このコード例では、Unity Catalog ボリュームにアクセスするための、WorkspaceClient
内の files
機能へのさまざまな呼び出しを示します。
package main
import (
"context"
"io"
"os"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/files"
)
func main() {
w := databricks.Must(databricks.NewWorkspaceClient())
catalog := "main"
schema := "default"
volume := "my-volume"
volumePath := "/Volumes/" + catalog + "/" + schema + "/" + volume // /Volumes/main/default/my-volume
volumeFolder := "my-folder"
volumeFolderPath := volumePath + "/" + volumeFolder // /Volumes/main/default/my-volume/my-folder
volumeFile := "data.csv"
volumeFilePath := volumeFolderPath + "/" + volumeFile // /Volumes/main/default/my-volume/my-folder/data.csv
uploadFilePath := "./data.csv"
// Create an empty folder in a volume.
err := w.Files.CreateDirectory(
context.Background(),
files.CreateDirectoryRequest{DirectoryPath: volumeFolderPath},
)
if err != nil {
panic(err)
}
// Upload a file to a volume.
fileUpload, err := os.Open(uploadFilePath)
if err != nil {
panic(err)
}
defer fileUpload.Close()
w.Files.Upload(
context.Background(),
files.UploadRequest{
Contents: fileUpload,
FilePath: volumeFilePath,
Overwrite: true,
},
)
// List the contents of a volume.
items := w.Files.ListDirectoryContents(
context.Background(),
files.ListDirectoryContentsRequest{DirectoryPath: volumePath},
)
for {
if items.HasNext(context.Background()) {
item, err := items.Next(context.Background())
if err != nil {
break
}
println(item.Path)
} else {
break
}
}
// List the contents of a folder in a volume.
itemsFolder := w.Files.ListDirectoryContents(
context.Background(),
files.ListDirectoryContentsRequest{DirectoryPath: volumeFolderPath},
)
for {
if itemsFolder.HasNext(context.Background()) {
item, err := itemsFolder.Next(context.Background())
if err != nil {
break
}
println(item.Path)
} else {
break
}
}
// Print the contents of a file in a volume.
file, err := w.Files.DownloadByFilePath(
context.Background(),
volumeFilePath,
)
if err != nil {
panic(err)
}
bufDownload := make([]byte, file.ContentLength)
for {
file, err := file.Contents.Read(bufDownload)
if err != nil && err != io.EOF {
panic(err)
}
if file == 0 {
break
}
println(string(bufDownload[:file]))
}
// Delete a file from a volume.
w.Files.DeleteByFilePath(
context.Background(),
volumeFilePath,
)
// Delete a folder from a volume.
w.Files.DeleteDirectory(
context.Background(),
files.DeleteDirectoryRequest{
DirectoryPath: volumeFolderPath,
},
)
}
アカウント ユーザーの一覧表示
このコード例では、Azure Databricks アカウント内の使用可能なユーザーの一覧を示します。
package main
import (
"context"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/iam"
)
func main() {
a := databricks.Must(databricks.NewAccountClient())
all, err := a.Users.ListAll(context.Background(), iam.ListAccountUsersRequest{})
if err != nil {
panic(err)
}
for _, u := range all {
println(u.UserName)
}
}
その他の技術情報
詳細については、以下を参照してください:
- Databricks SDK for Go の README
- Databricks SDK for Go の API リファレンス
- その他のコード例
- ログ記録
- テスト
- 長時間の操作
- ページ分割された応答