国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

jdbc操作 數據庫做同步,全量+增量,線程控制,批處理

2019-11-08 20:59:32
字體:
來源:轉載
供稿:網友

此次更新在前邊文章基礎上做了優化 http://blog.csdn.net/gooooa/article/details/54615455


頁面展示: 這里寫圖片描述

表現層代碼:

package com.zntz.web.admin.controller.jdbc;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.util.Date;import java.util.List;import java.util.Map;import org.sPRingframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.ResponseBody;import com.zntz.web.admin.model.base.ResultModel;import com.zntz.web.admin.model.vo.Operate.SyncDataVo;import com.zntz.web.admin.service.operate.SyncDataService;import com.zntz.web.admin.shiro.LoginUserUtils;import com.zntz.web.admin.utils.common.page.Pagination;import com.zntz.web.admin.utils.tuple.BeanTransMap;@Controller@RequestMapping("/MySQL/syn")public class JdbcController { @Autowired SyncDataService syncDataService; /** * 進入列表頁面 * @return */ @RequestMapping(value = "/list", method = RequestMethod.GET) public String list() { return "mysql/syn/syn_list"; } @ResponseBody @RequestMapping(value = "/list", method = RequestMethod.POST) public Pagination<Map<String, Object>> list(Map<String, Object> modelMap, SyncDataVo vo) { Pagination<Map<String, Object>> pager = new Pagination<Map<String, Object>>(); pager.setStartIndex(vo.getStart()); pager.setPageSize(vo.getLength()); Map<String, Object> map = BeanTransMap.transBean2Map(vo); pager.setCondition(map); pager = syncDataService.getSyncDataVoListByConditions(pager); return pager; } /**新建 * @param rolevo * @param errors * @return */ @RequestMapping(value = "/add", method = RequestMethod.GET) public String create(Map<String, Object> modelMap) { return "mysql/syn/syn_create"; } /**保存 * @param rolevo * @return */ @ResponseBody @RequestMapping(value = "/save", method = RequestMethod.POST) public ResultModel save(SyncDataVo syncDataVo) { ResultModel result = new ResultModel(); try{ if (syncDataVo.getId() == null) { syncDataVo.setIsAll(true); syncDataVo.setIncDate(new Date()); syncDataVo.setIsTimer(false); syncDataVo.setTimerContent(null); syncDataVo.setCreateTime(new Date()); syncDataVo.setModifyTime(new Date()); syncDataVo.setCreateUser(LoginUserUtils.getLoginUser().getUserCode()); syncDataVo.setModifyUser(LoginUserUtils.getLoginUser().getUserCode()); syncDataService.addSyncDataVo(syncDataVo); } else { syncDataVo.setModifyTime(new Date()); syncDataVo.setModifyUser(LoginUserUtils.getLoginUser().getUserCode()); syncDataService.updateSyncDataVo(syncDataVo); } }catch(Exception e){ result.setCode("1"); } return result; } /** 修改 * @param id * @param modelMap * @return */ @RequestMapping(value = "/edit/{id}", method = RequestMethod.GET) public String edit(@PathVariable Long id, Map<String, Object> modelMap) { SyncDataVo vo = syncDataService.getSyncDataVoById(id); modelMap.put("syn", vo); return "mysql/syn/syn_edit"; } /**刪除 * @param allroleids * @return */ @ResponseBody @RequestMapping(value = "/delItem", method = RequestMethod.POST) public ResultModel delItem(Long id) { ResultModel result = new ResultModel(); Integer delresult = syncDataService.deleteSyncDataVo(id); if (delresult == null || delresult < 1) result.setSuccess(false); return result; } @ResponseBody @RequestMapping("/syncData") public ResultModel syncData(Long id){ ResultModel model=new ResultModel(); SyncDataVo syncData = syncDataService.getSyncDataVoById(id); try { String dbsource=syncData.getDbSource(); String dbdestination=syncData.getDbDestination(); String source="jdbc:mysql://localhost:3306/"+dbsource+"?user=root&passWord=123456&useUnicode=true&characterEncoding=UTF8"; String destination="jdbc:mysql://localhost:3306/"+dbdestination+"?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; SynchronizationController.init(source, destination,syncData.getIsAll(),syncData.getIncDate()); } catch (Exception e) { model.setMessage("系統異常"); model.setData(e); model.setCode("1"); } return model; } @ResponseBody @RequestMapping("/testConnection") public ResultModel testConnection(String url){ ResultModel model=new ResultModel(); String url_source=url; try { Connection con=DriverManager.getConnection(url_source); con.close(); } catch (SQLException e) { model.setCode("1"); model.setMessage("數據庫連接異常,請確認連接是否正確"); } return model; }}

數據庫同步的業務處理:

package com.zntz.web.admin.controller.jdbc;import java.sql.Connection;import java.sql.DatabaseMetaData;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.sql.Statement;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Set;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.zntz.web.admin.model.constant.Constant;public class SynchronizationController{ //數據源庫// static String url_source="jdbc:mysql://localhost:3306/zntz?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; //目標庫// static String url_destination="jdbc:mysql://localhost:3306/xx02?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; //同步源// static String url_source="";//"jdbc:mysql://localhost:3306/zntz?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; //目標庫// static String url_destination="";//"jdbc:mysql://localhost:3306/oil?user=root&password=123456&useUnicode=true&characterEncoding=UTF8"; private static Logger logger=LoggerFactory.getLogger(SynchronizationController.class); private static Connection conn_source = null; private static Connection conn_destination = null; private static String url_source = ""; private static String url_destination = ""; static{ try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } /** * * @param url_source 源數據庫連接 * @param url_destination 目標數據庫連接 * @param is_all 是否全量更新 * @throws InterruptedException * @throws SQLException */ public static void init(String source,String destination,boolean is_all,Date date) throws InterruptedException, SQLException{ url_source= source; url_destination=destination; try { conn_source = DriverManager.getConnection(url_source); conn_destination= DriverManager.getConnection(url_destination); synchronizationTables(); addData(is_all,date); logger.info("*******本次導入結束********"); } catch (SQLException e) { e.printStackTrace(); } finally { while(true){ if(InsertThread.getThreadCounts()>0){ Thread.sleep(1000); }else{ break; } } conn_source.close(); conn_destination.close(); } } //本地獲取表名獲取表名 public static Set<String> getTableName() { Set<String> set = new HashSet<String>(); try { DatabaseMetaData meta = conn_source.getMetaData(); ResultSet rs = meta.getTables(null, null, null,new String[] { "TABLE" }); while (rs.next()) { set.add(rs.getString("TABLE_NAME"));// String s = rs.getString("TABLE_NAME");// String type = rs.getString("TABLE_TYPE");// System.out.println(s+"======"+type);// getTableDDL(rs.getString("TABLE_NAME"), con); } } catch (Exception e) { logger.error(""+e); } return set; } //目標數據庫 public static Map<String,String> getTableNameToMap() { Map<String,String> map=new HashMap<String,String>(); try { DatabaseMetaData meta = conn_destination.getMetaData(); ResultSet rs = meta.getTables(null, null, null,new String[] { "TABLE" }); while (rs.next()) { map.put(rs.getString("TABLE_NAME"),"1"); } } catch (Exception e) { logger.error(""+e); } return map; } //創建表 public static void createTable(String sql_ddl) throws SQLException { Statement stmt = conn_destination.createStatement(); int result = stmt.executeUpdate(sql_ddl);// executeUpdate語句會返回一個受影響的行數,如果返回-1就沒有成功 if (result != -1) { logger.info("表創建成功"); }else{ logger.error("表創建失敗:"+sql_ddl); } } //創建sql public static String getTableField(String tableName) throws SQLException{ String sql = "select * from "+tableName +" limit 1,2"; Statement state = conn_source.createStatement(); ResultSet rs = state.executeQuery(sql); ResultSetMetaData rsd = rs.getMetaData() ; StringBuffer sql_model=new StringBuffer("insert into "+ tableName +" ("); StringBuffer sql_param=new StringBuffer(" VALUES("); for(int i = 1; i <= rsd.getColumnCount(); i++) { sql_model.append(rsd.getColumnName(i)); sql_param.append("?"); if (i < rsd.getColumnCount()) { sql_model.append(","); sql_param.append(","); } } sql_model.append(") ");sql_param.append(") "); logger.info(sql_model.toString()+sql_param.toString()); return sql_model.toString()+sql_param.toString(); } //創建增量同步的sql public static String getTableField2(String tableName,Date date) throws SQLException, Exception{ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateString = formatter.format(date); String sql = "select * from "+tableName +" where modify_time > '"+dateString+"'"; System.out.println("----------------------------"+sql); Statement state = conn_source.createStatement(); ResultSet rs = state.executeQuery(sql); ResultSetMetaData rsd = rs.getMetaData() ; StringBuffer sql_model=new StringBuffer("insert into "+ tableName +" ("); StringBuffer sql_param=new StringBuffer(" VALUES("); for(int i = 1; i <= rsd.getColumnCount(); i++) { sql_model.append(rsd.getColumnName(i)); sql_param.append("?"); if (i < rsd.getColumnCount()) { sql_model.append(","); sql_param.append(","); } } sql_model.append(") ");sql_param.append(") "); logger.info(sql_model.toString()+sql_param.toString()); return sql_model.toString()+sql_param.toString(); } //獲取表結構ddl public static String getTableDDL(String tableName) throws SQLException{ ResultSet rs = null; PreparedStatement ps = null; ps = conn_source.prepareStatement("show create table "+tableName); rs = ps.executeQuery(); StringBuffer ddl=new StringBuffer(); while (rs.next()) { ddl.append(rs.getString(rs.getMetaData().getColumnName(2))); } return ddl.toString(); } /** * 檢查本地庫所有表在B庫里是否存在,是否一致 * A本地庫 B目標庫 */ public static void synchronizationTables(){ Set<String> a_set=getTableName(); Map<String,String> b_map=getTableNameToMap(); Iterator<String> it=a_set.iterator(); while(it.hasNext()){ String n=it.next(); if(b_map.get(n)==null){ logger.info("表名:"+n+" 不在目標庫里"); String create_table_ddl=""; try { create_table_ddl = getTableDDL(n); createTable(create_table_ddl); } catch (SQLException e) { logger.error(create_table_ddl+"--------"+e); } }else { clearTableData(n); } } } //清除表數據 public static boolean clearTableData(String tableName){ try { Statement stmt = conn_destination.createStatement(); String sql = "TRUNCATE TABLE "+tableName; stmt.executeUpdate(sql); logger.info(tableName+":表數據已被清空"); } catch (SQLException e) { logger.error("異常表:"+tableName+"----數據清空失敗"); logger.error(""+e); return false; } return true; } /** * * @param conn_source * @param conn_destination * @param is_all 是否全量 * @throws SQLException * @throws InterruptedException */ public static void addData(boolean is_all,Date date) throws SQLException{ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateString = formatter.format(date); Statement stmt_source = conn_source.createStatement(); Set<String> tableNameSet=getTableName(); Iterator<String> it = tableNameSet.iterator(); //遍歷表 while (it.hasNext()) { long start = System.currentTimeMillis(); String str = it.next(); if(is_all){ try{ while(true){ if(InsertThread.getThreadCounts()>0){ Thread.sleep(1000); }else{ break; } } String sql_insert=getTableField(str); //獲取總條數 分頁查詢 long count_start=System.currentTimeMillis(); logger.info("-------------------------------------------------------------------"); String sql_count="select count(*) from "+ str; ResultSet rs = stmt_source.executeQuery(sql_count); rs.next(); int totalCount=rs.getInt(1); long count_end=System.currentTimeMillis(); logger.info("查詢記錄數耗時:"+(count_end-count_start)/1000); if(totalCount>500000) logger.info("xxxxxxxxxxxxxxx start implement table:"+str+",共"+totalCount+"條 xxxxxxxxxxxxxxxxxxxxxxxxxxx"); if(totalCount> Constant.pageSize){ int max=totalCount%Constant.pageSize==0 ? totalCount/Constant.pageSize : totalCount/Constant.pageSize+1; for(int i=0;i<max;i++){ synchronized (InsertThread.class) { String sql_data="select * from "+str+" limit "+ i*Constant.pageSize + " , "+Constant.pageSize; System.out.println("==================="+sql_data); int tCount = InsertThread.getThreadCounts(); while (tCount >= Constant.max_thread_size) { logger.info("系統當前線程數為:" + tCount+ ",已達到最大線程數 "+Constant.max_thread_size+",請等待其他線程執行完畢并釋放系統資源"); InsertThread.class.wait(); tCount = InsertThread.getThreadCounts(); } // 重新啟動一個子線程 Thread td = new InsertThread(sql_data, sql_insert, url_destination, url_source); td.start(); logger.info("已創建新的子線程: " + td.getName()); } } }else{ String sql_data="select * from "+str; System.out.println("=================="+sql_data); Thread td = new InsertThread(sql_data, sql_insert, url_destination, url_source); td.start(); logger.info("已創建新的子線程: " + td.getName()); } long end = System.currentTimeMillis(); logger.warn("表"+str+" ===================<<<全量>>>數據導入完成,耗時:"+(end-start)/1000+"秒,"+(end-start)/60000+"分鐘 ========================="); }catch(Exception e){ logger.error(e+""); } }else { try{ while(true){ if(InsertThread.getThreadCounts()>0){ Thread.sleep(1000); }else{ break; } } String sql_insert=getTableField(str); //獲取總條數 分頁查詢 long count_start=System.currentTimeMillis(); logger.info("-------------------------------------------------------------------"); String sql_count="select count(*) from "+ str; ResultSet rs = stmt_source.executeQuery(sql_count); rs.next(); int totalCount=rs.getInt(1); long count_end=System.currentTimeMillis(); logger.info("查詢記錄數耗時:"+(count_end-count_start)/1000); if(totalCount>500000) logger.info("xxxxxxxxxxxxxxxxx start implement table:"+str+",共"+totalCount+"條 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"); if(totalCount> Constant.pageSize){ int max=totalCount%Constant.pageSize==0 ? totalCount/Constant.pageSize : totalCount/Constant.pageSize+1; for(int i=0;i<max;i++){ synchronized (InsertThread.class) { String sql_data="select * from "+str+" where modify_time > '"+dateString+"' "+" limit "+ i*Constant.pageSize + " , "+Constant.pageSize; System.out.println("==================="+sql_data); int tCount = InsertThread.getThreadCounts(); while (tCount >= Constant.max_thread_size) { logger.info("系統當前線程數為:" + tCount+ ",已達到最大線程數 "+Constant.max_thread_size+",請等待其他線程執行完畢并釋放系統資源"); InsertThread.class.wait(); tCount = InsertThread.getThreadCounts(); } // 重新啟動一個子線程 Thread td = new InsertThread(sql_data, sql_insert, url_destination, url_source); td.start(); logger.info("已創建新的子線程: " + td.getName()); } } }else{ String sql_data="select * from "+str+" where modify_time > '"+dateString+"'"; System.out.println("=================="+sql_data); Thread td = new InsertThread(sql_data, sql_insert, url_destination, url_source); td.start(); logger.info("已創建新的子線程: " + td.getName()); } long end = System.currentTimeMillis(); logger.warn("表"+str+" ===================<<<增量>>>數據導入完成,耗時:"+(end-start)/1000+"秒,"+(end-start)/60000+"分鐘 ========================="); }catch(Exception e){ logger.error(e+""); } } } }}

線程處理:

package com.zntz.web.admin.controller.jdbc;import java.sql.BatchUpdateException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.sql.Statement;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.zntz.web.admin.model.constant.Constant;public class InsertThread extends Thread { private Logger logger=LoggerFactory.getLogger(InsertThread.class); private String sql_data; private String sql_insert; private String url_destination; private String url_source; private Connection conA; private Connection conB; // 線程計數器 static private int threadCounts; // 線程名稱池 static private String threadNames[]; static { // 假設這里允許系統同時運行最大線程數為10個 threadNames = new String[Constant.max_thread_size]; // 初始化線程名稱池 for (int i = 1; i <= Constant.max_thread_size; i++) { threadNames[i - 1] = "子線程_" + i; } } public InsertThread() { // 臨界資源鎖定 synchronized (InsertThread.class) { // 線程總數加1 threadCounts++; // 從線程名稱池中取出一個未使用的線程名 for (int i = 0; i < threadNames.length; i++) { if (threadNames[i] != null) { String temp = threadNames[i]; // 名被占用后清空 threadNames[i] = null; // 初始化線程名稱 this.setName(temp); break; } } } } public void run() { try { conA=DriverManager.getConnection(url_source); conB= DriverManager.getConnection(url_destination); conB.setAutoCommit(false); Long start = System.currentTimeMillis(); Statement stmt_source = conA.createStatement(); ResultSet rs_sql_data = stmt_source.executeQuery(sql_data); ResultSetMetaData rsmd = rs_sql_data.getMetaData(); PreparedStatement ps = conB.prepareStatement(sql_insert); int columnCount=rsmd.getColumnCount(); int count=1; while (rs_sql_data.next()) { count++; for(int k=1;k<=columnCount;k++){ ps.setString(k, rs_sql_data.getString(k)); } ps.addBatch(); if(count % Constant.batchSize == 0) { myBatchUpdate(ps); } } myBatchUpdate(ps); Long end = System.currentTimeMillis(); logger.info(this.getName()+",耗時:"+(end-start)/1000 + "秒"); stmt_source.close(); rs_sql_data.close(); ps.close(); } catch (Exception e) { logger.error(""+e); } finally { synchronized (InsertThread.class) { // 釋放線程名稱 String[] threadName = this.getName().split("_"); // 線程名使用完后放入名稱池 threadNames[Integer.parseInt(threadName[1]) - 1] = this.getName(); // 線程運行完畢后減1 threadCounts--; /* * 通知其他被阻塞的線程,但如果其他線程要執行,則該同步塊一定要運行結束(即直 * 到釋放占的鎖),其他線程才有機會執行,所以這里的只是喚醒在此對象監視器上等待 * 的所有線程,讓他們從等待池中進入對象鎖池隊列中,而這些線程重新運行時它們一定 * 要先要得該鎖后才可能執行,這里的notifyAll是不會釋放鎖的,試著把下面的睡眠語 * 句注釋去掉,即使你已調用了notify方法,發現CreateThread中的同步塊還是好 * 像一直處于對象等待狀態,其實調用notify方法后,CreateThread線程進入了對象鎖 * 池隊列中了,只要它一獲取到鎖,CreateThread所在線程就會真真的被喚醒并運行。 */ InsertThread.class.notifyAll(); logger.info("----" + this.getName() + " 所占用資源釋放完畢,當前系統正在運行的子線程數:"+ threadCounts); try { conA.close(); conB.close(); } catch (SQLException e) { logger.error("關閉連接異常"); } } } } static public int getThreadCounts() { synchronized (InsertThread.class) { return threadCounts; } } public InsertThread(String sql_data, String sql_insert, String url_destination, String url_source) { super(); this.sql_data = sql_data; this.sql_insert = sql_insert; this.url_destination = url_destination; this.url_source = url_source; // 臨界資源鎖定 synchronized (InsertThread.class) { // 線程總數加1 threadCounts++; // 從線程名稱池中取出一個未使用的線程名 for (int i = 0; i < threadNames.length; i++) { if (threadNames[i] != null) { String temp = threadNames[i]; // 名被占用后清空 threadNames[i] = null; // 初始化線程名稱 this.setName(temp); break; } } } } public void myBatchUpdate(PreparedStatement ps){ try { ps.executeBatch(); conB.commit(); }catch (BatchUpdateException e) { for (int i = 0; i < e.getUpdateCounts().length; i++) { if(e.getUpdateCounts()[i]<0){ logger.error(sql_insert+"*********"+e.getUpdateCounts()[i] + "*********" +e.getMessage()+ "*********"+e.getErrorCode()+ "*********"+e.getSQLState()); } } } catch (SQLException e) { logger.error(""+e); } }}
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 肇东市| 苗栗县| 延庆县| 集贤县| 民勤县| 济南市| 黄骅市| 鄂托克旗| 石城县| 靖州| 武清区| 临漳县| 会泽县| 绥芬河市| 广德县| 松潘县| 吉首市| 杂多县| 建瓯市| 南岸区| 武威市| 青川县| 锡林浩特市| 竹北市| 安塞县| 夏河县| 昌吉市| 株洲县| 泗水县| 黄浦区| 云南省| 外汇| 天长市| 谢通门县| 淮阳县| 高台县| 汉中市| 河源市| 青浦区| 望谟县| 观塘区|