1. 程式人生 > >flume高併發優化——(11)排除json轉換及中文亂碼

flume高併發優化——(11)排除json轉換及中文亂碼

在使用flume收集資料,轉換為json格式時,常常遇到特殊符號的問題,而json對於”引號,是非常敏感的,大家處理json資料的時候,要特別注意,在前不久,向es插入資料時,報錯就是json轉換失敗

git地址:https://github.com/xvshu/flume-files-source

原因:

       json通用格式:

        {"key":"value"}

       {"key":{}}

       {"key":[]}

       ["one","two"]

       [{}]

       等形式,而 {  }  [  ] "  :  這幾個符號都是json組成格式

       在{"key":"value"} 中,如果出現{"key":"val"u"e"},就會出現解析出錯

解決辦法:

       將所有value欄位單獨處理,使用字元替換將" 替換為 ' ,就沒有問題了

總結:

        有時候,我們的細心會幫助我們抽絲剝繭,但是比較耗神,而在網際網路時代,已經有各種工具幫助我們驗證各種規則問題,一下給大家推薦一個驗證json格式的網站,幫助大家解決json格式不一致的問題:

http://json.cn/

關鍵程式碼:

第一步:解決RandomAccessFile讀取資料後,格式變化為“8859_1”需轉換為原編碼格式

第二步:替換 " 為 ' 解決jsonvalues的問題 

if(line!=null){
                line = new String(line.getBytes(ExecTailSourceConfigurationConstants.CHARSET_RANDOMACCESSFILE),charset);
                line = line.replaceAll("\"","\'");
              }

flume-source原始碼:

/*
 * 作者:許恕
 * 時間:2016年5月3日
 * 功能:實現tail 某目錄下的所有符合正則條件的檔案
 * Email:[email protected]
 * To detect all files in a folder
 */

package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.SystemClock;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.utils.MsgBuildeJson;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 *  step:
 *    1,config one path
 *    2,find all file with RegExp
 *    3,tail one children file
 *    4,batch to channal
 *
 *  demo:
 *    demo.sources.s1.type = org.apache.flume.source.ExecTailSource
 *    demo.sources.s1.filepath=/export/home/tomcat/logs/auth.el.net/
 *    demo.sources.s1.filenameRegExp=(.log{1})$
 *    demo.sources.s1.tailing=true
 *    demo.sources.s1.readinterval=300
 *    demo.sources.s1.startAtBeginning=false
 *    demo.sources.s1.restart=true
 */
