到数据表取一堆未处理的数据用接口处理,然后根据结果在表标志已处理,用计划任务,如何实现并发? | 马犇-技术博客

到数据表取一堆未处理的数据用接口处理,然后根据结果在表标志已处理,用计划任务,如何实现并发?

来源:本站原创 微信技术群总结 超过2,107 views围观 0条评论

【今日话题】

到数据表取一堆未处理的数据用接口处理,然后根据结果在表标志已处理,用计划任务,如何实现并发? – 泉-June

1. shell 配合php, 执行一个php任务时, 在shell中使用wait,- Meow

2. 加锁就行了,或者把数据块处理 – 黑夜路人

3. swoole可以帮到你. 多线程,异步. 如果用Erlang,golang就个简单就可以实现 – 陈周瑜

4. 其实我是想用原生php实现. 因为想简单的同时自己多点了解. 如果是加锁怎么做呢 – 泉-June

5. 用pcntl. php原生的多进程. 只能在linux的cli模式下运行 – 陈周瑜

6. 跟多线程无关. 它关键是数据共享,必须对数据不会重复处理. 多线程多进程都能解决并发处理问题 – 黑夜路人

7. 是啊,之前讨论过, 你要么就加锁,遇到锁的数据跳过,要么是读的数据不一样 – twin

8. 你是数据库还是文本文件?如果是数据库,有个最简单的方法. 比如说你先统计出总共有多少要处理的数据. 然后按照你每个进程要处理数据的数量分段. 比如说你1000条记录,每个进程处理100个,就分10段,起10个进程. 然后每个段你获得起始处理id,然后每个进程从自己开始数据段位置开始处理,依次执行。 – 黑夜路人

9. 分段呗. 一个进程处理一段. 最简单了. 在一个文件里面记录你这次处理了多少. 下次处理后面的 – 花生

10. 不借缓存,队列. 表的数据会不断增加. 其实是一个进程,五分钟一次,担心处理不完,第二次任务起来 – 泉-June

回: 这也简单,加锁,文件锁. 用文件锁保证单进程运行. 我去年搞百度经验时都用过 – 花生

11. 如果你数据库是一直在动态增长的话,就需要对这些进程进行调度了. 很简单,就是用动态生成处理进程的方式来处理。比如说你发现五分钟内一个进程最多可以处理100条记录,而你启动的时候最近五分钟已经生成了880条记录。那么你就fork8个处理进程就行了。这个每个五分钟处理多少数据量,需要你自己观察记录统计出来设定一个合理的值。 – 黑夜路人

12. 关于加锁的问题. 比如你担心两个五分钟中间会处理不完,一般做法是处理进程启动的时候生成一个lock文件,进程启动判断如果存在lock文件。进程就认为上个进程处理没完成,就不会启动自己。进程处理完就把这个lock文件删除,就是解锁的操作。 – 黑夜路人

13. 但是问题又来了,如果上一个进程持续处理不完,那么下一个crontab启动新进程来运行,发现上个进程没有结束,自己就不启动,这个时候数据库的数据会不断累计,数据更多,你必须等到下个五分钟等crontab调用,才能进行下一轮的执行,后面就数据量很多,依次这么下去,变成恶性循环,就悲剧了。 – 黑夜路人

14. 简单解决方案就是我上面说的,按照当前数据量动态生成处理进程来处理,每个处理进程处理数据量按照你统计是在五分钟内可以处理完的最好。 – 黑夜路人

15. 还有个细节,你需要在启动执行的时候记录你处理数据是哪个时间范围内的,所以,你需要记录你扫描数据库那个时间范围的数据,就需要记录一个时间点。 – 黑夜路人

16. 我现在的做法是,一个进程,五分钟一次,代码循环一千条一次,每次写最大id到文件,循环里条件就是读最大id,做下次循环,不管多少进程 – 泉-June

17. 存储id或者时间都行,id精确些 – 黑夜路人

18. 其实这里设计关键. 就是进程管理. 所以,进程管理需要设计成类似nginx的 master/woeker 的模型。 – 黑夜路人

19. 集中管理. 分散处理. 按照这个思路就应该是对的 – Jason Bourne

20. 生成多少个进程,生成锁文件,检查是否启动等等,都需要master进程来负责,具体处理逻辑和工作的就是worker进程负责。- 黑夜路人

21. 可以按生成时间加一个取数据限制避免重复操作 – 亢

22. 如果想通用解决方案,可以把上面这个思路设计成框架,就是把进程调度加锁等作为基础,但是具体表结构和处理逻辑就是变成回调接口,这样就通用化了,就是一个能够处理所有类似的问题的框架了。这就是架构师思维了。 – 黑夜路人

23. 问: 那刚才说到的重复数据怎么处理 – Anonymous、z

回: 上面这个机制不会重复数据 – 黑夜路人

24. 问: 那失败要重发的怎么处理呢? – twin

回: 如果逻辑处理失败。建议记录到错误日志。或者记录到临时表或者redis. 下个master启动的时候,单独fork一个进程去扫描日志文件或redis去逐个处理,处理好了删除。或者单独启动一个php程序去处理也行,都可以。 – 黑夜路人

回: 应该用时间分段吧, 那些明确要重发的,只是改个时间标志, 下次进程就会读到这个记录了, 这样不用写多一份失败处理逻辑. 用id的话,就插入一条新记录到最前面了 – twin

回: 呵呵,也可以,不依赖外部存储。看各个业务了,有些不想更改原始数据。如果你错误数据比正确的还多,那你这个后段服务就有问题了。哈哈. 你得查查问题在哪儿。 – 黑夜路人

25. 上面相同问题用c/golang解决思路类似,不同是程序不用依赖于crontab来唤醒,自己设置定时器搞定就行了。多进程/多线程/多协程 都能搞定多任务 – 黑夜路人

26. 我倒没考虑过并发,最近也在搞后台脚本,跑cron,每次取100或1000条,处理完,再按这个数量依次处理 – 酸酸哥

27. 先获取长度,然后按照长度切分 分别跑,抓出待处理的记录. 然后根据新的队列长度,重新切分 跑完按照一定数量级一次query再alert回去?遇到类似要修改纪录的场景,由于长度不算太长,都直接sql改了。。 – 萝莉控夫斯基

28. 切割+线程池 – 马犇

29. 数据从表里取出放队列里,由队列去控制并发 – 徐刚

30. //伪代码

$queryWhere = ‘date > ‘.$startDate.’ and date <= ‘.$endDate.’order by date desc’;//通用限制条件

$countSql = ‘select count(0) form tableName where ‘.$queryWhere;//获取总数sql

$perNum = 500;

$count = getCount($countSql);//获取总数

$processNum = ceil($count / $perNum);//计算开多少线程执行

for($i=0; $i < $processNum; $i++){

//开线程执行下面代码

{

$startPos = $i * $perNum;

$selectSql = ‘select * from tableName ‘.$queryWhere. ‘limit ‘.$startPos.’,’.$perNum;

$data = getData($selectSql);

doSomeThing()

}

}

分段就可以避免并发 – 亢

31. 今天的话题以前有讨论过吧?我记得我有回复过,数据库的解决方案很成熟,最好的办法是按计划划分好数据段交给不同的线程/进程,不能划分且处理线程不太多时就用update改状态,类似乐观锁原理,我对付脑残的招行接口就是这么干的. 数据量超大就最好是处理完再入库,kafka真心不错 – 水浸街

32. 把未处理的数据有序话,每个进程处理固定的几个,进程共享处理的序号,感觉这样可以 – xx人