Menu
  • HOME
  • TAGS

Error calling `JValue.extract` from distributed operations in spark-shell

Tag: apache-spark,json4s

I am trying to use the case class extraction feature of json4s in Spark, ie calling jvalue.extract[MyCaseClass]. It works fine if I bring the JValue objects into the master and do the extraction there, but the same calls fail in the workers:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}

val sqx = sqlContext

val data = sc.textFile(inpath).coalesce(2000)

case class PageView(
 client:  Option[String]
)

def extract(json: JValue) = {
  implicit def formats = org.json4s.DefaultFormats
  Try(json.extract[PageView]).toOption
}

val json = data.map(parse(_)).sample(false, 1e-6).cache()

// count initial inputs
val raw = json.count 


// count successful extractions locally -- same value as above
val loc = json.toLocalIterator.flatMap(extract).size

// distributed count -- always zero
val dist = json.flatMap(extract).count // always returns zero

// this throws  "org.json4s.package$MappingException: Parsed JSON values do not match with class constructor"
json.map(x => {implicit def formats = org.json4s.DefaultFormats; x.extract[PageView]}).count

The implicit for Formats is defined locally in the extract function since DefaultFormats is not serializable and defining it at top level caused it to be serialized to for transmission to the workers rather than constructed there. I think the proble still has something to do with the remote initialization of DefaultFormats, but I am not sure what it is.

When I call the extract method directly, insted of my extract function, like in the last example, it no longer complains about serialization but just throws an error that the JSON does not match the expected structure.

How can I get the extraction to work when distributed to the workers?


Edit

@WesleyMiao has reproduced the problem and found that it is specific to spark-shell. He reports that this code works as a standalone application.

Best How To :

I got the same exception as yours when running your code in spark-shell. However when I turn your code into a real spark app and submit it to a standalone spark cluster, I got expected results with no exception.

Below is the code I put in a simple spark app.

val data = sc.parallelize(Seq("""{"client":"Michael"}""", """{"client":"Wesley"}"""))

val json = data.map(parse(_))

val dist = json.mapPartitions { jsons =>
  implicit val formats = org.json4s.DefaultFormats
  jsons.map(_.extract[PageView])
}

dist.collect() foreach println

And when I run it using spark-submit, I got the following result.

PageView(Some(Michael))                                                                                                                                       
PageView(Some(Wesley))

And I am also sure that it is running not in "local[*]" mode.

Now I suspect the reason we got exceptions while running in spark-shell has something to do with the case class PageView definition in spark-shell and how spark-shell serialize / distribute it to executor.

Join files using Apache Spark / Spark SQL

java,apache-spark,apache-spark-sql

If you use plain spark you can join two RDDs. let a = RDD<Tuple2<K,T>> let b = RDD<Tuple2<K,S>> RDD<Tuple2<K,Tuple2<S,T>>> c = a.join(b) This produces an RDD of every pair for key K. There are also leftOuterJoin, rightOuterJoin, and fullOuterJoin methods on RDD. So you have to map both datasets to...

Spark-submit class not found exception

scala,apache-spark

The thing is that we have to submit the class file to the spark cluster whom we want to execute or will take use as a supporting file, so follow these steps - Create a jar file of this class -> In eclipse you can export this class as a...

Spark saving RDD[(Int, Array[Double])] to text file got strange result

apache-spark,mllib

The behavior of saveAsTextFile is to use the toString method. So, for an Array, this is merely the hashcode. You have two options if you stick with saveAsTextFile: .mapValues(x=>/*TURN ARRAY DATA INTO A STRING*/).saveAsTextFile... or you can use map to wrap the data in a custom object with a custom...

Pyspark: using filter for feature selection

python,apache-spark,pyspark

Sounds like you need to filter columns, but not records. Fo doing this you need to use Spark's map function - to transform every row of your array represented as an RDD. See in my example: # generate 13 x 10 array and creates rdd with 13 records, each record...

Spark streaming transform function

java,apache-spark,spark-streaming