public class ExecTailSource extends AbstractSource implements EventDrivenSource,
        Configurable {

  private static final Logger logger = LoggerFactory
      .getLogger(ExecTailSource.class);

  private SourceCounter sourceCounter;
  private ExecutorService executor;
  private List<ExecRunnable> listRuners;
  private List<Future<?>> listFuture;
  private long restartThrottle;
  private boolean restart = true;
  private boolean logStderr;
  private Integer bufferCount;
  private long batchTimeout;
  private Charset charset;
  private String filepath;
  private String filenameRegExp;
  private boolean tailing;
  private Integer readinterval;
  private boolean startAtBeginning;
  private boolean contextIsJson;
  private String fileWriteJson;
  private Long flushTime;
  private boolean contextIsFlumeLog;
  private String domain;
  private String msgTypeConfig;

  @Override
  public void start() {
    logger.info("=start=> flume tail source start begin time:"+new Date().toString());
    logger.info("ExecTail source starting with filepath:{}", filepath);

    List<String> listFiles = getFileList(filepath);
    if(listFiles==null || listFiles.isEmpty()){
      Preconditions.checkState(listFiles != null && !listFiles.isEmpty(),
              "The filepath's file not have fiels with filenameRegExp");
    }

    Properties prop=null;

    try{
      prop = new Properties();//屬性集合物件
      FileInputStream fis = new FileInputStream(fileWriteJson);//屬性檔案流
      prop.load(fis);
    }catch(Exception ex){
      logger.error("==>",ex);
    }



    executor = Executors.newFixedThreadPool(listFiles.size());

    listRuners = new ArrayList<ExecRunnable>();
    listFuture = new ArrayList<Future<?>>();

    logger.info("files size is {} ", listFiles.size());
    // FIXME: Use a callback-like executor / future to signal us upon failure.
    for(String oneFilePath : listFiles){
      ExecRunnable runner = new ExecRunnable(getChannelProcessor(), sourceCounter,
              restart, restartThrottle, logStderr, bufferCount, batchTimeout,
              charset,oneFilePath,tailing,readinterval,startAtBeginning,contextIsJson,
              prop,fileWriteJson,flushTime,contextIsFlumeLog,domain);
      listRuners.add(runner);
      Future<?> runnerFuture = executor.submit(runner);
      listFuture.add(runnerFuture);
      logger.info("{} is begin running",oneFilePath);
    }

    /*
     * NB: This comes at the end rather than the beginning of the method because
     * it sets our state to running. We want to make sure the executor is alive
     * and well first.
     */
    sourceCounter.start();
    super.start();
    logger.info("=start=> flume tail source start end time:"+new Date().toString());
    logger.debug("ExecTail source started");
  }

  @Override
  public void stop() {

    logger.info("=stop=> flume tail source stop begin time:"+new Date().toString());
    if(listRuners !=null && !listRuners.isEmpty()){
      for(ExecRunnable oneRunner : listRuners){
        if(oneRunner != null) {
          oneRunner.setRestart(false);
          oneRunner.kill();
        }
      }
    }


    if(listFuture !=null && !listFuture.isEmpty()){
      for(Future<?> oneFuture : listFuture){
        if (oneFuture != null) {
          logger.debug("Stopping ExecTail runner");
          oneFuture.cancel(true);
          logger.debug("ExecTail runner stopped");
        }
      }
    }

    executor.shutdown();
    while (!executor.isTerminated()) {
      logger.debug("Waiting for ExecTail executor service to stop");
      try {
        executor.awaitTermination(500, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        logger.debug("Interrupted while waiting for ExecTail executor service "
            + "to stop. Just exiting.");
        Thread.currentThread().interrupt();
      }
    }




    sourceCounter.stop();
    super.stop();
    logger.info("=stop=> flume tail source stop end time:"+new Date().toString());

  }

  @Override
  public void configure(Context context) {

    filepath = context.getString("filepath");
    Preconditions.checkState(filepath != null,
        "The parameter filepath must be specified");
    logger.info("The parameter filepath is {}" ,filepath);

    filenameRegExp = context.getString("filenameRegExp");
    Preconditions.checkState(filenameRegExp != null,
            "The parameter filenameRegExp must be specified");
    logger.info("The parameter filenameRegExp is {}" ,filenameRegExp);

    msgTypeConfig=context.getString(ExecTailSourceConfigurationConstants.CONFIG_MSGTYPECONFIG_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_MSGTYPECONFIG);

    String[] defultTypes = ExecTailSourceConfigurationConstants.DEFAULT_MSGTYPECONFIG_DEFULT.split("\\,");
    for(String oneType : defultTypes){
      String[] oneTypeMap = oneType.split("\\:");
      MsgBuildeJson.MsgTypes.put(oneTypeMap[0],oneTypeMap[1].split("\\|"));
    }

    try {
      if (msgTypeConfig != null && !msgTypeConfig.trim().isEmpty()) {
        String[] userTypes = msgTypeConfig.split("\\,");
        for(String oneType : defultTypes){
          String[] oneTypeMap = oneType.split("\\:");
          if(oneTypeMap.length>=2){
            MsgBuildeJson.MsgTypes.put(oneTypeMap[0],oneTypeMap[1].split("\\|"));
          }
        }
      }
    }catch (Exception ex){
      ex.printStackTrace();
    }

    logger.info("=MsgBuildeJson.MsgTypes is =>"+ JSON.toString(MsgBuildeJson.MsgTypes));


    MsgBuildeJson.MsgIntAtti.addAll(Arrays.asList(ExecTailSourceConfigurationConstants.MAP_INT_ATTRIBUTE.split("\\,")));

    contextIsJson= context.getBoolean(ExecTailSourceConfigurationConstants.CONFIG_CONTEXTISJSON_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_CONTEXTISJSON);

    contextIsFlumeLog=context.getBoolean(ExecTailSourceConfigurationConstants.CONFIG_CONTEXTISFLUMELOG_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_CONTEXTISFLUMELOG);

    domain=context.getString(ExecTailSourceConfigurationConstants.CONFIG_DOMIAN_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_DOMAIN);

    fileWriteJson= context.getString(ExecTailSourceConfigurationConstants.CONFIG_FILEWRITEJSON_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_FILEWRITEJSON);

    flushTime= context.getLong(ExecTailSourceConfigurationConstants.CONFIG_FLUSHTIME_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_FLUSHTIME);

    restartThrottle = context.getLong(ExecTailSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
        ExecTailSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);

    tailing = context.getBoolean(ExecTailSourceConfigurationConstants.CONFIG_TAILING_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_ISTAILING_TRUE);

    readinterval=context.getInteger(ExecTailSourceConfigurationConstants.CONFIG_READINTERVAL_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_READINTERVAL);

    startAtBeginning=context.getBoolean(ExecTailSourceConfigurationConstants.CONFIG_STARTATBEGINNING_THROTTLE,
            ExecTailSourceConfigurationConstants.DEFAULT_STARTATBEGINNING);

    restart = context.getBoolean(ExecTailSourceConfigurationConstants.CONFIG_RESTART,
        ExecTailSourceConfigurationConstants.DEFAULT_RESTART_TRUE);

    logStderr = context.getBoolean(ExecTailSourceConfigurationConstants.CONFIG_LOG_STDERR,
        ExecTailSourceConfigurationConstants.DEFAULT_LOG_STDERR);

    bufferCount = context.getInteger(ExecTailSourceConfigurationConstants.CONFIG_BATCH_SIZE,
        ExecTailSourceConfigurationConstants.DEFAULT_BATCH_SIZE);

    batchTimeout = context.getLong(ExecTailSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,
        ExecTailSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);

    charset = Charset.forName(context.getString(ExecTailSourceConfigurationConstants.CHARSET,
        ExecTailSourceConfigurationConstants.DEFAULT_CHARSET));


    if (sourceCounter == null) {
      sourceCounter = new SourceCounter(getName());
    }
  }

  /**
   * 獲取指定路徑下的所有檔案列表
   *
   * @param dir 要查詢的目錄
   * @return
   */
  public  List<String> getFileList(String dir) {
    List<String> listFile = new ArrayList<String>();
    File dirFile = new File(dir);
    //如果不是目錄檔案,則直接返回
    if (dirFile.isDirectory()) {
      //獲得資料夾下的檔案列表,然後根據檔案型別分別處理
      File[] files = dirFile.listFiles();
      if (null != files && files.length > 0) {
        //根據時間排序
        Arrays.sort(files, new Comparator<File>() {
          public int compare(File f1, File f2) {
            return (int) (f1.lastModified() - f2.lastModified());
          }

          public boolean equals(Object obj) {
            return true;
          }
        });
        for (File file : files) {
          //如果不是目錄,直接新增
          if (!file.isDirectory()) {
            String oneFileName = file.getName();
            if(match(filenameRegExp,oneFileName)){
              listFile.add(file.getAbsolutePath());
              logger.info("filename:{} is pass",oneFileName);
            }
          } else {
            //對於目錄檔案,遞迴呼叫
            listFile.addAll(getFileList(file.getAbsolutePath()));
          }
        }
      }
    }else{
      logger.info("FilePath:{} is not Directory",dir);
    }
    return listFile;
  }

  /**
   * @param regex
   * 正則表示式字串
   * @param str
   * 要匹配的字串
   * @return 如果str 符合 regex的正則表示式格式,返回true, 否則返回 false;
   */
  private boolean match(String regex, String str) {
    Pattern pattern = Pattern.compile(regex);
    Matcher matcher = pattern.matcher(str);
   return matcher.find();
  }


  private static class ExecRunnable implements Runnable {

    public ExecRunnable(ChannelProcessor channelProcessor,
                        SourceCounter sourceCounter, boolean restart, long restartThrottle,
                        boolean logStderr, int bufferCount, long batchTimeout,
                        Charset charset, String filepath,
                        boolean tailing, Integer readinterval,
                        boolean startAtBeginning, boolean contextIsJson,
                        Properties prop, String fileWriteJson, Long flushTime,
                        boolean contextIsFlumeLog, String domain) {

      this.channelProcessor = channelProcessor;
      this.sourceCounter = sourceCounter;
      this.restartThrottle = restartThrottle;
      this.bufferCount = bufferCount;
      this.batchTimeout = batchTimeout;
      this.restart = restart;
      this.logStderr = logStderr;
      this.charset = charset;
      this.filepath=filepath;
      this.logfile=new File(filepath);
      this.tailing=tailing;
      this.readinterval=readinterval;
      this.startAtBeginning=startAtBeginning;
      this.contextIsJson=contextIsJson;
      this.prop = prop;
      this.fileWriteJson=fileWriteJson;
      this.flushTime=flushTime;
      this.contextIsFlumeLog=contextIsFlumeLog;
      this.domain=domain;
    }



    private final ChannelProcessor channelProcessor;
    private final SourceCounter sourceCounter;
    private volatile boolean restart;
    private final long restartThrottle;
    private final int bufferCount;
    private long batchTimeout;
    private final boolean logStderr;
    private final Charset charset;
    private SystemClock systemClock = new SystemClock();
    private Long lastPushToChannel = systemClock.currentTimeMillis();
    ScheduledExecutorService timedFlushService;
    ScheduledFuture<?> future;
    private String filepath;
    private boolean contextIsJson;
    private Properties prop;
    private long timepoint;
    private String fileWriteJson;
    private Long flushTime;
    private String domain;

    /**
     * 當讀到檔案結尾後暫停的時間間隔
     */
    private long readinterval = 500;

    /**
     * 設定日誌檔案
     */
    private File logfile;

    /**
     * 設定是否從頭開始讀
     */
    private boolean startAtBeginning = false;

    /**
     * 設定tail執行標記
     */
    private boolean tailing = false;

    private boolean contextIsFlumeLog=false;

    private static String getDomain(String filePath){
      String[] strs = filePath.split("/");
      String domain ;
      domain=strs[strs.length-2];
      if(domain==null || domain.isEmpty()){
        domain=filePath;
      }
      return domain;
    }

    @Override
    public void run() {
      do {
        logger.info("=run=> flume tail source run start time:"+new Date().toString());
        timepoint=System.currentTimeMillis();
        Long filePointer = null;
        if (this.startAtBeginning) { //判斷是否從頭開始讀檔案
          filePointer =0L;
        } else {
          if(prop!=null || prop.contains(filepath)){

            try {
              filePointer = Long.valueOf((String) prop.get(filepath));
             logger.info("=ExecRunnable.run=>filePointer get from  Properties");
            }catch (Exception ex){
              logger.error("=ExecRunnable.run=>",ex);
              logger.info("=ExecRunnable.run=> error filePointer get from file size");
              filePointer=null;
            }
          }
          if(filePointer ==null){
            filePointer = this.logfile.length(); //指標標識從檔案的當前長度開始。
            logger.info("=ExecRunnable.run=>filePointer get from file size");
          }

        }

        final List<Event> eventList = new ArrayList<Event>();

        timedFlushService = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat(
                "timedFlushExecService" +
                Thread.currentThread().getId() + "-%d").build());
        RandomAccessFile randomAccessFile = null;
        try {

          randomAccessFile= new RandomAccessFile(logfile, "r"); //建立隨機讀寫檔案
          future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
                                                              @Override
                                                              public void run() {
                                                                try {
                                                                  synchronized (eventList) {
                                                                    if(!eventList.isEmpty() && timeout()) {
                                                                      flushEventBatch(eventList);
                                                                    }
                                                                  }
                                                                } catch (Exception e) {
                                                                  logger.error("Exception occured when processing event batch", e);
                                                                  if(e instanceof InterruptedException) {
                                                                    Thread.currentThread().interrupt();
                                                                  }
                                                                }
                                                              }
                                                            },
                  batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);

          while (this.tailing) {
            long fileLength = this.logfile.length();
            if (fileLength < filePointer) {
              randomAccessFile = new RandomAccessFile(logfile, "r");
              filePointer = 0l;
            }
            if (fileLength > filePointer) {
              randomAccessFile.seek(filePointer);
              String line = randomAccessFile.readLine();
              if(line!=null){
                line = new String(line.getBytes(ExecTailSourceConfigurationConstants.CHARSET_RANDOMACCESSFILE),charset);
                line = line.replaceAll("\"","\'");
              }

              while (line != null) {

                //送channal
                synchronized (eventList) {
                  sourceCounter.incrementEventReceivedCount();


                  String bodyjson = "";
                  if (!contextIsJson) {
                    bodyjson = MsgBuildeJson.buildeJson(contextIsFlumeLog,line,filepath,domain);
                    if(bodyjson.indexOf("{")>0){
                      bodyjson = bodyjson.substring(bodyjson.indexOf("{"),bodyjson.length());
                    }
                  }else{
                    bodyjson = MsgBuildeJson.changeDomain(line.toString(),domain);
                  }

                  Event oneEvent = EventBuilder.withBody(bodyjson.getBytes(charset));
                  eventList.add(oneEvent);
                  if (eventList.size() >= bufferCount || timeout()) {
                    flushEventBatch(eventList);
                  }
                }

                //讀下一行
                line = randomAccessFile.readLine();
                if(line!=null){
                  line = new String(line.getBytes(ExecTailSourceConfigurationConstants.CHARSET_RANDOMACCESSFILE),charset);
                  line = line.replaceAll("\"","\'");
                }

                try {
                  Long nowfilePointer = randomAccessFile.getFilePointer();
                  if (!nowfilePointer.equals(filePointer)) {
                    filePointer = nowfilePointer;
                    if (System.currentTimeMillis() - timepoint > flushTime) {
                      timepoint = System.currentTimeMillis();
                      prop.setProperty(filepath, filePointer.toString());
                      FileOutputStream fos = new FileOutputStream(fileWriteJson);
                      if (fos != null) {
                        prop.store(fos, "Update '" + filepath + "' value");
                      }
                      fos.close();

                    }
                  }
                }catch(Exception ex){
                  ex.printStackTrace();
                }
              }

            }
            Thread.sleep(this.readinterval);
          }

          synchronized (eventList) {
              if(!eventList.isEmpty()) {
                flushEventBatch(eventList);
              }
          }

        } catch (Exception e) {
          logger.error("Failed while running filpath: " + filepath, e);
          if(e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
          }
        } finally {

          if(randomAccessFile!=null){
            try {
              randomAccessFile.close();
            } catch (IOException ex) {
              logger.error("Failed to close reader for ExecTail source", ex);
            }
          }

        }
        logger.info("=run=> flume tail source run restart:"+restart);
        if(restart) {
          logger.info("=run=> flume tail source run restart time:"+new Date().toString());
          logger.info("Restarting in {}ms", restartThrottle);
          try {
            Thread.sleep(restartThrottle);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        } else {
          logger.info("filepath [" + filepath + "] exited with restart[" + restart+"]");
        }
      } while(restart);
    }

    private void flushEventBatch(List<Event> eventList){
      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

    private HashMap ParseFlumeLog(String log,HashMap logMap){
      String[] strLogs = log.split("\\|");
      logMap.put("className",strLogs[0]);
      logMap.put("methodName",strLogs[1]);
      logMap.put("level",strLogs[2]);
      logMap.put("treeId",strLogs[3]);
      logMap.put("requestId",strLogs[4]);
      logMap.put("transactionId",strLogs[5]);
      return logMap;
    }

    private boolean timeout(){
      return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
    }

    private static String[] formulateShellCommand(String shell, String command) {
      String[] shellArgs = shell.split("\\s+");
      String[] result = new String[shellArgs.length + 1];
      System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
      result[shellArgs.length] = command;
      return result;
    }

    public int kill() {
      logger.info("=kill=> flume tail source kill start time:"+new Date().toString());
      this.tailing=false;
        synchronized (this.getClass()) {
          try {
            // Stop the Thread that flushes periodically
            if (future != null) {
              future.cancel(true);
            }

            if (timedFlushService != null) {
              timedFlushService.shutdown();
              while (!timedFlushService.isTerminated()) {
                try {
                  timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                  logger.debug("Interrupted while waiting for ExecTail executor service "
                          + "to stop. Just exiting.");
                  Thread.currentThread().interrupt();
                }
              }
            }
            logger.info("=kill=> flume tail source kill end time:" + new Date().toString());
            return Integer.MIN_VALUE;
          } catch (Exception ex) {
            logger.error("=kill=>", ex);
            Thread.currentThread().interrupt();
          }
        }
      logger.info("=kill=> flume tail source kill end time:"+new Date().toString());
      return Integer.MIN_VALUE / 2;
    }
    public void setRestart(boolean restart) {
      this.restart = restart;
    }
  }
  private static class StderrReader extends Thread {
    private BufferedReader input;
    private boolean logStderr;

    protected StderrReader(BufferedReader input, boolean logStderr) {
      this.input = input;
      this.logStderr = logStderr;
    }



    @Override
    public void run() {
      try {
        int i = 0;
        String line = null;
        while((line = input.readLine()) != null) {
          if(logStderr) {
            // There is no need to read 'line' with a charset
            // as we do not to propagate it.
            // It is in UTF-16 and would be printed in UTF-8 format.
            logger.info("StderrLogger[{}] = '{}'", ++i, line);
          }
        }
      } catch (IOException e) {
        logger.info("StderrLogger exiting", e);
      } finally {
        try {
          if(input != null) {
            input.close();
          }
        } catch (IOException ex) {
          logger.error("Failed to close stderr reader for ExecTail source", ex);
        }
      }
    }
  }
}

