歷年滬深A股、香港H股票資料匯入和實時資料更新展示
阿新 • • 發佈:2019-01-01
<?php defined('IN_HEAVEN') or die('Hacking Attempt!'); /** * 股票資料匯入 * * PHP version 5 * * @category 4SWeb * @package Admin * @subpackage Custom * @version SVN: $Id: StockImport.class.php 67 2014-11-25 15:12:29Z blin.z $ */ import('COM.GZNC.Import.CSVImport'); import('ORG.Util.SQLKit'); import('ORG.Util.File'); import('ORG.Util.String'); class StockImport extends CSVImport{ const ERR_GUESS_FILE = 1201; // 檔名不對 /** * 資料型別 * @var const */ // 滬深 /** * 滬深每日財務資料(2008年3月18日前,37項) * @var const */ const SHSZ_FINANCE37 = 'SHSZ_finance37'; // 滬深每日財務資料(2008年3月18日前,37項) /** * 滬深每日財務資料(2008年3月19日起,51項) * @var const */ const SHSZ_FINANCE = 'SHSZ_finance'; // 滬深每日財務資料(2008年3月19日起,51項) /** * 滬深板塊分類資料 * @var const */ const SHSZ_BLOCK = 'SHSZ_block'; // 滬深板塊分類資料 /** * 滬深歷年財務報表明細資料_利潤表 * @var const */ const SHSZ_INCOME_STATEMENT = 'SHSZ_income_statement'; // 滬深歷年財務報表明細資料_利潤表 /** * 滬深歷年財務報表明細資料_現金流量表 * @var const */ const SHSZ_CASHFLOW_STATEMENT = 'SHSZ_cashflow_statement'; // 滬深歷年財務報表明細資料_現金流量表 /** * 滬深歷年財務報表明細資料_資產負債表 * @var const */ const SHSZ_BALANCE_SHEET = 'SHSZ_balance_sheet'; // 滬深歷年財務報表明細資料_資產負債表 /** * 滬深歷年財務報表明細資料_相關91項分析指標 * @var const */ const SHSZ_FINANCIAL_ANALYSIS_INDEX = 'SHSZ_financial_analysis_index'; // 滬深歷年財務報表明細資料_相關91項分析指標 /** * 滬深歷年股本變動資料 * @var const */ const SHSZ_CAPITAL_STOCK = 'SHSZ_capital_stock'; // 4-滬深歷年股本變動資料 /** * 滬深歷年十大股東變遷資料 * @var const */ const SHSZ_SHAREHOLDER = 'SHSZ_shareholder'; // 5-滬深歷年十大股東變遷資料 /** * 滬深歷年十大流通股東變遷資料 * @var const */ const SHSZ_TRADABLE_SHAREHOLDER = 'SHSZ_tradable_shareholder'; // 5-滬深歷年十大流通股東變遷資料 /** * 滬深日線資料 * @var const */ const SHSZ_DAY = 'SHSZ_day'; // 6-滬深日線資料 /** * 滬深日線資料(向前復權) * @var const */ const SHSZ_DAY_FORWARD = 'SHSZ_day_forward'; // 6-滬深日線資料 向前復權 /** * 滬深5分鐘資料 * @var const */ const SHSZ_5MIN = 'SHSZ_5Min'; // 6-滬深5分鐘資料 /** * 滬深5分鐘資料(向前復權) * @var const */ const SHSZ_5MIN_FORWARD = 'SHSZ_5Min_forward'; // 6-滬深5分鐘資料 向前復權 /** * 滬深權息資料 * @var const */ const SHSZ_SPLIT = 'SHSZ_split'; // 6-滬深權息資料 /** * 滬深實時資料 * @var const */ const SHSZ_REALTIME = 'SHSZ_realtime'; // 6-滬深實時資料 /** * 滬深實時5分鐘資料統計 * @var const */ const SHSZ_REALTIME_5MIN = 'SHSZ_realtime_5Min'; // 滬深實時5分鐘資料統計 // 港股 /** * 港股財務資料 * @var const */ const HK_FINANCE = 'HK_finance'; // 港股財務資料 /** * 港股行業資料 * @var const */ const HK_INDUSTRY = 'HK_industry'; // 港股行業資料 /** * 港股日線資料 * @var const */ const HK_DAY = 'HK_day'; // 港股日線資料 /** * 港股日線資料(向前復權) * @var const */ const HK_DAY_FORWARD = 'HK_day_forward'; // 港股日線資料 向前復權 /** * 港股5分鐘資料 * @var const */ const HK_5MIN = 'HK_5Min'; // 港股5分鐘資料 /** * 港股5分鐘資料 向前復權 * @var const */ const HK_5MIN_FORWARD = 'HK_5Min_forward'; // 港股5分鐘資料 向前復權 /** * 港股權息資料 * @var const */ const HK_SPLIT = 'HK_split'; // 港股權息資料 /** * 港股實時資料 * @var const */ const HK_REALTIME = 'HK_realtime'; // 港股實時資料 /** * 港股實時5分鐘資料統計 * @var const */ const HK_REALTIME_5MIN = 'HK_realtime_5Min'; // 港股實時5分鐘資料統計 /** * 監聽型別 */ const LISTENER_IMPORT_RESULT = 1; /** * 監聽器 * @var array */ protected static $_listners = array(); /** * 統計 * @var array */ protected $_stats = array(); /** * 當前資料驅動 * @var string */ protected $_driver = NULL; /** * SQL模式 * @var mixed */ protected $_sql_mode = ''; /** * 每日下載目錄 * @param string */ protected $_down_path = './WStockDown/'; /** * 每日更新目錄 * @param string */ protected $_update_path = './WStockUpdate/'; /** * 實時更新目錄 * @param string */ protected $_realtime_path = './WStockRealtime/'; /** * 日誌目錄 * @param string */ protected $_log_path = './WStockLog/'; /** * 資料庫日誌id */ protected $_log_id = NULL; /** * 是否寫SQL語句日誌 */ protected $_log_sql = false; /** * 是否為更新匯入 * @param boolean */ protected $_is_update = false; /** * 檔案預設年份 * @var string */ protected $_default_file_year = NULL; /** * 表格欄位定義 * @var array */ protected static $_fields_maps = NULL; /** * 最後一條SQL * @var string */ protected $_last_sql = NULL; public function __construct($driver='stockimport'){ $this->_driver = $driver; // 匯入工作目錄 $stock_path = ($t=C('STOCK_WORK_PATH'))?$t:DATA_PATH; $this->_down_path = $stock_path . 'WStockDown/'; $this->_update_path = $stock_path . 'WStockUpdate/'; $this->_realtime_path = $stock_path . 'WStockRealtime/'; $this->_log_path = $stock_path . 'WStockLog/'; /* if(!file_exists($this->_update_path) && !mkdir($this->_update_path)){ return $this->_trigger_error("Cann't create update path {$this->_update_path}", SCRIPT_ERR_CONFIG, $this->_update_path, __FILE__, __LINE__); }*/ /* if(!file_exists($this->_log_path) && !mkdir($this->_log_path)){ return $this->_trigger_error("Cann't create log path {$this->_log_path}", SCRIPT_ERR_CONFIG, $this->_log_path, __FILE__, __LINE__); }*/ //$this->_log = $log_dir . '/' . $this->_driver . '_' . date('Ymd') . '.log'; $log_dir = $this->_get_log_dir(date('Y-m-d'), true); $this->_log = $log_dir . $this->_driver. '_' . date('Ymd') . '.log'; $this->_logLevel = ($l=C('STOCK_IMPORT_LOG_LEVEL'))?$l:Log::INFO; // 忽略空資料、檔案不存在錯誤 $this->_ignoreError = array(self::ERR_EMPTY_DATA, self::ERR_FILE_READ, self::ERR_HTTP_REQUEST, self::ERR_GUESS_FILE); // 是否把SQL語句寫入日誌? $this->_log_sql = C('STOCK_IMPORT_LOG_SQL'); } /** * 資料匯入例項工廠 * @param string $driver 資料型別名稱 * @return object */ public static function factory($driver){ $class = ucfirst(preg_replace("/_([a-zA-Z0-9])/e", "strtoupper('\\1')", $driver)); if(!class_exists($class)){ require 'Drivers/' . $class . '.class.php'; } if(!class_exists($class)){ return self::throw_exception("Driver {$driver} not exists", SCRIPT_ERR_CONFIG, $driver, __FILE__, __LINE__); } $obj = new $class($driver); return $obj; } /** * 資料匯入單入口 * @param string $file_or_date 資料檔案,或者每日更新日期(格式:2014-08-23) * @param string $driver 資料型別,不傳的話,自動根據資料檔名稱判斷資料型別 * @param string $consoleOutput 是否把日誌輸出到終端螢幕 * @param string $default_file_year 預設年份,有些資料檔案按月份命名,需要傳入預設的年份,組合成資料的日期 * @param mixed $run_specific_driver 只匯入指定的型別的資料,部分壓縮資料包,裡面包括多個數據檔案 * @return mixed 返回匯入行數 */ public static function import($file_or_date, $driver=NULL, $consoleOutput=true, $default_file_year=NULL, $run_specific_driver=NULL){ // auto detect driver & date from file name if(!$driver){ if(is_dir($file_or_date)){ import('ORG.Util.File'); $aFiles = File::getDirFiles($file_or_date, array('zip', 'rar')); sort($aFiles, SORT_NATURAL); }else{ $aFiles = array($file_or_date); } foreach($aFiles as $file){ $detect_info = self::guess_driver_from_file_name($file); if(!$detect_info){ return self::throw_exception("Cann't detect driver for file {$file}", SCRIPT_ERR_CONFIG, $file, __FILE__, __LINE__); } $driver = $detect_info['driver']; $date = $detect_info['date']; if(is_array($driver)){ foreach($driver as $d){ if(!$run_specific_driver || $run_specific_driver==$d){ self::import(array('date'=>$date, 'down'=>$file), $d, $consoleOutput, $default_file_year); } } }else{ self::import(array('date'=>$date, 'down'=>$file), $driver, $consoleOutput, $default_file_year); } } return; } $obj = self::factory($driver); if(!$obj){ return self::throw_exception("Driver {$driver} not exists", SCRIPT_ERR_CONFIG, $driver, __FILE__, __LINE__); } if(is_array($file_or_date) || self::filter_date($file_or_date)){ $func = 'importUpdate'; }elseif(is_file($file_or_date)){ $func = 'importFile'; }elseif(is_dir($file_or_date)){ $func = 'importFolder'; }else{ return self::throw_exception("File or folder {$file_or_date} not exists", self::ERR_FILE_READ, $file_or_date, __FILE__, __LINE__); } $obj->consoleOutput($consoleOutput); if($default_file_year){ $obj->default_file_year($default_file_year); } return $obj->$func($file_or_date); } /** * 返回資料型別對應的欄位定義 * @param string $driver 資料型別 * @param bool $isUpdate 是否為每日更新,歷史資料和每日更新資料的表結構存在不一樣的情況 * @return array */ public static function getFields($driver, $isUpdate=false){ if(empty(self::$_fields_maps)){ self::$_fields_maps = require APP_ROOT_PATH . 'app/Web/Inc/stock_fields.inc.php'; } if($isUpdate && isset(self::$_fields_maps[$driver]["UPDATE"])){ return self::$_fields_maps[$driver]["UPDATE"]; }else{ return isset(self::$_fields_maps[$driver]["IMPORT"])?self::$_fields_maps[$driver]["IMPORT"]:self::$_fields_maps[$driver]; } } /** * 返回資料型別對應表名 * @param string $driver 資料型別 * @return string */ public static function getTable($driver){ static $_SHSZ_TABLES = array( self::SHSZ_5MIN => 'shsz_5min', self::SHSZ_5MIN_FORWARD => 'shsz_5min_forward', self::SHSZ_BALANCE_SHEET => 'shsz_balance_sheet', self::SHSZ_BLOCK => 'shsz_block', self::SHSZ_CAPITAL_STOCK => 'shsz_capital_stock', self::SHSZ_CASHFLOW_STATEMENT => 'shsz_cashflow_statement', self::SHSZ_DAY => 'shsz_day', self::SHSZ_DAY_FORWARD => 'shsz_day_forward', self::SHSZ_FINANCE => 'shsz_finance', self::SHSZ_FINANCE37 => 'shsz_finance37', self::SHSZ_FINANCIAL_ANALYSIS_INDEX => 'shsz_financial_analysis_index', self::SHSZ_INCOME_STATEMENT => 'shsz_income_statement', self::SHSZ_SHAREHOLDER => 'shsz_shareholder', self::SHSZ_SPLIT => 'shsz_split', self::SHSZ_TRADABLE_SHAREHOLDER => 'shsz_tradable_shareholder', self::HK_FINANCE => 'hk_finance', self::HK_INDUSTRY => 'hk_industry', self::HK_SPLIT => 'hk_split', self::HK_5MIN => 'hk_5min', self::HK_5MIN_FORWARD => 'hk_5min_forward', self::HK_DAY => 'hk_day', self::HK_DAY_FORWARD => 'hk_day_forward', ); return $_SHSZ_TABLES[$driver]; } /** * 返回對應資料型別的歸檔天數 * @param string $driver * @return number|false */ public static function getArchiveDays($driver){ switch($driver){ case self::SHSZ_DAY: case self::SHSZ_DAY_FORWARD: case self::HK_DAY: case self::HK_DAY_FORWARD: $day = C('STOCK_ARCHIVE_DAY'); return $day; break; case self::SHSZ_5MIN: case self::SHSZ_5MIN_FORWARD: case self::HK_5MIN: case self::HK_5MIN_FORWARD: $day = C('STOCK_ARCHIVE_5MIN'); return $day; break; default: return false; } } /** * 匯入每日更新 * @param string $date 日期,格式如:2014-08-23 * @param string $extension 每日更新資料檔案字尾名 * @return boolean */ public function importUpdate($date, $extension='csv'){ return $this->_importUpdate($date, $extension); } /** * 匯入資料夾內資料 * @param string $folder 資料夾 * @param string $extension 資料檔案字尾名 */ public function importFolder($folder, $extension='csv'){ return $this->_importFolder($folder, $extension); } /** * 設定預設年份 * @param string $year */ public function default_file_year($year=NULL){ if(is_null($year)){ return $this->_default_file_year; }else{ $this->_default_file_year = $year; } } /** * 設定SQL模式 * @param string $strict 是否為嚴格模式,可以傳true/false/模式名稱 */ public function sql_mode($strict=true){ if($strict){ $this->_sql_mode = $strict===true?'TRADITIONAL':$strict; }else{ $this->_sql_mode = ''; } $this->_VM()->query('SET sql_mode = "' . $this->_sql_mode . '"'); } /** * 手機號碼過濾 * @param string $v * @return string */ public static function filter_mobile($v){ if(preg_match('/0?(1[0-9]{10})/', $v, $m)){ return $m[1]; }else{ return ''; } } /** * 單位萬轉換 * @param number $v * @return number */ public static function filter_tenthousands($v){ //return $v?$v*10000:0; return $v?bcmul($v, 10000, 0):0; } /** * 單位前轉換 * @param number $v * @return number */ public static function filter_thousand($v){ //return $v?$v*1000:0; return $v?bcmul($v, 1000, 0):0; } /** * 轉換為整型 * @param number $v * @return number */ public static function filter_intval($v){ return bcadd($v, 0, 0); } /** * 日期轉換 * @param string $v * @return string */ public static function filter_date($v){ if(!$v){ return false; } if(preg_match('/^([0-9]{4})([0-9]{2})([0-9]{2})$/', $v, $m) || preg_match('/^([0-9]{4})[\/|\-]{1}([0-9]{1,2})[\/|\-]{1}([0-9]{1,2})$/', $v, $m) || preg_match('/^([0-9]{4})[\/|\-]{1}([0-9]{1,2})[\/|\-]{1}([0-9]{1,2})\s+[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}$/', $v, $m) ){ return $m[1] . '-' . $m[2] . '-' . $m[3]; }else{ // 嘗試 Converts a Gregorian date to Julian Day Count return (is_numeric ($v) && ($v=self::filter_convert_datetime($v)))?$v:false; } } /** * 日期時間轉換 * @param string $v * @return string */ public static function filter_datetime($v){ if(preg_match('/^([0-9]{4})[\/|\-]{1}([0-9]{1,2})[\/|\-]{1}([0-9]{1,2})\s+([0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2})$/', $v, $m) ){ return $m[1] . '-' . $m[2] . '-' . $m[3] . ' ' . $m[4]; }else{ return false; } } public static function filter_stock_code($v){ if(preg_match('/^(SH|SZ)[0-9]{6}$/i', $v) || preg_match('/^HK[0-9]{5}$/i', $v)){ return strtoupper($v); }elseif(preg_match('/^HK([0-9]{4})$/i', $v, $m)){ return strtoupper('HK0' . $m[1]); }else{ return false; } } protected function _batch_exec_sqls($sql, $delimiter=';'){ $oVirtualModel = $this->_VM(); if(is_string($sql)){ $aSQLs = SQLKit::parse_sqls($sql); }else{ $aSQLs = $sql; } $total = 0; $succs = 0; $fails = 0; $this->_log("Start batch execute SQLs"); foreach ($aSQLs as $sql){ if(!$sql) continue; $this->_log("Query: " . $sql); $result = $oVirtualModel->execute($sql); $this->_log("Result is " . $result?'succ':'fail: ' . $oVirtualModel->getDbError()); if($result){ $this->_setError("匯入失敗:" . $oVirtualModel->getDbError()); } $total++; if($result){ $succs++; }else{ $fails++; } } $this->_log("Finished!"); $this->_log("Total executed: " . $total . ", succ: " . $succs . ", fail: " . $fails); } protected function _VM(){ if(!$this->_oVirtualModel){ import('COM.GZNC.VirtualModel'); $this->_oVirtualModel = new VirtualModel(); } return $this->_oVirtualModel; } /** * 使用mysqli連線資料庫 * @return object */ protected function _mysqli(){ static $mysqli = NULL; if($mysqli){ return $mysqli; } // 寫入資料庫 $mysqli = new mysqli('p:' . C('DB_HOST'), C('DB_USER'), C('DB_PWD'), C('DB_NAME'), C('DB_PORT')); /* * This is the "official" OO way to do it, * BUT $connect_error was broken until PHP 5.2.9 and 5.3.0. */ if ($mysqli->connect_error) { return $this->_trigger_error('Connect DB Error (' . $mysqli->connect_errno . ') ' . $mysqli->connect_error , self::ERR_DB_CONNECT , NULL , __FILE__ , __LINE__ ); } /* change character set to utf8 */ if (!$mysqli->set_charset("utf8")) { return $this->_trigger_error(sprintf("Error loading character set utf8: %s\n", $mysqli->error) , self::ERR_DB_QUERY , NULL , __FILE__ , __LINE__ ); } return $mysqli; } /** * 儲存行資料到資料庫 * @param string $table 表明 * @param string|array $key 表主鍵 * @param string $file 資料檔名稱 * @param int $line 當前行值,如1,2,3,4 * @param array $fieldValues 欄位回撥函式處理過後的列值 * @param array $rowValues 原始列植 * @param array $options 選項設定 * @param bool $replace 是否用替換方式寫入資料資料庫 * @return string */ public function _cb_row_save($table, $key, $file, $line, $fieldValues, $rowValues, $options=NULL, $replace=false){ $oVirtualModel = $this->_VM(); if(!$replace){ $exist_row = false; if($key){ $where = is_array($key)?$key:array($key => $fieldValues[$key]); //---------------------- // 注意: // 2014-10-12 zhongyw // Cli db->escapeString() 內的mysql_escape_string方法匯入大資料時存在記憶體洩漏情況 // 改為自行生成查詢SQL //$exist_row = $oVirtualModel->table($table)->where($where)->find(); //-------------------------- $exist_sql = "SELECT"; if(is_array($key)){ $exist_sql .= " " . implode(', ', array_keys($key)); }else{ $exist_sql .= " `" . $key . "`"; } if(!empty($options['checkUpdateDate'])){ $exist_sql .= ", update_date"; } $exist_sql .= " FROM `" . $table . "`"; $exist_sql .= " WHERE " . SQLKit::where_expr($where); $exist_sql .= " LIMIT 1"; $exist_result = $oVirtualModel->query($exist_sql); $exist_row = !empty($exist_result) && is_array($exist_result)?$exist_result[0]:false; } if(empty($exist_row)){ if(!empty($options['_addFieldValues']) && is_array($options['_addFieldValues'])){ $fieldValues = array_merge($fieldValues, $options['_addFieldValues']); } $sql = SQLKit::insert_stmt($table, $fieldValues); $operation = "Insert"; }else{ if(!empty($options['_updateFieldValues']) && is_array($options['_updateFieldValues'])){ $fieldValues = array_merge($fieldValues, $options['_updateFieldValues']); } if(!empty($options['checkUpdateDate']) && !empty($exist_row['update_date']) && !empty($fieldValues['update_date']) && str_replace(array('-', '/'), '', $exist_row['update_date']) > str_replace(array('-', '/'), '', $fieldValues['update_date']) ){ $this->_log("Exist record date(" . $exist_row['update_date'] . ") is newer than update line:" . $line . " date (" . $fieldValues['update_date'] . ")", LOG::WARN); $this->_log("Skip Update"); return true; } $sql = SQLKit::update_stmt($table, $fieldValues, $where); $operation = "Update"; } }else{ if(!empty($options['_addFieldValues']) && is_array($options['_addFieldValues'])){ $fieldValues = array_merge($fieldValues, $options['_addFieldValues']); } if(!empty($options['_updateFieldValues']) && is_array($options['_updateFieldValues'])){ $fieldValues = array_merge($fieldValues, $options['_updateFieldValues']); } $sql = SQLKit::replace_stmt($table, $fieldValues); $operation = "Replace"; } $this->_last_sql = $sql; if (! empty ( $options ['instantWrite'] ) && !empty($sql)) { $log_content = "Line {$line}" . ($this->_log_sql?": $sql":""); $dbError = NULL; try { $result = $oVirtualModel->execute ( $sql ); } catch ( Exception $e ) { $result = false; $dbError = $e->getMessage (); } // $result可能=0,當update同樣內容資料的時候,必須要通過判斷===false if (false === $result) { if (! $dbError) { $dbError = $oVirtualModel->getDbError (); } $this->_log( $log_content . ": $sql"); $this->_log($operation . " FAIL: $dbError" ); return $this->_trigger_error ( "Line {$line} importing SQL execute Failed: " . $dbError , self::ERR_SQL_EXECUTE , array ( 'line' => $line, 'sql' => $sql ) , __FILE__, __LINE__ ); } else { $this->_log ( $log_content . ": " . $operation . " SUCC" ); return true; } } else { return $sql; } } /** * 寫入日誌 * @param string $message 日誌訊息 * @param string $level 日誌等級 * @return boolean */ protected function _log($message, $level=LOG::INFO){ if($this->_log){ if (is_callable ( $this->_log )) { static $aLogLevelMaps = array ( LOG::EMERG => 0, LOG::ALERT => 1, LOG::CRIT => 2, LOG::ERR => 3, LOG::WARN => 4, LOG::NOTICE => 5, LOG::INFO => 6, LOG::DEBUG => 7 ); if ($this->_log && $aLogLevelMaps [$level] > $aLogLevelMaps [$this->_logLevel]) { return false; } return Runder::run_callback ( $this->_log, array ( $message, $level ) ); }else{ return parent::_log($message, $level, $this->_logLevel); } }else{ return false; } } /** * xls檔案轉為csv * @param string $xls_file 源xls檔案 * @param string $csv_file 目標csv檔案 * @param string $overwrite 是否覆蓋 * @return string 返回目標csv檔名稱 */ protected function _convertXls2Csv($xls_file, $csv_file=NULL, $overwrite=false){ $this->_log("Try convert xls to csv: $xls_file "); if(!$csv_file){ $csv_file = (!strcasecmp('xls', File::getExtension($xls_file))?substr($xls_file, 0, strrpos($xls_file, '.')):$xls_file) . '.csv'; } if(!$overwrite && self::file_exists($csv_file)){ $this->_log("CSV file already exists"); $this->_log("Skip converting"); return $csv_file; } $bin_maps = C('BIN_MAPS'); $oCommand = new Command($bin_maps); $convert_xls_file = self::convert2filesystemname($xls_file); $to_csv_file = self::convert2filesystemname($csv_file); /* $this->_log("\$convert_xls_file: ". $convert_xls_file); $this->_log("\$to_csv_file: ". $to_csv_file); $this->_log("\$convert_xls_file exist ". (file_exists($convert_xls_file)?'Yes':'No')); */ if(!empty($bin_maps['libxls2csv'])){ $cmd_status = $oCommand->libxls2csv($convert_xls_file, $to_csv_file, 'GBK', "", ","); }else{ $cmd_status = $oCommand->xls2csv($convert_xls_file, $to_csv_file, 'cp936', 'cp936'); } if(1==$cmd_status){ return $this->_trigger_error("Cann't convert xls to csv file: " . $oCommand->cmdOutput("\n") , self::ERR_OTHER_MISC , array('driver'=>$this->_driver, 'xls_file' => $xls_file, 'csv_file'=> $csv_file) , __FILE__ , __LINE__ ); }else{ $this->_log($oCommand->cmdOutput("\n")); $this->_log("Convert Succ!"); $this->_log("CSV file is " . $csv_file); return $csv_file; } } /** * 解壓每日下載檔案 * @param string $down_file 下載檔案 * @param string $update_file 解壓目標檔案 * @return boolean */ protected function _decompressDownFile($down_file, $update_file=null){ $this->_log("Down file is {$down_file}"); if(!self::file_exists($down_file)){ return $this->_trigger_error("Down file not exists!" , self::ERR_FILE_READ , array('driver'=>$this->_driver, 'down_file' => $down_file) , __FILE__ , __LINE__ ); } $down_fext = File::getExtension($down_file); switch(strtolower($down_fext)){ case 'csv': case 'xls': // Copy to update folder $this->_log("Try copy to update folder as file: " . $update_file); if(!copy($down_file, $update_file)){ return $this->_trigger_error("Cann't copy down file to update file!" , self::ERR_OTHER_MISC , array('driver'=>$this->_driver, 'down_file' => $down_file, 'update_file'=> $update_file) , __FILE__ , __LINE__ ); } $this->_log("Copy Succ!"); break; case 'zip': case 'rar': $this->_log("Try decompress down file"); $update_dir = dirname($update_file); $oCommand = new Command(C('BIN_MAPS')); $cmd_status = $oCommand->decompress($down_file, $update_dir); if(1==$cmd_status){ return $this->_trigger_error("Cann't decompress down file: " . $oCommand->cmdOutput("\n") , self::ERR_OTHER_MISC , array('driver'=>$this->_driver, 'down_file' => $down_file, 'update_file'=> $update_file) , __FILE__ , __LINE__ ); } $this->_log($oCommand->cmdOutput("\n")); $this->_log("Decompress Succ!"); $this->_log("Saved into update directory: {$update_dir}"); break; default: return $this->_trigger_error("Unknown file type: " . $down_fext , self::ERR_OTHER_MISC , array('driver'=>$this->_driver, 'down_file' => $down_file, 'update_file'=> $update_file) , __FILE__ , __LINE__ ); } return true; } /** * 匯入資料夾 * @param string $folder 資料夾,完整路徑 * @param string $extension 副檔名 * @param string $namepattern 檔名匹配正則 */ protected function _importFolder($folder, $extension='csv', $namepattern=NULL){ import('ORG.Util.Debug'); $debug_start_mark = 'start_import_folder'; $debug_end_mark = 'end_import_folder'; Debug::mark($debug_start_mark); $this->_log("+++++++++++++++++++++++++++++++++++++"); $this->_log("START IMPORT FOLDER"); $this->_log("Folder {$folder}"); import('ORG.Util.File'); $aFiles = File::getDirFiles($folder, $extension); sort($aFiles, SORT_NATURAL); foreach($aFiles as $file){ if(!$namepattern || preg_match($namepattern, $file)){ $this->importFile($file); } } Debug::mark($debug_end_mark); $performance = "Cost time " . Debug::useTime($debug_start_mark,$debug_end_mark).'s'; $performance .= ", used memory " . Debug::useMemory($debug_start_mark,$debug_end_mark).'kb'; $performance .= ", peak memory " . Debug::getMemPeak($debug_start_mark, $debug_end_mark).'kb'; $this->_log( $performance ); $this->_log("END IMPORT FOLDER.[$folder]"); $this->_log("+++++++++++++++++++++++++++++++++++++"); } /** * 匯入資料檔案 * @param string $file 資料檔案 * @param array $fields 資料欄位定義 * @param array $options 選項 * @param string $file_content 資料內容,傳的話不再從$file讀取,實時資料更新時有用 * @throws Exception * @return mixed */ protected function _importFile($file, $fields, $options=array( 'titleRow' => 1, // 標題行數 'dataRow' => 2, // 資料開始行數 'instantWrite' => true, // 實時寫 'rowCallback' => '_cb_row', 'fieldCallback' => '_cb_field', 'fromEncoding' => 'GBK', 'toEncoding' => 'UTF-8', ), $file_content=NULL){ $this->_begin_dblog($file); import('ORG.Util.Debug'); $debug_start_mark = 'start_import_file'; $debug_end_mark = 'end_import_file'; Debug::mark($debug_start_mark); $this->_log("----------------------------"); $this->_log("START IMPORT FILE"); $this->_log("File {$file}"); $this->_stats['total'] = 0; $this->_stats['succ'] = 0; $this->_stats['fail'] = array(); $this->sql_mode(true); try { $sqls = $this->_import ( self::convert2filesystemname ( $file ), $fields, $options, $file_content); // S4Web::debug_log($sqls); if (! $sqls) { $error = $this->getError (); $this->_log ( "Import Failed: " . $error, LOG::ERR ); $this->_end_dblog ( false, $error ); // 監聽處理 $this->_callListener(self::LISTENER_IMPORT_RESULT, array($file, $error, false)); return false; } } catch ( Exception $e ) { $this->_end_dblog ( false, $e); // 監聽處理 $this->_callListener(self::LISTENER_IMPORT_RESULT, array($file, $e, false)); throw $e; } //$this->_batch_exec_sqls($sqls); $message = "Total found " . $this->_stats['total'] . ' records'; $message .= ", Succ " . ($this->_stats['succ']?$this->_stats['succ']:"<B COLOR=RED>" . $this->_stats['succ'] . "</B>"); $message .= ", Fail " . count($this->_stats['fail']); $message .= $this->_stats['fail']?" Lines: " . implode(',', $this->_stats['fail']):''; $message .= "."; //$this->_setMessage($message); $this->_log($message); Debug::mark($debug_end_mark); $this->_stats['costsecs'] = Debug::useTime($debug_start_mark,$debug_end_mark, 3); $this->_stats['usedmemory'] = str_replace(',', '', Debug::useMemory($debug_start_mark,$debug_end_mark)); $this->_stats['peakmemory'] = str_replace(',', '', Debug::getMemPeak($debug_start_mark,$debug_end_mark)); $performance = "Cost time " . Debug::useTime($debug_start_mark,$debug_end_mark).'s'; $performance .= ", used memory " . Debug::useMemory($debug_start_mark,$debug_end_mark).'kb'; $performance .= ", peak memory " . Debug::getMemPeak($debug_start_mark, $debug_end_mark).'kb'; $this->_log( $performance ); $this->_log("END IMPORT FILE.[$file]"); $this->_log("----------------------------"); $this->_end_dblog (true, $message . "\n" . $performance); // 監聽處理 $this->_callListener(self::LISTENER_IMPORT_RESULT, array($file, $message, $this->_stats)); return $sqls; } /** * 匯入每日更新 * @param string $date 日期 * @param string $extension 檔案字尾名 * @return boolean */ protected function _importUpdate($date, $extension='csv'){ import('ORG.Util.Debug'); $debug_start_mark = 'start_import_update'; $debug_end_mark = 'end_import_update'; Debug::mark($debug_start_mark); $this->_log("======================================="); $this->_log("START IMPORT UPDATE"); $update_dir = $this->_get_update_dir($date, true); if(is_array($date)){ $update_file = $date['update']; $down_file = $date['down']; $date = $date['date']; }elseif(self::filter_date($date)){ $down_file = $this->_get_down_file($date); // 滬深財務四張表先解壓再匹配名稱 switch ($this->_driver){ case self::SHSZ_INCOME_STATEMENT: case self::SHSZ_BALANCE_SHEET: case self::SHSZ_CASHFLOW_STATEMENT: case self::SHSZ_FINANCIAL_ANALYSIS_INDEX: $this->_decompressDownFile($down_file, $update_dir . '/update_file'); break; } $update_file = $this->_get_update_file($date); }elseif(is_file($date)){ $down_file = $date; $guess_info = self::guess_driver_from_file_name($down_file); $date = $guess_info['date']; }else{ $this->_log("Import failed: Unknown date or file for driver " . $this->_driver . ": {$date}"); return false; } if(!$date || !self::filter_date($date)){ $this->_log("Import failed: Unknown date for driver " . $this->_driver . ": {$date}"); return false; } if(!$down_file){ $down_file = $this->_get_down_file($date); } if(!$update_file){ $update_file = $this->_get_update_file($date); } if(!$update_file){ $this->_log("Import failed: Cann't get update file for driver " . $this->_driver . " on date {$date}"); return false; }elseif(is_string($update_file)){ $update_files = array($update_file); }else{ $update_files = $update_file; } // set update importing $this->_is_update = true; $this->_update_date = $date; $this->_log("Date {$date}"); $this->_log("Down File: " . $down_file); foreach($update_files as $update_file){ $this->_log("Update File: " . $update_file); $this->_log("Check if Update file exists"); $this->_update_file = $update_file; // Check update file existence, and Try to decompress down file if(!self::file_exists($update_file)){ if((!$this->_decompressDownFile($down_file, $update_file) || !self::file_exists($update_file))){ $this->_log("Import Failed: Update file not exists, Cann't decompress from down file!"); return false; } }else{ $this->_log("Update file already exists"); } $update_fext = File::getExtension($update_file); if(!strcasecmp('xls', $update_fext) && !strcasecmp('csv', $extension) && (!($update_file = $this->_convertXls2Csv($update_file)) || !self::file_exists($update_file))){ $this->_log("Import Failed: Update file not exists, Cann't convert file type from xls to csv!"); return false; } $this->importFile($update_file); } Debug::mark($debug_end_mark); $performance = "Cost time " . Debug::useTime($debug_start_mark,$debug_end_mark).'s'; $performance .= ", used memory " . Debug::useMemory($debug_start_mark,$debug_end_mark).'kb'; $performance .= ", peak memory " . Debug::getMemPeak($debug_start_mark, $debug_end_mark).'kb'; $this->_log( $performance ); $this->_log("END IMPORT UPDATE."); $this->_log("======================================="); return true; } /** * 根據資料型別和日期,返回自動下載檔名稱 * * @param string $driver 資料型別 * @param string $date 日期 * @return false|string */ public static function guess_down_file_name($driver, $date){ if(!is_numeric($date)){ $timestamp = strtotime($date); }else{ $timestamp = $date; } if(!$timestamp){ return false; } $shortdate = date('md', $timestamp); $date = date('Ymd', $timestamp); switch ($driver) { case self::SHSZ_FINANCE : // wss0929fin.zip $name = 'wss' . $shortdate . 'fin.zip'; break; case self::SHSZ_BLOCK : // wss0929blk.zip $name = 'wss' . $shortdate . 'blk.zip'; break; case self::SHSZ_BALANCE_SHEET : case self::SHSZ_CASHFLOW_STATEMENT : case self::SHSZ_INCOME_STATEMENT : case self::SHSZ_FINANCIAL_ANALYSIS_INDEX : // ws0929rpt.zip $name = 'ws' . $shortdate . 'rpt.zip'; break; case self::SHSZ_SHAREHOLDER : case self::SHSZ_TRADABLE_SHAREHOLDER: // wsSHSZ_Shareholders_20140929.rar $name = 'wsSHSZ_Shareholders_' . $date . '.rar'; break; case self::SHSZ_CAPITAL_STOCK : // wsSHSZ_CapitalStocks_20140930_csv.zip $name = 'wsSHSZ_CapitalStocks_' . $date . '_csv.zip'; break; /* case self::SHSZ_DAY : // wss0929e.zip //$name = 'wss' . $shortdate . 'e.zip'; break; case self::SHSZ_5MIN : // wss0929e5.zip $name = 'wss' . $shortdate . 'e5.zip'; break; */ case self::SHSZ_SPLIT: case self::SHSZ_DAY: case self::SHSZ_DAY_FORWARD: case self::SHSZ_5MIN: case self::SHSZ_5MIN_FORWARD: // wstock_SHSZ_Day_5Min_SPLIT_20140930.zip $name = "wstock_SHSZ_Day_5Min_SPLIT_{$date}.zip"; break; case self::HK_FINANCE : // wsHK_finance_20141114.zip $name = 'wsHK_finance_' . $date . '.zip'; break; case self::HK_INDUSTRY : // wsHK_Industry_20141114.zip $name = 'wsHK_Industry_' . $date . '.zip'; break; case self::HK_SPLIT: case self::HK_DAY: case self::HK_DAY_FORWARD: case self::HK_5MIN: case self::HK_5MIN_FORWARD: // wstock_HK_Day_5Min_SPLIT_20140930.zip $name = "wstock_HK_Day_5Min_SPLIT_{$date}.zip"; break; default: return false; } return $name; } /** * 根據資料型別和日期,返回自動下載解壓後文件名稱 * * @param string $driver 資料型別 * @param string $date 日期 * @return false|string|array */ public static function guess_update_file_name($driver, $date, $updatedir=NULL){ if(!is_numeric($date)){ $timestamp = strtotime($date); }else{ $timestamp = $date; } if(!$timestamp){ return false; } $shortdate = date('md', $timestamp); $date = date('Ymd', $timestamp); switch ($driver) { case self::SHSZ_FINANCE : // wss0930finance.csv $name = 'wss' . $shortdate . 'finance.csv'; break; case self::SHSZ_BLOCK : // wsSHSZ_Block_20140930.csv $name = 'wsSHSZ_Block_' . $date . '.csv'; break; // @todo: 需要依據一定規則自動判斷按季變化的檔名稱 case self::SHSZ_BALANCE_SHEET : // SHSZ_2013Q2-2014Q2_資產負債表.xls //$name = "SHSZ_2013Q3-2014Q3_資產負債表.xls"; $namepattern = '^SHSZ_[0-9A-Z\-]+_資產負債表\.xls$'; $name = self::_match_update_file_name($updatedir, $namepattern); break; case self::SHSZ_CASHFLOW_STATEMENT : // SHSZ_2013Q2-2014Q2_現金流量表.xls //$name = "SHSZ_2013Q3-2014Q3_現金流量表.xls"; $namepattern = '^SHSZ_[0-9A-Z\-]+_現金流量表\.xls$'; $name = self::_match_update_file_name($updatedir, $namepattern); break; case self::SHSZ_INCOME_STATEMENT : // SHSZ_2013Q2-2014Q2_利潤表.xls //$name = "SHSZ_2013Q3-2014Q3_利潤表.xls"; $namepattern = '^SHSZ_[0-9A-Z\-]+_利潤表\.xls$'; $name = self::_match_update_file_name($updatedir, $namepattern); break; case self::SHSZ_FINANCIAL_ANALYSIS_INDEX : // SHSZ_2013Q2-2014Q2_相關91項分析指標.xls //$name = "SHSZ_2013Q3-2014Q3_相關91項分析指標.xls"; $namepattern = '^SHSZ_[0-9A-Z\-]+_相關91項分析指標\.xls$'; $name = self::_match_update_file_name($updatedir, $namepattern); break; case self::SHSZ_SHAREHOLDER : // SHSZ_Shareholders_2010Q3-20140929.xls $name = "SHSZ_Shareholders_2010Q3-{$date}.xls"; break; case self::SHSZ_TRADABLE_SHAREHOLDER : // SHSZ_TradableShareholders_2010Q3-20140929.xls $name = "SHSZ_TradableShareholders_2010Q3-{$date}.xls"; break; case self::SHSZ_CAPITAL_STOCK : // SZ_CapitalStocks_1991-20140929.csv // SH_CapitalStocks_1990-20140929.csv $name = array( "SZ_CapitalStocks_1991-{$date}.csv", "SH_CapitalStocks_1990-{$date}.csv", ); break; /* * case self::SHSZ_DAY : // wss20140929r.csv $name = "wss{$date}r.csv"; break; case self::SHSZ_5MIN : // wss0929f.csv $name = "wss{$shortdate}f.csv"; break; */ case self::SHSZ_SPLIT: // wstock_SHSZ_20141019_SPLITs.csv $name = "wstock_SHSZ_{$date}_SPLITs.csv"; break; case self::SHSZ_DAY : // wstock_SHSZ_20140930_Day.csv $name = "wstock_SHSZ_{$date}_Day.csv"; break; case self::SHSZ_DAY_FORWARD : // wstock_SHSZ_20140930_Day_Forward.csv $name = "wstock_SHSZ_{$date}_Day_Forward.csv"; break; case self::SHSZ_5MIN : // wstock_SHSZ_20140930_5Min.csv $name = "wstock_SHSZ_{$date}_5Min.csv"; break; case self::SHSZ_5MIN_FORWARD : // wstock_SHSZ_20140930_5Min_Forward.csv $name = "wstock_SHSZ_{$date}_5Min_Forward.csv"; break; case self::HK_FINANCE : // wsHK_finance_20141114.csv $name = 'wsHK_finance_' . $date . '.csv'; break; case self::HK_INDUSTRY : // wsHK_Industry_20141114.xls $name = 'wsHK_Industry_' . $date . '.xls'; break; case self::HK_SPLIT: // wstock_HK_20141019_SPLITs.csv $name = "wstock_HK_{$date}_SPLITs.csv"; break; case self::HK_DAY : // wstock_HK_20140930_Day.csv $name = "wstock_HK_{$date}_Day.csv"; break; case self::HK_DAY_FORWARD : // wstock_HK_20140930_Day_Forward.csv $name = "wstock_HK_{$date}_Day_Forward.csv"; break; case self::HK_5MIN : // wstock_HK_20140930_5Min.csv $name = "wstock_HK_{$date}_5Min.csv"; break; case self::HK_5MIN_FORWARD : // wstock_HK_20140930_5Min_Forward.csv $name = "wstock_HK_{$date}_5Min_Forward.csv"; break; default: return false; } return $name; } protected static function _match_update_file_name($updatedir, $namepattern){ import('ORG.Util.File'); $aFiles = File::getDirFiles($updatedir); import('ORG.Util.String'); $aFiles = String::autoCharset($aFiles); // force set regular match encoding as UTF-8 mb_regex_encoding('UTF-8'); foreach($aFiles as $file){ $filename = basename($file); if(mb_ereg($namepattern, $filename)){ return $filename; } } return false; } /** * 從檔名判斷資料型別和日期 * * @param string $file 檔名 * @return false|array */ public static function guess_driver_from_file_name($file){ $pathinfo = pathinfo($file); if(!$pathinfo || !($basename=$pathinfo['basename'])){ return false; } // wss0929fin.zip if(preg_match('/^wss([0-9]{4})fin\.zip$/', $basename, $m)){ $return = array( 'driver' => self::SHSZ_FINANCE, 'date' => date('Y') . $m[1], ); } // wss0929blk.zip elseif(preg_match('/^wss([0-9]{4})blk\.zip$/', $basename, $m)){ $return = array( 'driver' => self::SHSZ_BLOCK, 'date' => date('Y') . $m[1], ); } // ws0929rpt.zip elseif(preg_match('/^ws([0-9]{4})rpt\.zip$/', $basename, $m)){ $return = array( 'driver' => array( self::SHSZ_BALANCE_SHEET, self::SHSZ_CASHFLOW_STATEMENT, self::SHSZ_INCOME_STATEMENT, self::SHSZ_FINANCIAL_ANALYSIS_INDEX ), 'date' => date('Y') . $m[1], ); } // wsSHSZ_Shareholders_20140929.rar elseif(preg_match('/^wsSHSZ_Shareholders_([0-9]{8})\.rar$/', $basename, $m)){ $return = array( 'driver' => array( self::SHSZ_SHAREHOLDER, self::SHSZ_TRADABLE_SHAREHOLDER, ), 'date' => $m[1], ); } // wsSHSZ_CapitalStocks_20140929_csv.zip elseif(preg_match('/^wsSHSZ_CapitalStocks_([0-9]{8})_csv\.zip$/', $basename, $m)){ $return = array( 'driver' =>self::SHSZ_CAPITAL_STOCK, 'date' => $m[1], ); } // wss0929e.zip elseif(preg_match('/^wss([0-9]{4})e\.zip$/', $basename, $m)){ $return = array( 'driver' => self::SHSZ_DAY, 'date' => date('Y') . $m[1], ); } // wss0929e5.zip elseif(preg_match('/^wss([0-9]{4})e5\.zip$/', $basename, $m)){ $return = array( 'driver' => self::SHSZ_5MIN, 'date' => date('Y') . $m[1], ); } // wstock_SHSZ_Day_5Min_20140930.zip elseif(preg_match('/^wstock_SHSZ_Day_5Min_([0-9]{8})\.zip$/', $basename, $m)){ $return = array( 'driver' => array( self::SHSZ_SPLIT, self::SHSZ_DAY, self::SHSZ_DAY_FORWARD, self::SHSZ_5MIN, self::SHSZ_5MIN_FORWARD, ), 'date' => $m[1], ); } // wsHK_finance_20141114.zip elseif(preg_match('/^wsHK_finance_([0-9]{8})\.zip$/', $basename, $m)){ $return = array( 'driver' => self::HK_FINANCE, 'date' => $m[1], ); } // wsHK_Industry_20141114.zip elseif(preg_match('/^wsHK_Industry_([0-9]{8})\.zip$/', $basename, $m)){ $return = array( 'driver' => self::HK_INDUSTRY, 'date' => $m[1], ); } // wstock_HK_Day_5Min_20140930.zip elseif(preg_match('/^wstock_HK_Day_5Min_([0-9]{8})\.zip$/', $basename, $m)){ $return = array( 'driver' => array( self::HK_SPLIT, self::HK_DAY, self::HK_DAY_FORWARD, self::HK_5MIN, self::HK_5MIN_FORWARD, ), 'date' => $m[1], ); } else{ return false; } $return['date'] = self::filter_date($return['date']); if(!$return['date']){ return false; }else{ return $return; } } /** * 轉換為檔案系統字符集編碼的檔名稱 * @param string $name */ public static function convert2filesystemname($file){ return String::autoCharset($file, C('DEFAULT_CHARSET'), C('FILE_SYSTEM_ENCODING')); } /** * 判斷本地檔案系統中檔案是否存在 * @param string $file * @return boolean */ public static function file_exists($file){ $file = self::convert2filesystemname($file); return file_exists($file); } /** * 返回每日更新解壓後的檔名稱 * @param string $date 日誌 * @return string */ protected function _get_update_file($date){ $update_dir = $this->_get_update_dir($date); $update_fname = self::guess_update_file_name($this->_driver, $date, $update_dir); if(!$update_fname){ return $this->_trigger_error("Cann't guess update file name!" , self::ERR_GUESS_FILE , array('driver'=>$this->_driver, 'date' => $date) , __FILE__ , __LINE__ ); return false; } if(is_array($update_fname)){ $return = array(); foreach($update_fname as $f){ $return[] = $update_dir . $f; } return $return; }else{ return $update_dir . $update_fname; } } /** * 返回更新(解壓後)目錄 * @param string $date 日期 * @param string $autocreate 目錄不存在是否自動建立 * @return boolean|string */ protected function _get_update_dir($date, $autocreate=false){ $update_folder = date('Ymd', strtotime($date)); $update_folder .= '/'; $update_dir = $this->_update_path . $update_folder; if($autocreate && !file_exists($this->_update_path) && !mkdir($this->_update_path)){ return $this->_trigger_error("Cann't create update path!" , self::ERR_OTHER_MISC , array('driver'=>$this->_driver, 'date' => $date, 'update_path'=>$this->_update_path ) , __FILE__ , __LINE__ ); } if($autocreate && !file_exists($update_dir) && !mkdir($update_dir, 0777, true)){ return $this->_trigger_error("Cann't create update dir!" , self::ERR_FILE_CREATE , array('driver'=>$this->_driver, 'date' => $date, 'update_dir'=>$update_dir ) , __FILE__ , __LINE__ ); } return $update_dir; } /** * 返回每日下載檔案 * @param string $date 日期 * @return boolean|string */ protected function _get_down_file($date){ $down_fname = self::guess_down_file_name($this->_driver, $date); if(!$down_fname){ return $this->_trigger_error("Cann't guest down file name!" , self::ERR_GUESS_FILE , array('driver'=>$this->_driver, 'date' => $date) , __FILE__ , __LINE__ ); } return $this->_get_down_dir($date) . $down_fname; } /** * 返回每日下載目錄 * @param string $date 日期 * @return string */ protected function _get_down_dir($date){ $down_folder = date('Ymd', strtotime($date)); $down_folder .= '/'; return $this->_down_path . $down_folder; } /** * 返回實時更新資料目錄 * @param string $date * @param string $autocreate * @return boolean|string */ protected function _get_realtime_dir($date, $autocreate=false){ $folder = date('Ymd', strtotime($date)); $folder .= '/'; $realtime_dir = $this->_realtime_path . $folder; if($autocreate && !file_exists($this->_realtime_path) && !mkdir($this->_realtime_path)){ return $this->_trigger_error("Cann't create realtime path!" , self::ERR_FILE_CREATE , array('driver'=>$this->_driver, 'date' => $date, 'realtime_path'=>$this->_realtime_path ) , __FILE__ , __LINE__ ); } if($autocreate && !file_exists($realtime_dir) && !mkdir($realtime_dir, 0777, true)){ return $this->_trigger_error("Cann't create realtime dir!" , self::ERR_FILE_CREATE , array('driver'=>$this->_driver, 'date' => $date, 'realtime_dir'=>$realtime_dir ) , __FILE__ , __LINE__ ); } return $realtime_dir; } /** * 返回日誌目錄 * @param string $date 日期 * @param string $autocreate 目錄不存在是否自動建立 * @return boolean|string */ protected function _get_log_dir($date, $autocreate=false){ $folder = date('Ymd', strtotime($date)); $folder .= '/'; $dir = $this->_log_path . $folder; if($autocreate && !file_exists($this->_log_path) && !mkdir($this->_log_path)){ return $this->_trigger_error("Cann't create path!" , self::ERR_FILE_CREATE , array('driver'=>$this->_driver, 'date' => $date, 'path'=>$this->_log_path ) , __FILE__ , __LINE__ ); } if($autocreate && !file_exists($dir) && !mkdir($dir, 0777, true)){ return $this->_trigger_error("Cann't create dir!" , self::ERR_FILE_CREATE , array('driver'=>$this->_driver, 'date' => $date, 'dir'=>$dir ) , __FILE__ , __LINE__ ); } return $dir; } /** * 傳送GET請求 * * @param string $url 連結 * @param string|array $data 引數 * @return string */ public function get($url, $data = null) { if(!$this->Http){ import('ORG.Net.curl'); $this->Http = new Curl(); } if(!($return = $this->Http->get($url, $data)) && ($error=$this->Http->getError())){ return $this->_trigger_error( $error , self::ERR_HTTP_REQUEST , array('url' => $url, 'data' => $data, 'method' => 'get', 'response' => $return) , __FILE__, __LINE__); } return $return; } protected function _begin_dblog($file){ $this->_log_id = $this->_VM()->table('stock_update_log') ->add(array( 'begintime' => date('Y-m-d H:i:s'), 'driver' => $this->_driver, 'datafile' => $file, 'logfile' => $this->_log, 'updatestatus' => 'running', )); return $this->_log_id; } protected function _end_dblog($status, $message=NULL){ if(!$this->_log_id){ return false; } return $this->_VM()->table('stock_update_log') ->where(array('id'=>$this->_log_id)) ->save(array( 'records' => $this->_stats['total'], 'succs' => $this->_stats['succ'], 'fails' => $this->_stats['fail'], 'costsecs' => $this->_stats['costsecs'], 'usedmemory' => $this->_stats['usedmemory'], 'peakmemory' => $this->_stats['peakmemory'], 'endtime' => date('Y-m-d H:i:s'), 'updatestatus' => $status?'succ':'fail', 'updatemsg' => (is_object($message) && $message instanceof Exception)?$message->__toString():$message, )); } /** * 從檔名分析出資料日期 * @param string $file 資料檔名稱 * @param string $pattern 匹配正則 * @param string $default_year 預設年份 * @return string */ protected function _parse_file_date($file, $pattern, $default_year=NULL){ static $_file_dates = array(); $fidx = md5($file); // 分析檔案日期 //if(!$_file_dates[$fidx]){ if(empty($_file_dates[$fidx])){ $fileName = basename($file); $date = NULL; if(preg_match($pattern, $fileName, $m)){ $date = $m[1]; if(strlen($date)==4 && $default_year){ $date = $default_year . $date; } $date = self::filter_date($date); } if(!$date){ return $this->_trigger_error("Cann't parse date from file name" , self::ERR_OTHER_MISC , array('file' => $file) , __FILE__, __LINE__ ); } $_file_dates[$fidx] = $date; } return $_file_dates[$fidx]; } /** * 從資料檔名稱分析出股票程式碼 * @param string $file 資料檔名稱 * @param string $pattern 匹配正則 * @return string */ protected function _parse_file_stock_code($file, $pattern){ static $_file_stocks = array(); $fidx = md5($file); // 分析檔案日期 //if(!$_file_stocks[$fidx]){ if(empty($_file_stocks[$fidx])){ $fileName = basename($file); $date = NULL; if(preg_match($pattern, $fileName, $m)){ $stock = $m[1]; $stock = self::filter_stock_code($stock); } if(!$stock){ return $this->_trigger_error("Cann't parse stock code from file name" , self::ERR_OTHER_MISC , array('file' => $file) , __FILE__, __LINE__ ); } $_file_stocks[$fidx] = $stock; } return $_file_stocks[$fidx]; } /** * 從資料檔名稱分析出資料來源 * @param string $file 資料檔案 * @param number $limit 返回目錄層次 * @return string */ protected function _parse_file_source($file, $limit=3){ static $_file_sources = array(); $fidx = md5($file); // 分析檔案源 if(empty($_file_sources[$fidx])){ if (preg_match ( '~(((\/|\\\)[^\/\\\]+){1,' . $limit . '})$~', $file, $m )) { $source = substr($m[1], 1); }else{ $source = $file; } $_file_sources[$fidx] = $source; } return $_file_sources[$fidx]; } /** * 更新滬深股票名稱 * @param string $file 股票名稱更新檔案 * @return mixed */ public function updateSHSZStockName($file){ $fields = array ( 'Symbol' => array ( 'column' => 'A', 'title' => '程式碼' ), 'CName' => array ( 'column' => 'B', 'title' => '中文' ), 'EName' => array ( 'column' => 'C', 'title' => '英文簡寫' ), );; $options = array( 'titleRow' => 1, // 標題行數 'dataRow' => 2, // 資料開始行數 'instantWrite' => true, // 實時寫 'fieldCallback' => array($this, '_cb_field_update_stock_name'), 'rowCallback' => array($this, '_cb_row_update_stock_name'), 'fromEncoding' => 'GBK', 'toEncoding' => 'UTF-8', //'checkUpdateDate' => true, // 只更新比當前日期新的紀錄 ); return $this->_importFile($file, $fields, $options); } /** * 更新港股股票名稱 * @param string $file 股票名稱更新檔案 * @return mixed */ public function updateHKStockName($file){ $fields = array ( 'Symbol' => array ( 'column' => 'A', 'title' => 'Code' ), 'EName' => array ( 'column' => 'B', 'title' => 'English Name' ), );; $options = array( 'titleRow' => 1, // 標題行數 'dataRow' => 2, // 資料開始行數 'instantWrite' => true, // 實時寫 'fieldCallback' => array($this, '_cb_field_update_stock_name'), 'rowCallback' => array($this, '_cb_row_update_stock_name'), 'fromEncoding' => 'GBK', 'toEncoding' => 'UTF-8', //'checkUpdateDate' => true, // 只更新比當前日期新的紀錄 ); return $this->_importFile($file, $fields, $options); } public function _cb_field_update_stock_name($file, $line, $column, $field, $value, $options=NULL){ $value = self::filter_trim($value); switch($field){ // 股票程式碼 case 'stock': $value = self::filter_stock_code($value); if(!$value){ return true; } break; } return $value; } public function _cb_row_update_stock_name($file, $line, $fieldValues, $rowValues, $options=NULL){ $this->_stats['total']++; $table = 'stock_symbol'; $key = 'Symbol'; // 新增欄位 $options['_addFieldValues'] = array( 'Name' => $fieldValues['CName']?$fieldValues['CName']:$fieldValues['EName'], 'created_time' => date('Y-m-d H:i:s'), ); // 更新欄位 /* $options['_updateFieldValues'] = array( 'modified_time' => date('Y-m-d H:i:s'), );*/ $c_sql = self::_cb_row_save($table, $key, $file, $line, $fieldValues, $rowValues, $options); if(!$c_sql){ //$this->_stats['fail']++; $this->_stats['fail'][] = $line; }else{ $this->_stats['succ']++; } if(!empty($options['instantWrite'])){ return $c_sql; }else{ $sqls = array(); if($c_sql){ $sqls[] = $c_sql; } return implode(';', $sqls); } } /** * 新增監聽器 * @param mixed $listener 監聽器 * @param string $type 監聽型別 */ public static function addListener($listener, $type){ if(empty(self::$_listners)){ self::$_listners = array(); } if(empty(self::$_listners[$type])){ self::$_listners[$type] = array(); } self::$_listners[$type][] = $listener; } protected function _callListener($type, $data){ if(empty(self::$_listners[$type])){ return false; } foreach(self::$_listners[$type] as $listener){ $this->_run_callback($listener, $data); } } public function archive_data(){ $this->_log("======================================="); $this->_log("START Archive DATA"); $driver = $this->_driver; if(!$driver){ $this->_log("Driver cann't be empty", LOG::ERR); $this->_log("Stop Archive."); return false; } $this->_log("Driver: " . $driver); $table = self::getTable($driver); if(!$table){ $this->_log("Cann't find table", LOG::ERR); $this->_log("Stop Archive."); return false; } $table_archive = $table . '_archive'; $this->_log("Table: " . $table); $this->_log("Archive Table: " . $table_archive); $archiveDays = self::getArchiveDays($driver); if(!$archiveDays || $archiveDays<=0){ $this->_log("Archive day less than 1 day", LOG::ERR); $this->_log("Stop Archive."); return false; } $this->_log("Archive Days: " . $archiveDays); import('ORG.Util.Debug'); $debug_start_mark = 'start_archive_' . $driver; $debug_end_mark = 'end_archive_' . $driver; Debug::mark($debug_start_mark); $archiveDate = date('Y-m-d', strtotime('-' . $archiveDays . 'days')); $this->_log('Archive data before ' . $archiveDate); $mysqli = $this->_mysqli (); switch($driver){ case self::SHSZ_DAY: case self::SHSZ_DAY_FORWARD: case self::HK_DAY: case self::HK_DAY_FORWARD: $insert_sql = "INSERT INTO {$table_archive} SELECT * FROM {$table} WHERE date<'{$archiveDate}'"; $delete_sql = "DELETE FROM {$table} WHERE date<'{$archiveDate}'"; break; case self::SHSZ_5MIN: case self::SHSZ_5MIN_FORWARD: case self::HK_5MIN: case self::HK_5MIN_FORWARD: $archiveDateTime = $archiveDate . ' 00:00:00'; $insert_sql = "INSERT INTO {$table_archive} SELECT * FROM {$table} WHERE datetime<'{$archiveDateTime}'"; $delete_sql = "DELETE FROM {$table} WHERE datetime<'{$archiveDateTime}'"; break; default: $this->_log("Not supported driver"); $this->_log("Stop Archive."); return false; } $repair_sql = "REPAIR TABLE {$table}"; $optimize_sql = "OPTIMIZE TABLE {$table}"; $this->_log("Insert SQL: " . $insert_sql, LOG::DEBUG); $this->_log("Delete SQL: " . $delete_sql, LOG::DEBUG); //return $insert_sql . "\n\n" . $delete_sql; $this->_log("Try to insert into archive table"); if (false ==$mysqli->query ( $insert_sql )) { $this->_log ( sprintf ( 'Query Error(%s) %s', $mysqli->errno, $mysqli->error ), LOG::ERR ); return false; } $inserted_rows = $mysqli->affected_rows; $this->_log("Insert Succ!"); $this->_log("Inserted Rows: " . $inserted_rows); $this->_log("Try to delete from table"); if (false ==$mysqli->query ( $delete_sql )) { $this->_log ( sprintf ( 'Query Error(%s) %s', $mysqli->errno, $mysqli->error ), LOG::ERR ); return false; } $deleted_rows = $mysqli->affected_rows; $this->_log("Delete Succ!"); $this->_log("Deleted Rows: " . $deleted_rows); $this->_log("Try to repair table"); if (false ==$mysqli->query ( $repair_sql )) { $this->_log ( sprintf ( 'Query Error(%s) %s', $mysqli->errno, $mysqli->error ), LOG::ERR ); return false; } $this->_log("Repair Succ!"); $this->_log("Try to optimize table"); if (false ==$mysqli->query ( $optimize_sql )) { $this->_log ( sprintf ( 'Query Error(%s) %s', $mysqli->errno, $mysqli->error ), LOG::ERR ); return false; } $this->_log("Optimize Succ!"); Debug::mark ( $debug_end_mark ); $performance = "Cost time " . Debug::useTime ( $debug_start_mark, $debug_end_mark ) . 's'; $performance .= ", used memory " . Debug::useMemory ( $debug_start_mark, $debug_end_mark ) . 'kb'; $performance .= ", peak memory " . Debug::getMemPeak ( $debug_start_mark, $debug_end_mark ) . 'kb'; $this->_log ( $performance ); $this->_log ( "END ARCHIVE DATA" ); return "Inserted Rows: {$inserted_rows}, Deleted Rows: {$deleted_rows}"; } }