세상을 이롭게

MQTT - Mosquitto 02 ESP32 와 통신하기 본문

카테고리 없음

MQTT - Mosquitto 02 ESP32 와 통신하기

2025. 2. 23. 20:13

ESP32 에서 Client로 Local의 Broker 에게 접속하는 것을 테스트 해보도록한다.

개발보드는 ESP-WROOM-32D 를 사용하였다.

Broker 실행 전에 포트와 접속관련 설정을 위해 mosquitto.conf 를 아래 내용으로 수정해준다.

listener 1883
allow_anonymous true

아래 명령어를 통해 Local 에서 브로커를 실행한다.

mosquitto.exe -c mosquitto.conf -v

 

아래 명령어를 통해 Log를 볼수있도록 한다. 이때 관찰할 토픽을 설정한다.

mosquitto_sub -t "test/topic" -v

 

아래 명령어를 통해 메세지를 보내본다.

mosquitto_pub -t "test/topic" -m "Hello Mosquitto"

log가 찍히는 것을 볼 수 있다.

Broker 상에서도 잘 받은 것을 볼 수 있다.

이제 ESP32 에 코드를 올려보자.

#include "freertos/queue.h"
#include <Wire.h>
#include <Adafruit_BMP280.h>
#include <Adafruit_AHTX0.h>
#include <Adafruit_GFX.h>
#include <Adafruit_SSD1306.h>

#include <WiFi.h>
#include <NTPClient.h>
#include <PubSubClient.h>  // MQTT 라이브러리 추가

#define SCREEN_WIDTH 128     // OLED display width, in pixels
#define SCREEN_HEIGHT 64     // OLED display height, in pixels
#define OLED_RESET -1        // Reset pin # (or -1 if sharing Arduino reset pin)
#define SCREEN_ADDRESS 0x3C  ///< See datasheet for Address; 0x3D for 128x64, 0x3C for 128x32
Adafruit_SSD1306 display(SCREEN_WIDTH, SCREEN_HEIGHT, &Wire, OLED_RESET);

Adafruit_AHTX0 aht;
Adafruit_BMP280 bmp;
Adafruit_Sensor *bmp_temp = bmp.getTemperatureSensor();
Adafruit_Sensor *bmp_pressure = bmp.getPressureSensor();

const char *ssid = "SSID";            // Change this to your WiFi SSID
const char *password = "PASSWORD";  // Change this to your WiFi password
WiFiUDP ntpUDP;
NTPClient timeClient(ntpUDP, "kr.pool.ntp.org", 3600, 60000);

const char *mqtt_server = "192.168.0.12";  // MQTT 브로커 주소
const int mqtt_port = 1883;                // MQTT 포트 번호
const char *mqtt_topic = "test/topic";     // 구독할 토픽

WiFiClient espClient;
PubSubClient MQTTClient(espClient);

QueueHandle_t mqttDataQueue;
QueueHandle_t sensorDataQueue;

typedef struct
{
  bool bmp280_valid;
  bool aht20_valid;
  float bmp280_temperature;
  float bmp280_pressure;
  float aht20_temperature;
  float aht20_humidity;
} SensorData;

void MQTTCallback(char *topic, byte *payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++) {
    Serial.print((char)payload[i]);
  }
  Serial.println();

  // 수신한 메시지를 문자열로 변환
  char message[100];
  if (length < 100) {
    memcpy(message, payload, length);
    message[length] = '\0';  // 문자열 종료 문자 추가

    // 메시지를 큐에 전송
    if (xQueueSend(mqttDataQueue, &message, portMAX_DELAY) != pdPASS) {
      Serial.println("Failed to send message to mqttDataQueue");
    }
  } else {
    Serial.println("Received message is too long for the queue");
  }
}

void setOled() {
  if (!display.begin(SSD1306_SWITCHCAPVCC, SCREEN_ADDRESS)) {
    Serial.println(F("SSD1306 allocation failed"));
    while (1)
      delay(10);
  }
  display.display();
  delay(500);              // Pause for 2 seconds
  display.clearDisplay();  // Clear the buffer
  display.display();
  display.setTextSize(1);    // Normal 1:1 pixel scale
  display.setCursor(0, 16);  // Set the cursor position to (0, 10)
  display.setTextColor(SSD1306_WHITE);
  display.println("Made by Hyunjinwon");  // Print "1234567" on the OLED display
  display.println("2025.02.23. ver");
  // display.invertDisplay(true);
  display.display();
  delay(2000);
}

