You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

82 lines
2.1 KiB

  1. <?php
  2. /**
  3. * 计划任务调度器
  4. *
  5. * - 异常失败,会轮循重试
  6. * - 彩蛋式的抽象方法名
  7. *
  8. * @author dogstar <chanzonghuang@gmail.com> 20150516
  9. */
  10. abstract class Task_Runner {
  11. /**
  12. * @var MQ队列实例
  13. */
  14. protected $mq;
  15. /**
  16. * @var int $step 批次的数据,步长
  17. */
  18. protected $step;
  19. /**
  20. * @param Task_MQ $mq MQ队列实例
  21. * @param int $step 批次的数据,步长
  22. */
  23. public function __construct(Task_MQ $mq, $step = 10) {
  24. $this->mq = $mq;
  25. $this->step = max(1, intval($step));
  26. }
  27. /**
  28. * 执行任务
  29. * @param string $service MQ中的接口服务名称,如:Default.Index
  30. * @return array('total' => 总数量, 'fail' => 失败数量)
  31. */
  32. public function go($service) {
  33. $rs = array('total' => 0, 'fail' => 0);
  34. $todoList = $this->mq->pop($service, $this->step);
  35. $failList = array();
  36. while (!empty($todoList)) {
  37. $rs['total'] += count($todoList);
  38. foreach ($todoList as $params) {
  39. try {
  40. $isFinish = $this->youGo($service, $params);
  41. if (!$isFinish) {
  42. $rs['fail'] ++;
  43. }
  44. } catch (PhalApi_Exception_InternalServerError $ex) {
  45. $rs['fail'] ++;
  46. $failList[] = $params;
  47. DI()->logger->error('task occur exception to go',
  48. array('service' => $service, 'params' => $params, 'error' => $ex->getMessage()));
  49. }
  50. }
  51. $todoList = $this->mq->pop($service, $this->step);
  52. }
  53. foreach ($failList as $params) {
  54. $this->mq->add($service, $params);
  55. }
  56. return $rs;
  57. }
  58. /**
  59. * 具体的执行,这里使用了一个彩蛋的命名
  60. * @param string $service MQ中的接口服务名称,如:Default.Index
  61. * @param array $params 参数
  62. * @return boolean 成功返回TRUE,失败返回FALSE
  63. */
  64. abstract protected function youGo($service, $params);
  65. }