1. 程式人生 > >kafka原理和實踐(四)spring-kafka消費者原始碼

kafka原理和實踐(四)spring-kafka消費者原始碼

正文

系列目錄

==============正文分割線=====================

回到頂部

一、kafkaConsumer消費者模型

如上圖所示,spring-kafka消費者模型主要流程:

1.容器啟動,輪詢執行消費。

2.kafkaConsumer拉取訊息流程:

1)Fetcher請求獲取器獲取請求並存儲在unset中

2)ConsumerNetworkClient網路客戶端執行poll(),呼叫NetWlrikClient的send()方法從unset中獲取ClientRequest請求轉成RequestSend最終塞進Selector的KafkaChannel通道中,Seletcor.send()從kafka叢集拉取待消費資料ConsumerRecords

3. 消費者監聽器MessageListener.onMessage()執行使用者自定義的實際消費業務邏輯。

回到頂部

一、kafkaConsumer構造

複製程式碼
  1 @SuppressWarnings("unchecked")
  2     private KafkaConsumer(ConsumerConfig config,
  3                           Deserializer<K> keyDeserializer,
  4                           Deserializer<V> valueDeserializer) {
5 try { 6 log.debug("Starting the Kafka consumer"); 7 this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); 8 int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); 9 int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
10 if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs) 11 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); 12 this.time = new SystemTime(); 13 14 String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); 15 if (clientId.length() <= 0) 16 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); 17 this.clientId = clientId; 18 Map<String, String> metricsTags = new LinkedHashMap<>(); 19 metricsTags.put("client-id", clientId); 20 MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) 21 .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) 22 .tags(metricsTags); 23 List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 24 MetricsReporter.class); 25 reporters.add(new JmxReporter(JMX_PREFIX)); 26 this.metrics = new Metrics(metricConfig, reporters, time); 27 this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); 28 29 // load interceptors and make sure they get clientId 30 Map<String, Object> userProvidedConfigs = config.originals(); 31 userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); 32 List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 33 ConsumerInterceptor.class); 34 this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList); 35 if (keyDeserializer == null) { 36 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 37 Deserializer.class); 38 this.keyDeserializer.configure(config.originals(), true); 39 } else { 40 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); 41 this.keyDeserializer = keyDeserializer; 42 } 43 if (valueDeserializer == null) { 44 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 45 Deserializer.class); 46 this.valueDeserializer.configure(config.originals(), false); 47 } else { 48 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); 49 this.valueDeserializer = valueDeserializer; 50 } 51 ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); 52 this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners); 53 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); 54 this.metadata.update(Cluster.bootstrap(addresses), 0); 55 String metricGrpPrefix = "consumer"; 56 ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); 57 NetworkClient netClient = new NetworkClient( 58 new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder), 59 this.metadata, 60 clientId, 61 100, // a fixed large enough value will suffice 62 config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), 63 config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), 64 config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 65 config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time); 66 this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs, 67 config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); 68 OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); 69 this.subscriptions = new SubscriptionState(offsetResetStrategy); 70 List<PartitionAssignor> assignors = config.getConfiguredInstances( 71 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 72 PartitionAssignor.class); 73 this.coordinator = new ConsumerCoordinator(this.client, 74 config.getString(ConsumerConfig.GROUP_ID_CONFIG), 75 config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), 76 config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 77 config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), 78 assignors, 79 this.metadata, 80 this.subscriptions, 81 metrics, 82 metricGrpPrefix, 83 this.time, 84 retryBackoffMs, 85 new ConsumerCoordinator.DefaultOffsetCommitCallback(), 86 config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), 87 config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), 88 this.interceptors, 89 config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG)); 90 this.fetcher = new Fetcher<>(this.client, 91 config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), 92 config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), 93 config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), 94 config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 95 config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 96 config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), 97 this.keyDeserializer, 98 this.valueDeserializer, 99 this.metadata, 100 this.subscriptions, 101 metrics, 102 metricGrpPrefix, 103 this.time, 104 this.retryBackoffMs); 105 106 config.logUnused(); 107 AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); 108 109 log.debug("Kafka consumer created"); 110 } catch (Throwable t) { 111 // call close methods if internal objects are already constructed 112 // this is to prevent resource leak. see KAFKA-2121 113 close(true); 114 // now propagate the exception 115 throw new KafkaException("Failed to construct kafka consumer", t); 116 } 117 }
複製程式碼

 從KafkaConsumer建構函式來看,核心元件有:

