1. 程式人生 > >【daily】文件分割限速下載,及合並分割文件

【daily】文件分割限速下載,及合並分割文件

但我 redist lar for @override files fse exception 調度

說明

  主要功能:
    1) 分割文件, 生成下載任務;
    2) 定時任務: 檢索需要下載的任務, 利用多線程下載限制下載速度;
    3) 定時任務: 檢索可合並的文件, 把n個文件合並為完整的文件.
  GitHub: https://github.com/vergilyn/SpringBootDemo
  代碼結構:
    技術分享圖片
  

一、獲取遠程資源ContentLength、FileName

  本來以為很容易, 但如果想較好的得到contentLength、fileName其實很麻煩,主要要看download-url是怎麽樣的. 大致有3種:
  1) download-url: www.xxx.com/xxxx.exe,這種是最簡單的.直接通過HttpURLConnection.getContentLength()就可以獲取到, FileName則直接解析download-url(或從Content-Disposition中解析得到fileName).

  2) download-url: www.xxx.com/download.html?fileId=xxx, 這個實際響應的和1)一樣, 只是無法直接解析download-url得到fileName, 只能從Content-Disposition中解析得到fileName.
  3) download-url跟2)類似, 但會"重定向"或"響應"一個真實下載地址, 那麽就需要具體分析.
  

