PHP导入CSV文件到MYSQL速度问题


使用PHP(YII框架的YIIC写的),导入数据,数据源是一个5W行的CSV文件,代码如下,我执行导入5W行数据竟然要2小时的时间,怎么优化能控制在5分钟内呢?

   
  <?php
  
class UserImportCommand extends CConsoleCommand {

public $filePath;
public $fileHandle;
public $error_message;
public $succes_count = 0;
public $error_count = array();
public $db;
public $count = 0;
public $connectLimit = 2000;

public $start_time;
public $end_time;
public $runtime;

public function getHelp() {
parent::getHelp();
}

/**
* 文件使用CSV格式,并且文件编码要是UTF8编码
**/
public function setFilePath() {
$this->filePath = dirname(dirname(__FILE__)) . DIRECTORY_SEPARATOR . "data" . DIRECTORY_SEPARATOR . "m-(1_50000).csv";
}

public function readFile() {
if(!$this->fileHandle = @fopen($this->filePath, "r")){
$this->trace($this->filePath."File Note Found", CLogger::LEVEL_ERROR);
Yii::app()->end();
}

if( mb_detect_encoding(file_get_contents($this->filePath)) != 'UTF-8'){
$this->trace($this->filePath."File type is not [UTF8]", CLogger::LEVEL_ERROR);
Yii::app()->end();
}
}

public function run($limit) {

$this->start();
$limit = intval($limit) ? intval($limit) : $this->connectLimit;
$this->setFilePath();
$this->readFile();
$this->connectDb();
$emailValidator = new CEmailValidator();

while ($data = fgetcsv($this->fileHandle, 2000, ",")) {
$this->count++;
if ($this->count == 1) {
$column = $data;
continue;
} else {
if (count($column) != count($data)) {
continue;
}
$data = array_combine($column, $data);
}

/* import data start */
/*
* USER_NAME oly_user.username
* EMAIL oly_user.email
* NAME oly_profiles.real_name
* SEX oly_profiles.gender(1-男 2-女)
* REGION oly_profiles.area
* BIRTHDAY oly_profiles.birth_y && oly_profiles.birth_m
* MOBI_TELE oly_profiles.mobile
* MARRY oly_profiles.marital_status
* ADDRESS oly_profiles.address
*/

$username = $data['USER_NAME'];
$email = $data['EMAIL'];
$real_name = $data['NAME'];
$gender = !empty($data['SEX']) ? $data['SEX'] : '';
$area = $data['REGION'];
$birth_y = !empty($data['BIRTHDAY']) ? date('Y', strtotime($data['BIRTHDAY'])) : '';
$birth_m = !empty($data['BIRTHDAY']) ? date('j', strtotime($data['BIRTHDAY'])) : '';
$mobile = $data['MOBI_TELE'];
$marital_status = $data['MARRY'];
$address = $data['ADDRESS'];
$created_at = $lastvisit_at = date('Y-m-d H:i:s');
$point = $data['UNUSE_POINT'] != 0 ? floor($data['UNUSE_POINT'] / 15) : 0;

if (empty($email)) {
$this->trace("[" . ($this->count - 1) . "] Import [{$email}] Continue, Email is Empty", CLogger::LEVEL_ERROR);
$this->error_count[] = $username;
continue;
}

if (!$emailValidator->validateValue($email)) {
$this->trace("[" . ($this->count - 1) . "] Import [{$email}] Continue, Email is not validator", CLogger::LEVEL_ERROR);
$this->error_count[] = $username;
continue;
}

if ($this->db->createCommand("SELECT email FROM oly_users WHERE email = '{$email}'")->queryScalar()) {
$this->trace("[" . ($this->count - 1) . "] Import [{$email}] Continue, Email is exist", CLogger::LEVEL_ERROR);
$this->error_count[] = $username;
continue;
}

$insert_user_sql = "INSERT INTO oly_users(
username,
email,
create_at,
lastvisit_at,
status,
shot_status,
old_user,
superuser
)VALUES(
:username,
:email,
:created_at,
:lastvisit_at,
0,
0,
1,
0
);";

$command_user = $this->db->createCommand($insert_user_sql);
$command_user->bindParam(":username", $username, PDO::PARAM_STR);
$command_user->bindParam(":email", $email, PDO::PARAM_STR);
$command_user->bindParam(":created_at", $created_at, PDO::PARAM_STR);
$command_user->bindParam(":lastvisit_at", $lastvisit_at, PDO::PARAM_STR);
$command_user->execute();
$user_id = $this->db->getLastInsertId();

$insert_profiles_sql = "INSERT INTO oly_profiles(
user_id,
mobile,
gender,
marital_status,
area,
birth_y,
birth_m,
real_name,
address,
created_date,
lastmodified_date,
type,
point
)VALUES(
:user_id,
:mobile,
:gender,
:marital_status,
:area,
:birth_y,
:birth_m,
:real_name,
:address,
:created_at,
:lastmodified_date,
0,
:point
);";

$profiles_command = $this->db->createCommand($insert_profiles_sql);
$profiles_command->bindParam(":user_id", $user_id, PDO::PARAM_STR);
$profiles_command->bindParam(":mobile", $mobile, PDO::PARAM_STR);
$profiles_command->bindParam(":gender", $gender, PDO::PARAM_STR);
$profiles_command->bindParam(":marital_status", $marital_status, PDO::PARAM_STR);
$profiles_command->bindParam(":area", $area, PDO::PARAM_STR);
$profiles_command->bindParam(":birth_y", $birth_y, PDO::PARAM_STR);
$profiles_command->bindParam(":birth_m", $birth_m, PDO::PARAM_STR);
$profiles_command->bindParam(":real_name", $real_name, PDO::PARAM_STR);
$profiles_command->bindParam(":address", $address, PDO::PARAM_STR);
$profiles_command->bindParam(":created_at", $created_at, PDO::PARAM_STR);
$profiles_command->bindParam(":lastmodified_date", $lastvisit_at, PDO::PARAM_STR);
$profiles_command->bindParam(":point", $point, PDO::PARAM_INT);
$profiles_command->execute();

/* import data end */

$this->trace("[" . ($this->count - 1) . "] Import [{$email}] Success");
$this->succes_count++;

if ($this->count % $limit == 0 && $this->count >= $limit) {
$this->reConnectDb();
}

}