void setBMP280() {
  while (!Serial) {
    delay(100);  // wait for native usb
  }
  if (!bmp.begin()) {
    Serial.println(F("Could not find a valid BMP280 sensor, check wiring or try a different address!"));
    Serial.print("SensorID was: 0x");
    Serial.println(bmp.sensorID(), 16);
    while (1)
      delay(10);
  } else {
    Serial.println("BMP280 found");
    display.setCursor(0, 26);
    display.println("Load BMP280 Done...");
    display.display();
    delay(500);
  }
}

void setAHT20() {
  if (!aht.begin()) {
    Serial.println("Could not find AHT? Check wiring");
    while (1)
      delay(10);
  } else {
    Serial.println("AHT20 found");
    display.setCursor(0, 36);
    display.println("Load AHT20 Done...");
    display.display();
    delay(500);
  }

  /* Default settings from datasheet. */
  bmp.setSampling(Adafruit_BMP280::MODE_NORMAL,     /* Operating Mode. */
                  Adafruit_BMP280::SAMPLING_X2,     /* Temp. oversampling */
                  Adafruit_BMP280::SAMPLING_X16,    /* Pressure oversampling */
                  Adafruit_BMP280::FILTER_X16,      /* Filtering. */
                  Adafruit_BMP280::STANDBY_MS_500); /* Standby time. */

  bmp_temp->printSensorDetails();
}

void setWifi() {
  Serial.print("Connecting to ");
  Serial.println(ssid);
  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
  display.setCursor(0, 0);
  display.print("IP : ");
  display.println(WiFi.localIP());
  display.display();
  delay(1000);
}

void setNTP() {
  timeClient.setTimeOffset(9 * 3600);
  timeClient.begin();
  display.setCursor(0, 50);
  display.println("Load NTP Done...");
  display.display();
  delay(2000);
}

void setMQTT() {
  MQTTClient.setServer(mqtt_server, mqtt_port);
  MQTTClient.setCallback(MQTTCallback);
  delay(2000);
}

void setQueue() {
  sensorDataQueue = xQueueCreate(10, sizeof(SensorData));
  if (sensorDataQueue == NULL) {
    Serial.println("Failed to create sensor data queue");
    while (1)
      delay(10);
  }
  // 큐 생성: 큐의 길이는 10, 각 아이템의 크기는 100바이트로 설정
  mqttDataQueue = xQueueCreate(10, 100);
  if (mqttDataQueue == NULL) {
    Serial.println("Failed to create MQTT data queue");
    while (1)
      delay(10);
  }
}
void setup() {
  Serial.begin(115200);
  setOled();
  setBMP280();
  setAHT20();
  setWifi();
  setNTP();
  setMQTT();
  setQueue();
  // FreeRTOS 태스크 생성 (각 태스크의 스택 크기와 우선순위는 필요에 따라 조정)
  xTaskCreatePinnedToCore(sensorTask, "SensorTask", 4096, NULL, 1, NULL, 1);
  xTaskCreatePinnedToCore(mqttTask, "MQTTTask", 4096, NULL, 1, NULL, 0);
  xTaskCreatePinnedToCore(displayTask, "DisplayTask", 4096, NULL, 1, NULL, 0);
}

// MQTT 연결 및 클라이언트 루프 태스크
void mqttTask(void *parameter) {
  for (;;) {
    if (!MQTTClient.connected()) {
      Serial.print("Attempting MQTT connection...");
      if (MQTTClient.connect("ESP32Client")) {
        Serial.println("connected");
        MQTTClient.subscribe(mqtt_topic);
      } else {
        Serial.print("failed, rc=");
        Serial.print(MQTTClient.state());
        Serial.println(" try again in 5 seconds");
        vTaskDelay(pdMS_TO_TICKS(5000));
        continue;
      }
    }
    MQTTClient.loop();
    vTaskDelay(pdMS_TO_TICKS(10));  // 짧은 딜레이로 CPU 사용률 낮추기
  }
}

