PHP高并发Redis MySQL重复插入测试笔记
发布时间:2018-08-31, 19:59:41 分类:PHP | 编辑 off 网址 | 辅助
图集1/9
正文 13717字数 1,359,762阅读
<?php
error_reporting(E_ALL); //E_ALL
function cache_shutdown_error() {
$_error = error_get_last();
if ($_error && in_array($_error['type'], array(1, 4, 16, 64, 256, 4096, E_ALL))) {
echo '<font color=red>你的代码出错了:</font></br>';
echo '致命错误:' . $_error['message'] . '</br>';
echo '文件:' . $_error['file'] . '</br>';
echo '在第' . $_error['line'] . '行</br>';
}
}
register_shutdown_function("cache_shutdown_error");
function dump($arr){
echo '';
var_dump($arr);
echo '
';
}
function glog($file,$content,$file_type='.txt'){
//return false;
$file=$file?$file:date('Y-m-d');
$content=$content?$content:date('Y-m-d h:m:s');
$f=file_put_contents($file.$file_type,$content.PHP_EOL,FILE_APPEND);
return $f;
}
//写入数据
function tin($login){
global $dbh;
/*添加*/
//$sql = "INSERT INTO `user` SET `login`=:login AND `password`=:password";
$sql = "INSERT INTO `user` (`login` ,`password`)VALUES (:login, :password)";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login,':password'=>'123'));
$c_id=$stmt->rowCount();
return $c_id;
try {
$dbh->beginTransaction(); // 开启一个事务
$sql = "SELECT * FROM `user`";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$slogin));
$row = $stmt->fetchAll(PDO::FETCH_ASSOC);
if(empty($row)){
/*添加*/
//$sql = "INSERT INTO `user` SET `login`=:login AND `password`=:password";
$sql = "INSERT INTO `user` (`login` ,`password`)VALUES (:login, :password)";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$slogin,':password'=>'123'));
$c_id=$dbh->lastinsertid();
}else{
foreach($row as $k=>$v){
if($v['login']!=$slogin){
/*添加*/
//$sql = "INSERT INTO `user` SET `login`=:login AND `password`=:password";
$sql = "INSERT INTO `user` (`login` ,`password`)VALUES (:login, :password)";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$slogin,':password'=>'123'));
$c_id=$dbh->lastinsertid();
}
}
}
$dbh->commit();
} catch (PDOException $e) {
$dbh->rollback(); // 执行失败,事务回滚
glog('',$e->getMessage());
}
return $c_id;
}
//ab -n 50000 -c 100 http://oc.com/r.php
//select login,count(*) as count from user group by login having count>1;
Global $redis,$dbh;
$redis = new redis();
$redis->connect('127.0.0.1', 6379);
/*$redis->delete('test');
$redis->lpush("test","111");
$redis->lpush("test","222");
print_r($redis->lgetrange("test",0,-1)); //结果:Array ( [0] => 222 [1] => 111 )*/
$dbh = new PDO('mysql:host=localhost;dbname=test', 'root', 'DRsXT5ZJ6Oi55LPQ');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
//$dbh->setAttribute(PDO::ATTR_AUTOCOMMIT, false);//关闭自动提交
$dbh->exec('set names utf8');
try {
$now = time();
//限流
$qps_num=$redis->incr("qps_num");
$redis->expireAt('qps_num', $now + 1);
if($qps_num>100){
//echo '限流'.$qps_num;
//glog('','限流'.$qps_num);
return false;
}
//dump($redis->lgetrange("test",0,-1));exit;
//$login=date('s',time());
//$redis->delete('login_set');exit;
//$redis->expireAt('x', $now + 3); // x will disappear in 3 seconds.
$s_x=$redis->setnx('x',1); //加一个标志位
$redis->expireAt('x', $now + 60); //后于集合过期
if(!$s_x){
//glog('','请排队');
return false;
}
//事务 写在前面 意外退出 ?没有提交 commit 不回滚?
$dbh->beginTransaction(); // 开启一个事务
//$set = $redis->smembers('login_set');
//dump($set);exit;
//echo $redis->ttl('login_set');exit;
$login_set_num=$redis->sCard('login_set'); //返回SET容器的成员数
//echo $login_set_num;exit;
if(!$login_set_num){
glog('','查询');
//查询 缓存数据 假设字段login唯一
$sql = 'SELECT login FROM user';
$stmt = $dbh->prepare($sql);
$row = $stmt->fetchAll(PDO::FETCH_ASSOC);
//多个成员元素加入到集合中 注意:在Redis2.4版本以前, SADD 只接受单个成员值
foreach($row as $k=>$v){
$redis->sAdd('login_set',$v);
}
unset($row,$k,$v);
}
//模拟生成插入值 login
static $login;
$login=rand(100,9999);
//检查是否成员 是已经存在数据库 否录入
//$login_set_is=$redis->sIsMember('login_set',$login);
$login_set_is=$redis->sAdd('login_set',$login); //设计修改操作 才是原子性?
if($login_set_is){
$in_id=tin($login);
//if($in_id) $redis->sAdd('login_set',$login); //插入成功 加入集合缓存
}else glog('','已经存在-'.$login);
$redis->expireAt('login_set', $now + 30);
//glog('','ttl-'.$redis->ttl('login_set'));
$dbh->commit();
} catch (Exception $e) {
$dbh->rollBack();
glog('',"Failed: " . $e->getMessage());
$redis->sRem('login_set', $login);
}finally{
}
$redis->delete('x');
exit;
//防止重复写入
$hnum=$redis->incr('num');
//glog('',$hnum);
$oldstr=$redis->get('login');
if($hnum>50 || ($oldstr && $oldstr==$login)){
$redis->set('num',-99);
exit;//已经存在
}else if($hnum>0){
$redis->set('login',$login);
if(tin($login)) glog('','incr22-'.$login.'-'.$oldstr);
else glog('','EE-incr22-'.$login.'-'.$oldstr);
}
exit;
$tadd=$redis->rpush("test",$login);
while(true){
try{
$slogin = $redis->BLPOP('test',1);
if(!$slogin){
break;
}
//var_dump($value)."\n";
if(tin($slogin)) glog('','incr22-'.$login.'-'.$slogin);
else glog('','EE-incr22-'.$login.'-'.$slogin);
/*
* 利用$value进行逻辑和数据处理
*/
}catch(Exception $e){
glog('',$e->getMessage());
}
}
exit;
//$tadd=$redis->rpush("test",$login);
//exit;
//$gnum=$redis->flushall();
$gnum=$redis->get('gnum');
if(!$gnum){
$redis->set('gnum','1');
if(tin($login)) glog('','rob_result_ok-'.$login);
else glog('','rob_result_ok-E-'.$login);
$redis->delete('gnum');
}else glog('','rob_result_EE-'.time());
exit;
$redis->incr('gnum');
$gnum=$redis->get('gnum');
$redis->watch("gnum");
$redis->multi();
if(tin($login)) glog('','rob_result_ok-'.$login);
else glog('','rob_result_ok-E-'.$login);
//$redis->set('sdf',1);
$rob_result=$redis->exec();
//dump($rob_result);
if($rob_result=='nil'){
$tadd=$redis->rpush("test",$login);
if($tadd) glog('','rob_result_E-'.$login);
else glog('','rob_result_E-_EL'.$login);
$redis->unwatch("gnum");
}
exit;
//$redis->delete('gnum test');exit;
$tadd=$redis->rpush("test",$login);
//$redis->watch("gnum");
//$redis->multi();
while(true){
try{
$slogin = $redis->LPOP('test');
if(!$slogin){
break;
}
//var_dump($value)."\n";
if(tin($slogin)) glog('','incr22-'.$login.'-'.$slogin);
else glog('','E-incr22-'.$login);
/*
* 利用$value进行逻辑和数据处理
*/
}catch(Exception $e){
glog('',$e->getMessage());
}
}
//$rob_result = $redis->exec();
//if(!$rob_result) $redis->unwatch("gnum");
exit;
//$redis->delete('test');
$tadd=$redis->rpush("test",$login);
//dump($redis->lpop("test"));
//dump($redis->lgetrange("test",0,-1));
//$list_arr=$redis->lgetrange("test",0,-1);
while(true){
try{
$slogin = $redis->LPOP('test');
if(!$slogin){
break;
}
//var_dump($value)."\n";
/*
* 利用$value进行逻辑和数据处理
*/
}catch(Exception $e){
glog('',$e->getMessage());
}
}
exit;
//$redis->expire('test',1);
//glog('',$tadd);exit;
$redis->watch("test");
$redis->multi();
if($tadd<2){
glog('',$tadd);exit;
/*查询*/
//$login = 'kevin2';
$sql = "SELECT * FROM `user`";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login));
$row = $stmt->fetchAll(PDO::FETCH_ASSOC);
if(empty($row)){
/*添加*/
//$sql = "INSERT INTO `user` SET `login`=:login AND `password`=:password";
$sql = "INSERT INTO `user` (`login` ,`password`)VALUES (:login, :password)";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login,':password'=>'123'));
if($dbh->lastinsertid()){
$redis->delete('test');
}
}else{
foreach($row as $k=>$v){
if($v['login']!=$login){
/*添加*/
//$sql = "INSERT INTO `user` SET `login`=:login AND `password`=:password";
$sql = "INSERT INTO `user` (`login` ,`password`)VALUES (:login, :password)";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login,':password'=>'123'));
if($dbh->lastinsertid()){
$redis->delete('test');
}
}
}
}
}else if($tadd<50){
glog('','并发五百内');
}else{
glog('','高并发大于五百,队列完毕,重置');
$redis->delete('test');
}
$rob_result = $redis->exec();
exit;
//$redis->lpush("test",$login);
$redis->sadd('isdata',$login);
$rcnum=$redis->scard('isdata');
if($rcnum>3){
echo '抢完了';
$set = $redis->smembers('isdata');
dump($set);
exit;
}
exit;
//$redis->sadd('isdata',$login);exit;
/*$redis->sadd('isdata',$login);//$redis->delete('isdata');
var_dump($redis->sort('isdata'));exit;*/
//dump($redis->sort('isdata'));exit;
$inarr=$redis->sort('isdata');
$redis->delete('isdata');
if(!count($inarr)){
$result=json_encode(array("errcode" => 2004, "errmsg" => "no data", 'data' => 'dataLength:' . $dataLength . 'liveKey:' . $listKey));
//glog('',$result);
exit($result);
};
$sql = "INSERT INTO user(login,password) values ";
$data=array();
foreach($inarr as $k=>$v){
$sql.='(?,?),';
array_push($data,$v,'test'.$k);
//$data[$k]=$v;
//$data[($k+1)]='test'.$k;
//$stmt->execute(array(':login'=>$v,':password'=>'123'));
}
$inarr=null;
$sql = substr($sql,0,strlen($sql)-1);
//echo $sql;exit;
//dump($data);exit;
try {
$stmt = $dbh->prepare($sql);
$stmt->execute($data);
} catch (PDOException $e) {
glog('',"Error!: " . $e->getMessage());
return false;
}
/*if($dbh->lastinsertid()){
//$redis->sremove('isdata',$v);
//unset($inarr[$k]);
$redis->delete('isdata');
}*/
exit;
/*查询*/
//$login = 'kevin2';
$sql = "SELECT * FROM `user`";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login));
while($row = $stmt->fetch(PDO::FETCH_ASSOC)){
/*print_r($row);
exit; */
if($row['login']!=$login){
/*添加*/
//$sql = "INSERT INTO `user` SET `login`=:login AND `password`=:password";
$sql = "INSERT INTO `user` (`login` ,`password`)VALUES (:login, :password)";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login,':password'=>'123'));
echo $dbh->lastinsertid();
}
}
exit;
/*修改*/
$sql = "UPDATE `user` SET `password`=:password WHERE `user_id`=:userId";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':userId'=>'7', ':password'=>'4607e782c4d86fd5364d7e4508bb10d9'));
echo $stmt->rowCount();
/*删除*/
$sql = "DELETE FROM `user` WHERE `login` LIKE 'kevin_'"; //kevin%
$stmt = $dbh->prepare($sql);
$stmt->execute();
echo $stmt->rowCount();
/*查询*/
$login = 'kevin%';
$sql = "SELECT * FROM `user` WHERE `login` LIKE :login";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login));
while($row = $stmt->fetch(PDO::FETCH_ASSOC)){
print_r($row);
}
print_r( $stmt->fetchAll(PDO::FETCH_ASSOC));
/*添加*/
/*//$sql = "INSERT INTO `user` SET `login`=:login AND `password`=:password";
$sql = "INSERT INTO `user` (`login` ,`password`)VALUES (:login, :password)";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login,':password'=>'123'));
$dbh->lastinsertid();
$row=null;
$login=null;
exit;*/
/*查询*/
//$login = 'kevin2';
$sql = "SELECT * FROM `user`";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login));
while($row = $stmt->fetch(PDO::FETCH_ASSOC)){
/*print_r($row);
exit; */
if($row['login']!=$login){
/*添加*/
//$sql = "INSERT INTO `user` SET `login`=:login AND `password`=:password";
$sql = "INSERT INTO `user` (`login` ,`password`)VALUES (:login, :password)";
$stmt = $dbh->prepare($sql);
$stmt->execute(array(':login'=>$login,':password'=>'123'));
$dbh->lastinsertid();
}
}
$row=null;
$login=null;
exit;
Run code
Cut to clipboard
(支付宝)给作者钱财以资鼓励 (微信)→
有过 12 条评论 »
查询user表中,user_name字段值重复的数据及重复次数
select user_name,count(*) as count from user group by user_name having count>1;
事务处理具有四个特性:原子性、一致性、独立性、持久性。
并不是所有的数据库都支持事务处理的,PDO 为能够执行事务处理的数据库提供事务支持。
配置事务处理需注意:
1、关闭 PDO 的自动提交;
$pdo->setAttribute(PDO::ATTR_AUTOCOMMIT, false);
2、开启一个事务需要的方法;
$pdo->beginTransaction(); // 开启一个事务 $pdo->commit(); // 提交事务 $pdo->rollback(); // 回滚事务
3、一般事务处理是运行在 try...catch...语句中,当事务失败时执行 catch 代码段。
<?php try { $pdo->beginTransaction(); // 开启一个事务 $row = null; $row = $pdo->exec("xxx"); // 执行第一个 SQL if (!$row) throw new PDOException('提示信息或执行动作'); // 如出现异常提示信息或执行动作 $row = $pdo->exec("xxx"); // 执行第二个 SQL if (!$row) throw new PDOException('提示信息或执行动作'); $pdo->commit(); } catch (PDOException $e) { $pdo->rollback(); // 执行失败,事务回滚 exit($e->getMessage()); } ?>
在事务中的 SQL 语句,如果出现错误,那么所有的 SQL 都不执行。当所有 SQL 有无误的时候,才提交执行。
在处理锁的问题上,经常听到:共享锁、排它锁、悲观锁、乐观锁、行级锁、表级锁。
共享锁: 就是在读取数据的时候,给数据添加一个共享锁。共享和共享直接是不冲突的,但是和排他锁是冲突的。
排他锁: 更新数据的时候,安装排他锁,禁止其他一切行为。
场 景:老公去在 ATM 上取钱,老婆在柜台存钱,假设这个账户中有 1000 元。老公首先执行查询操作,查询到账户余额为 1000 此时程序 将 1000 拿到内存中,老公取了200 元,程序就执行了更新操作将账户余额改为 800,但是当老公的程序没有 commit 的时候,老婆查询账户,此时账户余额还是 1000 元,老婆存入 200 元,程序执行了更新操作将账户余额改为 1200,然后老公将更新语句提交,接着老婆也将更新语句提交。最后导致的结果就是该账户的余额为 1200,这就是更新丢失的问题。引发更新丢失的根源就是查询上,因为双方都是根据从数据库查询到的数据再对数据库中的数据进行更新的。
解决更新丢失有三个方案:
(1) 将事务隔离级别设置为最高,采用死锁策略。
(2) 采用悲观锁,悲观锁不是数据库中真正的锁,是人们看待事务的态度。
(3) 采用乐观锁,乐观锁也不是数据库中真正的锁。
如 果我们采用的是第一个方案时,老公进行查询操作,数据库为表增加了共享锁,老婆进行查询操作时数据库也增加了一个共享锁。但是当老公进行更新数据库操作 时,由于老婆拿着共享锁,导致老公不能增加排它锁,老婆进行更新操作时,因为老公拿着共享锁,导致老婆也拿不到排它锁,这就发生了死锁现象,你等我,我等你。在 mysql 中,处理死锁的方案是释放掉一方的锁。这样就保证了一方更新成功,但是这种性能极低,因为数据库频繁在解决死锁问题。
悲观锁(更新多,查询少时用)
如果我们采用的是第二个方案时,即采用悲观锁。就是我们在操作数据库时采用悲观的态度,认为别人会在此时并发访问数据库。
我们在查询语句中 select * from account where name='aaa' for update; 等于加了排它锁。
当老公查询余额的时候,select money from account where name='aaa' for update; 增加了排它锁,
老婆查询账户余额的时候, select money from account where name='aaa' for update; 也要求对数据库加排它锁,
因为老公已经拿到了排它锁,导致老婆不能加锁,所以老婆只有等待老公执行完毕,释放掉锁以后才能继续操作。
乐观锁(更新少,查询多时用)
如 果我们采用的是第三个方案时,即采用乐观锁,就是我们在操作数据库的时候会认为没有其它用户并发访问,但是乐观锁也不是完全乐观的,乐观锁是采用版本号的 方式进行控制的。在数据库表中有一列版本号。从数据库中查询的时候,将版本号也查询过来,在进行更新操作的时候,将版本号加1,查询条件的版本号还是查询过来的版本号。
比如:
老公执行查询操作
select money,version from account where name='aaa';
假设此时查询到的版本号为 0,
老公在进行更新操作
update account set money=money+100,version=version+1 where name='aaa' and version=0;
未提交时老婆来查询,查询到的版本号依然是 0,
老婆也执行更新操作
update account set money=money+100,version=version+1 where name='aaa' and version=0;
现在老公提交了事务,老婆再提交事务的时候发现版本号为 0 的记录没有了,所以就避免了数据丢失的问题。不过这种情况也导致了多个用户更新操作时,只有一个用户的更新被执行。
行级别的锁:
select * from employee where employeeID=9857 for update; where 后边是索引列 不是索引列那么就为表级别的锁
1.计数器
2.滑动窗口
3.漏桶算法
4.令牌桶算法
时间复杂度是一个函数,它定量描述了该算法的运行时间。常见的时间复杂度有以下几种。
1,log(2)n,n,n log(2)n ,n的平方,n的三次方,2的n次方,n!
1指的是常数。即,无论算法的输入n是多大,都不会影响到算法的运行时间。这种是最优的算法。而n!(阶乘)是非常差的算法。当n变大时,算法所需的时间是不可接受的。
用通俗的话来描述,我们假设n=1所需的时间为1秒。那么当n = 10,000时。
O(1)的算法需要1秒执行完毕。
O(n)的算法需要10,000秒 ≈ 2.7小时 执行完毕。
O(n2)的算法需要100,000,000秒 ≈ 3.17年 执行完毕。
O(n!)的算法需要XXXXXXXX(系统的计算器已经算不出来了)。
可见算法的时间复杂度影响有多大。
所以O(1)和O(n)差了2.7小时,区别显而易见。
create table desc 报错
create table `desc` 成功
一般我们建表时都会将表名,库名都加上反引号来保证语句的执行度。
10线程同时操作,频繁出现插入同样数据的问题。虽然在插入数据的时候使用了:
insert inti tablename(fields....) select @t1,@t2,@t3 from tablename where not exists (select id from tablename where t1=@t1,t2=@t2,t3=@t3)
当时还是在高并发的情况下无效。此语句也包含在存储过程中。(之前也尝试线判断有无记录再看是否写入,无效)。
因此,对于此类情况还是需要从数据库的根本来解决,就是约束。否则数据库的原子操作细不到我所需要的层面。
添加约束的命令行用得人不多,网上每次找SQL语句都累死,还是写下来好了。
需要的关键就叫做 字段组合约束唯一性
alter table tablename add CONSTRAINT NewUniqueName Unique(t1,t2,t3)
这样可以保证三个字段组合不重复
在生产系统数据库的调整真是锱铢必较。。。。。。
对于数据库读操作的重复暂时没有好的解决方法,就是读数据库某些条目同时将这些条目某个字段修改为1,然后其他进程读的时候就不会重复读取。但是在多线程情况下即使我使用了SQL SERVER 2005最新的特性,就是类似update...output into到临时表的方法:
update tablename set OnCheck=1,LastLockTime=getdate(),LastChecktime=getdate()
output deleted.ID into @newtb
where ID in
(select id from tablename where Oncheck=0)
还是会造成重复读。难道没有更好的办法了吗?
回滚由 PDO::beginTransaction() 发起的当前事务。如果没有事务激活,将抛出一个 PDOException 异常。
如果数据库被设置成自动提交模式,此函数(方法)在回滚事务之后将恢复自动提交模式。
包括 MySQL 在内的一些数据库, 当在一个事务内有类似删除或创建数据表等 DLL 语句时,会自动导致一个隐式地提交。隐式地提交将无法回滚此事务范围内的任何更改。
<?php /* 开始一个事务,关闭自动提交 */ $dbh->beginTransaction(); /* 更改数据库架构和数据 */ $sth = $dbh->exec("DROP TABLE fruit"); $sth = $dbh->exec("UPDATE dessert SET name = 'hamburger'"); /* 识别错误且回滚更改 */ $dbh->rollBack(); /* 此时数据库连接恢复到自动提交模式 */
execute函数是用于执行已经预处理过的语句,只是返回执行结果成功或失败。也就是说execute需要配合prepare函数使用,这个的确是麻烦了一点,每次都要先prepare,然后才能exec
INSERT INTO t2(age,name) values (?, ?), (?, ?), (?, ?);
$data = array($age1, $name1, $age2, $name2, $age3, $name3); $stmt->execute($data);
则需要配置下MYSQL,在mysql 命令行中运行 :set global max_allowed_packet = 2*1024*1024*10;消耗时间为:11:24:06 11:25:06;
插入200W条测试数据仅仅用了1分钟!代码如下:
$sql= “insert into twenty_million (value) values”; for($i=0;$i<2000000;$i++){ $sql.=”('50′),”; }; $sql = substr($sql,0,strlen($sql)-1); $connect_mysql->query($sql);
2、使用sql count函数 $q = $db->query("SELECT count(*) from db;"); $rows = $q->fetch(); $rowCount = $rows[0];
显然第二种方法更有效率