python - How to use JDBC source to write and read data in (Py)Spark? -
the goal of question document:
steps required read , write data using jdbc connections in pyspark
possible issues jdbc sources , know solutions
with small changes these methods should work other supported languages including scala , r.
writing data
include applicable jdbc driver when submit application or start shell. can use example
--packages
:bin/pyspark --packages group:name:version
or combining
driver-class-path
,jars
bin/pyspark --driver-class-path $path_to_driver_jar --jars $path_to_driver_jar
these properties can set using
pyspark_submit_args
environment variable before jvm instance has been started or usingconf/spark-defaults.conf
setspark.jars.packages
orspark.jars
/spark.driver.extraclasspath
.choose desired mode. spark jdbc writer supports following modes:
append
: append contents of :class:dataframe
existing data.overwrite
: overwrite existing data.ignore
: silently ignore operation if data exists.error
(default case): throw exception if data exists.
upserts or other fine-grained modifications not supported
mode = ...
prepare jdbc uri, example:
# can encode credentials in uri or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
(optional) create dictionary of jdbc arguments.
properties = { "user": "foo", "password": "bar" }
use
dataframe.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
to save data (see
pyspark.sql.dataframewriter
details).
known issues:
suitable driver cannot found when driver has been included using
--packages
(java.sql.sqlexception: no suitable driver found jdbc: ...
)assuming there no driver version mismatch solve can add
driver
classproperties
. example:properties = { ... "driver": "org.postgresql.driver" }
using
df.write.format("jdbc").options(...).save()
may result in:java.lang.runtimeexception: org.apache.spark.sql.execution.datasources.jdbc.defaultsource not allow create table select.
solution unknown.
in pyspark 1.3 can try calling java method directly:
df._jdf.insertintojdbc(url, "baz", true)
reading data
- follow steps 1-4 writing data
use
sqlcontext.read.jdbc
:sqlcontext.read.jdbc(url=url, table="baz", properties=properties)
or
sqlcontext.read.format("jdbc")
:(sqlcontext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
known issues , gotchas:
- suitable driver cannot found - see: writing data
- spark sql supports predicate pushdown jdbc sources although not predicates can pushed down. doesn't delegate limits nor aggregations. possible workaround replace
dbtable
/table
argument valid subquery. see example: by default jdbc data sources loads data sequentially using single executor thread. ensure distributed data loading can:
- provide partitioning
column
(mustintegetype
),lowerbound
,upperbound
,numpartitions
. - provide list of mutually exclusive predicates
predicates
, 1 each desired partition.
- provide partitioning
in distributed mode (with partitioning column or predicates) each executor operates in own transaction. if source database modified @ same time there no guarantee final view consistent.
where find suitable drivers:
maven repository (to obtain required coordinates
--packages
select desired version , copy data gradle tab in formcompile-group:name:version
substituting respective fields) or maven central repository:
Comments
Post a Comment