Въведение в обработката на графики Spark с GraphFrames

1. Въведение

Обработката на графики е полезна за много приложения от социални мрежи до реклами. Вътре в сценарий за големи данни се нуждаем от инструмент за разпределяне на това натоварване от обработката.

В този урок ще заредим и изследваме възможностите за графики, използвайки Apache Spark в Java. За да избегнем сложни структури, ще използваме лесен и висококачествен Apache Spark графичен API: GraphFrames API.

2. Графики

Първо, нека дефинираме графика и нейните компоненти. Графиката е структура от данни с ръбове и върхове. В краищата носят информация , която представлява взаимоотношения между върховете.

Върховете са точки в n -мерно пространство и ръбовете свързват върховете според техните взаимоотношения:

На изображението по-горе имаме пример за социална мрежа. Можем да видим върховете, представени с букви, и ръбовете, носещи вида на връзката между върховете.

3. Настройка на Maven

Сега, нека започнем проекта, като настроим конфигурацията на Maven.

Нека добавим spark-graphx 2.11, graphframes и spark-sql 2.11 :

 org.apache.spark spark-graphx_2.11 2.4.4   graphframes graphframes 0.7.0-spark2.4-s_2.11   org.apache.spark spark-sql_2.11 2.4.4 

Тези версии на артефакти поддържат Scala 2.11.

Също така се случва GraphFrames да не е в Maven Central. И така, нека добавим и необходимото хранилище на Maven:

  SparkPackagesRepo //dl.bintray.com/spark-packages/maven  

4. Конфигурация на искри

За да работим с GraphFrames, ще трябва да изтеглим Hadoop и да дефинираме променливата на средата HADOOP_HOME .

В случай на Windows като операционна система, ние също ще изтеглим съответния winutils.exe в папката HADOOP_HOME / bin .

След това нека започнем нашия код, като създадем основната конфигурация:

