Here are our steps to run GraphX:
1. Get the Spark from the official site. We are using the Spark 0.9.1.
2. Configure the Spark cluster The configuration step is very simple. The slave file is the same as the Hadoop's conf/slaves. The conf/spark-env.sh is as below.
SPARK_WORKER_DIR=/scratch/jianfenj/spark SPARK_MASTER_IP=sensorium-1The other part is left empty to use the all the resources by default. Start the spark cluster use the sbin/start-all.sh
3. Write the GraphX code All the code actually reuses the existed GraphX functions. It can be found here. For example, the pagerank task will load the graph and call the existed graph.staticPageRank function.
def loadWebmap[VD: ClassTag, ED: ClassTag](sc: SparkContext, path: String, defaultEdgeAttr: ED, defaultVetexAttr: VD): Graph[VD, ED] = { val startTime = System.currentTimeMillis var textRDD = sc.textFile(path); var edge = textRDD.flatMap( line => { var numbers = line.split(" "); var srcId: VertexId = numbers(0).trim.toLong; numbers.slice(2, numbers.size).map(num => Edge(srcId, num.trim.toLong, defaultEdgeAttr)) }) Graph.fromEdges[VD, ED](edge, defaultVetexAttr); } def pageRank(sc: SparkContext, inputPath: String, maxIterations: Int): Graph[Double, Double] = { var graph = loadWebmap(sc, inputPath, 1.0, 1.0) graph.staticPageRank(maxIterations) }
4. Compile We use SBT to build and run the scala code. SBT version 0.13.1. Scala Version 2.10.4. The build config file is here
5. Run We still use SBT to run the jar file. The script is here Take the pagerank task for example, the cmd will be like this:
fullcmd="$sbt_path/sbt 'run $sparkserver jar $jar -c $core -m $mem cmd PageRank $input ${output_folder}' " eval $fullcmd 2>&1 | tee $logfile core=$(( $slaves * 4 )) # each machine have 4 cores. mem="6800m" # for one worker!These arguments will be process by SparkContext inside the Driver.scala:
conf.setJars(jars) conf.set("spark.executor.memory", memory) # == $mem conf.set("spark.cores.max", cores.toString) # == $core