注意:這個是 MixPHP V1 的范例
郵件發(fā)送是很常見的需求,由于發(fā)送郵件的操作一般是比較耗時的,所以我們一般采用異步處理來提升用戶體驗,而異步通常我們使用消息隊列來實現(xiàn)。

傳統(tǒng) MVC 框架由于缺少多進(jìn)程開發(fā)能力,通常是采用同一個腳本執(zhí)行多次,產(chǎn)生多個進(jìn)程的方式,mixphp 封裝了 TaskExecutor 專用于多進(jìn)程開發(fā),用戶能非常簡單的開發(fā)出功能完善的高可用多進(jìn)程應(yīng)用。
推薦:《PHP視頻教程》
下面演示一個異步郵件發(fā)送系統(tǒng)的開發(fā)過程,涉及知識點:
異步消息隊列多進(jìn)程守護(hù)進(jìn)程如何使用消息隊列實現(xiàn)異步PHP 使用消息隊列通常是使用中間件來實現(xiàn),常用的消息中間件有:
redisrabbitmqkafka本次我們選用 redis 來實現(xiàn)異步郵件發(fā)送,redis 的數(shù)據(jù)類型中有一個 list 類型,可實現(xiàn)消息隊列,使用以下命令:
// 入列 $redis->lpush($key, $data); // 出列 $data = $redis->rpop($key); // 阻塞出列 $data = $redis->brpop($key, 10);
架構(gòu)設(shè)計
本實例由傳統(tǒng) MVC 框架投遞郵件發(fā)送需求,MixPHP 多進(jìn)程執(zhí)行發(fā)送任務(wù)。
郵件發(fā)送庫選型
以往我們通常使用框架提供的郵件發(fā)送庫,或者網(wǎng)上下載別的用戶分享的庫,composer 出現(xiàn)后,https://packagist.org/ 上有大量優(yōu)質(zhì)的庫,我們只需選擇一個最好的即可,本例選擇 swiftmailer。
由于發(fā)送任務(wù)是由 MixPHP 執(zhí)行,所以 swiftmailer 是安裝在 MixPHP 項目中,在項目根目錄中執(zhí)行以下命令安裝:
composer require swiftmailer/swiftmailer
生產(chǎn)者開發(fā)
在郵件發(fā)送這個需求中生產(chǎn)者是指投遞發(fā)送任務(wù)的一方,這一方通常是一個接口或網(wǎng)頁,這個部分并不一定需 mixphp 開發(fā),TP、CI、YII 這些都可以,只需在接口或網(wǎng)頁中把任務(wù)信息投遞到消息隊列中即可。
在傳統(tǒng) MVC 框架的控制器中增加如下代碼:
通常框架中使用 redis 會安裝一個類庫來使用,本例使用原生代碼,便于理解。
// 連接
$redis = new \\Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
throw new \\Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投遞任務(wù)
$data = [
'to' => ['***@qq.com' => 'A name'],
'body' => 'Here is the message itself',
'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));通常異步開發(fā)中,投遞完成后就會立即響應(yīng)一個消息給用戶,當(dāng)然此時該任務(wù)并沒有執(zhí)行。
消費者開發(fā)
本例我們使用 MixPHP 的多進(jìn)程開發(fā)工具 TaskExecutor 來完成這個需求,通常使用常駐進(jìn)程來處理隊列的消費,所以我們使用 TaskExecutor 的 TYPE_DAEMON 類型,MODE_PUSH 模式。
TaskExecutor 的 MODE_PUSH 模式有二種進(jìn)程:
左進(jìn)程:負(fù)責(zé)從消息隊列取出任務(wù)數(shù)據(jù),投放給中進(jìn)程。
中進(jìn)程:負(fù)責(zé)執(zhí)行郵件發(fā)送任務(wù)。
PushCommand.php 代碼如下:
<?php
namespace apps\\daemon\\commands;
use mix\\console\\ExitCode;
use mix\\facades\\Input;
use mix\\facades\\Redis;
use mix\\task\\CenterProcess;
use mix\\task\\LeftProcess;
use mix\\task\\TaskExecutor;
/**
* 推送模式范例
* @author 劉健 <coder.liu@qq.com>
*/
class PushCommand extends BaseCommand
{
// 配置信息
const HOST = 'smtpdm.aliyun.com';
const PORT = 465;
const SECURITY = 'ssl';
const USERNAME = '****@email.***.com';
const PASSWORD = '****';
// 初始化事件
public function onInitialize()
{
parent::onInitialize(); // TODO: Change the autogenerated stub
// 獲取程序名稱
$this->programName = Input::getCommandName();
// 設(shè)置pidfile
$this->pidFile = "/var/run/{$this->programName}.pid";
}
/**
* 獲取服務(wù)
* @return TaskExecutor
*/
public function getTaskService()
{
return create_object(
[
// 類路徑
'class' => 'mix\\task\\TaskExecutor',
// 服務(wù)名稱
'name' => "mix-daemon: {$this->programName}",
// 執(zhí)行類型
'type' => \\mix\\task\\TaskExecutor::TYPE_DAEMON,
// 執(zhí)行模式
'mode' => \\mix\\task\\TaskExecutor::MODE_PUSH,
// 左進(jìn)程數(shù)
'leftProcess' => 1,
// 中進(jìn)程數(shù)
'centerProcess' => 5,
// 任務(wù)超時時間 (秒)
'timeout' => 5,
]
);
}
// 啟動
public function actionStart()
{
// 預(yù)處理
if (!parent::actionStart()) {
return ExitCode::UNSPECIFIED_ERROR;
}
// 啟動服務(wù)
$service = $this->getTaskService();
$service->on('LeftStart', [$this, 'onLeftStart']);
$service->on('CenterStart', [$this, 'onCenterStart']);
$service->start();
// 返回退出碼
return ExitCode::OK;
}
// 左進(jìn)程啟動事件回調(diào)函數(shù)
public function onLeftStart(LeftProcess $worker)
{
try {
// 模型內(nèi)使用長連接版本的數(shù)據(jù)庫組件,這樣組件會自動幫你維護(hù)連接不斷線
$queueModel = Redis::getInstance();
// 保持任務(wù)執(zhí)行狀態(tài),循環(huán)結(jié)束后當(dāng)前進(jìn)程會退出,主進(jìn)程會重啟一個新進(jìn)程繼續(xù)執(zhí)行任務(wù),這樣做是為了避免長時間執(zhí)行內(nèi)存溢出
for ($j = 0; $j < 16000; $j++) {
// 從消息隊列中間件阻塞獲取一條消息
$data = $queueModel->brpop('queue:email', 10);
if (empty($data)) {
continue;
}
list(, $data) = $data;
// 將消息推送給中進(jìn)程去處理,push有長度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($data, false);
}
} catch (\\Exception $e) {
// 休息一會,避免 CPU 出現(xiàn) 100%
sleep(1);
// 拋出錯誤
throw $e;
}
}
// 中進(jìn)程啟動事件回調(diào)函數(shù)
public function onCenterStart(CenterProcess $worker)
{
// 保持任務(wù)執(zhí)行狀態(tài),循環(huán)結(jié)束后當(dāng)前進(jìn)程會退出,主進(jìn)程會重啟一個新進(jìn)程繼續(xù)執(zhí)行任務(wù),這樣做是為了避免長時間執(zhí)行內(nèi)存溢出
for ($j = 0; $j < 16000; $j++) {
// 從進(jìn)程消息隊列中搶占一條消息
$data = $worker->pop();
if (empty($data)) {
continue;
}
// 處理消息
try {
// 處理消息,比如:發(fā)送短信、發(fā)送郵件、微信推送
var_dump($data);
$ret = self::sendEmail($data);
var_dump($ret);
} catch (\\Exception $e) {
// 回退數(shù)據(jù)到消息隊列
$worker->rollback($data);
// 休息一會,避免 CPU 出現(xiàn) 100%
sleep(1);
// 拋出錯誤
throw $e;
}
}
}
// 發(fā)送郵件
public static function sendEmail($data)
{
// Create the Transport
$transport = (new \\Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
->setUsername(self::USERNAME)
->setPassword(self::PASSWORD);
// Create the Mailer using your created Transport
$mailer = new \\Swift_Mailer($transport);
// Create a message
$message = (new \\Swift_Message($data['subject']))
->setFrom([self::USERNAME => '**網(wǎng)'])
->setTo($data['to'])
->setBody($data['body']);
// Send the message
$result = $mailer->send($message);
return $result;
}
}測試
1.在 shell 中啟動 push 常駐程序。[root@localhost bin]# ./mix-daemon push start mix-daemon 'push' start successed.1.調(diào)用接口往消息隊列投放任務(wù)。
此時 shell 終端將打印:
成功收到測試郵件:
MixPHP
GitHub: https://github.com/mix-php/mix
官網(wǎng):http://www.mixphp.cn/
標(biāo)題名稱:教你使用mixphp打造多進(jìn)程異步郵件發(fā)送
文章分享:http://chinadenli.net/article40/cgpheo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、外貿(mào)網(wǎng)站建設(shè)、微信小程序、服務(wù)器托管、網(wǎng)站設(shè)計公司、全網(wǎng)營銷推廣
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)