GraphFrames ユーザー ガイド - Scala

この記事では、 Graphframes ユーザー ガイドの例を紹介します。

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

GraphFrames を作成する

頂点とエッジの DataFrame から GraphFrames を作成できます。

  • 頂点 DataFrame: 頂点 DataFrame は、グラフ内の各頂点に一意の ID を指定する id という名前の特別な列を含む必要があります。
  • エッジ DataFrame: エッジ DataFrame は、src (エッジのソース頂点の ID) と dst (エッジの宛先頂点の ID) という2つの特殊な列を含む必要があります。

どちらの DataFrames も、他の任意の列を持つことができます。 これらの列は、頂点属性とエッジ属性を表すことができます。

頂点とエッジの作成

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

これらの頂点とエッジからグラフを作成してみましょう。

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

基本的なグラフ クエリと DataFrame クエリ

GraphFrames には、ノードの次数などの単純なグラフクエリが用意されています。

また、GraphFrames は頂点とエッジの DataFrame のペアとしてグラフを表すため、頂点とエッジの DataFrame に直接強力なクエリを簡単に作成できます。 これらの DataFrame は、GraphFrames の頂点とエッジのフィールドとして使用できます。

display(g.vertices)
display(g.edges)

頂点の入次数:

display(g.inDegrees)

頂点の出次数:

display(g.outDegrees)

頂点の次数:

display(g.degrees)

頂点の DataFrame でクエリを直接実行できます。 たとえば、グラフで最年少の人の年齢を見つけることができます。

val youngest = g.vertices.groupBy().min("age")
display(youngest)

同様に、エッジの DataFrame でもクエリを実行できます。 たとえば、グラフ内の ' フォロー ' リレーションシップの数をカウントします。

val numFollows = g.edges.filter("relationship = 'follow'").count()

モチーフ検索

モチーフを使用して、エッジと頂点を含むより複雑なリレーションシップをビルドします。 次のセルは、両方向のエッジにある頂点のペアを検索します。 結果は DataFrame になります。ここでの列名はモチーフキーです。

API の詳細については、「GraphFrames ユーザー ガイド」を参照してください。

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

結果は DataFrame なので、より複雑なクエリをモチーフ上にビルドすることができます。 1 人が 30 歳より年長の相互関係をすべて検索してみましょう。

val filtered = motifs.filter("b.age > 30")
display(filtered)

ステートフル クエリ

上記の例のように、ほとんどのモチーフ クエリはステートレスで、簡単に表現できます。 次の例では、モチーフ内のパスで状態を保持する、より複雑なクエリを示します。 これらのクエリを表現するには、GraphFrame のモチーフ検索を結果のフィルターと組み合わせます。ここでフィルターは、シーケンス操作を使用して一連の DataFrame 列を構築します。

たとえば、一連の関数によって定義されたいくつかのプロパティを持つ 4 つの頂点のチェーンを識別するとします。 つまり、4 つの頂点 a->b->c->d のチェーンの中で、この複雑なフィルターに一致するチェーンのサブセットを特定します。

  • パスで状態を初期化します。
  • 頂点 a に基づいて状態を更新します。
  • 頂点 b に基づいて状態を更新します。
  • c と d でも同様にします。
  • 最終的な状態がいくつかの条件に一致する場合、フィルターはチェーンを受け入れます。

次のコードスニペットは、このプロセスを示しています。ここでは、3 つのエッジのうち 2 つ以上が "friend" リレーションシップだというように、4 つの頂点のチェーンを識別します。 この例では、状態は "friend" エッジの現在の数です。一般的に、どの DataFrame 列でも可能です。

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

部分グラフ

GraphFrames は、エッジと頂点をフィルター処理することによって、部分グラフを構築する API を提供します。 これらのフィルターは、まとめて構成できます。 たとえば、次の部分グラフには、友人で 30 歳を超える人だけが含まれています。

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

複合型のトリプレット フィルター

次の例では、エッジとその "src" および "dst" 頂点を操作するトリプレット フィルターに基づいて、部分グラフを選択する方法を示します。 より複雑なモチーフを使用することで、この例をトリプレットを超えて適用することは容易です。

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

標準のグラフ アルゴリズム

このセクションでは、GraphFrames に組み込まれている標準のグラフ アルゴリズムについて説明します。

幅優先の検索 (BFS)

age < 32 のユーザーの "Esther" から検索します。

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

検索では、エッジフィルターと最大パス長も制限される場合があります。

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

接続されたコンポーネント

各頂点の接続されたコンポーネントのメンバーシップを計算し、各頂点にコンポーネント ID が割り当てられたグラフを返します。

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

強く接続されたコンポーネント

各頂点の強く接続されたコンポーネント (SCC) を計算し、その頂点を含む SCC に割り当てられた各頂点を持つグラフを返します。

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

ラベルの伝達

ネットワーク内のコミュニティを検出するための静的なラベル伝達アルゴリズムを実行します。

ネットワーク内の各ノードは、最初に専用のコミュニティに割り当てられます。 すべてのスーパーステップで、ノードはコミュニティの所属をすべての近隣ノードに送信し、その状態を受信メッセージのモードコミュニティの所属に更新します。

LPA は、グラフの標準的なコミュニティ検出アルゴリズムです。 (1) 収束は保証されておらず、(2) 簡易ソリューションになる可能性があります (全ノードが単一のコミュニティとして識別される) が、計算コストは低いです。

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

接続に基づいてグラフ内の重要な頂点を識別します。

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

最短パス

指定された一連のランドマーク頂点への最短パスを計算します。ここで、ランドマークは頂点 ID によって規定されています。

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

三角形のカウント

各頂点を通過する三角形の数を計算します。

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()