1. 程式人生 > >spark連接關系型數據庫的幾種方法

spark連接關系型數據庫的幾種方法

function tor ast ply all word con getc sna

1.使用jdbcRDD的接口:

  1 SparkConf conf = new SparkConf();
  2         conf.setAppName("Simple Application").setMaster("local");
  3         JavaSparkContext jsc = new JavaSparkContext(conf);
  4 
  5         
  6 //1.直接使用jdbcRDD的構造函數
  7 class DbConnection extends AbstractFunction0<Connection> implements
8 Serializable { 9 private static final long serialVersionUID = 1L; 10 private String driverClassName; 11 private String connectionUrl; 12 private String userName; 13 private String password; 14 15 public DbConnection(String driverClassName, String connectionUrl,
16 String userName, String password) { 17 this.driverClassName = driverClassName; 18 this.connectionUrl = connectionUrl; 19 this.userName = userName; 20 this.password = password; 21 } 22 23 @Override 24 public Connection apply() { 25 try
{ 26 Class.forName(driverClassName); 27 } catch (ClassNotFoundException e) { 28 } 29 Properties properties = new Properties(); 30 properties.setProperty("user", userName); 31 properties.setProperty("password", password); 32 Connection connection = null; 33 try { 34 connection = DriverManager.getConnection(connectionUrl, 35 properties); 36 } catch (SQLException e) { 37 } 38 return connection; 39 } 40 } 41 42 class MapResult extends AbstractFunction1<ResultSet, Object[]> 43 implements Serializable { 44 private static final long serialVersionUID = 1L; 45 46 public Object[] apply(ResultSet row) { 47 return JdbcRDD.resultSetToObjectArray(row); 48 } 49 } 50 51 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8"; 52 String Driver="com.mysql.jdbc.Driver"; 53 String UserName = "root"; 54 String password = "pd"; 55 DbConnection dbConnection = new DbConnection(Driver, 56 Connection_url, UserName, password); 57 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?"; 58 //lowerBound,upperBound均設置0,where條件就為恒真,這個是個處理技巧 59 JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(jsc.sc(), dbConnection, 60 sql, 0, 0, 1, new MapResult(), 61 ClassManifestFactory$.MODULE$.fromClass(Object[].class)); 62 JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, 63 ClassManifestFactory$.MODULE$.fromClass(Object[].class)); 64 65 66 //另外一種實現: 67 class DbConnectionFactory implements JdbcRDD.ConnectionFactory { 68 private static final long serialVersionUID = 1L; 69 private String driverClassName; 70 private String connectionUrl; 71 private String userName; 72 private String password; 73 74 public Connection getConnection() throws Exception { 75 Class.forName(driverClassName); 76 String url = connectionUrl; 77 Properties properties = new Properties(); 78 properties.setProperty("user", userName); 79 properties.setProperty("password", password); 80 return DriverManager.getConnection(url, properties); 81 } 82 83 public DbConnectionFactory(String driverClassName, String connectionUrl, 84 String userName, String password) { 85 this.driverClassName = driverClassName; 86 this.connectionUrl = connectionUrl; 87 this.userName = userName; 88 this.password = password; 89 } 90 91 } 92 93 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8"; 94 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?"; 95 DbConnectionFactory ConnectFactory = new DbConnectionFactory(Driver, 96 Connection_url, UserName, password) 97 javaRDD = JdbcRDD.create(jsc, new DbConnectionFactory(Driver, 98 Connection_url, UserName, password), sql, 0, 0, 1,new Function<ResultSet,Object[]>() 99 { 100 private static final long serialVersionUID = 1L; 101 public Object[] call(ResultSet resultSet) 102 { 103 return JdbcRDD.resultSetToObjectArray(resultSet); 104 } 105 });//直接返回JavaRDD<Object[]>,這個底層調用的是JdbcRDD(SparkContext sc, Function0<Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, Function1<ResultSet, T> mapRow, ClassTag<T> evidence$1) 106 //javaRDD =JdbcRDD.create(jsc, ConnectFactory, sql, 0, 0, 1);//該方法更加簡潔,底層調用上面的create(JavaSparkContext paramJavaSparkContext, ConnectionFactory paramConnectionFactory, String paramString, long paramLong1, long paramLong2, int paramInt, Function<ResultSet, T> paramFunction)

spark連接關系型數據庫的幾種方法