python - How to use JDBC source to write and read data in (Py)Spark? -


the goal of question document:

  • steps required read , write data using jdbc connections in pyspark

  • possible issues jdbc sources , know solutions

with small changes these methods should work other supported languages including scala , r.

writing data

  1. include applicable jdbc driver when submit application or start shell. can use example --packages:

    bin/pyspark --packages group:name:version   

    or combining driver-class-path , jars

    bin/pyspark --driver-class-path $path_to_driver_jar --jars $path_to_driver_jar 

    these properties can set using pyspark_submit_args environment variable before jvm instance has been started or using conf/spark-defaults.conf set spark.jars.packages or spark.jars / spark.driver.extraclasspath.

  2. choose desired mode. spark jdbc writer supports following modes:

    • append: append contents of :class:dataframe existing data.
    • overwrite: overwrite existing data.
    • ignore: silently ignore operation if data exists.
    • error (default case): throw exception if data exists.

    upserts or other fine-grained modifications not supported

    mode = ... 
  3. prepare jdbc uri, example:

    # can encode credentials in uri or pass # separately using properties argument # of jdbc method or options  url = "jdbc:postgresql://localhost/foobar" 
  4. (optional) create dictionary of jdbc arguments.

    properties = {     "user": "foo",     "password": "bar" } 
  5. use dataframe.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties) 

    to save data (see pyspark.sql.dataframewriter details).

known issues:

  • suitable driver cannot found when driver has been included using --packages (java.sql.sqlexception: no suitable driver found jdbc: ...)

    assuming there no driver version mismatch solve can add driver class properties. example:

    properties = {     ...     "driver": "org.postgresql.driver" } 
  • using df.write.format("jdbc").options(...).save() may result in:

    java.lang.runtimeexception: org.apache.spark.sql.execution.datasources.jdbc.defaultsource not allow create table select.

    solution unknown.

  • in pyspark 1.3 can try calling java method directly:

    df._jdf.insertintojdbc(url, "baz", true) 

reading data

  1. follow steps 1-4 writing data
  2. use sqlcontext.read.jdbc:

    sqlcontext.read.jdbc(url=url, table="baz", properties=properties) 

    or sqlcontext.read.format("jdbc"):

    (sqlcontext.read.format("jdbc")     .options(url=url, dbtable="baz", **properties)     .load()) 

known issues , gotchas:

  • suitable driver cannot found - see: writing data
  • spark sql supports predicate pushdown jdbc sources although not predicates can pushed down. doesn't delegate limits nor aggregations. possible workaround replace dbtable / table argument valid subquery. see example:
  • by default jdbc data sources loads data sequentially using single executor thread. ensure distributed data loading can:

    • provide partitioning column (must integetype), lowerbound, upperbound, numpartitions.
    • provide list of mutually exclusive predicates predicates, 1 each desired partition.
  • in distributed mode (with partitioning column or predicates) each executor operates in own transaction. if source database modified @ same time there no guarantee final view consistent.

where find suitable drivers:


Comments

Popular posts from this blog

html - Outlook 2010 Anchor (url/address/link) -

javascript - Why does running this loop 9 times take 100x longer than running it 8 times? -

Getting gateway time-out Rails app with Nginx + Puma running on Digital Ocean -