1.Metadata:封裝了元資料的一些邏輯的類。元資料僅保留一個主題的子集,隨著時間的推移可以新增。當我們請求一個主題的元資料時,我們沒有任何元資料會觸發元資料更新。如果對元資料啟用了主題過期,那麼在更新之後,在過期時間間隔內未使用的任何主題都將從元資料重新整理集中刪除。

2.ConsumerNetworkClient:高等級消費者訪問網路層,為請求Future任務提供基本支援。這個類是執行緒安全的,但是不提供響應回撥的同步。這保證在呼叫它們時不會持有鎖。

3.SubscriptionState:訂閱的TopicPartition的offset狀態維護

4.ConsumerCoordinator:消費者的協調者,負責partitiion的分配,reblance

5.Fetcher:從brokers上按照配置獲取訊息。

回到頂部

二、消費者容器啟動流程

kafka消費者有兩種常見的實現方式:

1.xml配置檔案

2.基於註解實現

其實,不管哪種方式,本質只是生成Spring Bean的方式不同而已。我們就以xml的實現方式來追蹤原始碼。

基於xml的總體配置如下:

複製程式碼
 1 <!-- 1.定義consumer的引數 -->
 2     <bean id="consumerProperties" class="java.util.HashMap">
 3         <constructor-arg>
 4             <map>
 5                 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
 6                 <entry key="group.id" value="${group.id}" />
 7                 <entry key="enable.auto.commit" value="${enable.auto.commit}" />
 8                 <entry key="session.timeout.ms" value="${session.timeout.ms}" />
 9                 <entry key="key.deserializer"
10                     value="org.apache.kafka.common.serialization.StringDeserializer" />
11                 <entry key="value.deserializer"
12                     value="org.apache.kafka.common.serialization.StringDeserializer" />
13             </map>
14         </constructor-arg>
15     </bean>
16 
17     <!-- 2.建立consumerFactory bean -->
18     <bean id="consumerFactory"
19         class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
20         <constructor-arg>
21             <ref bean="consumerProperties" />
22         </constructor-arg>
23     </bean>
24 
25     <!-- 3.定義消費實現類 -->
26     <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" />
27 
28     <!-- 4.消費者容器配置資訊 -->
29     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
30         <!-- topic -->
31         <constructor-arg name="topics">
32             <list>
33                 <value>${kafka.consumer.topic.credit.for.lease}</value>
34                 <value>${loan.application.feedback.topic}</value>
35                 <value>${templar.agreement.feedback.topic}</value>
36                 <value>${templar.aggrement.active.feedback.topic}</value>
37                 <value>${templar.aggrement.agreementRepaid.topic}</value>
38                 <value>${templar.aggrement.agreementWithhold.topic}</value>
39                 <value>${templar.aggrement.agreementRepayRemind.topic}</value>
40             </list>
41         </constructor-arg>
42         <property name="messageListener" ref="kafkaConsumerService" />
43     </bean>
44     <!-- 5.消費者併發訊息監聽容器,執行doStart()方法 -->
45     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
46         <constructor-arg ref="consumerFactory" />
47         <constructor-arg ref="containerProperties" />
48         <property name="concurrency" value="${concurrency}" />
49     </bean>
複製程式碼

 分為5個步驟:

 2.1.定義消費引數bean

consumerProperties ,就是個map<key,value>

2.2.建立consumerFactory bean

