Queue.php
3.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
<?php
namespace common\components\jqueue\base;
use yii\di\ServiceLocator;
use function json_encode;
use function array_map;
use function is_array;
/**
* Class Queue
* @package common\components\jqueue\base
*/
abstract class Queue extends ServiceLocator
{
/**
* 队列默认名称
* @var string
*/
public $queueName = 'default';
/**
* 队列允许最大任务数量,0代表不限制
* @var int
*/
public $maxJob = 0;
/**
* 任务过期时间(秒),重试的次数达到10次之后,按过期时间自动删除已处理的任务。
* @var int
*/
public $expire = 60;
/**
* @var array 失败配置
*/
public $failed;
/**
* 入队列
* @param string $data
* @param $queueName
*/
abstract protected function _push(array $data = [], $queueName = null);
/**
* 延时入队列
* @param $delay
* @param string $data
* @param $queueName
*/
abstract protected function _later($delay, array $data = [], $queueName = null);
/**
* 出队列
* @param null $queueName
* @return Job
*/
abstract public function pop($queueName = null);
/**
* 将一个任务重新加入队列
* @param string $queueName
* @param string $delay
* @return mixed
*/
abstract public function deleteAndRelease($queueName, $payload, $delay);
/**
* 获取当前队列中等待执行的任务数量
* @param string||null $queueName
* @return mixed
*/
abstract public function getJobCount($queueName = null);
/**
* 入队列
* @param array $data
* @param string||null $queueName
* @return mixed
* @throws Exception
*/
public function push(array $data = [], $queueName = null)
{
if ($this->canPush()) {
return $this->_push($data, $queueName);
} else {
throw new Exception("Max Jobs number exceed! the max Jobs number is {$this->maxJob}");
}
}
/**
* 延时入队列
* @param $delay
* @param string $data
* @param $queueName
* @return mixed
* @throws Exception
*/
public function later($delay, array $data = [], $queue = null)
{
if ($this->canPush()) {
return $this->_later($delay, $data, $queue);
} else {
throw new Exception("Max Jobs number exceed! the max Jobs number is {$this->maxJob}");
}
}
/**
* 将任务及任务相关数据打包成json数据
* @param mixed $data
* @return string
*/
protected function createPayload(array $data = [])
{
return json_encode($data, JSON_UNESCAPED_UNICODE);
}
/**
* 准备任务的数据
* @param $data
* @return array
*/
protected function prepareQueueData(array $data)
{
$data = array_map(function ($d) {
if (is_array($d)) {
return $this->prepareQueueData($d);
}
return $d;
}, $data);
return $data;
}
/**
* 检查队列是否已达最大任务量
* @return bool
*/
protected function canPush()
{
if ($this->maxJob > 0 && $this->getJobCount() >= $this->maxJob) {
return false;
}
return true;
}
/**
* 获取队列名称
* @param $queueName
* @return string
*/
protected function getQueueName($queueName)
{
return $queueName ?: $this->queueName;
}
}