Here is my MQTT service class:
<?php
namespace App\Services;
use Illuminate\Contracts\Container\BindingResolutionException;
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\ClientNotConnectedToBrokerException;
use PhpMqtt\Client\Exceptions\RepositoryException;
use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\MqttClient;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Container\ContainerExceptionInterface;
use App\Models\Devices\Device;
class MqttService {
public function sendCommand(Device $device, string $command, int $qos = 1)
{
$host = config('mqtt.host');
$port = config('mqtt.port');
$clientId = config('mqtt.client_id');
$mqtt = new MqttClient($host, $port, $clientId);
$mqtt->connect();
$userId = $device->user->id;
$deviceId = $device->id;
$uniqueId = $device->unique_id;
$topic = "v2/$userId/$deviceId/commands";
if ($device->recognized_by_unique_id) {
$topic = "v3/$uniqueId/commands";
}
$mqtt->publish($topic, $command, $qos);
$mqtt->disconnect();
}
/**
* Publishes error message for a specific device and user on the Mqtt broker.
*
* @param string|null $userId the user id - first subtopic
* @param string|null $deviceId the device id - last sub topic
* @param string|null $uniqueId the device unique id - if it is using unique id instead of user id and device id
* @param string|null $errorAsJson the json error message to publish on the mqtt broker
*
* @return void
*
* @throws BindingResolutionException
* @throws NotFoundExceptionInterface
* @throws ContainerExceptionInterface
* @throws ConfigurationInvalidException
* @throws ConnectingToBrokerFailedException
* @throws ClientNotConnectedToBrokerException
* @throws RepositoryException
* @throws PendingMessageAlreadyExistsException
* @throws DataTransferException
*/
public static function sendError(?string $userId = null, ?string $deviceId = null, ?string $uniqueId = null, string $errorAsJson)
{
$host = config('mqtt.host');
$port = config('mqtt.port');
$clientId = config('mqtt.client_id');
if ($uniqueId) {
$topic = "errors/$uniqueId";
} else {
$topic = "errors/$userId/$deviceId";
}
$mqtt = new MqttClient($host, $port, $clientId);
$mqtt->connect();
$mqtt->publish($topic, $errorAsJson, 0);
$mqtt->disconnect();
}
}
Here is my job:
?php
namespace App\Jobs;
use App\Services\CommandService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Models\Devices\Device;
class SendCommand implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct(public Device $device, public string $command) {}
/**
* Execute the job.
*/
public function handle(): void
{
info("Before: {$this->device->unique_id}; {$this->command}");
CommandService::sendCommands($this->device, $this->command);
info("After: {$this->device->unique_id}; {$this->command}");
}
}
Here is also the relevant part of the CommandService
class:
public static function sendCommands(Device $device, string $command)
{
$mqtt = new MqttService;
$mqtt->sendCommand($device, $command);
return true;
}
As per the comments this is likely to be a combination of things:
- Concurrent connections with the same client ID (the broker will drop existing connections with the same ID)
- Using QOS 1 but ignoring the ACK (you just disconnect). You may as well use QOS 0 as messages will never be retransmitted with your current setup.
- Sending multiple messages with the same Packet ID/client ID (the packet ID should not be re-used before an ACK is processed). Whether this is an issue will depend on the broker (client initiated, unacknowledged, QOS 1 messages are not part of the server session state, but brokers may handle this in different ways).
- Using different, or no, Client ID.
- Waiting for the PUBACK
- Using QOS 0; these are fire and forget with no delivery guarantee (but as you don’t wait for a PUBACK currently so have no guarantees).
- stablishing a long running connection with the broker (does not really fit with normal PHP flow; questions like this may help).
Leave a Reply