有時候需要連接第三方的各種數據源,總是要去寫不同的代碼,于是將MaxCompute, Hive, Oracle, MySQL等JDBC連接封裝起來,只需要傳入不同的參數即可創建一個不同類型的連接池。
連接參數基礎類封裝
封裝了JDBC基礎的連接參數,如果不需要這些屬性可以繼承該類,增加新的屬性即可。
@Datapublic class BaseJdbcConnParam implements Serializable { /** * driver name */ private String driverName; /** * IP */ private String ip; /** * db server port */ private Integer port; /** * db name */ private String dbName; /** * db connection username */ private String username; /** * db connection password */ private String password;}
抽象連接工具類封裝
功能如下:
- 1、構造函數:根據連接參數不同構建不同的連接對象
- 2、構建具體的連接,子類實現buildConnection()
- 3、獲取連接,構建好之后直接獲取getConnection()
/** * @Description 抽象連接工具類父類 * @Author itdl * @Date 2022/08/15 09:54 */public abstract class AbstractConnUtil<P extends BaseJdbcConnParam> { /** * connection params */ protected final P connParam; /** * jdbc connection object */ protected final Connection connection; /** * 構造函數, 構造工具類對象 * @param connParam 連接參數 */ public AbstractConnUtil(P connParam) { this.connParam = connParam; this.connection = buildConnection(); } /** * 構建連接對象 * @return 連接對象 */ protected abstract Connection buildConnection(); /** * 獲取連接 */ public Connection getConnection() { return connection; }}
連接池管理
功能如下:
- 1、根據不同的連接參數,和最大連接數去創建一個對應類型的連接池。
- 2、獲取連接方法,如果連接沒有了,等待其他線程釋放(最多等待十分鐘)
- 3、釋放連接方法,將連接放回連接池,然后喚醒等待的線程
- 4、關閉連接池所有的連接
/** * @Description 連接池管理 * @Author itdl * @Date 2022/08/16 09:42 */@Slf4jpublic class DbConnPool<T extends BaseJdbcConnParam> { /** * 用于存放連接 */ private final LinkedList<Connection> connPool = new LinkedList<Connection>(); /** * 最大連接池數量 */ private final Integer maxPoolSize; private final T connParam; /** * 構造函數 * @param connParam 連接參數 * @param maxPoolSize 連接池大小 */ public DbConnPool(T connParam, Integer maxPoolSize) { this.maxPoolSize = maxPoolSize; this.connParam = connParam; // 初始化連接池 for (int i = 0; i < maxPoolSize; i++) { connPool.addLast(this.createConnection()); } } /** * 創建數據庫連接 * @return 連接 */ private Connection createConnection() { if (connParam instanceof OracleJdbcConnParam){ final OracleConnUtil util = new OracleConnUtil((OracleJdbcConnParam) connParam); return util.getConnection(); } if (connParam instanceof HiveJdbcConnParam){ final HiveConnUtil util = new HiveConnUtil((HiveJdbcConnParam) connParam); return util.getConnection(); } if (connParam instanceof MysqlJdbcConnParam){ final MysqlConnUtil util = new MysqlConnUtil((MysqlJdbcConnParam) connParam); return util.getConnection(); } if (connParam instanceof MaxComputeJdbcConnParam){ final MaxComputeJdbcUtil util = new MaxComputeJdbcUtil((MaxComputeJdbcConnParam) connParam); return util.getConnection(); } throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT); } /** * 獲取連接 * @return 連接 */ public synchronized Connection getConnection(){ if (connPool.size() == 0){// throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR); // 最長等待十分鐘 try { log.info("==========連接池已經空了, 請等待其他線程釋放=========="); wait(10 * 60 * 1000); } catch (InterruptedException e) { log.info("==========連接池已經空了, 等待了10分鐘還沒有釋放,拋出異常=========="); e.printStackTrace(); throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR); } } // 去除最上面一個連接 如果沒有連接了,將會拋出異常 return connPool.removeFirst(); } /** * 用完后釋放連接 * @param conn 要釋放的連接 */ public synchronized void freeConnection(Connection conn){ // 通知連接已經釋放 notifyAll(); this.connPool.addLast(conn); } /** * 關閉連接池 */ public synchronized void close(){ for (Connection connection : connPool) { SqlUtil.close(connection); } }}
SQL操作工具類
根據連接對象Connection和數據庫房源,封裝不同的sql執行。執行SQL核心功能封裝。
/** * @Description SQL操作工具類 * @Author itdl * @Date 2022/08/10 17:13 */@Slf4jpublic class SqlUtil { /**查詢mysql表注釋sql*/ public static final String SELECT_TABLES_MYSQL = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'"; /**查詢MaxCompute表注釋sql*/ public static final String SELECT_TABLES_MAX_COMPUTE = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'"; /**查詢oracle表注釋sql*/ public static final String SELECT_TABLES_ORACLE = "SELECT t2.TABLE_NAME as table_name, t2.COMMENTS as table_comment FROM user_tables t1 inner join user_tab_comments t2 on t1.TABLE_NAME = t2.TABLE_NAME"; /**查詢hive表注釋sql, 先查詢表名,根據表名獲取建表語句,正則提取表注釋*/ public static final String SELECT_TABLES_HIVE = "show tables"; public static final String SELECT_TABLES_2_HIVE = "describe extended %s"; /**分頁數量統計Mysql*/ private static final String SELECT_COUNT_MYSQL = "select count(1) from (%s) z"; /**分頁數量統計MaxCompute*/ private static final String SELECT_COUNT_MAX_COMPUTE = "select count(1) from (%s) z;"; /**分頁數量統計Hive*/ private static final String SELECT_COUNT_ORACLE = "select count(1) from (%s) z"; /**分頁數量統計Oracle*/ private static final String SELECT_COUNT_HIVE = "select count(1) from (%s) z"; /**maxCompute開啟全表掃描sql*/ private static final String FULL_SCAN_MAX_COMPUTE = "set odps.sql.allow.fullscan=true;"; /**分頁查詢sql-Mysql*/ private static final String SELECT_PAGE_MYSQL = "select z.* from (%s) z limit %s, %s"; /**分頁查詢sql-MaxCompute*/ private static final String SELECT_PAGE_MAX_COMPUTE = "select z.* from (%s) z limit %s, %s;"; /**分頁查詢sql-Hive*/ private static final String SELECT_PAGE_HIVE = "select * from (select row_number() over () as row_num_01,u.* from (%s) u) mm where mm.row_num_01 between %s and %s"; /**分頁查詢sql-Oracle*/ private static final String SELECT_PAGE_ORACLE = "select * from (SELECT ROWNUM as row_num_01,z.* from (%s) z) h where h.row_num_01 > %s and h.row_num_01 <= %s"; /**數據庫連接*/ private final Connection connection; /**數據庫方言*/ private final Integer dbDialect; /**支持的方言列表*/ private static final List<Integer> supportDbTypes = Arrays.asList(DbDialectEnum.ORACLE.getCode(), DbDialectEnum.HIVE.getCode(), DbDialectEnum.MYSQL.getCode(), DbDialectEnum.MAX_COMPUTE.getCode()); public SqlUtil(Connection connection, Integer dbDialect) { if (!supportDbTypes.contains(dbDialect)){ throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT); } this.connection = connection; this.dbDialect = dbDialect; } /** * 根據connection獲取所有的表和對應的注釋 */ public List<TableMetaInfo> getTables(String schemaName){ List<TableMetaInfo> result = new ArrayList<>(); String sql = ""; switch (this.dbDialect){ case 1: sql = SELECT_TABLES_ORACLE; break; case 2: sql = SELECT_TABLES_HIVE; break; case 3: if (StringUtils.isBlank(schemaName)){ throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR); } sql = String.format(SELECT_TABLES_MYSQL, schemaName); break; case 4: if (StringUtils.isBlank(schemaName)){ throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR); } sql = String.format(SELECT_TABLES_MAX_COMPUTE, schemaName); default: break; } if (StringUtils.isBlank(sql)){ throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT); } // 執行SQL語句 final List<LinkedHashMap<String, Object>> resultMaps = querySql(sql); if (ObjectUtils.isEmpty(resultMaps)){ return Lists.newArrayList(); } // hive單獨處理 List<TableMetaInfo> result1 = getHiveTableMetaInfos(result, resultMaps); if (result1 != null) return result1; // 轉換結果 return resultMaps.stream().map( m->{ final TableMetaInfo info = new TableMetaInfo(); Object tableNameObj = m.get("table_name"); String tableName = tableNameObj == null ? m.get("TABLE_NAME") == null ? "" : String.valueOf(m.get("TABLE_NAME")) : String.valueOf(tableNameObj); Object tableCommentObj = m.get("table_comment"); String tableComment = tableCommentObj == null ? m.get("TABLE_COMMENT") == null ? "" : String.valueOf(m.get("TABLE_COMMENT")) : String.valueOf(tableCommentObj); info.setTableName(tableName); info.setComment(tableComment); return info; } ).collect(Collectors.toList()); } /** * 根據schemeName,表名獲取字段列表 * @param tableName 一般是數據庫 oracle是用戶名 */ public List<TableColumnMetaInfo> getColumnsByTableName(String tableName){ try { List<TableColumnMetaInfo> list = new ArrayList<>(); final DatabaseMetaData metaData = connection.getMetaData(); final ResultSet columns = metaData.getColumns(null, null, tableName, null); while (columns.next()){ String columnName = columns.getString("COLUMN_NAME"); String remarks = columns.getString("REMARKS"); remarks = StringUtils.isBlank(remarks) ? "" : remarks; final TableColumnMetaInfo metaInfo = new TableColumnMetaInfo(tableName, columnName, remarks); list.add(metaInfo); } return list; } catch (SQLException e) { e.printStackTrace(); return Lists.newArrayList(); } } /** * 執行sql查詢 * @param querySql 查詢sql * @return List<Map<String, Object>> 通過LinkedHashMap接受,序列化時可保證順序一致 */ public List<LinkedHashMap<String, Object>> queryData(String querySql, boolean... fullScan){ Statement statement = null; ResultSet resultSet = null; try { // 創建statement statement = this.connection.createStatement(); // 執行全表掃描sql for (boolean b : fullScan) { if (b){ statement.execute(FULL_SCAN_MAX_COMPUTE); break; } } // 執行查詢語句 resultSet = statement.executeQuery(querySql); // 構建結果返回 return buildListMap(resultSet); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.SQL_EXEC_ERR); } finally { // 關閉resultSet, statement close(resultSet, statement); } } /** * 執行sql查詢 * @param querySql 查詢sql * @return List<Map<String, Object>> */ public List<LinkedHashMap<String, Object>> queryData(String querySql, Integer page, Integer size){ Statement statement = null; ResultSet resultSet = null; try { // 1、替換分號 querySql = querySql.replaceAll(";", ""); // 創建statement statement = this.connection.createStatement(); // 2、格式化SQL int offset = (page - 1 ) * size; String execSql = ""; switch (this.dbDialect){ case 1: // oracle execSql = String.format(SELECT_PAGE_ORACLE, querySql, offset, size); break; case 2: // hive execSql = String.format(SELECT_PAGE_HIVE, querySql, offset, size); break; case 3: // mysql execSql = String.format(SELECT_PAGE_MYSQL, querySql, offset, size); break; case 4: // maxCompute execSql = String.format(SELECT_PAGE_MAX_COMPUTE, querySql, offset, size); break; default: break; } // maxCompute開啟全表掃描 if (DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect)){ statement.execute(FULL_SCAN_MAX_COMPUTE); } log.info("=======>>>執行分頁sql為:{}", execSql); // 執行查詢語句 resultSet = statement.executeQuery(execSql); // 構建結果返回 return buildListMap(resultSet); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.SQL_EXEC_ERR); } finally { // 關閉resultSet, statement close(resultSet, statement); } } /** * 執行分頁查詢 * @param querySql 分頁查詢sql * @param page 頁碼 從1開始 第n頁傳n * @param size 每頁記錄數 * @return 分頁查詢結果 */ public PageResult<LinkedHashMap<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){ // 1、替換分號 querySql = querySql.replaceAll(";", ""); String countSql = ""; switch (this.dbDialect){ case 1: // oracle countSql = String.format(SELECT_COUNT_ORACLE, querySql); break; case 2: // hive countSql = String.format(SELECT_COUNT_HIVE, querySql); break; case 3: // mysql countSql = String.format(SELECT_COUNT_MYSQL, querySql); break; case 4: // maxCompute countSql = String.format(SELECT_COUNT_MAX_COMPUTE, querySql); break; default: break; } log.info("=======>>>執行分頁統計總數sql為:{}", countSql); // 查詢總數 final List<LinkedHashMap<String, Object>> countMap = queryData(countSql, DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect)); if (CollectionUtils.isEmpty(countMap)){ return new PageResult<>(0L, new ArrayList<>()); } long count = 0L; for (Object value : countMap.get(0).values()) { count = Long.parseLong(String.valueOf(value)); } if (count == 0){ return new PageResult<>(0L, new ArrayList<>()); } // 執行分頁查詢 開啟全表掃描 final List<LinkedHashMap<String, Object>> resultList = queryData(querySql, page, size); return new PageResult<>(count, resultList); } /** * 執行分頁查詢 * @param querySql 分頁查詢sql * @param page 頁碼 從1開始 第n頁傳n * @param size 每頁記錄數 * @return 分頁查詢結果 */ public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){ final PageResult<LinkedHashMap<String, Object>> result = pageQueryMap(querySql, page, size); List<T> rows = new ArrayList<>(); for (LinkedHashMap<String, Object> row : result.getRows()) { final T t = JSONObject.parseobject(JSONObject.toJSONString(row), clazz); rows.add(t); } return new PageResult<>(result.getTotal(), rows); } /** * 獲取hive的表注釋 * @param result 結果 * @param resultMaps show tables結果 * @return List<TableMetaInfo> */ private List<TableMetaInfo> getHiveTableMetaInfos(List<TableMetaInfo> result, List<LinkedHashMap<String, Object>> resultMaps) { if (dbDialect.equals(DbDialectEnum.HIVE.getCode())){ for (LinkedHashMap<String, Object> resultMap : resultMaps) { final String tabName = String.valueOf(resultMap.get("tab_name")); final String descTableCommentSql = String.format(SELECT_TABLES_2_HIVE, tabName); List<LinkedHashMap<String, Object>> resultMapsComments = querySql(descTableCommentSql);// col_name -> Detailed Table Information String comments = resultMapsComments.stream() .filter(m -> "Detailed Table Information".equals(m.get("col_name"))) .map(m -> String.valueOf(m.get("data_type"))).findFirst() .orElse(""); comments = ReUtil.get("parameters:\{(?!.*?\().*transient_lastDdlTime.*?comment=(.*?)\}", comments,1); if (StringUtils.isBlank(comments)) { comments = ""; } if (comments.contains(",")){ comments = comments.substring(0, comments.lastIndexOf(",")); } result.add(new TableMetaInfo(tabName, comments)); log.info("===========>>>獲取表{}的注釋成功:{}", tabName, comments); resultMapsComments.clear(); } return result; } return null; } /** * 執行SQL查詢 * @param sql sql語句 * @return 數據列表,使用LinkedHashMap是為了防止HashMap序列化后導致順序亂序 */ public List<LinkedHashMap<String, Object>> querySql(String sql){ // 執行sql Statement statement = null; ResultSet resultSet = null; try { statement = connection.createStatement(); resultSet = statement.executeQuery(sql); return buildListMap(resultSet); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.SQL_EXEC_ERR); }finally { // 關閉 close(resultSet, statement); } } /** * 關閉對象 傳入多個時注意順序, 需要先關閉哪個就傳在參數前面 * @param objs 對象動態數組 */ public static void close(Object ...objs){ if (objs == null || objs.length == 0){ return; } for (Object obj : objs) { if (obj instanceof Statement){ try { ((Statement) obj).close(); }catch (Exception e){ e.printStackTrace(); } } if (obj instanceof ResultSet){ try { ((ResultSet) obj).close(); }catch (Exception e){ e.printStackTrace(); } } if (obj instanceof Connection){ try { ((Connection) obj).close(); }catch (Exception e){ e.printStackTrace(); } } } } /** * @Description 功能描述:將resultSet構造為List<Map> * @Author itdl * @Date 2022/4/18 21:13 * @Param {@link ResultSet} resultSet * @Return {@link List < Map <String,Object>>} **/ private List<LinkedHashMap<String, Object>> buildListMap(ResultSet resultSet) throws SQLException { if (resultSet == null) { return Lists.newArrayList(); } List<LinkedHashMap<String, Object>> resultList = new ArrayList<>(); // 獲取元數據 ResultSetMetaData metaData = resultSet.getMetaData(); while (resultSet.next()) { // 獲取列數 int columnCount = metaData.getColumnCount(); LinkedHashMap<String, Object> map = new LinkedHashMap<>(); for (int i = 0; i < columnCount; i++) { String columnName = metaData.getColumnName(i + 1); // 過濾掉查詢的結果包含序號的 if("mm.row_num_01".equalsIgnoreCase(columnName) || "row_num_01".equalsIgnoreCase(columnName)){ continue; } // 去除hive查詢結果的mm.別名前綴 if (columnName.startsWith("mm.")){ columnName = columnName.substring(columnName.indexOf(".") + 1); } Object object = resultSet.getObject(columnName); // maxCompute里面的空返回的是使用n if ("\N".equalsIgnoreCase(String.valueOf(object))) { map.put(columnName, ""); } else { map.put(columnName, object); } } resultList.add(map); } return resultList; }}
MaxCompute JDBC連接池封裝
MaxCompute 已經有了JDBC連接方式 也就是 odbc-jdbc, 最終能夠獲取一個Connection. 官方文檔:
https://help.aliyun.com/document_detail/161246.html
封裝MaxCompute JDBC連接參數
/** * @author itdl * @description maxCompute使用JDBC的連接參數 * @date 2022/08/08 10:07 */@Datapublic class MaxComputeJdbcConnParam extends BaseJdbcConnParam{ /**阿里云accessId 相當于用戶名 */ private String aliyunAccessId; /**阿里云accessKey 相當于密碼 */ private String aliyunAccessKey; /** maxcompute_endpoint */ private String endpoint; /**項目名稱*/ private String projectName;}
封裝MaxCompute JDBC連接實現類
就是實現父類AbstractConnUtil,實現抽象方法buildConnection
/** * @Description maxCompute JDBC連接實現 * @Author itdl * @Date 2022/08/08 14:26 */@Slf4jpublic class MaxComputeJdbcUtil extends AbstractConnUtil<MaxComputeJdbcConnParam>{ /**JDBC 驅動名稱*/ private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver"; /** * 構造函數, 構造工具類對象 * * @param connParam 連接參數 */ public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) { super(connParam); } @Override protected Connection buildConnection() { return buildConn(); } /** * 創建連接 * @return 數據庫連接 */ private Connection buildConn() { try { Class.forName(DRIVER_NAME); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.MAX_COMPUTE_DRIVE_LOAD_ERR); } try { Properties dbProperties = new Properties(); dbProperties.put("user", connParam.getAliyunAccessId()); dbProperties.put("password", connParam.getAliyunAccessKey()); dbProperties.put("remarks", "true"); // JDBCURL連接模板 String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true"; // 使用驅動管理器連接獲取連接 return DriverManager.getConnection( String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()), dbProperties); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.CONN_USER_PWD_ERR); } }}
連接測試代碼一起放在結尾,將會開啟多個線程獲取連接,然后去獲取表名,表注釋,字段名,字段注釋,傳入page, size和普通sql就可以實現分頁查詢的封裝方法
Hive JDBC連接池封裝
Hive JDBC連接參數
Hive連接參數封裝,除了基礎的JDBC所需字段,還需要kerberos相關字段,因為hive開啟kerberos認證后,需要使用kertab密鑰文件和kbr5.conf配置文件去認證。將會在參數和測試代碼中得到重復的體現。
/** * @Description Hive JDBC connection params * @Author itdl * @Date 2022/08/10 16:40 */@Data@EqualsAndHashCode(callSuper = false)public class HiveJdbcConnParam extends BaseJdbcConnParam { /** * enable kerberos authentication */ private boolean enableKerberos; /** * principal */ private String principal; /** * kbr5 file path in dick */ private String kbr5FilePath; /** * keytab file path in dick */ private String keytabFilePath;}
Hive JDBC獲取連接實現
Hive獲取JDBC連接之后,本來可以從Connection的元數據中獲取表的注釋,但是獲取的中文注釋居然是亂碼,但是我們Hue上查看表注釋又是正常,暫時沒找到這種方式如何解決,從而退而求其次,通過表名去獲取建表語句,從建表語句中通過正則表達式提取表的注釋。
/** * @Description hive connection util * @Author itdl * @Date 2022/08/10 16:52 */@Slf4jpublic class HiveConnUtil extends AbstractConnUtil<HiveJdbcConnParam>{ public HiveConnUtil(HiveJdbcConnParam connParam) { super(connParam); } /** * 獲取連接 * @return 連接 */ public Connection getConnection() { return connection; } @Override protected Connection buildConnection(){ try {// Class.forName("org.Apache.hive.jdbc.HiveDriver"); Class.forName(connParam.getDriverName()); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.HIVE_DRIVE_LOAD_ERR); } // 開啟kerberos后需要私鑰 // 拼接jdbcUrl String jdbcUrl = "jdbc:hive2://%s:%s/%s"; String ip = connParam.getIp(); String port = connParam.getPort() + ""; String dbName = connParam.getDbName(); final String username = connParam.getUsername(); final String password = connParam.getPassword(); // is enable kerberos authentication final boolean enableKerberos = connParam.isEnableKerberos(); // 格式化 Connection connection; // 獲取連接 try { Properties dbProperties = new Properties(); dbProperties.put("user", username); dbProperties.put("password", password); // 加上remark后, 能夠獲取到標注釋 但是會出現中文亂碼 dbProperties.put("remarks", "true"); if (!enableKerberos) { jdbcUrl = String.format(jdbcUrl, ip, port, dbName); connection = DriverManager.getConnection(jdbcUrl, dbProperties); } else { final String principal = connParam.getPrincipal(); final String kbr5FilePath = connParam.getKbr5FilePath(); final String secretFilePath = connParam.getKeytabFilePath(); String format = "jdbc:hive2://%s:%s/%s;principal=%s"; jdbcUrl = String.format(format, ip, port, dbName, principal); // 使用hadoop安全認證 System.setProperty("JAVA.security.krb5.conf", kbr5FilePath); System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); // 解決windows中執行可能出現找不到HADOOP_HOME或hadoop.home.dir問題 // Kerberos認證 org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); conf.set("hadoop.security.authentication", "Kerberos"); conf.set("keytab.file", secretFilePath); conf.set("kerberos.principal", principal); UserGroupInformation.setConfiguration(conf); try { UserGroupInformation.loginUserFromKeytab(username, secretFilePath); } catch (IOException e) { e.printStackTrace(); throw new BizException(ResultCode.KERBEROS_AUTH_FAIL_ERR); } try { connection = DriverManager.getConnection(jdbcUrl, dbProperties); } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.KERBEROS_AUTH_SUCCESS_GET_CONN_FAIL_ERR); } } log.info("=====>>>獲取hive連接成功:username:{},jdbcUrl: {}", username, jdbcUrl); return connection; } catch (SQLException e) { e.printStackTrace(); throw new BizException(ResultCode.HIVE_CONN_USER_PWD_ERR); } catch (BizException e){ throw e; } catch (Exception e) { e.printStackTrace(); throw new BizException(ResultCode.HIVE_CONN_ERR); } }}
Oracle JDBC連接參數封裝
只需要繼承父類即可
/** * @Description Oracle連接的JDBC參數 * @Author itdl * @Date 2022/08/15 09:50 */public class OracleJdbcConnParam extends BaseJdbcConnParam{ }
Oracle JDBC連接實現類
包括了普通用戶的認證和dba用戶的認證
/** * @Description Oracle獲取jdbc連接工具類 * @Author itdl * @Date 2022/08/15 09:52 */@Slf4jpublic class OracleConnUtil extends AbstractConnUtil<OracleJdbcConnParam> { /** * 構造函數, 構造工具類對象 * * @param connParam 連接參數 */ public OracleConnUtil(OracleJdbcConnParam connParam) { super(connParam); } @Override protected Connection buildConnection() { try { Class.forName("oracle.jdbc.driver.OracleDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.ORACLE_DRIVE_LOAD_ERR); } // 拼接jdbcUrl String jdbcUrl = "jdbc:oracle:thin:@//%s:%s/%s"; final String ip = connParam.getIp(); final String port = connParam.getPort() + ""; final String dbName = connParam.getDbName(); final String username = connParam.getUsername(); final String password = connParam.getPassword(); // 格式化 jdbcUrl = String.format(jdbcUrl, ip, port, dbName); // 獲取連接 Connection connection; try { Properties dbProperties = new Properties(); // 用戶名 如果是dba,則后面跟了as sysdba String dba = "as sysdba"; dbProperties.put("password", password); dbProperties.put("remarks", "true"); if (username.trim().endsWith(dba)) { dbProperties.put("user", username.trim().substring(0, username.trim().indexOf(dba) - 1)); dbProperties.put("defaultRowPrefetch", "15"); dbProperties.put("internal_logon", "sysdba"); connection = DriverManager.getConnection(jdbcUrl, dbProperties); } else { dbProperties.put("user", username); connection = DriverManager.getConnection(jdbcUrl, dbProperties); } log.info("=====>>>獲取oracle連接成功:username:{}, jdbcUrl: {}", username, jdbcUrl); return connection; } catch (SQLException e) { e.printStackTrace(); if (e.getMessage().contains("TNS:listener")) { throw new BizException(ResultCode.CONN_LISTENER_UNKNOWN_ERR); } if (e.getMessage().contains("ORA-01017")) { throw new BizException(ResultCode.CONN_USER_PWD_ERR); } if (e.getMessage().contains("IO 錯誤: Got minus one from a read call")) { throw new BizException(ResultCode.CONN_CONN_TOO_MANY_ERR); } throw new BizException(ResultCode.CONN_UNKNOWN_ERR); } catch (Exception e) { throw new BizException(ResultCode.CONN_UNKNOWN_ERR); } }}
Mysql JDBC連接池封裝
Mysql JDBC連接參數封裝
只需要繼承父類即可
/** * @Description Mysql連接的JDBC參數 * @Author itdl * @Date 2022/08/15 09:50 */public class MysqlJdbcConnParam extends BaseJdbcConnParam{ }
Mysql JDBC連接實現
需要注意的是連接的屬性里面配置useInformationSchema=true,表示可以直接從Connection中獲取表和字段的注釋。
/** * @Description Mysql獲取jdbc連接工具類 * @Author itdl * @Date 2022/08/15 09:52 */@Slf4jpublic class MysqlConnUtil extends AbstractConnUtil<MysqlJdbcConnParam> { /** * 構造函數, 構造工具類對象 * * @param connParam 連接參數 */ public MysqlConnUtil(MysqlJdbcConnParam connParam) { super(connParam); } @Override protected Connection buildConnection() { try { Class.forName("com.mysql.cj.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BizException(ResultCode.MYSQL_DRIVE_LOAD_ERR); } // 拼接jdbcUrl String jdbcUrl = "jdbc:mysql://%s:%s/%s?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"; final String ip = connParam.getIp(); final String port = connParam.getPort() + ""; final String dbName = connParam.getDbName(); final String username = connParam.getUsername(); final String password = connParam.getPassword(); // 格式化 jdbcUrl = String.format(jdbcUrl, ip, port, dbName); // 獲取連接 try { Properties dbProperties = new Properties(); dbProperties.put("user", username); dbProperties.put("password", password); dbProperties.put("remarks", "true"); // 設置可以獲取tables remarks信息 dbProperties.setProperty("useInformationSchema", "true"); Connection connection = DriverManager.getConnection(jdbcUrl,dbProperties); log.info("=====>>>獲取mysql連接成功:username:{}, jdbcUrl: {}", username, jdbcUrl); return connection; } catch (SQLException e) { e.printStackTrace(); if (e.getMessage().contains("Unknown database")){ throw new BizException(ResultCode.CONN_UNKNOWN_DB_ERR); } throw new BizException(ResultCode.CONN_USER_PWD_ERR); } catch (Exception e) { throw new BizException(ResultCode.CONN_UNKNOWN_ERR); } }}
測試代碼連接各自數據庫
@SpringBootTest(classes = DbConnectionDemoApplication.class)@RunWith(value = SpringRunner.class)@Slf4jclass DbConnectionDemoApplicationTests { private DbConnPool<?> connPool = null; @Test public void testMysqlConn() throws InterruptedException { // 創建連接參數 final MysqlJdbcConnParam connParam = new MysqlJdbcConnParam(); final String ip = "localhost"; final Integer port = 3306; final String username = "root"; final String password = "root"; final String dbname = "test_db"; // 設置參數 connParam.setDriverName(Driver.class.getName()); connParam.setIp(ip); connParam.setPort(port); connParam.setUsername(username); connParam.setPassword(password); connParam.setDbName(dbname); // 創建連接池 connPool = new DbConnPool<>(connParam, 2); handler01(dbname, DbDialectEnum.MYSQL); new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start(); new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start(); Thread.sleep(60 * 1000); } @Test public void testOracleConn() throws InterruptedException { // 創建連接參數 final OracleJdbcConnParam connParam = new OracleJdbcConnParam(); final String ip = "你的Oracle的IP地址"; final Integer port = 1521; // 如果是admin賬號 用戶后面+ as sysdba final String username = "用戶名"; final String password = "密碼"; final String dbname = "實例/服務名"; // 設置參數 connParam.setDriverName(Driver.class.getName()); connParam.setIp(ip); connParam.setPort(port); connParam.setUsername(username); connParam.setPassword(password); connParam.setDbName(dbname); // 創建連接池 connPool = new DbConnPool<>(connParam, 2); final DbDialectEnum dbDialectEnum = DbDialectEnum.ORACLE; // 處理操作(oracle的schemaName就是用戶名) handler01(username, dbDialectEnum); // 新建兩個線程獲取連接 new Thread(() -> handler01(username, dbDialectEnum)).start(); new Thread(() -> handler01(username, dbDialectEnum)).start(); Thread.sleep(60 * 1000); } @Test public void testHiveConn() throws InterruptedException { // 創建連接參數 final HiveJdbcConnParam connParam = new HiveJdbcConnParam(); final String ip = "連接的域名"; final Integer port = 10000; // 如果是admin賬號 用戶后面+ as sysdba final String username = "賬號@域名"; final String password = ""; final String dbname = "數據庫名"; final String principal = "hive/_HOST@域名"; final String kbr5FilePath = "C:\workspace\krb5.conf"; final String keytabFilePath = "C:\workspace\zhouyu.keytab"; // 設置參數 connParam.setDriverName(Driver.class.getName()); connParam.setIp(ip); connParam.setPort(port); connParam.setUsername(username); connParam.setPassword(password); connParam.setDbName(dbname); connParam.setEnableKerberos(true); connParam.setPrincipal(principal); connParam.setKbr5FilePath(kbr5FilePath); connParam.setKeytabFilePath(keytabFilePath); // 創建連接池 connPool = new DbConnPool<>(connParam, 2); final DbDialectEnum dbDialectEnum = DbDialectEnum.HIVE; // 處理操作(oracle的schemaName就是用戶名) handler01(username, dbDialectEnum); // 新建兩個線程獲取連接 new Thread(() -> handler01(username, dbDialectEnum)).start(); new Thread(() -> handler01(username, dbDialectEnum)).start(); Thread.sleep(10 * 60 * 1000); } @Test public void testMaxComputeConn() throws InterruptedException { // 創建連接參數 final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam(); String accessId = "你的阿里云accessId"; String accessKey = "你的阿里云accessKey"; String endpoint = "http://service.cn-chengdu.maxcompute.aliyun.com/api"; String projectName = "項目名=數據庫名"; // 設置參數 connParam.setDriverName(Driver.class.getName()); connParam.setAliyunAccessId(accessId); connParam.setAliyunAccessKey(accessKey); connParam.setEndpoint(endpoint); connParam.setProjectName(projectName); // 創建連接池 connPool = new DbConnPool<>(connParam, 2); final DbDialectEnum dbDialectEnum = DbDialectEnum.MAX_COMPUTE; // 處理操作(oracle的schemaName就是用戶名) handler01(projectName, dbDialectEnum); // 新建兩個線程獲取連接 new Thread(() -> handler01(projectName, dbDialectEnum)).start(); new Thread(() -> handler01(projectName, dbDialectEnum)).start(); Thread.sleep(60 * 1000); } private void handler01(String schemaName, DbDialectEnum dbDialectEnum) { final Connection connection = connPool.getConnection(); // 構建工具類 final SqlUtil sqlUtil = new SqlUtil(connection, dbDialectEnum.getCode()); // 獲取表和注釋 final List<TableMetaInfo> tables = sqlUtil.getTables(schemaName); log.info("===============獲取所有表和注釋開始==================="); log.info(tables.toString()); log.info("===============獲取所有表和注釋結束==================="); // 獲取字段和注釋 final String tableName = tables.get(0).getTableName(); final List<TableColumnMetaInfo> columns = sqlUtil.getColumnsByTableName(tableName); log.info("===============獲取第一個表的字段和注釋開始==================="); log.info(columns.toString()); log.info("===============獲取第一個表的字段和注釋結束==================="); final PageResult<LinkedHashMap<String, Object>> pageResult = sqlUtil.pageQueryMap("select * from " + tableName, 1, 10); log.info("===============SQL分頁查詢開始==================="); log.info("總數:{}", pageResult.getTotal()); log.info("記錄數:{}", JSONObject.toJSONString(pageResult.getRows())); log.info("===============SQL分頁查詢結束==================="); connPool.freeConnection(connection); } @After public void close(){ if (connPool != null){ connPool.close(); log.info("==================連接池成功關閉================"); } }}
小結
就是為了方便整合第三方數據源做數據源管理時比較重要(若有所需,私信“封裝數據源源碼”獲取源碼)。