Menu
  • HOME
  • TAGS

PySpark No suitable driver found for jdbc:mysql://dbhost

Tag: apache-spark,apache-spark-sql,pyspark

I am trying to write my dataframe to a mysql table. I am getting No suitable driver found for jdbc:mysql://dbhost when I try write.

As part of the preprocessing I read from other tables in the same DB and have no issues doing that. I can do the full run and save the rows to a parquet file so it is definitely reading from the mysql DB.

I am submitting using:

spark-submit --conf spark.executor.extraClassPath=/home/user/Downloads/mysql-connector-java-5.1.35-bin.jar --driver-class-path /home/user/Downloads/mysql-connector-java-5.1.35-bin.jar --jars /home/user/Downloads/mysql-connector-java-5.1.35-bin.jar main.py

And I am writing using:

df.write.jdbc(url="jdbc:mysql://dbhost/dbname", table="tablename", mode="append", properties={"user":"dbuser", "password": "s3cret"})

Best How To :

This is a bug related the the classloader. This is the ticket for it: https://issues.apache.org/jira/browse/SPARK-8463 and this is the pull request for it: https://github.com/apache/spark/pull/6900.

A workaround is to copy mysql-connector-java-5.1.35-bin.jar to every machine at the same location as it is on the driver.

How to extract application ID from the PySpark context

apache-spark,yarn,pyspark

You could use Java SparkContext object through the Py4J RPC gateway: >>> sc._jsc.sc().applicationId() u'application_1433865536131_34483' Please note that sc._jsc is internal variable and not the part of public API - so there is (rather small) chance that it may be changed in the future. I'll submit pull request to add public...

Count of second dimension in two dimension data in Spark

apache-spark

If you just want the count for each data then you could just use countByValue API. val data = Array(("apple","laptop"),("apple","laptop"),("dell","laptop"), ("apple","ipad")) val rdd = sc.parallelize(data) scala> rdd.countByValue res0: scala.collection.Map[(String, String),Long] = Map((apple,laptop) -> 2, (apple,ipad) -> 1, (dell,laptop) -> 1) ...

PySpark No suitable driver found for jdbc:mysql://dbhost

apache-spark,apache-spark-sql,pyspark

This is a bug related the the classloader. This is the ticket for it: https://issues.apache.org/jira/browse/SPARK-8463 and this is the pull request for it: https://github.com/apache/spark/pull/6900. A workaround is to copy mysql-connector-java-5.1.35-bin.jar to every machine at the same location as it is on the driver....

Convert RDD[Map[String,Double]] to RDD[(String,Double)]

scala,apache-spark,rdd

You can call flatMap with the identity function to 'flatten' the structure of your RDD. rdd.flatMap(identity) ...

Pyspark: using filter for feature selection

python,apache-spark,pyspark

Sounds like you need to filter columns, but not records. Fo doing this you need to use Spark's map function - to transform every row of your array represented as an RDD. See in my example: # generate 13 x 10 array and creates rdd with 13 records, each record...

Issue with UDF on a column of Vectors in PySpark DataFrame

apache-spark,apache-spark-sql,pyspark,spark-sql

In spark-sql, vectors are treated (type, size, indices, value) tuple. You can use udf on vectors with pyspark. Just modify some code to work with values in vector type. vector_udf = udf(lambda vector: sum(vector[3]), DoubleType()) df.withColumn('feature_sums', vector_udf(df.features)).first() https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala ...

how to parse a custom log file in scala to extract some key value pairs using patterns

java,regex,scala,apache-spark

The case class expects a variety of things like IP address that your log obviously doesn't have, therefore you would need to modify the case class definition to include just the fields that you want to add. Just to illustrate here, let's make the case class like so: case class...

spark-mllib: Error “reassignment to val” in source code

apache-spark,mllib

I did run example from the spark website witch is pretty much identical. It does work even though Intellij shows the same errors. It looks like some bug in IDEA scala plugin. Your project should compile and work normally.

Spark on yarn jar upload problems

java,hadoop,mapreduce,apache-spark

The problem was solved by copying spark-assembly.jar into a directory on the hdfs for each node and then passing it to spark-submit --conf spark.yarn.jar as a parameter. Commands are listed below: hdfs dfs -copyFromLocal /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar /user/spark/spark-assembly.jar /var/tmp/spark/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class MRContainer --master yarn-cluster --conf spark.yarn.jar=hdfs:///user/spark/spark-assembly.jar simplemr.jar ...

Include package in Spark local mode

