1. 程式人生 > >spark讀取kafka資料(兩種方式比較及flume配置檔案)

spark讀取kafka資料(兩種方式比較及flume配置檔案)

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 150000

a1.channels.c1.transactionCapacity = 1500

a1.sources.r1.type = TaildirSourceSelf

a1.sources.r1.batchSize = 500

a1.sources.r1.skipToEnd = true

a1.sources.r1.ignorePathChange = true

a1.sources.r1.positionFile = test/logConf/xxx_taildir_position.json

a1.sources.r1.writePosInterval = 5000

a1.sources.r1.filegroups = f0

a1.sources.r1.filegroups.f0 = xxx/xxx/*.*

a1.sources.r1.file.header.enabled = true

a1.sources.r1.file.name.key = tag.default.filename

a1.sources.r1.dir.header.enabled = true

a1.sources.r1.dir.name.key = tag.default.dirname

a1.sources.r1.inode.header.enabled = false

a1.sources.r1.inode.key = tag.default.inode

# For Kafka Sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = mn01:9092

a1.sinks.k1.kafka.topic = test-topic

a1.sinks.k1.defaultPartitionId = 0

a1.sinks.k1.flumeBatchSize = 500

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.max.request.size = 5120000

相關推薦

spark讀取kafka資料方式比較flume配置檔案

a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.channels.c1.type = memory a1.channels.c1.capacity

Spark獲取Kafka資料方式原始碼

在sparkstreaming接受資料的時候有兩種方式 (1)DirectKafkaInputDStream 使用的是 Direct 方式獲取資料;DirectKafkaInputDStream 繼承自 InputDStream (2)Socket

Spark-streaming kafka資料接收方式

@Author : Spinach | GHB @Link : http://blog.csdn.net/bocai8058 1 Receiver-based Approach import org.apache.spark.streaming.k

Spark-Streaming獲取kafka資料方式:Receiver與Direct的方

 簡單理解為:Receiver方式是通過zookeeper來連線kafka佇列,Direct方式是直接連線到kafka的節點上獲取資料 回到頂部 使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的資料都儲存在Spark Exec

sparkStreaming讀取kafka資料的2方式

方式一  Receiver           使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的資料都儲存在Spark Executor的記憶體中,然後Spa

SpringBoot讀取配置檔案方式以及自定義配置檔案讀取

1.讀取預設配置檔案中的資料 application.properties 直接使用@Value註解獲取資料 2.使用Environment獲取資料 防止亂碼統一編碼格式 注入Environment 使用getPro

FileInputStream讀取位元組流。讀取檔案資料方式(寫的好)

總結:    //1讀取檔案的資料到位元組流inputStream     InputStream inputStream = new FileInputStream("D:\\demo.txt");//讀取檔案的資料到位元組流inputStream。

sparkstreaming和kafka整合的方式最全

-1,基於接收者的方法 運算元:KafkaUtils.createStream 方法:PUSH,從topic中去推送資料,將資料推送過來 API:呼叫的Kafka高階API 效果:SparkStreaming中的Receivers,恰好Kafka有釋出/

Spark RDD/DataFrame map儲存資料方式

使用Spark RDD或DataFrame,有時需要在foreachPartition或foreachWith裡面儲存資料到本地或HDFS。 直接儲存資料 當然如果不需要在map裡面儲存資料,那麼針對RDD可以有如下方式 val rdd = // targ

手把手教你用MFC和OpenCV,製作mfc讀取並顯示影象方式

MFC OpenCV 讀取並顯示影象 *************************************************完成的效果如圖******************************************** 前言:我用的是VS2013 + O

讀取格式的配置檔案xml方式,txt,excel,csv,json

using Mono.Xml; using System.Security; using LitJson; using System.Collections.Generic; using System.IO; using Excel; using S

springMVC --全局異常處理方式

nal font method mil -h util 一個 fail space 首先看springMVC的配置文件: <!-- 全局異常配置 start --> <bean id="exceptionResolv

Android------Button 添加聲音效果方式

div abs 一次 播放 pool 當前 傳送門 ide col 我在先前的案例《Android 的底部導航欄 BottomNavigationBar》中添加以底部 的4個按鈕切換添加聲音 下來看看案例效果圖 使用添加依賴 comp

【C語言】實現一個計算器方式

1.使用switch…case…語句實現 #define _CRT_SECURE_NO_DEPRECATE 1 #include <stdio.h> #include <stdlib.h> int add(int x, int y) {   &nb

(轉)Activity和Fragment傳遞資料方式

1、第一種方式,也是最常用的方式,就是使用Bundle來傳遞引數 MyFragment myFragment = new MyFragment(); Bundle bundle = new Bundle(); bundle.putString("DATA",values);//這裡的va

MongoDB的使用學習之MongoDB的聚合查詢方式附專案原始碼

@Testpublic void save() { News n = null;for (int i = 0; i < 10000; i++) { n = new News(); n.setTitle("title_" + i);

Hadoop的安裝方式

Hadoop的安裝(偽分散式和完全分散式) (一)偽分散式的安裝 {生產中不用 自己測試的時候,有時會用} 假的分散式 所有的程序全部執行在一個節點上 linux操作 普通使用者下 1.安裝準備 1)設定ip(安裝Linux時應該已經設定完畢了)

Eigen庫安裝方式

                  Eigen庫安裝指南(兩種方式)1、apt-get方式(假設預設安裝到/usr/local/include裡(可在終端中輸入locate eigen3檢視位置),若實際中預設安裝到了/usr/include的話,可以對應替換下面命令的相應部分)執行命令: sudo ap

屬性動畫方式

1.程式碼形式 package com.bawei.mymvp.animation; import android.animation.Animator; import android.animation.AnimatorInflater; import android.animati

Java + 原生MongoDB驅動 API 使用案例詳說方式

前不久,博主利用spring-boot結合spring-data-mongodb包,搞了一把mongodb的整合  -- 增刪改查這種方式,比較固執,使用起來雖然方便,但是太依賴spring,我想建立自己的dbname,都無從下手(可能我還沒探究到),只能使用配置檔案裡面一開