1. 程式人生 > >mysql 連結,資料處理

mysql 連結,資料處理

**
  * Created by Administrator on 2018/1/28.
  */
object LogsAnalys {
  val driver = "com.mysql.jdbc.Driver"
  val url = "jdbc:mysql://url:3306/ntd_db?useUnicode=true&characterEncoding=utf8"
  val username = "user"
  val password = "password"

def Format(timestamp: String): String = {

val sdf = new

SimpleDateFormat("yyyyMMdd") val time = sdf.format(new Date(timestamp.toLong)) time } def main(args: Array[String]): Unit = { println("logs.....................") var connection:Connection = null var prepareStatement: PreparedStatement = null var jsonFile: BufferedSource = null var counts = 0 try
{ val jsonCitys = "D:\\2018-01-28.log" // val jsonCitys = "E:\\navinfo_workspace\\x_data\\log\\log\\2018-01-23.log" // val jsonCitys = "E:\\navinfo_workspace\\x_data\\log\\log\\2018-01-24.log" // val jsonCitys = "E:\\navinfo_workspace\\x_data\\log\\log\\2018-01-25.log" // jsonFile = Source.fromFile(jsonCitys) connection
= DriverManager.getConnection(url, username, password) prepareStatement = connection.prepareStatement("insert into ntd_db.ngixlog_primeval_infos (date,guid,appkey,version,res01,res02) values (?,?,?,?,?,?);") connection.setAutoCommit(false); for (line <- jsonFile.getLines()) { counts += 1 if (!line.contains("POST") || !line.contains("HTTP")) { println("error !!!!!!!!!! no post or http sign ..............." + line) } var version = "" var res01 = "" var res02 = 0 if (line.contains("N-IS")) { val nisIndex = line.indexOf("N-IS V") val signIndex = line.indexOf('"', nisIndex) version = line.substring(nisIndex + 6, signIndex) } else if (line.contains("S-IS")) { val iisIndex = line.indexOf("S-IS") val signIndex = line.indexOf('"', iisIndex) version = line.substring(iisIndex + 5, signIndex) } else { println("error!!!!!!!!!! version sign -------------" + line) } val split: Array[String] = line.split(" ", -1) val tmp_date: String = split(3) val date = tmp_date.substring(1, tmp_date.size) val parseFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:SS", Locale.ENGLISH) val myDate = parseFormat.parse(date) val times = myDate.getTime.toString val day = Format(times) val indexPost = line.indexOf("POST") val indexHttp = line.indexOf("HTTP") val apk_guid_line = line.substring(indexPost, indexHttp) val split_agl: Array[String] = apk_guid_line.split("/", -1) var appkey = "" var guid = "" if (apk_guid_line.contains("service") || (apk_guid_line.contains("upload") && line.contains("S-IS"))) { guid = split_agl(split_agl.length - 1) appkey = split_agl(split_agl.length - 2) } else if (apk_guid_line.contains("upload")) { guid = split_agl(split_agl.length - 2) appkey = split_agl(split_agl.length - 3) } else { println("error!!!!!!!!!!!!!!!") } // println(split_agl.length) // println(apk_guid_line) // if (!guid.contains("-")) { // println(" --------------") // println(day + " :::" + version + " :: " + appkey + " ::: " + guid) // } prepareStatement.setString(1,day) prepareStatement.setString(2,guid) prepareStatement.setString(3,appkey) prepareStatement.setString(4,version) prepareStatement.setString(5,res01) prepareStatement.setInt(6,res02) prepareStatement.addBatch() if(counts % 600 == 0){ prepareStatement.executeBatch() connection.commit() } } prepareStatement.executeBatch() connection.commit() }catch { case e:Exception =>{ e.printStackTrace() } }finally { prepareStatement.close() connection.close() } println("一共------------------" + counts) jsonFile.close() } }

import java.sql.{Connection, DriverManager, PreparedStatement} import com.alibaba.fastjson.{JSON, JSONObject} import scala.io.{BufferedSource, Source} import java.text.SimpleDateFormat import java.util.Date  /**  * Created by Administrator on 2018/1/28.  */ object YixinAnalys { val driver = "com.mysql.jdbc.Driver"  val url = "jdbc:mysql://url:3306/db?useUnicode=true&characterEncoding=utf8"  val username = "root"  val password = "password"   def yiXinTimeFormat(timestamp: String): String = { val sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss") val time = sdf.format(new Date(timestamp.toLong)) time  } def main(args: Array[String]): Unit = { println("logs.....................") var connection: Connection = null  var prepareStatement: PreparedStatement = null  var jsonFile: BufferedSource = null  var counts = 0  try { //Class.forName(driver)  connection = DriverManager.getConnection(url, username, password) prepareStatement = connection.prepareStatement("insert into ntd_db.yixin_primeval_infos (date,guid,appkey,version,hour) values (?,?,?,?,?);") connection.setAutoCommit(false); // val jsonCitys = "E:\\navinfo_workspace\\x_data\\yin\\in\\1516606537778_json.txt"  // val jsonCitys = "E:\\navinfo_workspace\\xu_data\\in\\in\\1516861216718_json.txt"  val jsonCitys = "D:\\caIJIBAO\\20180129data\\data20180129\\yi\\1515634349929_json.txt"  jsonFile = Source.fromFile(jsonCitys) for (line <- jsonFile.getLines()) { counts += 1  val parseObject: JSONObject = JSON.parseObject(line) val headObject: JSONObject = parseObject.getJSONObject("header") val ver = headObject.getString("version") val appkey = headObject.getString("appKey") val guid = headObject.getString("guid") val time_tamp = headObject.getString("timestampUTC_ms") val day = yiXinTimeFormat(time_tamp) val dateall = day.split(" ") val date = dateall(0) val hour = dateall(1) prepareStatement.setString(1, date) prepareStatement.setString(2, guid) prepareStatement.setString(3, appkey) prepareStatement.setString(4, ver) prepareStatement.setString(5,hour) prepareStatement.addBatch() if (counts % 1000 == 0) { prepareStatement.executeBatch() connection.commit() } } prepareStatement.executeBatch() connection.commit() } catch { case e: Exception => { e.printStackTrace() } } finally { prepareStatement.close() connection.close() } println("counts....." + counts) jsonFile.close() } }
MySQL 地址:172.22.52.97 埠 :3306 DataBase: ntd_db 使用者名稱:ntd_user 密碼:ntd123 //易鑫原始資料 create table ntd_db.yixin_primeval_infos( ID INT primary key auto_increment, //主鍵自增 date VARCHAR(32), //日期 guid VARCHAR(256), //guid appkey VARCHAR(64), //appkey version VARCHAR(32), //版本 hour VARCHAR(256), //預留欄位1 res02 bigint //預留欄位2 ); ALTER TABLE ntd_db.yixin_primeval_infos ADD INDEX index_name (date,guid,appkey,version) create table ntd_db.yixin_20180129_infos( ID INT primary key auto_increment, date VARCHAR(32), guid VARCHAR(256), appkey VARCHAR(64), version VARCHAR(32), hour VARCHAR(256), res02 bigint ); // insert into yixin_20180129_infos(date,guid,appkey,version,hour) select date,guid,appkey,version,hour from yixin_primeval_infos where date = 20180129 and appkey ='8cce364eaf4c3e26bedc9b0e23a41859'; //nglog原始資料 create table ntd_db.ngnixlog_primeval_infos( ID INT primary key auto_increment, date VARCHAR(32), guid VARCHAR(256), appkey VARCHAR(64), version VARCHAR(32), res01 VARCHAR(256), res02 bigint ); ALTER TABLE ntd_db.ngnixlog_primeval_infos ADD INDEX index_name (date,guid,appkey,version) //20180129 資料 log create table ntd_db.ngnixlog_20180129_infos( ID INT primary key auto_increment, date VARCHAR(32), guid VARCHAR(256), appkey VARCHAR(64), version VARCHAR(32), res01 VARCHAR(256), res02 bigint ); insert into ngnixlog_20180129_infos(date,guid,appkey,version) select date,guid,appkey,version from ngixlog_primeval_infos where date = 20180129 and appkey ='8cce364eaf4c3e26bedc9b0e23a41859'; //查詢新增活躍留存 select count(case when a.guid is not null and b.guid is null then a.guid end) as new_num, count(case when a.guid is not null and b.guid is not null then a.guid end) as act_num, count(case when a.guid is not null and b.guid is not null then a.guid end) as liucun_num from (select guid from ngnixlog_20180129_infos) as a join (select guid from ngixlog_primeval_infos) as b on a.guid=b.guid //附件 select count(case when a.guid is not null and b.guid is null then a.guid end) as new_num, distinct a.guid as act_num, count(case when a.guid is not null and b.guid is not null then a.guid end) as liucun_num from (select guid from ngnixlog_20180129_infos) as a join (select guid from ngixlog_primeval_infos) as b on a.guid=b.guid; //當日活躍使用者 select distinct guid,date,appkey from yixin_primeval_infos where date = 20180129; //新增使用者 select distinct guid ,guid from ngnixlog_20180129_infos t where not exists (select * from ngixlog_primeval_infos m where t.guid=m.guid); //留存使用者 select distinct guid ,guid from ngnixlog_20180129_infos t where exists (select * from ngixlog_primeval_infos m where t.guid=m.guid); //累計使用者 select sum(distinct guid) ,guid from ngnixlog_20180129_infos t where not exists (select * from ngixlog_primeval_infos m where t.guid=m.guid); 
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>jsonparse</groupId> <artifactId>jsonparse</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <scala.version>2.10.6</scala.version> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-exec</artifactId> <version>1.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.4</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j -->  <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> </dependency> <dependency> <groupId>org.apache.any23</groupId> <artifactId>apache-any23-csvutils</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.ahocorasick</groupId> <artifactId>ahocorasick</artifactId> <version>0.4.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <id>build-exec-bundle</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>false</minimizeJar> <artifactSet> <includes> <include>com.alibaba:fastjson</include> </includes> </artifactSet> </configuration> </execution> </executions> </plugin> <plugin> <!-- see http://davidb.github.com/scala-maven-plugin -->  <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration> <testFailureIgnore>true</testFailureIgnore> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... -->  <!-- useManifestOnlyJar>false</useManifestOnlyJar -->  <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <!-- 自定義打zip包 -->  </plugins> </build> </project>
package hourdis import java.text.SimpleDateFormat import java.util.Date  import com.alibaba.fastjson.JSON import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext  import scala.collection.mutable /**  * 每日活躍使用者時長統計  * Created by xuejiao on 2018/1/27.  */ object HourAnys { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val conf = new SparkConf() conf.setMaster("local[*]") conf.setAppName("OperaUser") conf.setAppName(s"${this.getClass.getSimpleName}") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val input = "E:\\yixin\\*"  val output = "E:\\guidtime\\6"  val input1 = "D:\\caIJIBAO\\20180129data\\data20180129\\yixin\\*"  val output1 = "E:\\yixin\\"   def main(args: Array[String]): Unit = { //analyfile()  houranalysis } //時間戳轉換  def timeFormat(time: String): String = { //秒  var sdf: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd HH") //天  //var sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")  var date: String = sdf.format(new Date((time.toLong))) // println(date)  date  } def timeFormat1(time: String): String = { //秒  var sdf: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd:HH:mm:ss") //天  //var sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")  var date: String = sdf.format(new Date((time.toLong))) // println(date)  date  } def abs(x:Double) ={ if (x<0) -x else x  } def houranalysis: Unit = { val file = sc.textFile(input) file.filter(y => { y.contains("20180129") }).map(line => { val split = line.split(" ") // println(line)  val guid = split(0) val day = split(1) val time = split(2) val stamp = split(3) val lon = split(4).substring(0,3)+"."+split(4).substring(3,split(4).length) val lat = split(5).substring(0,2)+"."+split(5).substring(2,split(5).length) val timedate = split(6) //(guid, time + ",")  (guid+","+time,(lon+","+lat+","+timedate+"\n")) }).reduceByKey(_ + _).map(line =>{ val split = line._2.split("\n") line._1+","+split(0) }).sortBy(line =>{ val split = line.split(",") split(0) }) /* .map(line => {  var li = ""  // try {   val guidhour: Array[String] = line._2.split(",")  val newhourdis = guidhour(guidhour.size - 1).toDouble - guidhour(0).toDouble  println(newhourdis)  line._1 + "," + newhourdis  // println(line._2)  /* val guidstamp = line._2.split(",")  println(guidstamp)  val guidmean = (guidstamp(guidstamp.size-1).toDouble)/1000 -(guidstamp(0).toDouble/1000)  //println(guidmean/3600)  val d = guidmean/3600  val abs1 = abs(d)  val guidhour2 = abs1.toString.substring(0,3)  line._1+","+abs1*/   // } catch {  // case e: Exception => ""  // }   })*/  // .collect()  .repartition(1).saveAsTextFile(output) } //解析存在里程儲存資料  def analyfile(): Unit = { val file = sc.textFile(input1) print(file) file  .filter(y => { y.contains("8cce364eaf4c3e26bedc9b0e23a41859") }) // .filter(y => {  // y.contains("D7827B0C-E747-45C7-AE87-1E9D30B5C947")  // })  .map(line => { try { var li = ""  val json = JSON.parseObject(line) val header = json.getString("header") val head = JSON.parseObject(header) val appKey = head.getString("appKey") val guid = head.getString("guid") print(guid) val path: String = json.getString("path") val position = JSON.parseArray(path) val point = position.getString(0) val parsepoint = JSON.parseObject(point) val parseArray = JSON.parseArray(parsepoint.getString("position")) val strall = new mutable.StringBuilder() var lonSum = 0L  var latSum = 0L  var timeutc = 0L  var timegps = 0L  for (i <- 0 until parseArray.size()) { val datas = parseArray.getString(i) val data = JSON.parseObject(datas) val timestampUTC_ms = data.getString("timestampUTC_ms") val timestampGPS_ms = data.getString("timestampGPS_ms") val longtimgps = timestampGPS_ms.toLong val longtime = timestampUTC_ms.toLong val longitude_deg = data.getString("longitude_deg") val latitude_deg = data.getString("latitude_deg") val lon = longitude_deg.toLong val lat = latitude_deg.toLong timeutc += longtime  timegps += longtimgps  val timestampGPS = timeFormat(timegps.toString) val timestampGPSF: String = timeFormat1(timegps.toString) val timestampUTC = timeFormat(timeutc.toString) lonSum += lon  latSum += lat  println(lonSum + +latSum) // val timeday = timeFormat(timestamp)  //if ((lat >= 399826400 && lon >= 823498900) && (lat <= 455500200 && lon <= 905825900)) {  // val all = strall.append(guid + "," + timestampUTC + "," + timeutc + "," + timestampGPS + "," + timegps + "," + lonSum + "," + latSum + "\n")  val all = strall.append(guid + " " + timestampGPS + " "+ timegps+ " " + lonSum + " " + latSum +" "+timestampGPSF+"\n") //}   li = all + ""  } print(li) li   } catch { case e: Exception => ""  } }).filter(y => { y.trim.length > 20  }) // .collect()  .repartition(1).saveAsTextFile(output1) } }