DefaultKafkaConsumerFactory 實現了ConsumerFactory介面,提供建立消費者判斷是否自動提交2個方法。通過consumerProperties作為引數構造。
複製程式碼
1 public interface ConsumerFactory<K, V> {
2 
3     Consumer<K, V> createConsumer();
4 
5     boolean isAutoCommit();
6 
7 
8 }
複製程式碼

2.3.定義消費實現類

自定義一個類實現MessageListener介面,介面設計如下:

實現onMessage方法,去消費接收到的訊息。兩種方案:

1)MessageListener 消費完訊息後自動提交offset(enable.auto.commit=true時),可提高效率,存在消費失敗但移動了偏移量的風險。

2)AcknowledgingMessageListener 消費完訊息後手動提交offset(enable.auto.commit=false時)效率降低,無消費失敗但移動偏移量的風險。

2.4.監聽容器配置資訊

ContainerProperties:包含了一個監聽容器的執行時配置資訊,主要定義了監聽的主題、分割槽、初始化偏移量,還有訊息監聽器。
複製程式碼
  1 public class ContainerProperties {
  2 
  3     private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
  4 
  5     private static final int DEFAULT_QUEUE_DEPTH = 1;
  6 
  7     private static final int DEFAULT_PAUSE_AFTER = 10000;
  8 
  9     /**
 10      * Topic names.監聽的主題字串陣列
 11      */
 12     private final String[] topics;
 13 
 14     /**
 15      * Topic pattern.監聽的主題模板
 16      */
 17     private final Pattern topicPattern;
 18 
 19     /**
 20      * Topics/partitions/initial offsets.
 21      */
 22     private final TopicPartitionInitialOffset[] topicPartitions;
 23 
 24     /**
 25      * 確認模式(自動確認屬性為false時使用)
 26      * <ul>
 27      * <li>1.RECORD逐條確認: 每條訊息被髮送給監聽者後確認</li>
 28      * <li>2.BATCH批量確認: 當批量訊息記錄被消費者接收到並傳送給監聽器時確認</li>
 30      * <li>3.TIME超時確認:當超過設定的超時時間毫秒數時確認(should be greater than
 31      * {@code #setPollTimeout(long) pollTimeout}.</li>
 32      * <li>4.COUNT計數確認: 當接收到指定數量之後確認</li>
 33      * <li>5.MANUAL手動確認:由監聽器負責確認(AcknowledgingMessageListener</ul>
 36      */
 37     private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
 38 
 39     /**
 40      * The number of outstanding record count after which offsets should be
 41      * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being
 42      * used.
 43      */
 44     private int ackCount;
 45 
 46     /**
 47      * The time (ms) after which outstanding offsets should be committed when
 48      * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
 49      * larger than
 50      */
 51     private long ackTime;
 52 
 53     /**
 54      * 訊息監聽器,必須是 MessageListener或者AcknowledgingMessageListener兩者中的一個 55      * 56      */
 57     private Object messageListener;
 58 
 59     /**
 60      * The max time to block in the consumer waiting for records.
 61      */
 62     private volatile long pollTimeout = 1000;
 63 
 64     /**
 65      * 執行緒執行器:輪詢消費者
 66      */
 67     private AsyncListenableTaskExecutor consumerTaskExecutor;
 68 
 69     /**
 70      * 執行緒執行器:呼叫監聽器
 71      */
 72     private AsyncListenableTaskExecutor listenerTaskExecutor;
 73 
 74     /**
 75      * 錯誤回撥,當監聽器丟擲異常時
 76      */
 77     private GenericErrorHandler<?> errorHandler;
 78 
 79     /**
 80      * When using Kafka group management and {@link #setPauseEnabled(boolean)} is
 81      * true, the delay after which the consumer should be paused. Default 10000.
 82      */
 83     private long pauseAfter = DEFAULT_PAUSE_AFTER;
 84 
 85     /**
 86      * When true, avoids rebalancing when this consumer is slow or throws a
 87      * qualifying exception - pauses the consumer. Default: true.
 88      * @see #pauseAfter
 89      */
 90     private boolean pauseEnabled = true;
 91 
 92     /**
 93      * Set the queue depth for handoffs from the consumer thread to the listener
 94      * thread. Default 1 (up to 2 in process).
 95      */
 96     private int queueDepth = DEFAULT_QUEUE_DEPTH;
 97 
 98     /**
 99      * 停止容器超時時間    */
103     private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
104 
105     /**
106      * 使用者定義的消費者再平衡監聽器實現類     */
108     private ConsumerRebalanceListener consumerRebalanceListener;
109 
110     /**
111      * 提交回調,預設記錄日誌。      */
114     private OffsetCommitCallback commitCallback;
115 
116     /**
117      * Whether or not to call consumer.commitSync() or commitAsync() when the
118      * container is responsible for commits. Default true. See
119      * https://github.com/spring-projects/spring-kafka/issues/62 At the time of
120      * writing, async commits are not entirely reliable.
121      */
122     private boolean syncCommits = true;
123 
124     private boolean ackOnError = true;
125 
126     private Long idleEventInterval;
127 
128     public ContainerProperties(String... topics) {
129         Assert.notEmpty(topics, "An array of topicPartitions must be provided");
130         this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
131         this.topicPattern = null;
132         this.topicPartitions = null;
133     }
134 
135     public ContainerProperties(Pattern topicPattern) {
136         this.topics = null;
137         this.topicPattern = topicPattern;
138         this.topicPartitions = null;
139     }
140 
141     public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
142         this.topics = null;
143         this.topicPattern = null;
144         Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided");
145         this.topicPartitions = new LinkedHashSet<>(Arrays.asList(topicPartitions))
146                 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]);
147     }
148 ...省略各種set、get
149 
150 }
複製程式碼

