【daily】文件分割限速下載,及合並分割文件
說明
主要功能:
1) 分割文件, 生成下載任務;
2) 定時任務: 檢索需要下載的任務, 利用多線程下載
並限制下載速度
;
3) 定時任務: 檢索可合並的文件, 把n個文件合並為完整的文件.
GitHub: https://github.com/vergilyn/SpringBootDemo
代碼結構:
一、獲取遠程資源ContentLength、FileName
本來以為很容易, 但如果想較好的得到contentLength、fileName其實很麻煩,主要要看download-url是怎麽樣的. 大致有3種:
1) download-url: www.xxx.com/xxxx.exe,這種是最簡單的.直接通過HttpURLConnection.getContentLength()就可以獲取到, FileName則直接解析download-url(或從Content-Disposition中解析得到fileName).
1)
一樣, 只是無法直接解析download-url得到fileName, 只能從Content-Disposition中解析得到fileName.3) download-url跟
2)
類似, 但會"重定向"或"響應"一個真實下載地址, 那麽就需要具體分析.二、分割下載文件
原意: 把一個大文件分割成n個小文件, 分別下載這n個小文件. 盡可能減少需要重新下載的大小. 其實就是想要"斷點下載"(或稱"斷點續傳");
直接往完整文件file.exe.tmp寫,每次啟動下載的時候讀取這個file.exe.tmp的size,請求下載的Range就是bytes={size}-{contentLength}.
代碼說明: 生成n個下載任務, 保存每個下載任務的
Range: bytes={beginOffse}-{endOffset}
private void createSplitFile(CompleteFileBean fileBean){
String key = ConstantUtils.keyBlock (fileBean.getId());
String fileId = fileBean.getId();
String fileName = fileBean.getFileName();
String url = fileBean.getDownloadUrl();
long contentLength = fileBean.getContentLength();
BlockFileBean block;
List<String> blocks = new ArrayList<>();
if(contentLength <= ConstantUtils.UNIT_SIZE){
block = new BlockFileBean(fileId, getBlockName(fileName, 1), url, 0, contentLength );
blocks.add(JSON.toJSONString(block));
}else{
long begin = 0;
int index = 1;
while(begin < contentLength){
long end = begin + ConstantUtils.UNIT_SIZE <= contentLength ? begin + ConstantUtils.UNIT_SIZE : contentLength;
block = new BlockFileBean(fileId, getBlockName(fileName, index++), url, begin, end );
blocks.add(JSON.toJSONString(block));
begin += ConstantUtils.UNIT_SIZE;
}
}
if(blocks.size() > 0){
// 模擬保存數據庫: 生成每個小塊的下載任務, 待定時器讀取任務下載
redisTemplate.opsForList().rightPushAll(key, blocks);
// 保存需要執行下載的任務, 實際應用中是通過sql得到.
redisTemplate.opsForList().rightPushAll(ConstantUtils.keyDownloadTask(), key);
}
}
三、多線程下載
線程池、線程的知識請自行baidu/google;(我也不是很了解啊 >.<!)
實際中我只特別去了解了下:ArrayBlockingQueue
、CallerRunsPolicy
, 根據我的理解(不一定對): 只有CallerRunsPolicy
比較適用, 但當ArrayBlockingQueue
等待隊列達到滿值時並且有新任務A-TASK
進來時,CallerRunsPolicy
會強制中斷當前主線程去執行這個新任務A-TASK
, 見:https://www.cnblogs.com/lic309/p/4564507.html.
這是否意味著我可能有"某塊"下到一半被強制中斷了?雖然這下載任務並未被標記成已下載完
, 但如果有大量這種中斷操作, 意味著會重新去下載這部分數據.(這也反映出另外中"斷點下載"思路可能更好)
所以, 實際中我把任務等待隊列設置成一定比總任務數大. 因為實際中我每天只執行一次下載定時任務, 每次只下載700個小塊(即700條下載任務), 所以ArrayBlockingQueue我設置的800. 並且我沒有保留核心線程
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
6,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(100),
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.allowCoreThreadTimeOut(true);
分塊下載, 只需用到Http請求的Range: bytes={beginOffse}-{endOffset}
.
至於哪種"下載"寫法更好, 並未有太多的深究, 所以不知道具體那種"下載"的寫法會更好, 但看到很多都是RandomAccessFile實現的:
@Override
public void run() {
byte[] buffer = new byte[1024]; // 緩沖區大小
long totalSize = block.getEndOffset() - block.getBeginOffset();
long begin = System.currentTimeMillis();
InputStream is = null;
RandomAccessFile os = null;
try {
URLConnection conn = new URL(block.getDownloadUrl()).openConnection();
// -1: 因為bytes=0-499, 表示contentLength=500.
conn.setRequestProperty(HttpHeaders.RANGE, "bytes=" + block.getBeginOffset() + "-" + (block.getEndOffset() - 1));
conn.setDoOutput(true);
is = conn.getInputStream();
File file = new File(tempPath + File.separator + block.getBlockFileName());
os = new RandomAccessFile(file, "rw");
int len;
while((len = is.read(buffer)) != -1) {
os.write(buffer, 0, len);
}
os.close();
} catch (IOException e) {
e.printStackTrace();
System.out.println(block.getBlockFileName() + " download error: " + e.getMessage());
return; // 註意要return
} finally {
IOUtils.closeQuietly(is);
IOUtils.closeQuietly(os);
}
long end = System.currentTimeMillis() ;
// 簡單計算下載速度, 我把連接時間也算在內了
double speed = totalSize / 1024D / (end - begin + 1) * 1000D; // +1: 避免0
System.out.println(block.getBlockFileName() + " aver-speed: " + speed + " kb/s");
// FIXME: 實際中需要更新表BlockFileBean的信息, 標記分塊已下載完成, 記錄平均下載速度、下載完成時間等需要的信息
// (省略)更新表BlockFileBean
}
四、限制下載速度
看了下網上說的如何現在下載速度, 思路:
假設下載速度上限是m(kb/s), 發送n個字節的理論耗時: n / 1024 / m (kb/s); 然而實際耗時 t(s), 那麽則線程需要休眠 n / 1024 / m - t;
我也只是看到都是用這種方式來限速, 但我怎麽覺得"很蠢", (個人理解)這種實現其實實際下載速度還是滿速
, 而且會頻繁的存在線程的調度.
public class SpeedLimit {
private final Long speed;
// 已下載大小
private Long writeSize = 0L;
private long beginTime;
private long endTime;
public SpeedLimit(Long speed, long beginTime) {
this.speed = speed;
this.beginTime = beginTime;
this.endTime = beginTime;
}
public void updateWrite(int size){
this.writeSize += size;
}
public void updateEndTime(long endTime) {
this.endTime = endTime;
}
public Long getTotalSize() {
return totalSize;
}
public Long getSpeed() {
return speed;
}
public Long getWriteSize() {
return writeSize;
}
public long getBeginTime() {
return beginTime;
}
public long getEndTime() {
return endTime;
}
}
@Override
public void run() {
byte[] buffer = new byte[1024]; // 緩沖區大小
long totalSize = block.getEndOffset() - block.getBeginOffset();
long begin = System.currentTimeMillis();
InputStream is = null;
RandomAccessFile os = null;
try {
// FIXME: 對下載(對文件操作)並沒有太多了解, 所以不知道具體那種"下載"的寫法會更好, 但看到很多都是RandomAccessFile實現的.
URLConnection conn = new URL(block.getDownloadUrl()).openConnection();
// -1: 因為bytes=0-499, 表示contentLength=500.
conn.setRequestProperty(HttpHeaders.RANGE, "bytes=" + block.getBeginOffset() + "-" + (block.getEndOffset() - 1));
conn.setDoOutput(true);
is = conn.getInputStream();
File file = new File(tempPath + File.separator + block.getBlockFileName());
os = new RandomAccessFile(file, "rw");
int len;
// 是否限制下載速度
if(ConstantUtils.IS_LIMIT_SPEED){ // 限制下載速度
/* 思路:
* 假設下載速度上限是m(kb/s), 發送n個字節的理論耗時: n / 1024 / m; 然而實際耗時 t(s), 那麽則需要休眠 n / 1024 / m - t;
*/
// 需要註意: System.currentTimeMillis(), 可能多次得到的時間相同, 詳見其API說明.
SpeedLimit sl = new SpeedLimit(ConstantUtils.DOWNLOAD_SPEED, System.currentTimeMillis());
while((len = is.read(buffer)) != -1) {
os.write(buffer, 0, len);
sl.updateWrite(len);
sl.updateEndTime(System.currentTimeMillis());
long timeConsuming = sl.getEndTime() - sl.getBeginTime() + 1; // +1: 避免0
// 當前平均下載速度: kb/s, 實際中可以直接把 b/ms 約等於 kb/ms (減少單位轉換邏輯)
double currSpeed = sl.getWriteSize() / 1024D / timeConsuming * 1000D;
if(currSpeed > sl.getSpeed()){ // 當前下載速度超過限制速度
// 休眠時長 = 理論限速時常 - 實耗時常;
double sleep = sl.getWriteSize() / 1024D / sl.getSpeed() * 1000D - timeConsuming;
if(sleep > 0){
try {
Thread.sleep((long) sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}else{
while((len = is.read(buffer)) != -1) {
os.write(buffer, 0, len);
}
}
os.close();
} catch (IOException e) {
e.printStackTrace();
System.out.println(block.getBlockFileName() + " download error: " + e.getMessage());
return; // 註意要return
} finally {
IOUtils.closeQuietly(is);
IOUtils.closeQuietly(os);
}
long end = System.currentTimeMillis() ;
// 簡單計算下載速度, 我把連接時間也算在內了
double speed = totalSize / 1024D / (end - begin + 1) * 1000D; // +1: 避免0
System.out.println(block.getBlockFileName() + " aver-speed: " + speed + " kb/s");
// FIXME: 實際中需要更新表BlockFileBean的信息, 標記分塊已下載完成, 記錄平均下載速度、下載完成時間等需要的信息
// (省略)更新表BlockFileBean
}
五、合並文件
需要註意:
1) 合並文件的順序;
2) stream一定要關閉;
3) 不要把一個大文件讀取到內存中.
我亂七八糟寫了(或看到)以下4種寫法,並沒去深究哪種更理想.可能比較推薦的RandomAccessFile或者channelTransfer的形式.
(以下代碼中的stream並不一定都關閉了, 可以檢查一遍)
public class FileMergeUtil {
/**
* 利用FileChannel.write()合並文件
*
* @param dest 最終文件保存完整路徑
* @param files 註意排序
* @param capacity {@link ByteBuffer#allocate(int)}
* @see <a href="http://blog.csdn.net/skiof007/article/details/51072885">http://blog.csdn.net/skiof007/article/details/51072885<a/>
* @see <a href="http://blog.csdn.net/seebetpro/article/details/49184305">ByteBuffer.allocate()與ByteBuffer.allocateDirect()方法的區別<a/>
*/
public static void channelWrite(String dest, File[] files, int capacity) {
capacity = capacity <= 0 ? 1024 : capacity;
FileChannel outChannel = null;
FileChannel inChannel = null;
FileOutputStream os = null;
FileInputStream is = null;
try {
os = new FileOutputStream(dest);
outChannel = os.getChannel();
for (File file : files) {
is = new FileInputStream(file);
inChannel = is.getChannel();
ByteBuffer bb = ByteBuffer.allocate(capacity);
while (inChannel.read(bb) != -1) {
bb.flip();
outChannel.write(bb);
bb.clear();
}
inChannel.close();
is.close();
}
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
try {
if (outChannel != null) {
outChannel.close();
}
if (inChannel != null) {
inChannel.close();
}
if (os != null) {
os.close();
}
if (is != null) {
is.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 利用FileChannel.transferFrom()合並文件
* @param dest 最終文件保存完整路徑
* @param files 註意排序
* @see <a href="http://blog.csdn.net/tobacco5648/article/details/52958046">http://blog.csdn.net/tobacco5648/article/details/52958046</a>
*/
public static void channelTransfer(String dest, File[] files) {
FileChannel outChannel = null;
FileChannel inChannel = null;
FileOutputStream os = null;
FileInputStream is = null;
try {
os = new FileOutputStream(dest);
outChannel = os.getChannel();
for (File file : files) {
is = new FileInputStream(file);
inChannel = is.getChannel();
outChannel.transferFrom(inChannel, outChannel.size(), inChannel.size());
inChannel.close();
is.close();
}
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
try {
if (outChannel != null) {
outChannel.close();
}
if (inChannel != null) {
inChannel.close();
}
if (os != null) {
os.close();
}
if (is != null) {
is.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 利用apache common-IO, {@link IOUtils#copyLarge(Reader, Writer, char[])}.
* <p>看實現代碼, 不就是普通write()? 沒發現又什麽特別的優化, 所以感覺此方式性能/效率可能並不好.</p>
* @param dest
* @param files
* @param buffer
*/
public static void apache(String dest, File[] files, int buffer){
OutputStream os = null;
try {
byte[] buf = new byte[buffer];
os = new FileOutputStream(dest);
for (File file : files) {
InputStream is = new FileInputStream(file);
IOUtils.copyLarge(is, os, buf);
is.close();
}
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
if (os != null) {
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 利用randomAccessFile合並文件.
* <pre>雖然用了RandomAccessFile, 但還是普通的write(), 未了解其性能....<pre/>
* @param dest
* @param files
* @param buffer
*/
public static void randomAccessFile(String dest, List<File> files, int buffer){
RandomAccessFile in = null;
try {
in = new RandomAccessFile(dest, "rw");
in.setLength(0);
in.seek(0);
byte[] bytes = new byte[buffer];
int len = -1;
for (File file : files) {
RandomAccessFile out = new RandomAccessFile(file, "r");
while((len = out.read(bytes)) != -1) {
in.write(bytes, 0, len);
}
out.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
【daily】文件分割限速下載,及合並分割文件