相關推薦

flume併發優化——11排除json轉換中文亂碼

在使用flume收集資料,轉換為json格式時,常常遇到特殊符號的問題,而json對於”引號,是非常敏感的,大家處理json資料的時候,要特別注意,在前不久,向es插入資料時,報錯就是json轉換失敗git地址:https://github.com/xvshu/flume-f

flume併發優化——8多檔案source擴充套件斷點續傳

        在很多情況下,我們為了不丟失資料,一般都會為資料收集端擴充套件斷點續傳,而隨著公司日誌系統的完善,我們在原有的基礎上開發了斷點續傳的功能,以下是思路,大家共同討論:核心流程圖:                         原始碼:/* * 作者:許恕

flume併發優化——15中介軟體版本升級

在系統平穩執行一年的基礎上,為提供更好的服務,現針對java,kafka,flume,zk,統一進行版本升級,請各位小夥伴跟著走起來,不要掉隊啊! 名稱 老版本號 新版本號 jdk 1.7.0_25 1.8.0 ka

Java併發秒殺API之併發優化

四 高併發優化 1.分析 1.詳情頁 部署到cdn上,這樣使用者訪問的是cdn不是伺服器了。 使用者在上網時通過運營商訪問最近的都會網路,都會網路訪問主幹網。 2.獲取系統時間 不用優化 訪問一次記憶體大概 10ns 無法使用cdn,適合伺服

java併發實戰——鎖的優化和注意事項

由於之前看的容易忘記,因此特記錄下來,以便學習總結與更好理解,該系列博文也是第一次記錄,所有有好多不完善之處請見諒與留言指出,如果有幸大家看到該博文,希望報以參考目的看瀏覽,如有錯誤之處,謝謝大家指出與留言。這裡只是講解下鎖優化思路以及方法的總結,具體技術深究以後慢慢補充一、

java併發實戰——併發除錯和JDK8新特性

由於之前看的容易忘記,因此特記錄下來,以便學習總結與更好理解,該系列博文也是第一次記錄,所有有好多不完善之處請見諒與留言指出,如果有幸大家看到該博文,希望報以參考目的看瀏覽,如有錯誤之處,謝謝大家指出與留言。一、內容提要 多執行緒除錯的方法 執行緒dump及分析 JDK

實戰Java併發程式設計走進併發世界

阻塞(blocking)一個執行緒是阻塞的,那麼其它的執行緒釋放資源之前,當前執行緒無法繼續執行。使用synchronized或者重入鎖會使執行緒這是。 無飢餓(starvation-free):對於非公平的鎖來說,系統允許高優先順序的執行緒插隊,會造成飢餓;而公平的鎖則不會造成飢餓。 無障礙(obstruc

從0開始,部署.NetCore並構建非阻塞併發伺服器2

工具篇看我這篇部落格想搭建伺服器的朋友多半是想在Linux上使用,正所謂工欲善其事必先利其器,一個好用的SSH對我們的工作效率影響也很大,簡單的說下我在Win下常用的SSH軟體和使用教程,老手自動忽略即可推薦工具1--WinSCPWinSCP可以說是我最常用的遠端連線Linu

Java併發程式設計:Java併發工具類

1. 等待多執行緒完成的CountDownLatch CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作。 1.1 應用場景 假如有這樣一個需求:我們需要解析一個Excel裡多個sheet的資料,此時可以考慮使用多 執行緒,每個執行緒解析一個sheet裡的資料

Java併發程式設計:Java併發容器和框架

1. ConcurrentHashMap 1.1 ConcurrentHashMap的優勢 在併發程式設計中使用HashMap可能導致程式死迴圈。而使用執行緒安全的HashTable效率又非 常低下,基於以上兩個原因,便有了ConcurrentHashMap的登場機會。

java併發實戰——BIO、NIO和AIO

由於之前看的容易忘記,因此特記錄下來,以便學習總結與更好理解,該系列博文也是第一次記錄,所有有好多不完善之處請見諒與留言指出,如果有幸大家看到該博文,希望報以參考目的看瀏覽,如有錯誤之處,謝謝大家指出與留言。一、什麼是NIO?NIO是New I/O的簡稱,與舊式的基於流的I/

慕課網實戰·併發探索併發容器 J.U.C

特別感謝:慕課網jimin老師的《Java併發程式設計與高併發解決方案》課程,以下知識點多數來自老師的課程內容。 jimin老師課程地址:Java併發程式設計與高併發解決方案 概述 Java併發容器JUC是三個單詞的縮寫。是JDK下面的一個包名。

SQLServer 複雜儲存過程併發優化案例

一個儲存過程,幾千行程式碼,內部有一個查詢,關聯使用了200多張表(其中有重複的表),併發執行緒執行,耗時15秒左右,結果返回一般幾行記錄。這個儲存過程是系統中最耗時、最消耗效能的。今天突然想著得優化一下了! 取出儲存過程內的查詢,宣告相關引數執行指令碼。語句比較複雜,取

Android 系統性能優化11---UC效能優化方案

       一、效能優化六項指標:              效能、記憶體、穩定性、流量、電量、安裝包大小;       二、背景 ---- Android程式卡頓產生原因:              1、Android系統低效              --渲染執行緒、同步介面、廣播機制         

慕課網實戰·併發探索:執行緒封閉

特別感謝:慕課網jimin老師的《Java併發程式設計與高併發解決方案》課程,以下知識點多數來自老師的課程內容。 jimin老師課程地址:Java併發程式設計與高併發解決方案 1、什麼是執行緒封閉? 它其實就是把物件封裝到一個執行緒裡,只有一個執行緒能

慕課網實戰·併發探索:執行緒安全性-可見性-有序性

可見性 什麼是可見性? 一個執行緒對主記憶體的修改可以及時的被其他執行緒觀察到 導致共享變數線上程間不可見的原因 執行緒交叉執行 重排序結合線程交叉執行 共享變數更新後的值沒有在工作記憶體與主存間及時更新 JVM處理可見性 J

併發程式設計—— ReentrantLock實現原理原始碼分析

  ReentrantLock是Java併發包中提供的一個可重入的互斥鎖。ReentrantLock和synchronized在基本用法,行為語義上都是類似的,同樣都具有可重入性。只不過相比原生的Synchronized,ReentrantLock增加了一些高階的擴充套件功能,比如它可以實現公平鎖,同時也可以

併發程式設計—— ThreadLocal原始碼分析記憶體洩露預防

今天我們一起探討下ThreadLocal的實現原理和原始碼分析。首先,本文先談一下對ThreadLocal的理解,然後根據ThreadLocal類的原始碼分析了其實現原理和使用需要注意的地方,最後給出了兩個應用場景。相信本文一定能讓大家完全瞭解ThreadLocal。 ThreadL

MySQL優化:索引原理索引優化

建立高效能索引索引是提高MySQL查詢效能的一個重要途徑,但過多的索引可能會導致過高的磁碟使用率以及過高的記憶體佔用,從而影響應用程式的整體效能。應當儘量避免事後才想起新增索引,因為事後可能需要監控大量的SQL才能定位到問題所在,而且新增索引的時間肯定是遠大於初始新增索引所需

Spring Boot入門返回JSON資料資料封裝

1.從建立的專案的環境裡可以看到Spring Boot預設載入了Jackson,所以Spring Boot專案預設使用Jackson來返回JSON資料。 Jackson對空屬性統一設定為null,不管該屬性是什麼資料型別,這不利於資料操作,可以對Jackson進行配置 (