python,apache-spark,py.test,pyspark

you can use your config file spark.driver.extraClassPath to sort out the problem. Spark-default.conf and add the property spark.driver.extraClassPath /Volumes/work/bigdata/CHD5.4/spark-1.4.0-bin-hadoop2.6/lib/spark-csv_2.11-1.1.0.jar:/Volumes/work/bigdata/CHD5.4/spark-1.4.0-bin-hadoop2.6/lib/commons-csv-1.1.jar After setting the above you even don't need packages flag while running from shell. sqlContext = SQLContext(sc) df = sqlContext.read.format('com.databricks.spark.csv').options(header='false').load(BASE_DATA_PATH + '/ssi.csv')...

“remoteContext object has no attribute”

amazon-s3,apache-spark,pyspark

You've done a great job getting your data mounted into dbfs which is great, and it looks like you just have a small typo. I suspect you want to use sc.textFile rather than sc.textFiles. Best of luck with your adventures with Spark.

Join files using Apache Spark / Spark SQL

java,apache-spark,apache-spark-sql

If you use plain spark you can join two RDDs. let a = RDD<Tuple2<K,T>> let b = RDD<Tuple2<K,S>> RDD<Tuple2<K,Tuple2<S,T>>> c = a.join(b) This produces an RDD of every pair for key K. There are also leftOuterJoin, rightOuterJoin, and fullOuterJoin methods on RDD. So you have to map both datasets to...

Spark streaming on YARN executor's logs not available

logging,apache-spark,yarn,spark-streaming

I found solution, the proper log4j configuration must be set in the following way during applciation submit: --files /opt/spark/conf/log4j.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties" where spark.driver.extraJavaOptions set up log configuration for driver spark.executor.extraJavaOptions set up log configuration for executor(s) ...

Retrieving TriangleCount

scala,apache-spark,spark-graphx

triangleCount counts number of triangles per vertex and returns Graph[Int,Int], so you have to extract vertices: scala> graph.triangleCount().vertices.collect() res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((1,1), (3,1), (2,1)) ...

Removing duplicates from Spark RDDPair values

python,apache-spark,pyspark