void sensorTask(void *parameter) {
  TickType_t xLastWakeTime = xTaskGetTickCount();
  for (;;) {
    SensorData data;
    data.bmp280_valid = false;
    data.aht20_valid = false;

    sensors_event_t bmp_temp_event, bmp_pressure_event, aht_temp_event, aht_humidity_event;

    bmp_temp->getEvent(&bmp_temp_event);
    bmp_pressure->getEvent(&bmp_pressure_event);
    if (!isnan(bmp_temp_event.temperature) && !isnan(bmp_pressure_event.pressure)) {
      data.bmp280_valid = true;
      data.bmp280_temperature = bmp_temp_event.temperature;
      data.bmp280_pressure = bmp_pressure_event.pressure;
    }

    aht.getEvent(&aht_humidity_event, &aht_temp_event);
    if (!isnan(aht_temp_event.temperature) && !isnan(aht_humidity_event.relative_humidity)) {
      data.aht20_valid = true;
      data.aht20_temperature = aht_temp_event.temperature;
      data.aht20_humidity = aht_humidity_event.relative_humidity;
    }

    if (xQueueSend(sensorDataQueue, &data, portMAX_DELAY) != pdPASS) {
      Serial.println("Failed to send data to queue");
    }

    vTaskDelayUntil(&xLastWakeTime, pdMS_TO_TICKS(1000));
  }
}

void displayTask(void *parameter) {
  SensorData data;
  char mqttMessage[100];
  bool mqttMessageReceived = false;  // MQTT 메시지 수신 여부를 추적하는 변수

  for (;;) {
    if (xQueueReceive(sensorDataQueue, &data, portMAX_DELAY) == pdPASS) {
      // display.clearDisplay();
      display.fillRect(0, 16, 128, 50, SSD1306_BLACK);
      display.setCursor(0, 16);
      display.print(timeClient.getFormattedTime());

      if (data.aht20_valid) {
        display.setCursor(0, 26);
        display.print("A_Temp: ");
        display.print(data.aht20_temperature);
        display.println(" C");

        display.setCursor(0, 36);
        display.print("A_Humid: ");
        display.print(data.aht20_humidity);
        display.println(" %");
      } else {
        display.setCursor(0, 26);
        display.println("AHT20 Error");
      }

      if (data.bmp280_valid) {
        display.setCursor(0, 46);
        display.print("B_Temp: ");
        display.print(data.bmp280_temperature);
        display.println(" C");

        display.setCursor(0, 56);
        display.print("B_Press: ");
        display.print(data.bmp280_pressure);
        display.println(" hPa");
      }
      else {
        display.setCursor(0, 46);
        display.println("BMP280 Error");
      }
    }

    // MQTT 메시지 수신
    if (xQueueReceive(mqttDataQueue, &mqttMessage, 0) == pdPASS) {
      // 새로운 MQTT 메시지를 수신했으므로 표시 플래그를 설정
      mqttMessageReceived = true;
    }

    // MQTT 메시지가 수신되었으면 디스플레이에 추가
    if (mqttMessageReceived) {
      display.fillRect(0, 0, 128, 16, SSD1306_BLACK);
      display.setCursor(0, 0);
      display.print("MQTT: ");
      display.println(mqttMessage);
      mqttMessageReceived = false;  // 메시지 표시 후 플래그 초기화
    }
    display.display();
    vTaskDelay(pdMS_TO_TICKS(100));  // 100ms 주기로 태스크 실행
  }
}

void loop() {
  vTaskDelay(pdMS_TO_TICKS(1000));
}

 

ESP32 에서 ROS 를 사용하여 Task를 구현하였다.
사용자가 수정해줘야하는 부분은 아래와 같다.

const char *ssid = "SSID";            // Change this to your WiFi SSID
const char *password = "PASSWORD";  // Change this to your WiFi password
const char *mqtt_server = "192.168.0.12";  // MQTT 브로커 주소

const int mqtt_port = 1883;                // MQTT 포트 번호
const char *mqtt_topic = "test/topic";     // 구독할 토픽

만약 "test/topic" 을 수정한다면 위에 log 보는 토픽도 변경해서 확인해야한다.
MQTT 브로커 주소는 local 의 주소를 넣으면 된다.

ipconfig 명령어를 통해 확인하자.

Broker와 ESP32 가 연결되면 

명령어를 통해 날려보면 위와 같이 Ping을 날리는 것을 볼 수 있다.

토픽을 보내보면 

log 로 찍히는 것도 볼 수 있고,

ESP32 에서도 토픽을 받는 것도 확인 할 수있다.