Package Data | |
---|---|
Maintainer Username: | songyongzhan |
Maintainer Contact: | 574482856@qq.com (SongYongZhan) |
Package Create Date: | 2021-05-01 |
Package Last Update: | 2022-01-24 |
Language: | PHP |
License: | MIT |
Last Refreshed: | 2025-01-13 15:09:32 |
Package Statistics | |
---|---|
Total Downloads: | 2,587 |
Monthly Downloads: | 0 |
Daily Downloads: | 0 |
Total Stars: | 0 |
Total Watchers: | 1 |
Total Forks: | 0 |
Total Open Issues: | 0 |
位置app/Providers/RabbitMqProvider.php
<?php
namespace App\Providers;
use App\Services\RabbitConfig;
use Illuminate\Support\ServiceProvider;
use Songyz\Rabbit\RabbitManager;
class RabbitMqProvider extends ServiceProvider
{
public function boot()
{
$this->app->singleton(RabbitManager::class, function () {
return RabbitManager::getInstance(new RabbitConfig(), app('log'));
});
}
/**
* 获取提供者提供的服务
*
* @return array
*/
public function provides()
{
return [RabbitManager::class];
}
}
在 config/app.php
中,将 App\Providers\RabbitMqProvider::class
添加进来,这里面缺少一个rabbitConfig
配置,如下:
可以新建一个目录,将 config
exchange
统一放在一个目录下即可。
<?php
namespace App\Services;
use Songyz\Rabbit\Conf\Config;
class RabbitConfig extends Config
{
public function __construct()
{
$this->hosts = $this->initEnvironment();
}
/**
* 根据环境变量 设置不同的配置
* 此配置本身需要放在env中去维护,但考虑到env不支持数组,需要特定的格式去配置 较为繁琐
* 因此放在这里去做配置
* initEnvironment
* @return array|array[]
*
*/
private function initEnvironment()
{
$environment = app()->environment();
switch ($environment) {
case "development":
return [
[
'host' => "192.168.1.196",
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/'
],
];
break;
case "test":
return [];
break;
case "pre-production":
return [
[
'host' => "192.168.1.198",
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/'
],
];
break;
case "production":
return [
[
'host' => "192.168.1.199",
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/'
],
];
break;
default:
return [];
break;
}
}
}
initEnvironment
根据实际环境改写。
<?php
namespace App\Services;
use Songyz\Rabbit\Exchange\Exchange;
class PhpFirstExchange extends Exchange
{
/** @var string 交换机名称 */
protected $name = "php-f-exchange";
/** @var string
* 交换机路由key 可以为空 直连模式
* 交换机路由key topic 及 fanout 可指定对对应的key
*/
protected $routingKey = "";
}
交换机定义 一定要继承 Exchange
$rabbitManager = app(RabbitManager::class);
$rabbit = $rabbitManager->rabbit();
$exchange = new PhpFirstExchange();
$message = new Message("test-php 2021-05-03 14:19:03");
$rabbit1 = $rabbit->publish($message, $exchange, '');
$rabbitManager = app(RabbitManager::class);
$rabbit = $rabbitManager->rabbit();
$exchange = new PhpFirstExchange();
$message = new Message("test-php 2021-05-03 14:19:03");
$rabbit1 = $rabbit->publish($message, $exchange, '', true);
<?php
namespace App\Services;
use Songyz\Rabbit\Message\Message;
use Songyz\Rabbit\RabbitManager;
class MqService
{
public function mq()
{
$rabbitManager = app(RabbitManager::class);
$rabbit = $rabbitManager->rabbit();
$exchange = new PhpFirstExchange();
$result = [];
for ($i = 0; $i <= 30; $i++) {
array_push($result, new Message(json_encode(['content' => "test-php" . microtime(true)])));
}
$rabbit->publishMulti($result, $exchange, '');
}
}
<?php
namespace App\Services;
use Songyz\Rabbit\Message\Message;
use Songyz\Rabbit\RabbitManager;
class MqService
{
public function mq()
{
$rabbitManager = app(RabbitManager::class);
$rabbit = $rabbitManager->rabbit();
$exchange = new PhpFirstExchange();
$result = [];
for ($i = 0; $i <= 30; $i++) {
array_push($result, new Message(json_encode(['content' => "test-php" . microtime(true)])));
}
$rabbit->publishMulti($result, $exchange, '', true);
}
}
消费者只提供一种模式,每次发送 1
条消息,消费完毕,并重试3
次, 这两个参数都是包里定制好的,不能修改。
消费者带有消息确认,ack
或 nack
。
<?php
namespace App\Console\Commands;
use App\Models\HealthRecord;
use Songyz\Rabbit\Command\RabbitCommand;
use Songyz\Rabbit\Message\ResponseMessage;
class MqConsumer extends RabbitCommand
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'mq:xf';
/**
* The console command description.
*
* @var string
*/
protected $description = '脚本消费';
/** @var string 要消费的队列 */
protected $queue = 'php-f-queue3';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
parent::handle();
return 0;
}
public function consume(ResponseMessage $message): bool
{
$this->line("消费者标签:{$message->getDeliveryTag()} | 信息:" . $message->getBody());
$body = json_decode($message->getBody(), true);
// if (! empty($body) && is_array($body)) {
return false;
// }
//处理你的业务逻辑
//如果成功 则返回 true 失败就返回 false 默认会重试三次
return true;
}
}
$queue
这个属性一定要定义,这是消费的队列
启动脚本 php artisan mq:xf