Hadoop Graph Processing and Algorithms

hadoop graph processing example and directed acyclic graph in hadoop and hadoop graph visualization and hadoop graph theory, graph analysis using hadoop
CecilGalot Profile Pic
CecilGalot,United Kingdom,Teacher
Published Date:06-08-2017
Your Website URL(Optional)
Graph Processing on Hadoop In Chapter 3, we talked about tools for processing data with Hadoop, but there’s another class of data processing being done on Hadoop that we did not cover there: graph processing. We’ll discuss graph processing with Hadoop separately in this chapter, since it’s at the heart of many of the applications we use every day, and pro‐ vides significant opportunities for implementing specific types of applications. Some common uses of graph processing are ranking pages in search engines, finding new friends on social networks, determining the underlying equities in our investment funds, planning routes, and much more. What Is a Graph? Before we go into the tools for graph processing, let’s step back and first define what a graph is, in case you are new to this topic. Then we will go over what graph process‐ ing is and distinguish it from graph querying. Needless to say, a graph can mean multiple things. It can mean a line, pie, or bar graph in tools like Excel. One author’s second grader has to chart different-colored M&Ms on a bar graph, but that isn’t the type of graph we’re talking about in this chapter. The graphs we’re talking about here only contain two types of objects called vertices and edges. As you might have been taught in grade school, the points of a triangle are called ver‐ tices, and the lines that connect those points are edges, as shown in Figure 5-1. To provide some context, this is a good place to start. 159Figure 5-1. Edges and vertices Now let’s change the image a little and give each vertex some information. Let’s say each vertex represents a person, and we want it to have some information about that person like his or her name and type (which in this case would be Person). Now for the edges, let’s give them some information also describing the relationship of the two vertices they connect. We can use information like movies viewed, or relationships like brother, father, mother, and wife (see Figure 5-2). Figure 5-2. Adding information to vertices and edges But this isn’t enough information. We know that Karen is TJ’s mother, but TJ can’t also be Karen’s mother, and we know that Andrew watched Iron Man, but Iron Man is not watching Andrew. So, we can fix this problem by giving our edges directions, which gives us an image, or graph, that looks like Figure 5-3. 160 Chapter 5: Graph Processing on HadoopFigure 5-3. Edges showing directional relationships between vertices This is more like it. We can show this graph to even a nontechnical person and he or she will still be able to figure out what we’re trying to express. What Is Graph Processing? When we talk about graph processing, we are talking about doing processing at a global level that may touch every vertex in the graph. This is in contrast to the idea of graph querying. Graph querying is when you ask a question of the graph, like “Who’s connected to Karen?” This query will execute the following steps: 1. Look up Karen and her edges. 2. Follow each edge and get those vertices. 3. Return the resulting list to the user. Now, by graph processing, we mean something a little different that would be asking something like “What are the top five connections with a separation of five degrees?” This question is much larger in scale than the query and requires much more horse‐ power to process because it involves looking through a lot of people and all their con‐ nections. In contrast, the query focused on a single user could have been executed by a single client with a couple of hops to a data store such as HBase. These concepts manifest in many real-world examples: What Is Graph Processing? 161 • The Web can be seen as a very large graph, where web pages are the vertices, and links are the edges. This leads to a number of opportunities for analysis, includ‐ ing algorithms such as PageRank, famously used by Google in ranking search results. • As should be evident by the preceding discussion, social networks are a natural application of graph processing; for example, determining degrees of connection between users of a networking site. It’s these types of applications that this chapter will focus on; how do we ask these questions with Hadoop data in an effective way that is maintainable and performant? How Do You Process a Graph in a Distributed System? In order to perform this processing on a system like Hadoop, we can start with Map‐ Reduce. The problem with MapReduce is that it can only give us a one-layer join, which means we have to tackle a graph like peeling an onion. For those of you who don’t peel onions, it is very different from peeling an apple. An onion has many layers to get through before you reach the core. In addition, that property of onions that makes your eyes water and makes the peeling experience less than joyful is similar to how processing a graph with MapReduce might reduce you to tears at a certain point. The graph in Figure 5-4 is an example of what we mean by “like peeling an onion.” The center dot is our starting person, and every growing circle is yet another MapRe‐ duce job to figure out who is connected for each level. Figure 5-4. In MapReduce, tackling a graph is like peeling an onion This hurts even more when we realize that with every pass we are rereading and rewriting the whole graph to disk. 162 Chapter 5: Graph Processing on Hadoop Thankfully, once again some very smart people at Google decided to break the rules. In this case, it was the MapReduce rule that mappers are not allowed to talk to other mappers. This shared nothing concept is very important to a distributed system like Hadoop that needs sync points and strategies to recover from failure. So how did these very smart people solve this problem? Well, in short they found another way to get the same sync points and recovery strategies without the limitations of siloed mappers. The Bulk Synchronous Parallel Model So how do we maintain synchronous processing and still break the “no talking between mappers” rule? The answer was provided by a British computer scientist named Leslie Valiant of Harvard, who developed the Bulk Synchronous Parallel (BSP) model in the 1990s. This BSP model is at the core of the Google graph process‐ ing solution called Pregel. The idea of BSP is pretty complex, yet simple at the same time. In short, it is the idea of distributed processes doing work within a superstep. These distributed processes can send messages to each other, but they cannot act upon those messages until the next superstep. These supersteps will act as the boundaries for our needed sync points. We can only reach the next superstep when all distributed processes finish processing and sending message sending of the current superstep. There’s then nor‐ mally a single-threaded process that will decide if the overall process needs to con‐ tinue with a new superstep. It’s acceptable for this process to run in a single thread, since it does very little work in comparison to the worker threads and thus isn’t a bot‐ tleneck. BSP by Example That was admittedly a short definition of a distributed processing model that took years of research. We’ll help clarify with a short example of BSP. Scientists can use graph processing as a way to model the spread of a disease through a community. In this example, we illustrate this with zombies, who began as humans but changed after being bitten by another zombie. Let’s make a new graph and call it zombie bites. As you can see in Figure 5-5, in the start state we have one zombie and a bunch of people. The rule when the processing starts is that the zombie can bite every human it shares an edge with, and then when a vertex is bitten, it must turn itself into a zombie and continue by biting all of its edges. Once a zombie bites, it will not bite again because everyone around it will already have become a zombie, and we know from watching countless zombie movies that zombies never bite other zombies. Figure 5-5 shows what the graph looks like in the different supersteps of our BSP exe‐ cution model. How Do You Process a Graph in a Distributed System? 163Figure 5-5. Supersteps for the zombie bites graph We’ll be introducing two graph processing tools, Giraph and GraphX, in this chapter to show implementations of this example. But before we do so, it is important to note that BSP is not the only solution. As we are learning as we dig deeper into Spark, the penalty for the onion approach has been hugely reduced from the days of MapRe‐ duce. The penalty of the I/O writes and reads in between onion layers has been largely mitigated by Spark, at least in the cases where the data can fit in memory. But with that said, the BSP model is very different in that it only has to send the messages between the distributed processes, whereas the onion joining will have to resend everything. In the next two subsections we will dive into the two most popular graph processing frameworks for Hadoop today. First will be Giraph, which was born out of LinkedIn and used by Facebook as part of its graph search. Giraph is the more mature and sta‐ ble system, with the claim of handling up to a trillion edges. The second tool is the newer GraphX, which is part of the Apache Spark project. Spark GraphX gets a lot of its roots from GraphLab, an earlier open source graph processing project, and is an extension built on Spark’s generic DAG execution engine. Although still young and not as tuned and stable as Giraph, GraphX still 164 Chapter 5: Graph Processing on Hadoopholds a lot of promise because of its ease of use and integration with all other compo‐ nents of Spark. Giraph Giraph is an open source implementation of Google’s Pregel. From the ground up, Giraph is built for graph processing. This differs from Spark’s GraphX, which as noted, contains an implementation of the Pregel API built on the Spark DAG engine. To get a simple view of Giraph, let’s remove a lot of its details and focus on the three main stages of a Giraph program (see Figure 5-6): 1. Read and partition the data. 2. Batch-process the graph with BSP. 3. Write the graph back to disk. Figure 5-6.The three main stages of a Giraph program Giraph 165There are many other details to Giraph that we can’t cover here. Our intent is to pro‐ vide enough detail for you to decide which tools belong in your architecture. Let’s dig into these stages in more detail and look at the code we will have to imple‐ ment to customize these stages to our data and the zombie biting problem. Read and Partition the Data Just as MapReduce and Spark have input formats, Giraph has VertexInputFormat. In both cases, an input format takes care of providing the splits and the record or vertex reader. In our implementation we will stick with the default split logic and only over‐ ride the reader. So ourZombieTextVertexInputFormat is as simple as the following: public class ZombieTextVertexInputFormat extends TextVertexInputFormatLongWritable, Text, LongWritable Override public TextVertexReader createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException return new ZombieTextReader(); The next thing we need is a VertexReader. The main difference from a normal Map‐ Reduce RecordReader is that a RecordReader is returning a key and value Writable, whereas theVertexReader is returning aVertex object. So, what is aVertex object? It is made up of three parts: Vertex ID This is an ID that uniquely identifies a vertex in our graph. Vertex value This is an object that contains information about our vertex. In our example, it will store the state of our human or zombie and at which step he or she turned into a zombie. For simplicity we will use a string that looks like "Human" or "Zom bie.2" for a zombie that was bitten on the second superstep. Edge This is made up of two parts: the vertex ID of the source vertex and an object that can represent information about where the edge is pointing and/or what type of edge it is—for example, is the edge a relationship, a distance, or a weight? So now that we know what a vertex is, let’s see what a vertex looks like in our source file: vertexIdTypecomma-separated vertexId of "bitable" people 2Human4,6 166 Chapter 5: Graph Processing on Hadoop This is a vertex with the ID of 2 that is currently a human and is connected to vertices 4 and 6 with a directional edge. So, let’s look at the code that will take this line and turn it into aVertex object: public class ZombieTextReader extends TextVertexReader Override public boolean nextVertex() throws IOException, InterruptedException return getRecordReader().nextKeyValue(); Override public VertexLongWritable, Text, LongWritable getCurrentVertex() throws IOException, InterruptedException Text line = getRecordReader().getCurrentValue(); String majorParts = line.toString().split("\\"); LongWritable id = new LongWritable(Long.parseLong(majorParts0)); Text value = new Text(majorParts1); ArrayListEdgeLongWritable, LongWritable edgeIdList = new ArrayListEdgeLongWritable, LongWritable(); if (majorParts.length 2) String edgeIds = majorParts2.split(","); for (String edgeId: edgeIds) DefaultEdgeLongWritable, LongWritable edge = new DefaultEdgeLongWritable, LongWritable(); LongWritable longEdgeId = new LongWritable(Long.parseLong(edgeId)); edge.setTargetVertexId(longEdgeId); edge.setValue(longEdgeId); // dummy value edgeIdList.add(edge); VertexLongWritable, Text, LongWritable vertex = getConf().createVertex(); vertex.initialize(id, value, edgeIdList); return vertex; There’s a lot going on in this code, so let’s break it down: • Our VertexReader extends TextVertexReader so we are reading text files line- by-line. Note that we’d have to change our parent reader if we intend to read any other Hadoop file type. • nextVertex() is an interesting method. If you drill down into the parent class, you’ll see that it is using the normalRecordReader to try to read the next line and return if there is something left. Giraph 167 • The getCurrentVertex() method is where we parse the line and create and pop‐ ulate aVertex object. So as this method is firing, the resulting Vertex objects are being partitioned to the different distributed workers across the cluster. The default partitioning logic is a basic hash partition, but it can be modified. This is out of scope for this example, but just note you have control over the partitioning. If you can identify patterns that will force clumps of the graph to fewer distributed tasks, then the result may be less net‐ work usage and a corresponding reduction in speed. Once the data is loaded in memory (or disk with the new spill-to-disk functionality in Giraph), we can move to processing with BSP in the next sub-section. Before we move on, note that this is just an example of the VertexInputFormat. There are more advanced options in Giraph like reading in vertices and edges through different readers and advanced partitioning strategies, but that is out of scope for this book. Batch Process the Graph with BSP Of all the parts of Giraph, the BSP execution pattern is the hardest to understand for newcomers. To make it easier, let’s focus on three computation stages: vertex, master, and worker. We will go through the code for these three stages soon, but check out Figure 5-7 first. Figure 5-7.Three computation stages of the BSP execution pattern: vertex, master, and worker 168 Chapter 5: Graph Processing on Hadoop Hopefully, from the image you can see that each BSP pass will start with a master computation stage. Then it will follow with a worker computation stage on each dis‐ tributed JVM, followed by a vertex computation for every vertex in that JVM’s local memory or local disk. These vertex computations may process messages that will be sent to the receiving vertex, but the receiving vertices will not get those messages until the next BSP pass. Let’s start with the simplest of the computation stages, the master compute: public class ZombieMasterCompute extends DefaultMasterCompute Override public void compute() LongWritable zombies = getAggregatedValue("zombie.count"); System.out.println("Superstep "+String.valueOf(getSuperstep())+ " - zombies:" + zombies); System.out.println("Superstep "+String.valueOf(getSuperstep())+ " - getTotalNumEdges():" + getTotalNumEdges()); System.out.println("Superstep "+String.valueOf(getSuperstep())+ " - getTotalNumVertices():" + getTotalNumVertices()); Override public void initialize() throws InstantiationException, IllegalAccessException registerAggregator("zombie.count", LongSumAggregator.class); Let’s dig into the two methods in the ZombieMasterCompute class. First, we’ll look at the initialize() method. This is called before we really get started. The important thing we are doing here is registering anAggregator class. An Aggregator class is like an advanced counter in MapReduce but more like the accumulators in Spark. There are many aggregators to select from in Giraph, as shown in the following list, but there is nothing stopping you from creating your own custom one. Here are some examples of Giraphaggregators: • Sum • Avg • Max • Min • TextAppend Giraph 169 • BooleanAnd/Or The second method in the ZombieMasterCompute class is compute(), and this will fire at the start of every BSP. In this case we are just printing out some information that will help us debug our process. On to the next bit of code, which is the ZombieWorkerContext class for the worker computation stage. This is what will execute before and after the application and each superstep. It can be used for advanced purposes like putting aggregated values at the start of a superstep so that it is accessible to a vertex compute step. But, for this simple example, we are doing nothing more than using System.out.println() so that we can see when these different methods are being called during processing: public class ZombieWorkerContext extends WorkerContext Override public void preApplication() System.out.println("PreApplication of Zombies: " + getAggregatedValue("zombie.count")); Override public void postApplication() System.out.println("PostApplication of Zombies: " + getAggregatedValue("zombie.count")); Override public void preSuperstep() System.out.println("PreSuperstep of Zombies: " + getAggregatedValue("zombie.count")); Override public void postSuperstep() System.out.println("PostSuperstep of Zombies: " + getAggregatedValue("zombie.count")); Last and most complex is the vertex computation stage: public class ZombieComputation extends BasicComputationLongWritable,Text, LongWritable, LongWritable private static final Logger LOG = Logger.getLogger(ZombieComputation.class); Text zombieText = new Text("Zombie"); LongWritable longIncrement = new LongWritable(1); 170 Chapter 5: Graph Processing on HadoopOverride public void compute(VertexLongWritable, Text, LongWritable vertex, IterableLongWritable messages) throws IOException Context context = getContext(); long superstep = getSuperstep(); if (superstep == 0) if (vertex.getValue().toString().equals("Zombie")) zombieText.set("Zombie." + superstep); vertex.setValue(zombieText); LongWritable newMessage = new LongWritable(); newMessage.set(superstep+1); aggregate("zombie.count",longIncrement ); for (EdgeLongWritable, LongWritable edge : vertex.getEdges()) this.sendMessage(edge.getTargetVertexId(), newMessage); else if (vertex.getValue().toString().equals("Human")) IteratorLongWritable it = messages.iterator(); if (it.hasNext()) zombieText.set("Zombie." + superstep); vertex.setValue(zombieText); aggregate("zombie.count",longIncrement ); LongWritable newMessage = new LongWritable(); newMessage.set(superstep+1); for (EdgeLongWritable, LongWritable edge : vertex.getEdges()) this.sendMessage(edge.getTargetVertexId(), newMessage); else vertex.voteToHalt(); else vertex.voteToHalt(); There is a lot of logic in this code, which we’ll drill down into in a minute, but first let’s clarify how the method is being called. The compute() method is called for each Giraph 171vertex and is given an iterator of all the messages sent to that vertex by the end of the last superstep. The logic goes like this: • If it is the first superstep and I’m a zombie, bite everyone around me. • If it is after the first superstep and I’m a human that receives a bite message, turn myself into a zombie and bite everyone next to me. • If I’m a zombie that gets bitten, do nothing. Also note that we vote to halt in two places: if we are a zombie that gets bitten, or if we are a human that doesn’t get bitten. We do this because these are the two possible end conditions: • We could have everyone bitten and turned into a zombie, and at that point we need to stop processing. • We could have some zombies but also some humans that don’t have direct edges to them. So those humans will never become zombies. Write the Graph Back to Disk Now that we have infected our graph with zombies, it is time to write the results back to disk. We do that with the VertexOutputFormat. We won’t go into detail here, but just notice it is the opposite of the input format: public class ZombieTextVertexOutputFormat extends TextVertexOutputFormatLongWritable, Text, LongWritable Override public TextVertexWriter createVertexWriter(TaskAttemptContext context) throws IOException, InterruptedException return new ZombieRecordTextWriter(); public class ZombieRecordTextWriter extends TextVertexWriter Text newKey = new Text(); Text newValue = new Text(); public void writeVertex(VertexLongWritable, Text, LongWritable vertex) throws IOException, InterruptedException IterableEdgeLongWritable, LongWritable edges = vertex.getEdges(); StringBuilder strBuilder = new StringBuilder(); boolean isFirst = true; for (EdgeLongWritable, LongWritable edge : edges) if (isFirst) 172 Chapter 5: Graph Processing on HadoopisFirst = false; else strBuilder.append(","); strBuilder.append(edge.getValue()); newKey.set(vertex.getId().get() + "" + vertex.getValue() + "" + strBuilder.toString()); getRecordWriter().write(newKey, newValue); Putting It All Together Now, just as with MapReduce, we need to set everything up and configure it in the main method. Here’s the code for that: public class ZombieBiteJob implements Tool private static final Logger LOG = Logger.getLogger(ZombieBiteJob.class); private Configuration conf; Override public void setConf(Configuration conf) this.conf = conf; Override public Configuration getConf() return conf; Override public int run(String args) throws Exception if (args.length = 3) throw new IllegalArgumentException( "Syntax error: Must have 3 arguments " + " numbersOfWorkers inputLocaiton outputLocation"); int numberOfWorkers = Integer.parseInt(args0); String inputLocation = args1; String outputLocation = args2; GiraphJob job = new GiraphJob(getConf(), getClass().getName()); GiraphConfiguration gconf = job.getConfiguration(); gconf.setWorkerConfiguration(numberOfWorkers, numberOfWorkers, 100.0f); GiraphFileInputFormat.addVertexInputPath(gconf, new Path(inputLocation)); Giraph 173FileOutputFormat.setOutputPath(job.getInternalJob(), new Path(outputLocation)); gconf.setComputationClass(ZombieComputation.class); gconf.setMasterComputeClass(ZombieMasterCompute.class); gconf.setVertexInputFormatClass(ZombieTextVertexInputFormat.class); gconf.setVertexOutputFormatClass(ZombieTextVertexOutputFormat.class); gconf.setWorkerContextClass(ZombieWorkerContext.class); boolean verbose = true; if (job.run(verbose)) return 0; else return -1; public static void main(String args) throws Exception int ret = ToolRunner.run(new ZombieBiteJob(), args); if (ret == 0) System.out.println("Ended Good"); else System.out.println("Ended with Failure"); System.exit(ret); When Should You Use Giraph? Giraph is very powerful, but as you can see from the concepts and the code required, it’s not for the faint of heart. If your use cases are graphing, you have hard service- level agreements (SLAs), and you need a mature solution, Giraph may be your tool. However, note that not all of the major Hadoop vendors are currently supporting Giraph in their distribution. That does not mean it won’t work with your chosen dis‐ tribution, but you may need to talk with your vendor to see if you can get support. At the very least you will likely have to build your own Giraph JARs for your distribution. GraphX We talked a lot about Spark in the last chapter for ETL as a replacement for MapRe‐ duce, but how did Spark get to be a contender in the graph processing world? Well, consider that Spark’s GraphX removed the two main issues with MapReduce that Pre‐ gel and Giraph were designed to address: speed for iterative processing and data that’s as close to in-memory as possible. Spark’s GraphX also provides another advantage as a graph processing framework, which is that graphs are just another type of RDD. 174 Chapter 5: Graph Processing on Hadoop This means that GraphX is a familiar programming model for developers who already know Spark. To be fair, graph processing on Spark is still new, and we will see that reflected in our code example, which you’ll note is imple‐ mented in Scala. This is because the Java API wasn’t available at the time of this writing. Just Another RDD There are a couple of root RDDs in Spark. The most common are RDD and PairRDD. As you might guess, to get to graph processing from normal Spark we just need to make an EdgeRDD and a VertexRDD, which are simply extensions of the normal RDD object. We can get from any RDD to an EdgeRDD or VertexRDD with a simple map trans‐ formation function—no magic required. Then the last step from normal Spark to GraphX is putting these two new RDDs in a Graph object. The Graph object just contains a reference to these two RDDs and provides methods for doing graph pro‐ cessing on them. As easy as it is to get into graph mode with GraphX, it’s also easy to get out of graph mode. At any time we can grab the VertexRDD and EdgeRDD back out of our Graph object and start doing normal RDD functions on them. There’s really no easier way to explain this than just jumping into the code. The fol‐ lowing code gets a Spark context and creates the two RDDs we need to get a graph: val sc = new SparkContext(args(0), args(1), args(2), Seq("GraphXExample.jar")) // Create an RDD for the vertices val users: RDD(VertexId, (String)) = sc.parallelize(Array( (1L, ("Human")), (2L, ("Human")), (3L, ("Human")), (4L, ("Human")), (5L, ("Human")), (6L, ("Zombie")), (7L, ("Human")), (8L, ("Human")), (9L, ("Human")), (10L, ("Human")), (11L, ("Human")), (12L, ("Human")), (13L, ("Human")), (14L, ("Human")), GraphX 175(15L, ("Human")), (16L, ("Zombie")), (17L, ("Human")), (18L, ("Human")), (19L, ("Human")), (20L, ("Human")), (21L, ("Human")), (22L, ("Human")), (23L, ("Human")), (24L, ("Human")), (25L, ("Human")) )) // Create an RDD for edges val relationships: RDDEdgeString = sc.parallelize(Array( Edge(10L, 9L, "X"), Edge(10L, 8L, "X"), Edge(8L, 7L, "X"), Edge(8L, 5L, "X"), Edge(5L, 4L, "X"), Edge(5L, 3L, "X"), Edge(5L, 2L, "X"), Edge(9L, 6L, "X"), Edge(6L, 1L, "X"), Edge(20L, 11L, "X"), Edge(20L, 12L, "X"), Edge(20L, 13L, "X"), Edge(5L, 13L, "X"), Edge(20L, 14L, "X"), Edge(11L, 15L, "X"), Edge(20L, 16L, "X"), Edge(14L, 17L, "X"), Edge(1L, 17L, "X"), Edge(20L, 18L, "X"), Edge(21L, 18L, "X"), Edge(21L, 22L, "X"), Edge(4L, 23L, "X"), Edge(25L, 15L, "X"), Edge(24L, 3L, "X"), Edge(21L, 19L, "X") )) // Define a default user in case there are relationship with missing user val defaultUser = ("Rock") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) graph.triangleCount Again, there’s no magic here. The Spark context methodparallelize() would be the same method we would use to create normal RDDs. Note that we’re populating the 176 Chapter 5: Graph Processing on Hadoop sample data in the code to make the example easier to understand, but we could have loaded this data from disk just as easily. Before going on to the processing code, let’s take a minute to talk about the defaul tUser variable. In the GraphX world we get an option for a catch-all vertex called default. So, if a message were sent to a nonexisting vertex, it would go to the default. In our example, if a zombie tries to bite someone who does not exist, our unfortunate zombie will trip and bite a rock. But don’t worry—he’s already dead, so it won’t hurt. GraphX Pregel Interface Before we get into the code we need to discuss two considerations. First, the GraphX Pregel API is not like Giraph. Second, the code is in Scala, making it very compact, so we are going to have to take some time breaking it down. With that being said, how‐ ever, it is important to note that the 300-plus lines we wrote in Giraph are now replaced with about 20 lines in Scala with GraphX: //All the biting logic is right here val graphBites = graph.pregel(0L)( (id, dist, message) = if (dist.equals("Zombie")) (dist + "_" + message) else if (message = 0) "Zombie" + "_" + message else dist + "" + message , triplet = if (triplet.srcAttr.startsWith("Zombie") && triplet.dstAttr.startsWith("Human")) var stringBitStep = triplet.srcAttr.substring(triplet.srcAttr.indexOf("_") + 1) var lastBitStep = stringBitStep.toLong Iterator((triplet.dstId, lastBitStep + 1)) else if (triplet.srcAttr.startsWith("Human") && triplet.dstAttr.startsWith("Zombie")) var stringBitStep = triplet.dstAttr.substring(triplet.dstAttr.indexOf("_") + 1) var lastBitStep = stringBitStep.toLong Iterator((triplet.srcId, lastBitStep + 1)) else Iterator.empty , (a, b) = math.min(b, a)) graphBites.vertices.take(30) GraphX 177 If you can untangle that code with no help, then you probably already know more than the authors. Let’s start breaking it down by first defining the graph.pregel() method. graph.pregel() has two sets of parameters: the first set is values, and the second set is functions that will be executed in parallel. Of the first set of parameters, we used only the first one, but here are the definitions: • First message • Max iterations (we used the default value and didn’t use this parameter) • Edge direction (we used the default value and didn’t use this parameter) So the first message we send is 0. Note that unlike with Giraph, we won’t have access to a superstep number, so for our example we will keep track of the superstep in the message. This isn’t the only way to do a superstep; we’ll discuss more on this shortly. We didn’t set max iterations, because GraphX will stop when no more messages are sent and we want to go until we reach that point. We also didn’t set edge direction because the default was fine for our processing. The second set of parameters is a set of methods for processing: vprog() This is a user-defined vertex program. In other words, it specifies what to do when a message reaches a vertex. sendMessage() This contains logic for a vertex that may or may not want to send a message. mergeMessage() This is a function that determines how messages going to the same vertex will be merged. This helps in cases where millions of vertices send a message to the same vertex. vprog() Now, let’s dig into each of these methods in our GraphX code, starting with the vprog() method: (id, dist, message) = if (dist.equals("Zombie")) (dist + "_" + message) else if (message = 0) "Zombie" + "_" + message else dist + "" + message 178 Chapter 5: Graph Processing on Hadoop

Advise: Why You Wasting Money in Costly SEO Tools, Use World's Best Free SEO Tool Ubersuggest.