SparkConf sparkConf = new SparkConf() .setAppName("SparkGraphFrames") .setMaster("local[*]"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

Също така ще трябва да създадем SparkSession :

SparkSession session = SparkSession.builder() .appName("SparkGraphFrameSample") .config("spark.sql.warehouse.dir", "/file:C:/temp") .sparkContext(javaSparkContext.sc()) .master("local[*]") .getOrCreate();

5. Изграждане на графика

Сега всички сме готови да започнем с нашия основен код. И така, нека дефинираме обектите за нашите върхове и ребра и създадем екземпляр GraphFrame .

Ще работим върху връзките между потребители от хипотетична социална мрежа.

5.1. Данни

Първо, за този пример, нека дефинираме двата обекта като Потребител и Връзка :

public class User { private Long id; private String name; // constructor, getters and setters } public class Relationship implements Serializable { private String type; private String src; private String dst; private UUID id; public Relationship(String type, String src, String dst) { this.type = type; this.src = src; this.dst = dst; this.id = UUID.randomUUID(); } // getters and setters }

След това нека дефинираме някои потребителски и връзки :

List users = new ArrayList(); users.add(new User(1L, "John")); users.add(new User(2L, "Martin")); users.add(new User(3L, "Peter")); users.add(new User(4L, "Alicia")); List relationships = new ArrayList(); relationships.add(new Relationship("Friend", "1", "2")); relationships.add(new Relationship("Following", "1", "4")); relationships.add(new Relationship("Friend", "2", "4")); relationships.add(new Relationship("Relative", "3", "1")); relationships.add(new Relationship("Relative", "3", "4"));

5.2. GraphFrame съд

Сега, за да създадем и манипулираме нашата графика на връзките, ще създадем екземпляр на GraphFrame . Конструкторът GraphFrame очаква два екземпляра от набор от данни , първият представляващ върховете, а вторият - ръбовете:

Dataset userDataset = session.createDataFrame(users, User.class); Dataset relationshipDataset = session.createDataFrame(relationships, Relation.class); GraphFrame graph = new GraphFrame(userDataframe, relationshipDataframe);

Най-накрая ще регистрираме върховете и ръбовете си в конзолата, за да видим как изглежда:

graph.vertices().show(); graph.edges().show();
+---+------+ | id| name| +---+------+ | 1| John| | 2|Martin| | 3| Peter| | 4|Alicia| +---+------+ +---+--------------------+---+---------+ |dst| id|src| type| +---+--------------------+---+---------+ | 2|622da83f-fb18-484...| 1| Friend| | 4|c6dde409-c89d-490...| 1|Following| | 4|360d06e1-4e9b-4ec...| 2| Friend| | 1|de5e738e-c958-4e0...| 3| Relative| | 4|d96b045a-6320-4a6...| 3| Relative| +---+--------------------+---+---------+

6. Графични оператори

Сега, когато имаме екземпляр GraphFrame , нека видим какво можем да направим с него.

6.1. Филтър

GraphFrames ни позволява да филтрираме ръбове и върхове чрез заявка.

След това нека да филтрираме върховете по свойството name на User :

graph.vertices().filter("name = 'Martin'").show();

На конзолата можем да видим резултата:

+---+------+ | id| name| +---+------+ | 2|Martin| +---+------+

Също така можем директно да филтрираме върху графиката, като извикаме filterEdges или filterVertices :

graph.filterEdges("type = 'Friend'") .dropIsolatedVertices().vertices().show();

Сега, след като филтрирахме ръбовете, може да имаме все още изолирани върхове. Така че, ще извикаме dropIsolatedVertices ().

В резултат на това имаме подграф, все още екземпляр на GraphFrame , само с връзките, които имат статус „Приятел“:

+---+------+ | id| name| +---+------+ | 1| John| | 2|Martin| | 4|Alicia| +---+------+

6.2. Градуси

Друг интересен набор от функции е степенният набор от операции. Тези операции връщат броя на ръбовете, падащи на всеки връх.

The degrees operation just returns the count of all edges of each vertex. On the other hand, inDegrees counts only incoming edges, and outDegrees counts only outgoing edges.

Let's count the incoming degrees of all vertices in our graph:

graph.inDegrees().show();

As a result, we have a GraphFrame that shows the number of incoming edges to each vertex, excluding those with none:

+---+--------+ | id|inDegree| +---+--------+ | 1| 1| | 4| 3| | 2| 1| +---+--------+

7. Graph Algorithms

GraphFrames also provides popular algorithms ready to use — let's take a look at some of them.

7.1. Page Rank

The Page Rank algorithm weighs the incoming edges to a vertex and transforms it into a score.

The idea is that each incoming edge represents an endorsement and makes the vertex more relevant in the given graph.

For example, in a social network, if a person is followed by various people, he or she will be ranked highly.

Running the page rank algorithm is quite straightforward:

graph.pageRank() .maxIter(20) .resetProbability(0.15) .run() .vertices() .show();

To configure this algorithm, we just need to provide:

  • maxIter – the number of iterations of page rank to run – 20 is recommended, too few will decrease the quality, and too many will degrade the performance
  • resetProbability – the random reset probability (alpha) – the lower it is, the bigger the score spread between the winners and losers will be – valid ranges are from 0 to 1. Usually, 0.15 is a good score

The response is a similar GraphFrame, though this time we see an additional column giving the page rank of each vertex:

+---+------+------------------+ | id| name| pagerank| +---+------+------------------+ | 4|Alicia|1.9393230468864597| | 3| Peter|0.4848822786454427| | 1| John|0.7272991738542318| | 2|Martin| 0.848495500613866| +---+------+------------------+

In our graph, Alicia is the most relevant vertex, followed by Martin and John.

7.2. Connected Components

The connected components algorithm finds isolated clusters or isolated sub-graphs. These clusters are sets of connected vertices in a graph where each vertex is reachable from any other vertex in the same set.

We can call the algorithm without any parameters via the connectedComponents() method:

graph.connectedComponents().run().show();

The algorithm returns a GraphFrame containing each vertex and the component to which each is connected:

+---+------+------------+ | id| name| component| +---+------+------------+ | 1| John|154618822656| | 2|Martin|154618822656| | 3| Peter|154618822656| | 4|Alicia|154618822656| +---+------+------------+

Our graph has only one component — this means that we do not have isolated sub-graphs. The component has an auto-generated id, which is 154618822656, in our case.

Although we have one more column here – the component id – our graph is still the same.

7.3. Triangle Counting

Triangle counting is commonly used as community detection and counting in a social network graph. A triangle is a set of three vertices, where each vertex has a relationship to the other two vertices in the triangle.

In a social network community, it's easy to find a considerable number of triangles connected to each other.

We can easily perform a triangle counting directly from our GraphFrame instance:

graph.triangleCount().run().show();

The algorithm also returns a GraphFrame with the number of triangles passing through each vertex.

+-----+---+------+ |count| id| name| +-----+---+------+ | 1| 3| Peter| | 2| 1| John| | 2| 4|Alicia| | 1| 2|Martin| +-----+---+------+

8. Conclusion

Apache Spark is a great tool for computing a relevant amount of data in an optimized and distributed way. And, the GraphFrames library allows us to easily distribute graph operations over Spark.

Както винаги, пълният изходен код за примера е достъпен в GitHub.