學習任何的spark技術(shù)之前,請先正確理解spark,可以參考:正確理解spark
建甌網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)等網(wǎng)站項目制作,到程序開發(fā),運營維護。成都創(chuàng)新互聯(lián)從2013年成立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)。
以下是用spark RDD java api實現(xiàn)從關(guān)系型數(shù)據(jù)庫中讀取數(shù)據(jù),這里使用的是derby本地數(shù)據(jù)庫,當然可以是MySQL或者oracle等關(guān)系型數(shù)據(jù)庫:
package com.twq.javaapi.java7; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.JdbcRDD; import java.io.Serializable; import java.sql.*; public class JavaJdbcRDDSuite implements Serializable { public static void prepareData() throws ClassNotFoundException, SQLException { //使用本地數(shù)據(jù)庫derby,當然可以使用mysql等關(guān)系型數(shù)據(jù)庫 Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); try { //創(chuàng)建一張表FOO,ID是一個自增的主鍵,DATA是一個INTEGER列 Statement create = connection.createStatement(); create.execute( "CREATE TABLE FOO(" + "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + "DATA INTEGER)"); create.close(); //插入數(shù)據(jù) PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)"); for (int i = 1; i <= 5; i++) { insert.setInt(1, i * 2); insert.executeUpdate(); } insert.close(); } catch (SQLException e) { // If table doesn't exist... if (e.getSQLState().compareTo("X0Y32") != 0) { throw e; } } finally { connection.close(); } } public static void shutdownDB() throws SQLException { try { DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true"); } catch (SQLException e) { // Throw if not normal single database shutdown // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html if (e.getSQLState().compareTo("08006") != 0) { throw e; } } } public static void main(String[] args) throws Exception { JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite"); //準備數(shù)據(jù) prepareData(); //構(gòu)建JdbcRDD JavaRDD<Integer> rdd = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { @Override public Connection getConnection() throws SQLException { return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); } }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 5, 1, new Function<ResultSet, Integer>() { @Override public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); //結(jié)果: [2, 4, 6, 8, 10] System.out.println(rdd.collect()); shutdownDB(); sc.stop(); } }
詳細了解RDD的api的話,可以參考: spark core RDD api原理詳解
本文題目:spark2.x由淺入深深到底系列六之RDDjavaapi用JdbcRDD讀取關(guān)系型數(shù)據(jù)庫
文章位置:http://chinadenli.net/article40/jgjceo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供響應(yīng)式網(wǎng)站、云服務(wù)器、網(wǎng)站建設(shè)、微信小程序、面包屑導航、商城網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)