Thanks to @GaborBakos for providing the answer... The following seems to work! Had to use transformtoPair, instead of transform JavaPairDStream<Long,Integer> sortedtsStream = tsStream.transformToPair( new Function<JavaPairRDD<Long, Integer>, JavaPairRDD<Long,Integer>>() { @Override public JavaPairRDD<Long, Integer> call(JavaPairRDD<Long, Integer> longIntegerJavaPairRDD) throws Exception { return longIntegerJavaPairRDD.sortByKey(true); } }); ...

In Spark, does the filter function turn the data into tuples?

mapreduce,apache-spark,cloud

No, all filter does is take a predicate function and uses it such that any of the datapoints in the set that return a false when passed through that predicate, then they are not passed back out to the resultant set. So, the data remians the same: filesLines //RDD[String] (lines...

How to extract application ID from the PySpark context

apache-spark,yarn,pyspark

You could use Java SparkContext object through the Py4J RPC gateway: >>> sc._jsc.sc().applicationId() u'application_1433865536131_34483' Please note that sc._jsc is internal variable and not the part of public API - so there is (rather small) chance that it may be changed in the future. I'll submit pull request to add public...

how to parse a custom log file in scala to extract some key value pairs using patterns

java,regex,scala,apache-spark

The case class expects a variety of things like IP address that your log obviously doesn't have, therefore you would need to modify the case class definition to include just the fields that you want to add. Just to illustrate here, let's make the case class like so: case class...

PySpark No suitable driver found for jdbc:mysql://dbhost

apache-spark,apache-spark-sql,pyspark

This is a bug related the the classloader. This is the ticket for it: https://issues.apache.org/jira/browse/SPARK-8463 and this is the pull request for it: https://github.com/apache/spark/pull/6900. A workaround is to copy mysql-connector-java-5.1.35-bin.jar to every machine at the same location as it is on the driver....

Is the DStream return by updateStateByKey function only contains one RDD?

apache-spark,spark-streaming,apache-spark-sql,pyspark

Yes, the DStream return by updateStateByKey only hava one RDD

How to un-nest a spark rdd that has the following type ((String, scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]]))

scala,cassandra,apache-spark

Fairly straightforward using a for-comprehension and some pattern matching to destructure things: val in = List((5, Map ( "ABCD" -> Map("3200" -> 3, "3350.800" -> 4, "200.300" -> 3))), (1, Map ("DEF" -> Map("1200" -> 32, "1320.800" -> 4, "2100" -> 3)))) case class Thing(a:Int, b:String, c:String, d:Int) for {...

Shuffled vs non-shuffled coalesce in Apache Spark

scala,apache-spark,bigdata,distributed-computing

shuffle=true and shuffle=false aren't going to have any practical differences in the resulting output since they are both going down to a single partition. However, when you set it to true you will do a shuffle which isn't of any use. With shuffle=true the output is evenly distributed amongst the...

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver

cassandra,apache-spark,apache-spark-sql,spark-jobserver,spark-cassandra-connector

@vicg, first you need spark.cassandra.connection.host -- periods not dashes. Also note in the error how the IP is "127.0.1.1", not the one in the config. You can also pass the IP when you create a context, like: curl -X POST 'localhost:8090/contexts/my-context?spark.cassandra.connection.host=127.0.0.1' If the above don't work, try the following PR:...

Pyspark StructType is not defined

python,apache-spark,pyspark

Did you import StructType? If not from pyspark.sql.types import StructType should solve the problem....

reduceByKey with two columns in Spark

python,apache-spark,reduce,pyspark

As far I understand you goal is to count (column1,input.column2) pairs and your input looks more or less like this: from numpy.random import randint, seed from pyspark.sql import Row seed(323) rdd = sc.parallelize( Row(column1=randint(0, 5), column2=randint(0, 5)) for _ in range(1000)) rdd.take(3) Result: [Row(column1=0, column2=0), Row(column1=4, column2=4), Row(column1=3, column2=2)] First...

In sbt, how can we specify the version of hadoop on which spark depends?

apache-spark,sbt

You can exclude the dependency from Spark to hadoop, and add an explicit one with the version you need, something along those lines: libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", "com.datastax.cassandra" % "cassandra-driver-mapping" % "2.1.5", "com.datastax.spark" % "spark-cassandra-connector" %% "1.2.1", "org.apache.spark" % "spark-sql_2.10" % "1.2.1" excludeAll( ExclusionRule("org.apache.hadoop") ), "org.apache.hadoop"...

Install SparkR that comes with Spark 1.4

r,apache-spark,sparkr

@DavidArenburg put me on the right track. Following the Windows documentation in the C:\spark-1.4.0\R\WINDOWS.md, I installed RTools and added R.exe and RTools to my computers PATH. Then, I ran install-dev.bat in C:\spark-1.4.0\R This added the lib\SparkR\ installation that I was missing. Then, from the command prompt, I ran mklink /D...

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.CanSetDropBehind issue in ecllipse

maven,hadoop,apache-spark,word-count

When you run in eclipse, the referenced jars are the only source for your program to run. So the jar hadoop-core(thats where CanSetDropBehind is present), is not added properly in your eclipse from local repository for some reasons. You need to identify this if it is a proxy issue, or...

Retrieving TriangleCount

scala,apache-spark,spark-graphx

triangleCount counts number of triangles per vertex and returns Graph[Int,Int], so you have to extract vertices: scala> graph.triangleCount().vertices.collect() res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,1), (3,1), (2,1)) ...

