Package Data | |
---|---|
Maintainer Username: | per3evere |
Maintainer Contact: | wu543065657@163.com (Persevere Von) |
Package Create Date: | 2019-02-25 |
Package Last Update: | 2020-01-07 |
Home Page: | |
Language: | PHP |
License: | MIT |
Last Refreshed: | 2025-01-14 15:15:11 |
Package Statistics | |
---|---|
Total Downloads: | 2,017 |
Monthly Downloads: | 0 |
Daily Downloads: | 0 |
Total Stars: | 4 |
Total Watchers: | 2 |
Total Forks: | 1 |
Total Open Issues: | 0 |
PHP client for NSQ.
该客户端针对 Laravel 框架做了一些事情。
You can read all about NSQ via the readme on Github, or via the Bitly blog post describing it. More details on nsqd, nsqlookupd are provided within each folder within the project.
Here's some thing I have learned:
nslookupd
where to find messages
and then connect to every nsqd
it tells you to (this is one of the things
that makes nsq good).nsqd
instance is isolated;
you can simply connect to any and send publish your message (I have built
this into the client).nsqd
and then
de-duplicating on subscribe (I have built this into the client).msg-timeout
is 60,000ms (60 seconds). This is
the time before nsq will automatically consider a message to have failed
and hence requeue it. Our "work" should take much less time than this.
Additionally, PHP is a blocking language, and although we are using a
non-blocking IO event loop, any work you do to process a message will
block the client from being able to reply to any heartbeats etc.nsqphp
is available to add to your project via composer. Simply add the
following to your composer.json.
{
...
"require": {
...
"per3evere/nsqphp": "dev-master"
}
...
}
You can also simply clone it into your project:
git clone git://github.com/persevereVon/nsqphp.git
To use nsqphp
in your projects, just setup autoloading via composer. The design lends itself to a dependency injection container (all dependencies are constructor injected), although you can just
setup the dependencies manually when you use it.
Follow the getting started guide to install nsq on localhost.
Publish some events:
php cruft/test-pub.php 10
Fire up a subscriber in one shell:
php cruft/test-sub.php mychannel > /tmp/processed-messages
Then tail the redirected STDOUT in another shell, so you can see the messages received and processed:
tail -f /tmp/processed-messages
In these tests I'm publishing first since I haven't yet got the client to automatically rediscover which nodes have messages for a given topic; hence if you sub first, there are no nodes found with messages for the topic.
The blog post describes a channel:
| Each channel receives a copy of all the messages for a topic. In | practice, a channel maps to a downstream service consuming a topic.
So each message in a topic
will be delivered to each channel
.
Fire up two subscribers with different channels (one in each shell):
php cruft/test-sub.php mychannel
php cruft/test-sub.php otherchannel
Publish some messages:
php cruft/test-pub.php 10
Each message will be delivered to each channel. It's also worth noting that the API allows you to subscribe to multiple topics/channels within the same process.
Setup a bunch of servers running nsqd
and nsqlookupd
with hostnames
nsq1
, nsq2
... Now publish a bunch of messages to both:
php cruft/test-pub.php 10 nsq1
php cruft/test-pub.php 10 nsq2
Now subscribe:
php cruft/test-sub.php mychannel > /tmp/processed-messages
You will receive 20 messages.
Same test as before, but this time we deliver the same message to two nsqd
instances and then de-duplicate on subscribe.
php cruft/test-pub.php 10 nsq1,nsq2
php cruft/test-sub.php mychannel > /tmp/processed-messages
This time you should receive only 10 messages.
Messages are encapsulated by the Per3evere\Nsq\Message\Message class and are referred to by interface within the code (so you could implement your own).
Interface:
public function getPayload();
public function getId();
public function getAttempts();
public function getTimestamp();
The client supports publishing to N nsqd
servers, which must be specified
explicitly by hostname. Unlike with subscription, there is no facility to
lookup the hostnames via nslookupd
(and we probably wouldn't want to anyway
for speed).
Minimal approach:
// 原生的方式
$nsq = new Per3evere\Nsq\nsqphp;
$nsq->publishTo('localhost')
->publish('mytopic', new Per3evere\Nsq\Message\Message('some message payload'));
// Laravel 方式
app('nsq')->publish('mytopic', new Per3evere\Nsq\Message\Message('some message payload'));
It's up to you to decide if/how to encode your payload (eg: JSON).
HA publishing:
$nsq = new Per3evere\Nsq\nsqphp;
$nsq->publishTo(array('nsq1', 'nsq2', 'nsq3'), Per3evere\Nsq\nsqphp::PUB_QUORUM)
->publish('mytopic', new Per3evere\Nsq\Message\Message('some message payload'));
We will require a quorum of the publishTo
nsqd daemons to respond to consider
this operation a success (currently that happens in series). This is assuming
I have 3 nsqd
s running on three hosts which are contactable via nsq1
etc.
This technique is going to log messages twice, which will require de-duplication on subscribe.
The client supports subscribing from N nsqd
servers, each of which will be
auto-discovered from one or more nslookupd
servers. The way this works is
that nslookupd
is able to provide a list of auto-discovered nodes hosting
messages for a given topic. This feature decouples our clients from having
to know where to find messages.
So when subscribing, the first thing we need to do is initialise our lookup service object:
$lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
Or alternatively:
$lookup = new Per3evere\Nsq\Lookup\Nsqlookupd('nsq1,nsq2');
We can then use this to subscribe:
$lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
$nsq = new Per3evere\Nsq\nsqphp($lookup);
$nsq->subscribe('mytopic', 'somechannel', function($msg) {
echo $msg->getId() . "\n";
})->run();
Warning: if our callback were to throw any Exceptions, the messages would not be retried using these settings - read on to find out more.
Or a bit more in the style of PHP (?):
$lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
$nsq = new Per3evere\Nsq\nsqphp($lookup);
$nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
->run();
function msgCallback($msg)
{
echo $msg->getId() . "\n";
}
We can also subscribe to more than one channel/stream:
$lookup = new Per3evere\Nsq\Lookup\Nsqlookup;
$nsq = new Per3evere\Nsq\nsqphp($lookup);
$nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
->subscribe('othertopic', 'somechannel', 'msgCallback')
->run();
由于订阅处理可能有很多,但是放到一个文件不是很合理。我们可以建立一个代码目录存放订阅类,该订阅类继承 Per3evere\Nsq\Subscribe
,topic
属性对应订阅的主题,channel
属性对应订阅的频道,callback
方法对应回调方法。
比如现在有两个订阅需求 SubscribeA
,SubscribeB
,首先建立这两个文件:
app/Api/V1/Subscribes/SubscribeA.php
:
<?php
namespace App\Api\V1\Subscribes;
use Per3evere\Nsq\Subscribe;
use Per3evere\Nsq\Message\Message;
class SubscribeA extends Subscribe
{
/**
* 订阅的主题.
*
* @var string
*/
protected $topic = 'test';
/**
* 订阅的频道.
*
* @var string
*/
protected $channel = 'ch';
/**
* 监听消息回调处理
*
* @return void
*/
public function callback(Message $msg)
{
var_dump($msg);
}
}
app/Api/V1/Subscribes/SubscribeB.php
:
<?php
namespace App\Api\V1\Subscribes;
use Per3evere\Nsq\Subscribe;
use Per3evere\Nsq\Message\Message;
class SubscribeB extends Subscribe
{
/**
* 订阅的主题.
*
* @var string
*/
protected $topic = 'test';
/**
* 订阅的频道.
*
* @var string
*/
protected $channel = 'ch';
/**
* 监听消息回调处理
*
* @return void
*/
public function callback(Message $msg)
{
var_dump($msg);
}
}
然后需要在 nsq.php
配置文件中填写配置项:
/*
|--------------------------------------------------------------------------
| 订阅类列表
|--------------------------------------------------------------------------
|
| 所有需要启动的订阅类,需继承 Per3evere\Nsq\Subscribe 抽象类
|
*/
'subscribes' => [
App\Api\V1\Subscribes\SubscribeA::class,
App\Api\V1\Subscribes\SubscribeB::class,
],
最后直接执行 php artisan nsq
,监听程序就开始正常执行了。
默认采取 Per3evere\Nsq\RequeueStrategy\FixedDelay
策略,最多尝试 5 次,每次延迟 2 秒。
The PHP client will catch any thrown Exceptions that happen within the callback and then either (a) retry, or (b) discard the messages. Usually you won't want to discard the messages.
To fix this, we need a requeue strategy - this is in the form of any
object that implements Per3evere\Nsq\RequeueStrategy\RequeueStrategyInterface
:
public function shouldRequeue(MessageInterface $msg);
The client currently ships with one; a fixed delay strategy:
$requeueStrategy = new Per3evere\Nsq\RequeueStrategy\FixedDelay;
$lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
$nsq = new Per3evere\Nsq\nsqphp($lookup, NULL, $requeueStrategy);
$nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
->run();
function msgCallback($msg)
{
if (rand(1,3) == 1) {
throw new \Exception('Argh, something bad happened');
}
echo $msg->getId() . "\n";
}
Recall that to achieve HA we simply duplicate on publish into
two different nsqd
servers. To perform de-duplication we simply need to
supply an object that implements Per3evere\Nsq\Dedupe\DedupeInterface
.
public function containsAndAdd($topic, $channel, MessageInterface $msg);
The PHP client ships with two mechanisms for de-duplicating messages on subscribe. Both are based around the opposite of a bloom filter. One maintains a hash map as a PHP array (and hence bound to a single process); the other calls out to Memcached and hence can share the data structure between many processes.
We can use this thus:
$requeueStrategy = new Per3evere\Nsq\RequeueStrategy\FixedDelay;
$dedupe = new Per3evere\Nsq\Dedupe\OppositeOfBloomFilterMemcached;
$lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
$nsq = new Per3evere\Nsq\nsqphp($lookup, $dedupe, $requeueStrategy);
$nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
->run();
function msgCallback($msg)
{
if (rand(1,3) == 1) {
throw new \Exception('Argh, something bad happened');
}
echo $msg->getId() . "\n";
}
You can read more about de-duplication on my blog, however it's worth keeping the following in mind:
The final optional dependency is a logger, in the form of some object that
implements Per3evere\Nsq\Logger\LoggerInterface
(there is no standard logger
interface shipped with PHP to the best of my knowledge):
public function error($msg);
public function warn($msg);
public function info($msg);
public function debug($msg);
The PHP client ships with a logger that dumps all logging information to STDERR.
Putting all of this together we'd have something similar to the test-sub.php
file:
$requeueStrategy = new Per3evere\Nsq\RequeueStrategy\FixedDelay;
$dedupe = new Per3evere\Nsq\Dedupe\OppositeOfBloomFilterMemcached;
$lookup = new Per3evere\Nsq\Lookup\Nsqlookupd;
$logger = new Per3evere\Nsq\Logger\Stderr;
$nsq = new Per3evere\Nsq\nsqphp($lookup, $dedupe, $requeueStrategy, logger);
$nsq->subscribe('mytopic', 'somechannel', 'msgCallback')
->run();
function msgCallback($msg)
{
if (rand(1,3) == 1) {
throw new \Exception('Argh, something bad happened');
}
echo $msg->getId() . "\n";
}
nsqd
instances