2.5.啟動併發訊息監聽容器

核心類ConcurrentMessageListenerContainer,繼承自抽象類AbstractMessageListenerContainer,類圖如下:

看上圖可知AbstractMessageListenerContainer有2個實現類分別對應單執行緒和多執行緒,建議採用多執行緒消費。下面分析一下主要ConcurrentMessageListenerContainer類,注意2個方法:

1.建構函式,入參:消費者工廠ConsumerFactory+容器配置ContainerProperties

2.doStart():核心方法KafkaMessageListenerContainer的start()方法。原始碼如下:

複製程式碼
  1 public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
  2 
  3     private final ConsumerFactory<K, V> consumerFactory;
  4 
  5     private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
  6 
  7     private int concurrency = 1;
  8 
  9     /**
 10      * Construct an instance with the supplied configuration properties.
 11      * The topic partitions are distributed evenly across the delegate
 12      * {@link KafkaMessageListenerContainer}s.
 13      * @param consumerFactory the consumer factory.
 14      * @param containerProperties the container properties.
 15      */
 16     public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
 17             ContainerProperties containerProperties) {
 18         super(containerProperties);
 19         Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
 20         this.consumerFactory = consumerFactory;
 21     }
 22 
 23     public int getConcurrency() {
 24         return this.concurrency;
 25     }
 26 
 27     /**
 28      * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
 29      * Messages from within the same partition will be processed sequentially.
 30      * @param concurrency the concurrency.
 31      */
 32     public void setConcurrency(int concurrency) {
 33         Assert.isTrue(concurrency > 0, "concurrency must be greater than 0");
 34         this.concurrency = concurrency;
 35     }
 36 
 37     /**
 38      * Return the list of {@link KafkaMessageListenerContainer}s created by
 39      * this container.
 40      * @return the list of {@link KafkaMessageListenerContainer}s created by
 41      * this container.
 42      */
 43     public List<KafkaMessageListenerContainer<K, V>> getContainers() {
 44         return Collections.unmodifiableList(this.containers);
 45     }
 46 
 47     /*
 48      * Under lifecycle lock.
 49      */
 50     @Override
 51     protected void doStart() {
 52         if (!isRunning()) {
 53             ContainerProperties containerProperties = getContainerProperties();
 54             TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
 55             if (topicPartitions != null//校驗併發數>分割槽數,報錯。
 56                     && this.concurrency > topicPartitions.length) {
 57                 this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
 58                         + "equal to the number of partitions; reduced from " + this.concurrency + " to "
 59                         + topicPartitions.length);
 60                 this.concurrency = topicPartitions.length;//併發數最大隻能=分割槽數
 61             }
 62             setRunning(true);
 63             //遍歷建立監聽器容器
 64             for (int i = 0; i < this.concurrency; i++) {
 65                 KafkaMessageListenerContainer<K, V> container;
 66                 if (topicPartitions == null) {
 67                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
 68                 }
 69                 else {
 70                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
 71                             partitionSubset(containerProperties, i));
 72                 }
 73                 if (getBeanName() != null) {
 74                     container.setBeanName(getBeanName() + "-" + i);
 75                 }
 76                 if (getApplicationEventPublisher() != null) {
 77                     container.setApplicationEventPublisher(getApplicationEventPublisher());
 78                 }
 79                 container.setClientIdSuffix("-" + i);
 80                 container.start();//核心方法,啟動容器
 81                 this.containers.add(container);
 82             }
 83         }
 84     }146 ...省略