Call Distinct on 'pyspark.resultiterable.ResultIterable'

python,apache-spark,pyspark

Straightforward approach is to use sets: from numpy.random import choice, seed seed(323) keys = (4, 1, 5, 2) hosts = [ u'in24.inetnebr.com', u'ix-esc-ca2-07.ix.netcom.com', u'uplherc.upl.com', u'slppp6.intermind.net', u'piweba4y.prodigy.com' ] pairs = sc.parallelize(zip(choice(keys, 20), choice(hosts, 20))).groupByKey() pairs.map(lambda (k, v): (k, set(v))).take(3) Result: [(1, {u'ix-esc-ca2-07.ix.netcom.com', u'slppp6.intermind.net'}), (2, {u'in24.inetnebr.com', u'ix-esc-ca2-07.ix.netcom.com', u'slppp6.intermind.net', u'uplherc.upl.com'}),...

OutofMemoryErrory creating fat jar with sbt assembly

jar,cassandra,apache-spark,sbt

I was including spark as an unmanaged dependency (putting the jar file in the lib folder) which used a lot of memory because it is a huge jar. Instead, I made a build.sbt file which included spark as a provided, unmanaged dependency. Secondly, I created the environment variable JAVA_OPTS with...

spark-mllib: Error “reassignment to val” in source code

apache-spark,mllib

I did run example from the spark website witch is pretty much identical. It does work even though Intellij shows the same errors. It looks like some bug in IDEA scala plugin. Your project should compile and work normally.

Subtract an RDD from another RDD dosen't work correctly

scala,apache-spark,spark-graphx

Performing set operations like subtract with mutable types (Array in this example) is usually unsupported, or at least not recommended. Try using a immutable type instead. I believe WrappedArray is the relevant container for storing arrays in sets, but i'm not sure....

Removing duplicates from Spark RDDPair values

python,apache-spark,pyspark

I'd do something like this : streetsGroupedByZipCode.map(x => (x._1, x._2.groupBy(_._2).map(_._2.head))) distinct on a tuple doesn't work as you said, so group your list by tuple, and get only the first element at the end. val data = Seq((1, Seq((1, 1), (2, 2), (2, 2))), (10, Seq((1, 1), (1, 1), (3,...

Convert RDD[Map[String,Double]] to RDD[(String,Double)]

scala,apache-spark,rdd

You can call flatMap with the identity function to 'flatten' the structure of your RDD. rdd.flatMap(identity) ...

Access key from mapValues or flatMapValues?

scala,apache-spark

In this case you can use mapPartitions with the preservesPartitioning attribute. x.map((it => it.map { case (k,rr) => (k, someFun(rr, k)) }), preservesPartitioning = true) You just have to make sure you are not changing the partitioning, i.e. don't change the key....

Connecting from Spark/pyspark to PostgreSQL

postgresql,jdbc,jar,apache-spark,pyspark

Remove spark-defaults.conf and add the SPARK_CLASSPATH to the system environment in python like this: os.environ["SPARK_CLASSPATH"] = 'PATH\\TO\\postgresql-9.3-1101.jdbc41.jar' ...

SparkR and Packages

r,apache-spark,sparkr

So it looks like by setting SPARKR_SUBMIT_ARGS you are overriding the default value, which is sparkr-shell. You could probably do the same thing and just append sparkr-shell to the end of your SPARKR_SUBMIT_ARGS. This is seems unnecessarily complex compared to depending on jars so I've created a JIRA to track...

Count of second dimension in two dimension data in Spark

apache-spark

If you just want the count for each data then you could just use countByValue API. val data = Array(("apple","laptop"),("apple","laptop"),("dell","laptop"), ("apple","ipad")) val rdd = sc.parallelize(data) scala> rdd.countByValue res0: scala.collection.Map[(String, String),Long] = Map((apple,laptop) -> 2, (apple,ipad) -> 1, (dell,laptop) -> 1) ...

How to get rid of “Spark assembly has been built with Hive, including Datanucleus jars on classpath” message?

cron,apache-spark

Depending on your spark version, this message may or may not appear. Additionally it was previously generated by the shell script compute-classpath.sh, so you could always just comment that line out from compute-classpath.sh (The line you are looking for is echo "Spark assembly has been built with Hive, including Datanucleus...

How to transform a tabular data into transactions in spark(scala)?

scala,apache-spark

Something like this: val xs = input.map(_.split(",")) //List(Array(1, John, iPhone Cover, 9.99), // Array(2, Jack, iPhone Cover, 9.99), // Array(4, Jill, Samsung Galaxy Cover, 9.95), // Array(3, John, Headphones, 5.49), // Array(5, Bob, iPad Cover, 5.45)) xs.tail.foldLeft((xs.head(3), List(List(xs.head(0))))) { case ((cur, acc), e) => if (Math.abs(cur.toDouble - e(3).toDouble) < 1.0)...

convert scala object to json using json4s

json,scala,json4s

You can make a custom serialiser for your class as it is described in here: https://github.com/json4s/json4s#serializing-non-supported-types Unless you really need it I wouldn't advice to make it part of your toString though; I'd rather advice to have some other object perform the transformation as it will make things a bit...

Graphx EdgeRDD count taking long time to compute

apache-spark,spark-graphx

Aftergoing thru the log, figured that my task size is bigger and it takes time to schedule it. Spark itself warns this by saying. 15/06/17 02:32:47 WARN TaskSetManager: Stage 1 contains a task of very large size (140947 KB). The maximum recommended task size is 100 KB. That lead me...

How do I flatMap a row of arrays into multiple rows?

apache-spark,apache-spark-sql

You can use DataFrame.explode to achieve what you desire. Below is what I tried in spark-shell with your sample json data. import scala.collection.mutable.ArrayBuffer val jj1 = jj.explode("r", "r1") {list : ArrayBuffer[Long] => list.toList } val jj2 = jj1.select($"r1") jj2.collect You can refer to API documentation to understand more DataFrame.explode...

File Processing with Spark and Cassandra

cassandra,apache-spark

Originally I was using 'sbt run' to start the application. Once I was able to use spark-submit to launch the application, everything worked fine. So yes, files under 10 MB can be stored as a column of type blob. The Spark MapReduce ran quickly with 200 rows....

Passing a function foreach key of an Array

scala,apache-spark,scala-collections,spark-graphx

You're looking for the groupBy function followed by mapValues to process each group. pairs groupBy {_._1} mapValues { groupOfPairs => doSomething(groupOfPairs) } ...

Spark on yarn jar upload problems

java,hadoop,mapreduce,apache-spark

The problem was solved by copying spark-assembly.jar into a directory on the hdfs for each node and then passing it to spark-submit --conf spark.yarn.jar as a parameter. Commands are listed below: hdfs dfs -copyFromLocal /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar /user/spark/spark-assembly.jar /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class MRContainer --master yarn-cluster --conf spark.yarn.jar=hdfs:///user/spark/spark-assembly.jar simplemr.jar ...

Populate csv with Scala

python,scala,csv,apache-spark

Kind of little functional using the scanleft[1]. drop(1) from the readAll because it reads the header too. Also drop(1) at the end because scanleft starts with start result. ScanLeft kind of lets you work with previous value of the computation, It takes a function with two params, first the result...

“remoteContext object has no attribute”

amazon-s3,apache-spark,pyspark

You've done a great job getting your data mounted into dbfs which is great, and it looks like you just have a small typo. I suspect you want to use sc.textFile rather than sc.textFiles. Best of luck with your adventures with Spark.

when should groupByKey API used in spark programming?

apache-spark

According to the link below, GroupByKey should be avoided. Avoid GroupByKey...

Include package in Spark local mode

python,apache-spark,py.test,pyspark

you can use your config file spark.driver.extraClassPath to sort out the problem. Spark-default.conf and add the property spark.driver.extraClassPath /Volumes/work/bigdata/CHD5.4/spark-1.4.0-bin-hadoop2.6/lib/spark-csv_2.11-1.1.0.jar:/Volumes/work/bigdata/CHD5.4/spark-1.4.0-bin-hadoop2.6/lib/commons-csv-1.1.jar After setting the above you even don't need packages flag while running from shell. sqlContext = SQLContext(sc) df = sqlContext.read.format('com.databricks.spark.csv').options(header='false').load(BASE_DATA_PATH + '/ssi.csv')...

Profiling a scala spark application

scala,apache-spark

I would recommend you to use directly the UI that spark provides. It provides a lot of information and metrics regarding time, steps, network usage, etc... You can check more about it here: https://spark.apache.org/docs/latest/monitoring.html Also, in the new Spark version (1.4.0) there is a nice visualizer to understand the steps...

Spark executors with different amounts of memory on Mesos

apache-spark,mesos

Short anwer: No. Unfortunately, Spark Mesos and YARN only allow giving as much resources (cores, memory, etc.) per machine as your worst machine has (discussion). Ideally, the cluster should be homogeneous in order to take full advantage of its resources. However, there might exist a workaround for your problem. According...

Spark streaming on YARN executor's logs not available

logging,apache-spark,yarn,spark-streaming

I found solution, the proper log4j configuration must be set in the following way during applciation submit: --files /opt/spark/conf/log4j.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties" where spark.driver.extraJavaOptions set up log configuration for driver spark.executor.extraJavaOptions set up log configuration for executor(s) ...

What to set `SPARK_HOME` to?

python,apache-spark,pythonpath,pyspark,apache-zeppelin

Two environment variables are required: SPARK_HOME=/spark PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH...

Which spark MLIB algorithm to use?

machine-learning,apache-spark

One thing you can do is calculate correlation between rows. Take a look at the tutorial about summary statistics at mllib website. More advanced approach would be use dimensionality reduction. This should discover more complex dependencies....

Conditionally Combining/Reducing key-pairs

python,apache-spark,pyspark

You can try something like this: seq = sc.parallelize(zip(xrange(16), "ATCGATGCATGCATGC")) (seq .flatMap(lambda (pos, x): ((pos - i, (pos, x)) for i in range(3))) .groupByKey() .mapValues(lambda x: ''.join(v for (pos, v) in sorted(x))) .filter(lambda (pos, codon): len(codon) == 3) .map(lambda (pos, codon): (pos % 3, pos, codon)) .collect()) and result: [(0,...

Apache Spark: Error while starting PySpark

python,hadoop,apache-spark,pyspark

From the logs it looks like pyspark is unable to understand host localhost.Please check your /etc/hosts file , if localhost is not available , add an entry it should resolve this issue. e.g: [Ip] [Hostname] localhost In case you are not able to change host entry of the server edit...

Spark: use reduceByKey instead of groupByKey and mapByValues

python,apache-spark,pyspark

You can do it like this: data = (sc.parallelize([ {key1: A}, {key1: A}, {key1: B}, {key1: C}, {key2: B}, {key2: B}, {key2: D}, ..])) result = (data .mapValues(lambda x: {x}) .reduceByKey(lambda s1, s2: s1.union(s2))) ...

Issue with UDF on a column of Vectors in PySpark DataFrame

apache-spark,apache-spark-sql,pyspark,spark-sql

In spark-sql, vectors are treated (type, size, indices, value) tuple. You can use udf on vectors with pyspark. Just modify some code to work with values in vector type. vector_udf = udf(lambda vector: sum(vector[3]), DoubleType()) df.withColumn('feature_sums', vector_udf(df.features)).first() https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala ...