1. 程式人生 > >Spark_Spark on YARN 提交配置檔案,讀取配置檔案

Spark_Spark on YARN 提交配置檔案,讀取配置檔案

Spark ON YARN 的官方文件,基於Spark 2.1.1

To use a custom log4j configuration for the application master or executors, here are the options:

  • upload a custom log4j.properties using spark-submit, by adding it to the --files list of files to be uploaded with the application.
  • add -Dlog4j.configuration=<location of configuration file>
     to spark.driver.extraJavaOptions (for the driver) or spark.executor.extraJavaOptions (for executors). Note that if using a file, the file: protocol should be explicitly provided, and the file needs to exist locally on all the nodes.
  • update the $SPARK_CONF_DIR/log4j.properties file and it will be automatically uploaded along with the other configurations. Note that other 2 options has higher priority than this option if multiple options are specified.

Note that for the first option, both executors and the application master will share the same log4j configuration, which may cause issues when they run on the same node (e.g. trying to write to the same log file).

單個檔案

通過以上的描述,可以看到可以通過 --files 傳預設值的配置

例如 :

--files  log4j.config 這種

多個檔案 

那麼如果提交多個檔案怎麼辦呢,此時我們需要用 ( , ) 逗號進行分割。

--files redis.config,mysql.config

我們將提交多個檔案寫成指令碼 :

ROOT_PATH=$(dirname $(readlink -f $0))

## config , Job config files
config=""
for file in ${ROOT_PATH}/config/*
do
        config="${file},${config}"
done


nohup /usr/bin/spark2-submit \
    --class ${class_name} \
    --name ${JOB_NAME} \
    --files ${config} \
    --master yarn \
    --driver-memory 2G \
    --driver-cores 1 \
    --num-executors 3 \
    --executor-cores 2 \
    --executor-memory 2G \
    --jars ${classpath} \
    ${ROOT_PATH}/libs/${APP_NAME}-${libVersion}-SNAPSHOT.jar online ${config} \
    > ${ROOT_PATH}/logs/start.error 2> ${ROOT_PATH}/logs/start.log &

可以看到真實的路徑為如下描述:

--- Test ---
config files : /data-hdd/00/project/cloudera-scm/spark-workspace/onlineJob/TD-clickImp-blacklist-redis/0.0.6/config/redis_cluster.conf,/data-hdd/00/project/cloudera-scm/spark-workspace/onlineJob/TD-clickImp-blacklist-redis/0.0.6/config/LAN_ip,/data-hdd/00/project/cloudera-scm/spark-workspace/onlineJob/TD-clickImp-blacklist-redis/0.0.6/config/kafka_cluster.conf,

拆分為這3個檔案:

/data-hdd/00/project/cloudera-scm/spark-workspace/onlineJob/TD-clickImp-blacklist-redis/0.0.6/config/redis_cluster.conf,

/data-hdd/00/project/cloudera-scm/spark-workspace/onlineJob/TD-clickImp-blacklist-redis/0.0.6/config/LAN_ip,

/data-hdd/00/project/cloudera-scm/spark-workspace/onlineJob/TD-clickImp-blacklist-redis/0.0.6/config/kafka_cluster.conf,
 

那麼如何獲取到這幾個檔案呢,網上有一些方法,這裡我針對於Java開發了一套比較通用的方法:

首先,注意我們把這些路徑通過引數的方式傳遞進 main 方法中。

//獲取上傳的檔名

Set<String> fileFullPathContainer = null;

if (args.length < 2 && (GlobalVariable.env != EnvType.IDE)) {
    return;
} else {
    fileFullPathContainer = FileUtil.filesSplit(args[1], ",");
}

filesSplit 方法:

public static Set<String> filesSplit(String filesString, String separator) {

	Set<String> resultSet = new HashSet<>();
	String[] tmpArr = filesString.split(separator);
	resultSet.addAll(Arrays.asList(tmpArr));

	return resultSet;
}

這樣我們就可以通過絕對路徑讀取到配置檔案了,下面是一個示例:

if (GlobalVariable.env == EnvType.IDE) {
	ConfigCenter.init(GlobalVariable.env);
	configCenter = ConfigCenter.getInstance();
} else {
	ConfigCenter.init(null);
	configCenter = ConfigCenter.getInstance();

	//讀取Kafka 配置
	Properties kafkaProps = new Properties();
	kafkaProps.load(new FileInputStream(FileUtil.findFileFullPath(fileFullPathContainer, "kafka_cluster.conf")));
	configCenter.setKafkaConfig(kafkaProps);

	//讀取Redis 配置
	Properties redisProps = new Properties();
	redisProps.load(new FileInputStream(FileUtil.findFileFullPath(fileFullPathContainer, "redis_cluster.conf")));
	configCenter.setRedisConfig(redisProps);
}

findFileFullPath 方法:

public static String findFileFullPath(Set<String> container, String shortName) {

	for (String tmpString : container) {
		if (tmpString.contains(shortName)) {
			return tmpString;
		}
	}

	return null;
}