147 }
複製程式碼

 繼續追蹤,呼叫AbstractMessageListenerContainer的doStart(),值得注意的是start()和stop方法加了同一把鎖,用於鎖住生命週期。

複製程式碼
 1 private 
            
           

相關推薦

kafka原理實踐spring-kafka消費者原始碼

正文系列目錄 ==============正文分割線===================== 回到頂部一、kafkaConsumer消費者模型 如上圖所示,spring-kafka消費者模型主要流程: 1.容器啟動,輪詢執行消費。 2.kafkaConsumer拉取訊息流程: 1)Fetc

kafka原理實踐spring-kafka生產者原始碼

正文系列目錄 本文目錄 1.kafkaProducer傳送模型2.KafkaTemplate傳送模板3.KafkaProducer  3.1KafkaProducer構造過程  3.2 KafkaProducer傳送資料 ==============正文分割線==================

struts原理實踐

本篇我們來討論一下struts的國際化程式設計問題,即所謂的i18n程式設計問題,這一篇我們討論其基礎部分。與這個問題緊密相關的是在各java論壇中被頻繁提及的中文亂碼問題,因為,英、美程式設計人員較少涉及到中文亂碼問題,因此,這方面的英文資料也是非常奇缺的,同時也很少找到這方面比較完整的中文資料,本文也嘗試

Spring學習之旅Spring工作原理再探

容器 mxml 實現 span ssp express 16px 部分 做了 上篇博文對Spring的工作原理做了個大概的介紹,想看的同學請出門左轉。今天詳細說幾點。 (一)Spring IoC容器及其實例化與使用 Spring IoC容器負責Bean的實例化、配置和組裝工

Spring Boot 最佳實踐模板引擎Thymeleaf集成

data 圖層 int app 創建模板 原因 xmlns make 使用場景 一、Thymeleaf介紹 Thymeleaf是一種Java XML / XHTML / HTML5模板引擎,可以在Web和非Web環境中使用。它更適合在基於MVC的Web應用程序的視圖層提供X

Linux下的socket程式設計實踐TCP服務端優化常見函式

