學(xué)習(xí)任何的spark技術(shù)之前,請先正確理解spark,可以參考:正確理解spark

建甌網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營維護(hù)。成都創(chuàng)新互聯(lián)從2013年成立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)。
以下是用spark RDD java api實(shí)現(xiàn)從關(guān)系型數(shù)據(jù)庫中讀取數(shù)據(jù),這里使用的是derby本地數(shù)據(jù)庫,當(dāng)然可以是MySQL或者oracle等關(guān)系型數(shù)據(jù)庫:
package com.twq.javaapi.java7;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;
import java.io.Serializable;
import java.sql.*;
public class JavaJdbcRDDSuite implements Serializable {
public static void prepareData() throws ClassNotFoundException, SQLException {
//使用本地數(shù)據(jù)庫derby,當(dāng)然可以使用mysql等關(guān)系型數(shù)據(jù)庫
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
Connection connection =
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");
try {
//創(chuàng)建一張表FOO,ID是一個自增的主鍵,DATA是一個INTEGER列
Statement create = connection.createStatement();
create.execute(
"CREATE TABLE FOO(" +
"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
"DATA INTEGER)");
create.close();
//插入數(shù)據(jù)
PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");
for (int i = 1; i <= 5; i++) {
insert.setInt(1, i * 2);
insert.executeUpdate();
}
insert.close();
} catch (SQLException e) {
// If table doesn't exist...
if (e.getSQLState().compareTo("X0Y32") != 0) {
throw e;
}
} finally {
connection.close();
}
}
public static void shutdownDB() throws SQLException {
try {
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
} catch (SQLException e) {
// Throw if not normal single database shutdown
// https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html
if (e.getSQLState().compareTo("08006") != 0) {
throw e;
}
}
}
public static void main(String[] args) throws Exception {
JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite");
//準(zhǔn)備數(shù)據(jù)
prepareData();
//構(gòu)建JdbcRDD
JavaRDD<Integer> rdd = JdbcRDD.create(
sc,
new JdbcRDD.ConnectionFactory() {
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
}
},
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 5, 1,
new Function<ResultSet, Integer>() {
@Override
public Integer call(ResultSet r) throws Exception {
return r.getInt(1);
}
}
);
//結(jié)果: [2, 4, 6, 8, 10]
System.out.println(rdd.collect());
shutdownDB();
sc.stop();
}
}詳細(xì)了解RDD的api的話,可以參考: spark core RDD api原理詳解
本文題目:spark2.x由淺入深深到底系列六之RDDjavaapi用JdbcRDD讀取關(guān)系型數(shù)據(jù)庫
文章位置:http://chinadenli.net/article40/jgjceo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供響應(yīng)式網(wǎng)站、云服務(wù)器、網(wǎng)站建設(shè)、微信小程序、面包屑導(dǎo)航、商城網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)