Sign Up Form

Sign Up

Running MQTT publish command in Laravel queue with database option fails but in sync it works ?

1024 615 point-admin
  • 0

Here is my MQTT service class:

<?php

namespace App\Services;

use Illuminate\Contracts\Container\BindingResolutionException;
use PhpMqtt\Client\Exceptions\ConfigurationInvalidException;
use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException;
use PhpMqtt\Client\Exceptions\ClientNotConnectedToBrokerException;
use PhpMqtt\Client\Exceptions\RepositoryException;
use PhpMqtt\Client\Exceptions\PendingMessageAlreadyExistsException;
use PhpMqtt\Client\Exceptions\DataTransferException;
use PhpMqtt\Client\MqttClient;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Container\ContainerExceptionInterface;
use App\Models\Devices\Device;

class MqttService {

    public function sendCommand(Device $device, string $command, int $qos = 1)
    {
        $host = config('mqtt.host');
        $port = config('mqtt.port');
        $clientId = config('mqtt.client_id');


        $mqtt = new MqttClient($host, $port, $clientId);
        $mqtt->connect();

        $userId = $device->user->id;
        $deviceId = $device->id;
        $uniqueId = $device->unique_id;
        $topic = "v2/$userId/$deviceId/commands";

        if ($device->recognized_by_unique_id) {
            $topic = "v3/$uniqueId/commands";
        }

        $mqtt->publish($topic, $command, $qos);
        $mqtt->disconnect();
    }

    /**
     * Publishes error message for a specific device and user on the Mqtt broker.
     * 
     * @param string|null $userId the user id - first subtopic
     * @param string|null $deviceId the device id - last sub topic
     * @param string|null $uniqueId the device unique id - if it is using unique id instead of user id and device id
     * @param string|null $errorAsJson the json error message to publish on the mqtt broker
     * 
     * @return void 
     * 
     * @throws BindingResolutionException 
     * @throws NotFoundExceptionInterface 
     * @throws ContainerExceptionInterface 
     * @throws ConfigurationInvalidException 
     * @throws ConnectingToBrokerFailedException 
     * @throws ClientNotConnectedToBrokerException 
     * @throws RepositoryException 
     * @throws PendingMessageAlreadyExistsException 
     * @throws DataTransferException 
     */
    public static function sendError(?string $userId = null, ?string $deviceId = null, ?string $uniqueId = null, string $errorAsJson) 
    {
        $host = config('mqtt.host');
        $port = config('mqtt.port');
        $clientId = config('mqtt.client_id');

        if ($uniqueId) {
            $topic = "errors/$uniqueId";
        } else {
            $topic = "errors/$userId/$deviceId";
        }

        $mqtt = new MqttClient($host, $port, $clientId);
        $mqtt->connect();
        $mqtt->publish($topic, $errorAsJson, 0);
        $mqtt->disconnect();
    }

}

Here is my job:

?php

namespace App\Jobs;

use App\Services\CommandService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Models\Devices\Device;

class SendCommand implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(public Device $device, public string $command) {}

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        info("Before: {$this->device->unique_id}; {$this->command}");
        CommandService::sendCommands($this->device, $this->command);
        info("After: {$this->device->unique_id}; {$this->command}");
    }
}

Here is also the relevant part of the CommandService class:

public static function sendCommands(Device $device, string $command)
{
    $mqtt = new MqttService;

    $mqtt->sendCommand($device, $command);

    return true;
}

As per the comments this is likely to be a combination of things:

  • Concurrent connections with the same client ID (the broker will drop existing connections with the same ID)
  • Using QOS 1 but ignoring the ACK (you just disconnect). You may as well use QOS 0 as messages will never be retransmitted with your current setup.
  • Sending multiple messages with the same Packet ID/client ID (the packet ID should not be re-used before an ACK is processed). Whether this is an issue will depend on the broker (client initiated, unacknowledged, QOS 1 messages are not part of the server session state, but brokers may handle this in different ways).
  • Using different, or no, Client ID.
  • Waiting for the PUBACK
  • Using QOS 0; these are fire and forget with no delivery guarantee (but as you don’t wait for a PUBACK currently so have no guarantees).
  • stablishing a long running connection with the broker (does not really fit with normal PHP flow; questions like this may help).

Leave a Reply

Your email address will not be published.