二、分割下載文件

  原意: 把一個大文件分割成n個小文件, 分別下載這n個小文件. 盡可能減少需要重新下載的大小. 其實就是想要"斷點下載"(或稱"斷點續傳");

  但是, 後面想了下這種"分塊"感覺好蠢.更理想的實現思路可能是:
  直接往完整文件file.exe.tmp寫,每次啟動下載的時候讀取這個file.exe.tmp的size,請求下載的Range就是bytes={size}-{contentLength}.
  代碼說明: 生成n個下載任務, 保存每個下載任務的Range: bytes={beginOffse}-{endOffset}

 private void createSplitFile(CompleteFileBean fileBean){
        String key = ConstantUtils.keyBlock
(fileBean.getId()); String fileId = fileBean.getId(); String fileName = fileBean.getFileName(); String url = fileBean.getDownloadUrl(); long contentLength = fileBean.getContentLength(); BlockFileBean block; List<String> blocks = new ArrayList<>(); if(contentLength <= ConstantUtils.UNIT_SIZE){ block = new BlockFileBean(fileId, getBlockName(fileName, 1), url, 0, contentLength ); blocks.add(JSON.toJSONString(block)); }else{ long begin = 0; int index = 1; while(begin < contentLength){ long end = begin + ConstantUtils.UNIT_SIZE <= contentLength ? begin + ConstantUtils.UNIT_SIZE : contentLength; block = new BlockFileBean(fileId, getBlockName(fileName, index++), url, begin, end ); blocks.add(JSON.toJSONString(block)); begin += ConstantUtils.UNIT_SIZE; } } if(blocks.size() > 0){ // 模擬保存數據庫: 生成每個小塊的下載任務, 待定時器讀取任務下載 redisTemplate.opsForList().rightPushAll(key, blocks); // 保存需要執行下載的任務, 實際應用中是通過sql得到. redisTemplate.opsForList().rightPushAll(ConstantUtils.keyDownloadTask(), key); } }

三、多線程下載

  線程池、線程的知識請自行baidu/google;(我也不是很了解啊 >.<!)
  實際中我只特別去了解了下:ArrayBlockingQueueCallerRunsPolicy, 根據我的理解(不一定對): 只有CallerRunsPolicy比較適用, 但當ArrayBlockingQueue等待隊列達到滿值時並且有新任務A-TASK進來時,CallerRunsPolicy會強制中斷當前主線程去執行這個新任務A-TASK, 見:https://www.cnblogs.com/lic309/p/4564507.html.
  這是否意味著我可能有"某塊"下到一半被強制中斷了?雖然這下載任務並未被標記成已下載完, 但如果有大量這種中斷操作, 意味著會重新去下載這部分數據.(這也反映出另外中"斷點下載"思路可能更好)
  所以, 實際中我把任務等待隊列設置成一定比總任務數大. 因為實際中我每天只執行一次下載定時任務, 每次只下載700個小塊(即700條下載任務), 所以ArrayBlockingQueue我設置的800. 並且我沒有保留核心線程

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                6,
                30,
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(100),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        executor.allowCoreThreadTimeOut(true);

  分塊下載, 只需用到Http請求的Range: bytes={beginOffse}-{endOffset}.
  至於哪種"下載"寫法更好, 並未有太多的深究, 所以不知道具體那種"下載"的寫法會更好, 但看到很多都是RandomAccessFile實現的:

  @Override
    public void run() {
        byte[] buffer = new byte[1024]; // 緩沖區大小
        long totalSize = block.getEndOffset() - block.getBeginOffset();
        long begin = System.currentTimeMillis();
        InputStream is = null;
        RandomAccessFile os = null;
        try {
            URLConnection conn = new URL(block.getDownloadUrl()).openConnection();
            // -1: 因為bytes=0-499, 表示contentLength=500.
            conn.setRequestProperty(HttpHeaders.RANGE, "bytes=" + block.getBeginOffset() + "-" + (block.getEndOffset() - 1));
            conn.setDoOutput(true);

            is = conn.getInputStream();

            File file = new File(tempPath + File.separator + block.getBlockFileName());
            os = new RandomAccessFile(file, "rw");

            int len;
            while((len = is.read(buffer)) != -1) {
                os.write(buffer, 0, len);
            }

            os.close();

        } catch (IOException e) {
            e.printStackTrace();
            System.out.println(block.getBlockFileName() + " download error: " + e.getMessage());
            return; // 註意要return
        } finally {
            IOUtils.closeQuietly(is);
            IOUtils.closeQuietly(os);
        }
        long end = System.currentTimeMillis() ;
        // 簡單計算下載速度, 我把連接時間也算在內了
        double speed = totalSize / 1024D / (end - begin + 1) * 1000D; // +1: 避免0
        System.out.println(block.getBlockFileName() + " aver-speed: " + speed + " kb/s");

        // FIXME: 實際中需要更新表BlockFileBean的信息, 標記分塊已下載完成, 記錄平均下載速度、下載完成時間等需要的信息
        // (省略)更新表BlockFileBean
    }

四、限制下載速度

  看了下網上說的如何現在下載速度, 思路:
  假設下載速度上限是m(kb/s), 發送n個字節的理論耗時: n / 1024 / m (kb/s); 然而實際耗時 t(s), 那麽則線程需要休眠 n / 1024 / m - t;  
  我也只是看到都是用這種方式來限速, 但我怎麽覺得"很蠢", (個人理解)這種實現其實實際下載速度還是滿速, 而且會頻繁的存在線程的調度.

public class SpeedLimit {
    private final Long speed;
    // 已下載大小
    private Long writeSize = 0L;
    private long beginTime;
    private long endTime;


    public SpeedLimit(Long speed, long beginTime) {
        this.speed = speed;
        this.beginTime = beginTime;
        this.endTime = beginTime;
    }

    public void updateWrite(int size){
        this.writeSize += size;
    }

    public void updateEndTime(long endTime) {
        this.endTime = endTime;
    }

    public Long getTotalSize() {
        return totalSize;
    }

    public Long getSpeed() {
        return speed;
    }

    public Long getWriteSize() {
        return writeSize;
    }

    public long getBeginTime() {
        return beginTime;
    }

    public long getEndTime() {
        return endTime;
    }
}
    @Override
    public void run() {
        byte[] buffer = new byte[1024]; // 緩沖區大小
        long totalSize = block.getEndOffset() - block.getBeginOffset();
        long begin = System.currentTimeMillis();
        InputStream is = null;
        RandomAccessFile os = null;
        try {
            // FIXME: 對下載(對文件操作)並沒有太多了解, 所以不知道具體那種"下載"的寫法會更好, 但看到很多都是RandomAccessFile實現的.
            URLConnection conn = new URL(block.getDownloadUrl()).openConnection();
            // -1: 因為bytes=0-499, 表示contentLength=500.
            conn.setRequestProperty(HttpHeaders.RANGE, "bytes=" + block.getBeginOffset() + "-" + (block.getEndOffset() - 1));
            conn.setDoOutput(true);

            is = conn.getInputStream();

            File file = new File(tempPath + File.separator + block.getBlockFileName());
            os = new RandomAccessFile(file, "rw");

            int len;
            // 是否限制下載速度
            if(ConstantUtils.IS_LIMIT_SPEED){ // 限制下載速度

                /* 思路:
                 *  假設下載速度上限是m(kb/s), 發送n個字節的理論耗時: n / 1024 / m; 然而實際耗時 t(s), 那麽則需要休眠 n / 1024 / m - t;
                 */
                // 需要註意: System.currentTimeMillis(), 可能多次得到的時間相同, 詳見其API說明.
                SpeedLimit sl = new SpeedLimit(ConstantUtils.DOWNLOAD_SPEED, System.currentTimeMillis());

                while((len = is.read(buffer)) != -1) {
                    os.write(buffer, 0, len);

                    sl.updateWrite(len);
                    sl.updateEndTime(System.currentTimeMillis());

                    long timeConsuming = sl.getEndTime() - sl.getBeginTime() + 1; // +1: 避免0

                    // 當前平均下載速度: kb/s, 實際中可以直接把 b/ms 約等於 kb/ms (減少單位轉換邏輯)
                    double currSpeed = sl.getWriteSize() / 1024D / timeConsuming * 1000D;
                    if(currSpeed > sl.getSpeed()){ // 當前下載速度超過限制速度
                        // 休眠時長 = 理論限速時常 - 實耗時常;
                        double sleep = sl.getWriteSize() / 1024D / sl.getSpeed() * 1000D - timeConsuming;
                        if(sleep > 0){
                            try {
                                Thread.sleep((long) sleep);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                }
            }else{
                while((len = is.read(buffer)) != -1) {
                    os.write(buffer, 0, len);
                }
            }

            os.close();

        } catch (IOException e) {
            e.printStackTrace();
            System.out.println(block.getBlockFileName() + " download error: " + e.getMessage());
            return; // 註意要return
        } finally {
            IOUtils.closeQuietly(is);
            IOUtils.closeQuietly(os);
        }
        long end = System.currentTimeMillis() ;
        // 簡單計算下載速度, 我把連接時間也算在內了
        double speed = totalSize / 1024D / (end - begin + 1) * 1000D; // +1: 避免0
        System.out.println(block.getBlockFileName() + " aver-speed: " + speed + " kb/s");

        // FIXME: 實際中需要更新表BlockFileBean的信息, 標記分塊已下載完成, 記錄平均下載速度、下載完成時間等需要的信息
        // (省略)更新表BlockFileBean
    }

五、合並文件

  需要註意:
  1) 合並文件的順序;
  2) stream一定要關閉;
  3) 不要把一個大文件讀取到內存中.
  我亂七八糟寫了(或看到)以下4種寫法,並沒去深究哪種更理想.可能比較推薦的RandomAccessFile或者channelTransfer的形式.
  (以下代碼中的stream並不一定都關閉了, 可以檢查一遍)

public class FileMergeUtil {

    /**
     * 利用FileChannel.write()合並文件
     *
     * @param dest 最終文件保存完整路徑
     * @param files 註意排序
     * @param capacity {@link ByteBuffer#allocate(int)}
     * @see <a href="http://blog.csdn.net/skiof007/article/details/51072885">http://blog.csdn.net/skiof007/article/details/51072885<a/>
     * @see <a href="http://blog.csdn.net/seebetpro/article/details/49184305">ByteBuffer.allocate()與ByteBuffer.allocateDirect()方法的區別<a/>
     */
    public static void channelWrite(String dest, File[] files, int capacity) {
        capacity = capacity <= 0 ? 1024 : capacity;
        FileChannel outChannel = null;
        FileChannel inChannel = null;
        FileOutputStream os = null;
        FileInputStream is = null;
        try {
            os = new FileOutputStream(dest);
            outChannel = os.getChannel();
            for (File file : files) {
                is = new FileInputStream(file);
                inChannel = is.getChannel();
                ByteBuffer bb = ByteBuffer.allocate(capacity);
                while (inChannel.read(bb) != -1) {
                    bb.flip();
                    outChannel.write(bb);
                    bb.clear();
                }
                inChannel.close();
                is.close();
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        } finally {
            try {
                if (outChannel != null) {
                    outChannel.close();
                }
                if (inChannel != null) {
                    inChannel.close();
                }
                if (os != null) {
                    os.close();
                }
                if (is != null) {
                    is.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 利用FileChannel.transferFrom()合並文件
     * @param dest 最終文件保存完整路徑
     * @param files 註意排序
     * @see <a href="http://blog.csdn.net/tobacco5648/article/details/52958046">http://blog.csdn.net/tobacco5648/article/details/52958046</a>
     */
    public static void channelTransfer(String dest, File[] files) {
        FileChannel outChannel = null;
        FileChannel inChannel = null;
        FileOutputStream os = null;
        FileInputStream is = null;
        try {
            os = new FileOutputStream(dest);
            outChannel = os.getChannel();
            for (File file : files) {
                is = new FileInputStream(file);
                inChannel = is.getChannel();
                outChannel.transferFrom(inChannel, outChannel.size(), inChannel.size());

                inChannel.close();
                is.close();
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        } finally {
            try {
                if (outChannel != null) {
                    outChannel.close();
                }
                if (inChannel != null) {
                    inChannel.close();
                }
                if (os != null) {
                    os.close();
                }
                if (is != null) {
                    is.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

    /**
     * 利用apache common-IO, {@link IOUtils#copyLarge(Reader, Writer, char[])}.
     * <p>看實現代碼, 不就是普通write()? 沒發現又什麽特別的優化, 所以感覺此方式性能/效率可能並不好.</p>
     * @param dest
     * @param files
     * @param buffer
     */
    public static void apache(String dest, File[] files, int buffer){
        OutputStream os = null;
        try {
            byte[] buf = new byte[buffer];
            os = new FileOutputStream(dest);
            for (File file : files) {
                InputStream is = new FileInputStream(file);
                IOUtils.copyLarge(is, os, buf);
                is.close();
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        } finally {
            if (os != null) {
                try {
                    os.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 利用randomAccessFile合並文件.
     * <pre>雖然用了RandomAccessFile, 但還是普通的write(), 未了解其性能....<pre/>
     * @param dest
     * @param files
     * @param buffer
     */
    public static void randomAccessFile(String dest, List<File> files, int buffer){
        RandomAccessFile in = null;
        try {
            in = new RandomAccessFile(dest, "rw");
            in.setLength(0);
            in.seek(0);

            byte[] bytes = new byte[buffer];

            int len = -1;
            for (File file : files) {
                RandomAccessFile out = new RandomAccessFile(file, "r");
                while((len = out.read(bytes)) != -1) {
                    in.write(bytes, 0, len);
                }
                out.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(in != null){
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

【daily】文件分割限速下載,及合並分割文件