package com.linapex.code; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; /** * 项目名称:TempCode <br><br> * * 类名称:Main <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午10:58:33 <br><br> * * 版本:1.0 <br><br> * * 功能描述: */ public class Main { final static int BSIZE = 1024 * 1024; final static int DATACACHENUM = 10000; static int currThreadCount = 0; static int maxThreadCount = 9; static File roomFilterLogFile = new File("roomFilter.log"); static File sqlFile = new File("roomSql.sql"); static File csvFile = new File("D:\\baiduyundownload\\如家汉庭等酒店2000W开房数据\\2000W\\1-200W.csv"); final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id ,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');"; /** * * 功能描述:初始化SQL写入 * <br><br> * @param sqlFile * @return * @throws Exception BufferedWriter * <br><br> * 版本:1.0 <br><br> * 创建人:LinApex@163.com <br><br> * 创建时间:2014-3-1 下午11:01:02 */ public static BufferedWriter initSQLWrite(File sqlFile) throws Exception { if (!sqlFile.exists()) { if (!sqlFile.createNewFile()) { System.err.println("创建文件失败,已存在:" + sqlFile.getAbsolutePath()); } } return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8")); } /** * * 功能描述:加载CSV * <br><br> * @param callBack * @throws Exception void * <br><br> * 版本:1.0 <br><br> * 创建人:LinApex@163.com <br><br> * 创建时间:2014-3-1 下午11:01:14 */ public static void loadCSV(CallBack3<Void> callBack) throws Exception { FileChannel inChannel = null; try { String enterStr = "\n"; inChannel = new FileInputStream(csvFile).getChannel(); ByteBuffer buffer = ByteBuffer.allocate(BSIZE); StringBuilder newlinesBui = new StringBuilder(); int num = 0; while (inChannel.read(buffer) != -1) { buffer.flip(); //数据组合. String content = new String(buffer.array()); newlinesBui.append(content).toString(); int fromIndex = 0; int endIndex = -1; //循环找到 \n while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1) { //得到一行 String line = newlinesBui.substring(fromIndex, endIndex); num++; callBack.call(num, line); fromIndex = endIndex + 1; } newlinesBui.delete(0, fromIndex); buffer.clear(); } } finally { if (inChannel != null) { inChannel.close(); } } } public static void main(String[] args) throws Exception { final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount); //线程池 final List<Future<String>> threadResultList = new ArrayList<Future<String>>(); //线程返回集合 final BufferedWriter bw = initSQLWrite(sqlFile); //主要的buffer对象. final WriteSqlHandle2 writeSqlFile = new WriteSqlHandle2(DATACACHENUM); StopWatch2 stopWatch = new StopWatch2(); //计时器 stopWatch.start(); loadCSV(new CallBack3<Void>() { @Override public Void call(int num, String str) { String[] strs = str.split(","); if (strs.length < 8) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return null; } try { String name = strs[0].trim(); String card = strs[4]; String gender = strs[5]; String birthday = strs[6]; String address = strs[7]; String zip = strs[8]; String mobile = strs[20]; String email = strs[22]; String version = strs[31]; //生成sql语句 final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version); //添加数据,如果超出了缓存数据,则 开始写入文件系统 if (writeSqlFile.add(tempSql)) { currThreadCount++; //如果提交的线程过多,则取回之后再提交. if (currThreadCount >= maxThreadCount) { // System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); for (Future<String> fs : threadResultList) { String tempSqlName = fs.get(); currThreadCount--; // System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount) + " 线程返回的值:" + tempSqlName); } threadResultList.clear(); //清空 currThreadCount = threadResultList.size(); // System.out.println(String.format("重新开始提交线程 当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); } Future<String> future = threadPool.submit(new TaskWithResult(writeSqlFile, bw)); threadResultList.add(future); // System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num)); } } catch (Exception e) { writeLog("录入错误的数据::0", Arrays.toString(strs)); writeLog("错误的原因::0", e.getMessage()); } return null; } }); writeSqlFile.flush(bw); //刷新缓存数据 threadPool.shutdown(); //关闭线程池 stopWatch.stop(); //停止计时 System.out.println(String.format("任务完成时间:%s ms", stopWatch.getTime())); } /** * * 功能描述:写日志 * <br><br> * @param str * @param values void * <br><br> * 版本:1.0 <br><br> * 创建人:LinApex@163.com <br><br> * 创建时间:2014-3-1 下午11:04:00 */ public static void writeLog(String str, Object... values) { //FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false); } /** * * 功能描述:非常高效率的模版方法 * <br><br> * @param strSource * @param values * @return String * <br><br> * 版本:1.0 <br><br> * 创建人:LinApex@163.com <br><br> * 创建时间:2014-3-1 下午11:04:09 */ public static String tm(String strSource, Object... values) { if (strSource == null) { return null; } StringBuilder builder = new StringBuilder(strSource); final String prefix = ":"; for (int index = 0; index < values.length; index++) { Object value = values[index]; if (value == null) { continue; } String key = new StringBuilder(prefix).append(index).toString(); int i = -1; if ((i = builder.indexOf(key, i)) > -1) { int len = key.length(); builder.replace(i, i + len, value.toString()); } } return builder.toString(); } } /** * * 项目名称:TempCode <br><br> * * 类名称:TaskWithResult <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午11:04:25 <br><br> * * 版本:1.0 <br><br> * * 功能描述:任务与结果 */ class TaskWithResult implements Callable<String> { WriteSqlHandle2 handle2; BufferedWriter bufferedWriter; public TaskWithResult(WriteSqlHandle2 handle2, BufferedWriter bufferedWriter) { this.handle2 = handle2; this.bufferedWriter = bufferedWriter; } @Override public String call() throws Exception { String fileName = Thread.currentThread().getName(); handle2.save(bufferedWriter); return fileName; } } /** * * 项目名称:TempCode <br><br> * * 类名称:WriteSqlHandle2 <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午11:05:37 <br><br> * * 版本:1.0 <br><br> * * 功能描述:写SQL处理器 */ class WriteSqlHandle2 { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); WriteLock writeLock = readWriteLock.writeLock(); List<String> cacheList; int currItemCount = 0; int dataCacheNum; public WriteSqlHandle2() { cacheList = new ArrayList<String>(); } public WriteSqlHandle2(int dataCacheNum) { this.dataCacheNum = dataCacheNum; cacheList = new ArrayList<String>(dataCacheNum); } public boolean isCacheExpires() { return currItemCount >= dataCacheNum; } public boolean add(String sqlStr) { try { writeLock.lock(); cacheList.add(sqlStr); currItemCount++; return isCacheExpires(); } finally { writeLock.unlock(); } } public void save(BufferedWriter bw) throws Exception { try { writeLock.lock(); //如果数据没有超出缓存.则返回. if (!isCacheExpires()) { return; } StopWatch2 stopWatch = new StopWatch2(); stopWatch.start(); // System.out.println(String.format("%s,准备消费 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); currItemCount--; } stopWatch.stop(); System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), stopWatch.getTime(), cacheList.size())); cacheList.clear(); //清空数据. } finally { writeLock.unlock(); } } public void flush(BufferedWriter bw) throws Exception { System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size())); cacheList.clear(); //清空数据 closeWrite(bw); } private void closeWrite(BufferedWriter bw) throws Exception { bw.flush(); bw.close(); } } /** * * 项目名称:TempCode <br><br> * * 类名称:StopWatch2 <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午11:06:01 <br><br> * * 版本:1.0 <br><br> * * 功能描述:类似Apache的计时器类 */ class StopWatch2 { long begin; long end; public void start() { begin = System.currentTimeMillis(); } public void stop() { end = System.currentTimeMillis(); } public long getTime() { return end - begin; } } /** * * 项目名称:TempCode <br><br> * * 类名称:CallBack3 <br><br> * * 创建人:LinApex@163.com <br><br> * * 创建时间:2014-3-1 下午11:06:19 <br><br> * * 版本:1.0 <br><br> * * 功能描述:回调接口,类似Hibernate中的 HibernateCallback<T> 这个类一样. */ interface CallBack3<T> { T call(int num, String str); }

lyn520 LV3
2023年2月21日
mylzdy LV12
2022年5月21日
2469095052 LV8
2021年12月30日
newhaijun LV15
2021年1月4日
Coincidance LV8
2020年12月3日
gao123qq LV21
2020年8月28日
wkc LV21
2020年7月26日
耳朵的巧克力 LV1
2020年5月19日
chalet LV1
2020年4月14日
yinyun1985 LV14
2020年4月8日