Queue.php
5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
<?php
namespace common\components\jqueue\db;
use Yii;
use yii\db\Query;
use common\components\jqueue\base\Exception;
use common\components\jqueue\base\Queue as BaseQueue;
use function is_array;
use function is_null;
use function time;
/**
* Class Queue
* @package common\components\jqueue\db
*/
class Queue extends BaseQueue
{
/**
* @var \yii\db\Connection
*/
public $db;
/**
* @var string 存储队列任务表名称
*/
public $table = 'jobs';
/**
* @throws Exception
*/
public function init()
{
parent::init();
if (is_array($this->db)) {
$this->db = Yii::createObject($this->db);
if ($this->db->driverName != 'mysql') {
throw new Exception("sorry, only mysql db supported!");
}
}
}
/**
* 添加任务记录到数据库
* @param string $data
* @param null $queue
* @return mixed
*/
protected function _push(array $data = [], $queueName = null)
{
$queueName = $this->getQueueName($queueName);
return $this->pushToDatabase(0, $queueName, $this->createPayload($data));
}
/**
* 添加一个延时记录到数据库
* @param int $delay 延迟多少秒后执行
* @param array $data 任务数据
* @param string||null $queueName
* @return mixed
*/
protected function _later($delay, array $data = [], $queueName = null)
{
$queueName = $this->getQueueName($queueName);
return $this->pushToDatabase($delay, $queueName, $this->createPayload($data));
}
/**
* 取出一个任务
* @param string||null $queueName
* @return mixed
*/
public function pop($queueName = null)
{
$queueName = $this->getQueueName($queueName);
if (!is_null($this->expire)) {
$this->releaseJobsThatHaveBeenReservedTooLong($queueName);
}
$tr = $this->db->beginTransaction();
if ($model = $this->getNextAvailableJob($queueName)) {
$this->markJobAsReserved($model->id);
$tr->commit();
$config = [
'class' => 'common\components\jqueue\db\Job',
'queueName' => $queueName,
'model' => $model,
'queueInstance' => $this,
];
return Yii::createObject($config);
}
$tr->commit();
return false;
}
/**
* 从队列中删除一个已经处理过的任务
*
* @param string $queueName
* @param int $id
* @return mixed
*/
public function deleteReserved($queueName, $id)
{
return $this->db->createCommand()
->delete($this->table, "id={$id}")
->execute();
}
/**
* 将一个任务重新加入队列
*
* @param string $queueName
* @param \stdClass $job
* @param int $delay
* @return mixed
*/
public function release($queueName, $model, $delay, $attempt = 0)
{
return $this->pushToDatabase($delay, $queueName, $model->payload, $model->attempts);
}
/**
* 将任务数据写入数据库
* @param $delay
* @param $queueName
* @param $payload
* @param int $attempts
* @return integer
*/
protected function pushToDatabase($delay, $queueName, $payload, $attempts = 0)
{
$queueName = $this->getQueueName($queueName);
$createdAt = time();
$availableAt = $this->getAvailableAt($delay, $createdAt);
return $this->db->createCommand()->insert($this->table, [
'queue' => $queueName,
'payload' => $payload,
'attempts' => $attempts,
'reserved' => 0,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $createdAt
])->execute();
}
/**
* 获取队列当前任务数量
* @param null $queueName
* @return int|string
*/
public function getJobCount($queueName = null)
{
$queueName = $this->getQueueName($queueName);
return (new Query())
->select(['id'])
->from($this->table)
->where(['reserved' => 0])
->andWhere(['queue' => $queueName])
->count("*", $this->db);
}
/**
* 获取任务有效时间
* @param $delay
* @param $createdAt
* @return mixed
*/
protected function getAvailableAt($delay, $createdAt)
{
return $createdAt + $delay;
}
/**
* 重新激活那些超时仍未处理完的任务
* @param string $queueName
* @return integer
*/
protected function releaseJobsThatHaveBeenReservedTooLong($queueName)
{
$expired = time() + $this->expire;
$sql = "update {$this->table} set reserved=0,reserved_at=null,attempts=attempts+1 where queue='{$this->getQueueName($queueName)}' and reserved=1 and reserved_at<={$expired}";
return $this->db->createCommand($sql)->execute();
}
/**
* 获取下一个可用的任务
*
* @param string|null $queueName
* @return \stdClass|null
*/
protected function getNextAvailableJob($queueName)
{
$now = time();
$sql = "select * from {$this->table} where queue='{$this->getQueueName($queueName)}' and reserved=0 and available_at<={$now} ORDER BY id asc limit 1 for update";
$job = $this->db->createCommand($sql)->queryOne();
return $job ? (object)$job : null;
}
/**
* 将任务标记为已处理
*
* @param string $id
* @return void
*/
protected function markJobAsReserved($id)
{
$now = time();
$sql = "update {$this->table} set reserved=1,reserved_at={$now} where id={$id}";
return $this->db->createCommand($sql)->execute();
}
}