Package Data | |
---|---|
Maintainer Username: | s008nyx |
Maintainer Contact: | sonyxvlg@gmail.com (Alexey Mitroshkin) |
Package Create Date: | 2021-10-13 |
Package Last Update: | 2021-10-15 |
Language: | PHP |
License: | MIT |
Last Refreshed: | 2024-11-25 15:00:40 |
Package Statistics | |
---|---|
Total Downloads: | 86 |
Monthly Downloads: | 1 |
Daily Downloads: | 0 |
Total Stars: | 1 |
Total Watchers: | 2 |
Total Forks: | 0 |
Total Open Issues: | 0 |
composer require s008nyx/kafka-bus
Open your bootstrap/app.php
file and:
Register Container Bindings
section: $app->configure('kafka-bus');
Register Service Providers
section: $app->register(\KafkaBus\KafkaBusServiceProvider::class);
KAFKA_BROKERS="kafka-node01:9093,kafka-node02:9093"
KAFKA_AUTOCOMMIT=true
KAFKA_GROUP_ID="myGroup"
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISMS=SCRAM-SHA-512
KAFKA_SASL_PASSWORD=password
KAFKA_SASL_USERNAME=username
KAFKA_SSL_CA_LOCATION=/path/to/ca.crt
KAFKA_SSL_CERTIFICATE_LOCATION=/path/to/chain.crt
MyHandler.php
<?php
use KafkaBus\Error;
use KafkaBus\KafkaHandler;
use RdKafka\Message;
class MyHandler implements KafkaHandler
{
/**
* Topics list
* @return array
*/
public function getTopics(): array
{
return ['myTopic'];
}
/**
* Processing success message
* @param Message $message
* @return bool
*/
public function process(Message $message): bool
{
// Do something
}
/**
* Processing fail message
* @param Error $error
* @return bool
*/
public function error(Error $error): bool
{
// Do something
}
}
KafkaCommand.php
<?php
namespace App\Console\Commands;
use KafkaBus\Consumer;
use Illuminate\Console\Command;
class KafkaConsumer extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'kafka:consume';
/**
* @param Consumer $consumer
*/
public function handle(Consumer $consumer)
{
try {
$consumer->consume(new MyHandler());
} catch (\Exception $e) {
$this->error($e->getMessage());
}
}
}
php artisan kafka:consume