I'd do something like this : streetsGroupedByZipCode.map(x => (x._1, x._2.groupBy(_._2).map(_._2.head))) distinct on a tuple doesn't work as you said, so group your list by tuple, and get only the first element at the end. val data = Seq((1, Seq((1, 1), (2, 2), (2, 2))), (10, Seq((1, 1), (1, 1), (3,...

Graphx EdgeRDD count taking long time to compute

apache-spark,spark-graphx

Aftergoing thru the log, figured that my task size is bigger and it takes time to schedule it. Spark itself warns this by saying. 15/06/17 02:32:47 WARN TaskSetManager: Stage 1 contains a task of very large size (140947 KB). The maximum recommended task size is 100 KB. That lead me...

How do I flatMap a row of arrays into multiple rows?

apache-spark,apache-spark-sql

You can use DataFrame.explode to achieve what you desire. Below is what I tried in spark-shell with your sample json data. import scala.collection.mutable.ArrayBuffer val jj1 = jj.explode("r", "r1") {list : ArrayBuffer[Long] => list.toList } val jj2 = jj1.select($"r1") jj2.collect You can refer to API documentation to understand more DataFrame.explode...

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.CanSetDropBehind issue in ecllipse

maven,hadoop,apache-spark,word-count

When you run in eclipse, the referenced jars are the only source for your program to run. So the jar hadoop-core(thats where CanSetDropBehind is present), is not added properly in your eclipse from local repository for some reasons. You need to identify this if it is a proxy issue, or...

Profiling a scala spark application

scala,apache-spark

I would recommend you to use directly the UI that spark provides. It provides a lot of information and metrics regarding time, steps, network usage, etc... You can check more about it here: https://spark.apache.org/docs/latest/monitoring.html Also, in the new Spark version (1.4.0) there is a nice visualizer to understand the steps...

Install SparkR that comes with Spark 1.4

r,apache-spark,sparkr

@DavidArenburg put me on the right track. Following the Windows documentation in the C:\spark-1.4.0\R\WINDOWS.md, I installed RTools and added R.exe and RTools to my computers PATH. Then, I ran install-dev.bat in C:\spark-1.4.0\R This added the lib\SparkR\ installation that I was missing. Then, from the command prompt, I ran mklink /D...

How to un-nest a spark rdd that has the following type ((String, scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Int]]))

scala,cassandra,apache-spark

Fairly straightforward using a for-comprehension and some pattern matching to destructure things: val in = List((5, Map ( "ABCD" -> Map("3200" -> 3, "3350.800" -> 4, "200.300" -> 3))), (1, Map ("DEF" -> Map("1200" -> 32, "1320.800" -> 4, "2100" -> 3)))) case class Thing(a:Int, b:String, c:String, d:Int) for {...

In sbt, how can we specify the version of hadoop on which spark depends?

apache-spark,sbt

You can exclude the dependency from Spark to hadoop, and add an explicit one with the version you need, something along those lines: libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", "com.datastax.cassandra" % "cassandra-driver-mapping" % "2.1.5", "com.datastax.spark" % "spark-cassandra-connector" %% "1.2.1", "org.apache.spark" % "spark-sql_2.10" % "1.2.1" excludeAll( ExclusionRule("org.apache.hadoop") ), "org.apache.hadoop"...

Populate csv with Scala

python,scala,csv,apache-spark

Kind of little functional using the scanleft[1]. drop(1) from the readAll because it reads the header too. Also drop(1) at the end because scanleft starts with start result. ScanLeft kind of lets you work with previous value of the computation, It takes a function with two params, first the result...

Apache Spark: Error while starting PySpark

python,hadoop,apache-spark,pyspark

From the logs it looks like pyspark is unable to understand host localhost.Please check your /etc/hosts file , if localhost is not available , add an entry it should resolve this issue. e.g: [Ip] [Hostname] localhost In case you are not able to change host entry of the server edit...

Pyspark StructType is not defined

python,apache-spark,pyspark

Did you import StructType? If not from pyspark.sql.types import StructType should solve the problem....

Spark executors with different amounts of memory on Mesos

apache-spark,mesos

Short anwer: No. Unfortunately, Spark Mesos and YARN only allow giving as much resources (cores, memory, etc.) per machine as your worst machine has (discussion). Ideally, the cluster should be homogeneous in order to take full advantage of its resources. However, there might exist a workaround for your problem. According...

Spark streaming transform function

java,apache-spark,spark-streaming

Thanks to @GaborBakos for providing the answer... The following seems to work! Had to use transformtoPair, instead of transform JavaPairDStream<Long,Integer> sortedtsStream = tsStream.transformToPair( new Function<JavaPairRDD<Long, Integer>, JavaPairRDD<Long,Integer>>() { @Override public JavaPairRDD<Long, Integer> call(JavaPairRDD<Long, Integer> longIntegerJavaPairRDD) throws Exception { return longIntegerJavaPairRDD.sortByKey(true); } }); ...

SparkR and Packages

r,apache-spark,sparkr

So it looks like by setting SPARKR_SUBMIT_ARGS you are overriding the default value, which is sparkr-shell. You could probably do the same thing and just append sparkr-shell to the end of your SPARKR_SUBMIT_ARGS. This is seems unnecessarily complex compared to depending on jars so I've created a JIRA to track...

Access key from mapValues or flatMapValues?

scala,apache-spark

In this case you can use mapPartitions with the preservesPartitioning attribute. x.map((it => it.map { case (k,rr) => (k, someFun(rr, k)) }), preservesPartitioning = true) You just have to make sure you are not changing the partitioning, i.e. don't change the key....

Which spark MLIB algorithm to use?

machine-learning,apache-spark

One thing you can do is calculate correlation between rows. Take a look at the tutorial about summary statistics at mllib website. More advanced approach would be use dimensionality reduction. This should discover more complex dependencies....

Is the DStream return by updateStateByKey function only contains one RDD?

apache-spark,spark-streaming,apache-spark-sql,pyspark

Yes, the DStream return by updateStateByKey only hava one RDD

OutofMemoryErrory creating fat jar with sbt assembly

jar,cassandra,apache-spark,sbt

I was including spark as an unmanaged dependency (putting the jar file in the lib folder) which used a lot of memory because it is a huge jar. Instead, I made a build.sbt file which included spark as a provided, unmanaged dependency. Secondly, I created the environment variable JAVA_OPTS with...

How to get rid of “Spark assembly has been built with Hive, including Datanucleus jars on classpath” message?

cron,apache-spark

Depending on your spark version, this message may or may not appear. Additionally it was previously generated by the shell script compute-classpath.sh, so you could always just comment that line out from compute-classpath.sh (The line you are looking for is echo "Spark assembly has been built with Hive, including Datanucleus...

Call Distinct on 'pyspark.resultiterable.ResultIterable'

python,apache-spark,pyspark

Straightforward approach is to use sets: from numpy.random import choice, seed seed(323) keys = (4, 1, 5, 2) hosts = [ u'in24.inetnebr.com', u'ix-esc-ca2-07.ix.netcom.com', u'uplherc.upl.com', u'slppp6.intermind.net', u'piweba4y.prodigy.com' ] pairs = sc.parallelize(zip(choice(keys, 20), choice(hosts, 20))).groupByKey() pairs.map(lambda (k, v): (k, set(v))).take(3) Result: [(1, {u'ix-esc-ca2-07.ix.netcom.com', u'slppp6.intermind.net'}), (2, {u'in24.inetnebr.com', u'ix-esc-ca2-07.ix.netcom.com', u'slppp6.intermind.net', u'uplherc.upl.com'}),...

File Processing with Spark and Cassandra

cassandra,apache-spark

Originally I was using 'sbt run' to start the application. Once I was able to use spark-submit to launch the application, everything worked fine. So yes, files under 10 MB can be stored as a column of type blob. The Spark MapReduce ran quickly with 200 rows....

Passing a function foreach key of an Array

scala,apache-spark,scala-collections,spark-graphx

You're looking for the groupBy function followed by mapValues to process each group. pairs groupBy {_._1} mapValues { groupOfPairs => doSomething(groupOfPairs) } ...

Subtract an RDD from another RDD dosen't work correctly

scala,apache-spark,spark-graphx

Performing set operations like subtract with mutable types (Array in this example) is usually unsupported, or at least not recommended. Try using a immutable type instead. I believe WrappedArray is the relevant container for storing arrays in sets, but i'm not sure....

Spark-submit class not found exception

scala,apache-spark

The thing is that we have to submit the class file to the spark cluster whom we want to execute or will take use as a supporting file, so follow these steps - Create a jar file of this class -> In eclipse you can export this class as a...

Connecting from Spark/pyspark to PostgreSQL

postgresql,jdbc,jar,apache-spark,pyspark

Remove spark-defaults.conf and add the SPARK_CLASSPATH to the system environment in python like this: os.environ["SPARK_CLASSPATH"] = 'PATH\\TO\\postgresql-9.3-1101.jdbc41.jar' ...

Conditionally Combining/Reducing key-pairs

python,apache-spark,pyspark

You can try something like this: seq = sc.parallelize(zip(xrange(16), "ATCGATGCATGCATGC")) (seq .flatMap(lambda (pos, x): ((pos - i, (pos, x)) for i in range(3))) .groupByKey() .mapValues(lambda x: ''.join(v for (pos, v) in sorted(x))) .filter(lambda (pos, codon): len(codon) == 3) .map(lambda (pos, codon): (pos % 3, pos, codon)) .collect()) and result: [(0,...

Spark: use reduceByKey instead of groupByKey and mapByValues

python,apache-spark,pyspark

You can do it like this: data = (sc.parallelize([ {key1: A}, {key1: A}, {key1: B}, {key1: C}, {key2: B}, {key2: B}, {key2: D}, ..])) result = (data .mapValues(lambda x: {x}) .reduceByKey(lambda s1, s2: s1.union(s2))) ...

reduceByKey with two columns in Spark

python,apache-spark,reduce,pyspark

As far I understand you goal is to count (column1,input.column2) pairs and your input looks more or less like this: from numpy.random import randint, seed from pyspark.sql import Row seed(323) rdd = sc.parallelize( Row(column1=randint(0, 5), column2=randint(0, 5)) for _ in range(1000)) rdd.take(3) Result: [Row(column1=0, column2=0), Row(column1=4, column2=4), Row(column1=3, column2=2)] First...

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver

cassandra,apache-spark,apache-spark-sql,spark-jobserver,spark-cassandra-connector

@vicg, first you need spark.cassandra.connection.host -- periods not dashes. Also note in the error how the IP is "127.0.1.1", not the one in the config. You can also pass the IP when you create a context, like: curl -X POST 'localhost:8090/contexts/my-context?spark.cassandra.connection.host=127.0.0.1' If the above don't work, try the following PR:...

Shuffled vs non-shuffled coalesce in Apache Spark

scala,apache-spark,bigdata,distributed-computing

shuffle=true and shuffle=false aren't going to have any practical differences in the resulting output since they are both going down to a single partition. However, when you set it to true you will do a shuffle which isn't of any use. With shuffle=true the output is evenly distributed amongst the...