python - How to use JDBC source to write and read data in (Py)Spark? -
the goal of question document:
steps required read , write data using jdbc connections in pyspark
possible issues jdbc sources , know solutions
with small changes these methods should work other supported languages including scala , r.
writing data
include applicable jdbc driver when submit application or start shell. can use example
--packages:bin/pyspark --packages group:name:versionor combining
driver-class-path,jarsbin/pyspark --driver-class-path $path_to_driver_jar --jars $path_to_driver_jarthese properties can set using
pyspark_submit_argsenvironment variable before jvm instance has been started or usingconf/spark-defaults.confsetspark.jars.packagesorspark.jars/spark.driver.extraclasspath.choose desired mode. spark jdbc writer supports following modes:
append: append contents of :class:dataframeexisting data.overwrite: overwrite existing data.ignore: silently ignore operation if data exists.error(default case): throw exception if data exists.
upserts or other fine-grained modifications not supported
mode = ...prepare jdbc uri, example:
# can encode credentials in uri or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"(optional) create dictionary of jdbc arguments.
properties = { "user": "foo", "password": "bar" }use
dataframe.write.jdbcdf.write.jdbc(url=url, table="baz", mode=mode, properties=properties)to save data (see
pyspark.sql.dataframewriterdetails).
known issues:
suitable driver cannot found when driver has been included using
--packages(java.sql.sqlexception: no suitable driver found jdbc: ...)assuming there no driver version mismatch solve can add
driverclassproperties. example:properties = { ... "driver": "org.postgresql.driver" }using
df.write.format("jdbc").options(...).save()may result in:java.lang.runtimeexception: org.apache.spark.sql.execution.datasources.jdbc.defaultsource not allow create table select.
solution unknown.
in pyspark 1.3 can try calling java method directly:
df._jdf.insertintojdbc(url, "baz", true)
reading data
- follow steps 1-4 writing data
use
sqlcontext.read.jdbc:sqlcontext.read.jdbc(url=url, table="baz", properties=properties)or
sqlcontext.read.format("jdbc"):(sqlcontext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
known issues , gotchas:
- suitable driver cannot found - see: writing data
- spark sql supports predicate pushdown jdbc sources although not predicates can pushed down. doesn't delegate limits nor aggregations. possible workaround replace
dbtable/tableargument valid subquery. see example: by default jdbc data sources loads data sequentially using single executor thread. ensure distributed data loading can:
- provide partitioning
column(mustintegetype),lowerbound,upperbound,numpartitions. - provide list of mutually exclusive predicates
predicates, 1 each desired partition.
- provide partitioning
in distributed mode (with partitioning column or predicates) each executor operates in own transaction. if source database modified @ same time there no guarantee final view consistent.
where find suitable drivers:
maven repository (to obtain required coordinates
--packagesselect desired version , copy data gradle tab in formcompile-group:name:versionsubstituting respective fields) or maven central repository:
Comments
Post a Comment