package com.linapex.code; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; /** * 项目名称:TempCode <br><br> * * 类名称:Main <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午10:58:33 <br><br> * * 版本:1.0 <br><br> * * 功能描述: */ public class Main { final static int BSIZE = 1024 * 1024; final static int DATACACHENUM = 10000; static int currThreadCount = 0; static int maxThreadCount = 9; static File roomFilterLogFile = new File("roomFilter.log"); static File sqlFile = new File("roomSql.sql"); static File csvFile = new File("D:\\baiduyundownload\\如家汉庭等酒店2000W开房数据\\2000W\\1-200W.csv"); final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id ,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');"; /** * * 功能描述:初始化SQL写入 * <br><br> * @param sqlFile * @return * @throws Exception BufferedWriter * <br><br> * 版本:1.0 <br><br> * 创建人:LinApex@163.com <br><br> * 创建时间:2014-3-1 下午11:01:02 */ public static BufferedWriter initSQLWrite(File sqlFile) throws Exception { if (!sqlFile.exists()) { if (!sqlFile.createNewFile()) { System.err.println("创建文件失败,已存在:" + sqlFile.getAbsolutePath()); } } return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8")); } /** * * 功能描述:加载CSV * <br><br> * @param callBack * @throws Exception void * <br><br> * 版本:1.0 <br><br> * 创建人:LinApex@163.com <br><br> * 创建时间:2014-3-1 下午11:01:14 */ public static void loadCSV(CallBack3<Void> callBack) throws Exception { FileChannel inChannel = null; try { String enterStr = "\n"; inChannel = new FileInputStream(csvFile).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(BSIZE); StringBuilder newlinesBui = new StringBuilder(); int num = 0; while (inChannel.read(buffer) != -1) { buffer.flip(); //数据组合. String content = new String(buffer.array()); newlinesBui.append(content).toString(); int fromIndex = 0; int endIndex = -1; //循环找到 \n while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1) { //得到一行 String line = newlinesBui.substring(fromIndex, endIndex); num++; callBack.call(num, line); fromIndex = endIndex + 1; } newlinesBui.delete(0, fromIndex); buffer.clear(); } } finally { if (inChannel != null) { inChannel.close(); } } } public static void main(String[] args) throws Exception { final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount); //线程池 final List<Future<String>> threadResultList = new ArrayList<Future<String>>(); //线程返回集合 final BufferedWriter bw = initSQLWrite(sqlFile); //主要的buffer对象. final WriteSqlHandle2 writeSqlFile = new WriteSqlHandle2(DATACACHENUM); StopWatch2 stopWatch = new StopWatch2(); //计时器 stopWatch.start(); loadCSV(new CallBack3<Void>() { @Override public Void call(int num, String str) { String[] strs = str.split(","); if (strs.length < 8) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return null; } try { String name = strs[0].trim(); String card = strs[4]; String gender = strs[5]; String birthday = strs[6]; String address = strs[7]; String zip = strs[8]; String mobile = strs[20]; String email = strs[22]; String version = strs[31]; //生成sql语句 final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version); //添加数据,如果超出了缓存数据,则 开始写入文件系统 if (writeSqlFile.add(tempSql)) { currThreadCount++; //如果提交的线程过多,则取回之后再提交. if (currThreadCount >= maxThreadCount) { // System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); for (Future<String> fs : threadResultList) { String tempSqlName = fs.get(); currThreadCount--; // System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount) + " 线程返回的值:" + tempSqlName); } threadResultList.clear(); //清空 currThreadCount = threadResultList.size(); // System.out.println(String.format("重新开始提交线程 当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); } Future<String> future = threadPool.submit(new TaskWithResult(writeSqlFile, bw)); threadResultList.add(future); // System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num)); } } catch (Exception e) { writeLog("录入错误的数据::0", Arrays.toString(strs)); writeLog("错误的原因::0", e.getMessage()); } return null; } }); writeSqlFile.flush(bw); //刷新缓存数据 threadPool.shutdown(); //关闭线程池 stopWatch.stop(); //停止计时 System.out.println(String.format("任务完成时间:%s ms", stopWatch.getTime())); } /** * * 功能描述:写日志 * <br><br> * @param str * @param values void * <br><br> * 版本:1.0 <br><br> * 创建人:LinApex@163.com <br><br> * 创建时间:2014-3-1 下午11:04:00 */ public static void writeLog(String str, Object... values) { //FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false); } /** * * 功能描述:非常高效率的模版方法 * <br><br> * @param strSource * @param values * @return String * <br><br> * 版本:1.0 <br><br> * 创建人:LinApex@163.com <br><br> * 创建时间:2014-3-1 下午11:04:09 */ public static String tm(String strSource, Object... values) { if (strSource == null) { return null; } StringBuilder builder = new StringBuilder(strSource); final String prefix = ":"; for (int index = 0; index < values.length; index++) { Object value = values[index]; if (value == null) { continue; } String key = new StringBuilder(prefix).append(index).toString(); int i = -1; if ((i = builder.indexOf(key, i)) > -1) { int len = key.length(); builder.replace(i, i + len, value.toString()); } } return builder.toString(); } } /** * * 项目名称:TempCode <br><br> * * 类名称:TaskWithResult <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午11:04:25 <br><br> * * 版本:1.0 <br><br> * * 功能描述:任务与结果 */ class TaskWithResult implements Callable<String> { WriteSqlHandle2 handle2; BufferedWriter bufferedWriter; public TaskWithResult(WriteSqlHandle2 handle2, BufferedWriter bufferedWriter) { this.handle2 = handle2; this.bufferedWriter = bufferedWriter; } @Override public String call() throws Exception { String fileName = Thread.currentThread().getName(); handle2.save(bufferedWriter); return fileName; } } /** * * 项目名称:TempCode <br><br> * * 类名称:WriteSqlHandle2 <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午11:05:37 <br><br> * * 版本:1.0 <br><br> * * 功能描述:写SQL处理器 */ class WriteSqlHandle2 { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); WriteLock writeLock = readWriteLock.writeLock(); List<String> cacheList; int currItemCount = 0; int dataCacheNum; public WriteSqlHandle2() { cacheList = new ArrayList<String>(); } public WriteSqlHandle2(int dataCacheNum) { this.dataCacheNum = dataCacheNum; cacheList = new ArrayList<String>(dataCacheNum); } public boolean isCacheExpires() { return currItemCount >= dataCacheNum; } public boolean add(String sqlStr) { try { writeLock.lock(); cacheList.add(sqlStr); currItemCount++; return isCacheExpires(); } finally { writeLock.unlock(); } } public void save(BufferedWriter bw) throws Exception { try { writeLock.lock(); //如果数据没有超出缓存.则返回. if (!isCacheExpires()) { return; } StopWatch2 stopWatch = new StopWatch2(); stopWatch.start(); // System.out.println(String.format("%s,准备消费 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); currItemCount--; } stopWatch.stop(); System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), stopWatch.getTime(), cacheList.size())); cacheList.clear(); //清空数据. } finally { writeLock.unlock(); } } public void flush(BufferedWriter bw) throws Exception { System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size())); cacheList.clear(); //清空数据 closeWrite(bw); } private void closeWrite(BufferedWriter bw) throws Exception { bw.flush(); bw.close(); } } /** * * 项目名称:TempCode <br><br> * * 类名称:StopWatch2 <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午11:06:01 <br><br> * * 版本:1.0 <br><br> * * 功能描述:类似Apache的计时器类 */ class StopWatch2 { long begin; long end; public void start() { begin = System.currentTimeMillis(); } public void stop() { end = System.currentTimeMillis(); } public long getTime() { return end - begin; } } /** * * 项目名称:TempCode <br><br> * * 类名称:CallBack3 <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午11:06:19 <br><br> * * 版本:1.0 <br><br> * * 功能描述:回调接口,类似Hibernate中的 HibernateCallback<T> 这个类一样. */ interface CallBack3<T> { T call(int num, String str); }
最近下载更多
lyn520 LV3
2023年2月21日
mylzdy LV12
2022年5月21日
2469095052 LV8
2021年12月30日
newhaijun LV15
2021年1月4日
Coincidance LV8
2020年12月3日
gao123qq LV21
2020年8月28日
wkc LV21
2020年7月26日
耳朵的巧克力 LV1
2020年5月19日
chalet LV1
2020年4月14日
yinyun1985 LV14
2020年4月8日