$this->traceError();
$this->stop();
$this->spent();

$this->trace("Import User Success, Total Import Count{$this->succes_count}, Error Import Count" . count($this->error_count));
$this->trace("Import User Complete, Total cost {$this->runtime} seconds");

}

public function connectDb() {
$this->db = new CDbConnection(Yii::app()->db->connectionString, Yii::app()->db->username, Yii::app()->db->password);
$this->db->charset = 'utf8';
$this->trace("Connecting DB Success ... ...");
}

public function reConnectDb() {
$this->trace("Unset DB Connecting ... ...");
$this->db = null;
$this->db = new CDbConnection(Yii::app()->db->connectionString, Yii::app()->db->username, Yii::app()->db->password);
$this->db->charset = 'utf8';
$this->trace("Reconnecting Db Success... ...");
}

public function trace($message, $level = CLogger::LEVEL_INFO) {
echo "[{$$level}]:" . $message . "\n";
}

public function traceError() {
foreach ($this->error_count as $k => $v) {
Yii::log("Olympus Import Error User :" . $v, CLogger::LEVEL_ERROR, 'olympus');
}
}

/*
* 程序执行计时器
*/
public function get_microtime()
{
list($usec, $sec) = explode(' ', microtime());
return ((float)$usec + (float)$sec);
}
public function start()
{
$this->start_time = $this->get_microtime();
}

public function stop()
{
$this->end_time = $this->get_microtime();
}

public function spent()
{
$this->runtime = round(($this->start_time - $this->end_time) * 1000, 1);
}

}

yii mysql

evens 10 years, 11 months ago

生成大的insert sql文件(注意编码),然后用mysql的 source命令导入 (如果有权限的话)

如果权限没有,就生成一个大的insert语句形如 insert values (),(),.....

可以每5000条生成一次导入一次,停1秒

cccp127 answered 10 years, 11 months ago

Your Answer