I'm having some problems applying the mapReduceTriplets to my graph network in spark using graphx.

I've been following the tutorials and read in my own data which is put together as [Array[String],Int], so for example my vertices are:

`org.apache.spark.graphx.VertexRDD[Array[String]]`

e.g. (3999,Array(17, Low, 9))

And my edges are:

`org.apache.spark.graphx.EdgeRDD[Int]`

e.g. Edge(3999,4500,1)

I'm trying to apply an aggregate type function using mapReduceTriplets which counts how many of the last integer in the array of a vertices (in the above example 9) is the same or different to the first integer (in the above example 17) of all connected vertices.

So you would end up with a list of counts for the number of matches or non-matches.

The problem I am having is applying any function using mapReduceTriplets, I am quite new to scala so this may be really obvious, but in the graphx tutorials it has an example which is using a graph with the format Graph[Double, Int], however my graph is in the format of Graph[Array[String],Int], so i'm just trying as a first step to figure out how I can use my graph in the example and then work from there.

The example on the graphx website is as follows:

```
val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
Iterator((triplet.dstId, (1, triplet.srcAttr)))
} else {
// Don't send a message for this triplet
Iterator.empty
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
```

Any advice would be most appreciated, or if you think there is a better way than using mapreducetriplets I would be happy to hear it.

Edited new code

```
val nodes = (sc.textFile("C~nodeData.csv")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
val edges = GraphLoader.edgeListFile(sc, "C:~edges.txt")
val graph = edges.outerJoinVertices(nodes) {
case (uid, deg, Some(attrList)) => attrList
case (uid, deg, None) => Array.empty[String]
}
val countsRdd = graph.collectNeighbors(EdgeDirection.Either).leftOuterJoin(graph.vertices).map {
case (id, t) => {
val neighbors: Array[(VertexId, Array[String])] = t._1
val nodeAttr = (t._2)
neighbors.map(_._2).count( x => x.apply(x.size - 1) == nodeAttr(0))
}
}
```