併發下的殭屍程序處理 只有一個程序連線的時候,我們可以使用以下兩種方法處理殭屍程序: 1)通過忽略SIGCHLD訊號,避免殭屍程序     在server端程式碼中新增     signal(

CDH5實踐Cloudera Manager 5安裝中碰到的一些問題解決辦法

問題一:安裝過程中會出現失敗,檢視detail資訊中,提示 host 無法連線 解決:前面的文章中漏了對主機host的一個修改, 除了修改 /etc/hosts 檔案外,我們還需要修改修改  /et

Linux下的socket程式設計實踐TCP的粘包問題常用解決方案

TCP粘包問題的產生 由於TCP協議是基於位元組流並且無邊界的傳輸協議, 因此很有可能產生粘包問題。此外,傳送方引起的粘包是由TCP協議本身造成的,TCP為提高傳輸效率,傳送方往往要收集到足夠多的資料

Spring Boot自動配置原理實踐

前言   Spring Boot眾所周知是為了簡化Spring的配置,省去XML的複雜化配置(雖然Spring官方推薦也使用Java配置)採用Java+Annotation方式配置。如下幾個問題是我剛開始接觸Spring Boot的時候經常遇到的一些疑問,現在總結出來希望能幫助到更多的人理解Spring B

API開發實踐 返回HTML

acea 指定 win filename static box 拖動地圖 ive let 分為兩個部分:生成HTML和返回HTML 生成HTML: 最終想要的時顯示地圖,不可避免的使用高德地圖的API。 【地圖API】地址錄入時如何獲得準確的經緯度?淘寶收貨地址詳解 改變幾

基於rhel7.2的Zabbix平臺搭建部署

linux 監控軟件 zabbix nginx mysql php lnmp基於rhel7.2的Zabbix平臺搭建和部署(四)一、實現zabbix添加監測項,添加對Linux主機的監控、說明:先在“配置”-“主機”裏添加主機監控,監控os資源:內存,cpu,io,負載,帶寬等.(1)登錄zabbix,先在

MVC項目實踐——EDM實現

開發 ron key com sum lldb 實體類 資源管理器 space 實體數據模型 (EDM) 是一個規範,用於定義由在 實體框架 基礎上生成的應用程序使用的數據。使用 EDM 的應用程序在設計架構中定義應用程序域中的實體和關系。設計架構用於生成由應用程序代碼使用

三維渲染引擎設計與實踐

方式 lora 扇面 多個 幀緩存 binding osg smo tco 五、繪制幾何對象和文字 幀緩存(Frame Buffer)為用戶與顯示設備交互的一個接口,將顯示的畫面抽象成一塊可以進行讀寫操作的內存區域。 幀緩存的每一個存儲單元都對應顯示屏上的一個像素。整個緩存

菜鳥調錯——Spring與DWR集成,配置文件報錯

microsoft his www data editors bing ces myeclipse java 背景簡單介紹:該項目是市信用辦的一個系統,之前好像是一個石家莊的公司負責的。我屬於是半路接手。拿到源代碼後。依據他們給的簡(shao)單(de)明(ke)了(l

Spring學習之路spring對數據庫操作

date val mapr text core 導入 sed package assert 1、導入jdbc.jar、tx. jar包 2、測試 package com.junit; import static org.junit.Assert.*;

Spring Boot實戰筆記-- Spring常用配置事件Application Event

ans can string code text extends autowired dem plc 一、事件(Application Event)   Spring的事件為Bean和Bean之間的消息通信提供了支持。當一個Bean處理完一個任務之後,希望另一個Bean知道

spring cloud雲服務架構 - particle雲架構代碼結構詳細講解

springcloud 雲服務 架構 代碼 結構 上一篇我們介紹了spring cloud雲服務架構 - particle雲架構代碼結構,簡單的按照幾個大的部分去構建代碼模塊,讓我們來回顧一下:第一部分: 針對於普通服務的基礎框架封裝(entity、dao、service、controll

Spring Boot參考教程Spring Boot配置使用之配置文件用法

point rop 推薦書 endpoint size int == 需要 相同 4.1 Spring Boot配置使用之配置文件用法 Spring Boot旨在簡化配置,但依然需要進行少量配置來滿足應用的特定需要。 配置方式拋棄了XML文件的配置方式,主要使用配置文件

慕課網 星級評分原理實現

方法 click down cti 原理 als row rep ava 源碼下載 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8">

Spring實戰Spring高級裝配中的bean profile

優先 contex 如何 文件中 web.xml 多個 定義 blog 軟件   profile的原意為輪廓、剖面等,軟件開發中可以譯為“配置”。   在3.1版本中,Spring引入了bean profile的功能。要使用profile,首先要將所有不同的bean定義整理