ESP32使用队列实现并发控制外设

ESP32使用队列实现并发控制外设

ESP32开发板可以消费MQTT消息来驱动外设,例如抖音整蛊设备,如电磁铁等,但无法实现多个通道并发,通过结合队列和FreeRTOS设置任务,可以实现并发。

以下是C代码。

#include <WiFi.h>
#include <PubSubClient.h>

#include <stdio.h>
#include <string.h>

const char* ssid = "FY-2.4G";
const char* password = "12345678";
const char* mqtt_server = "192.168.29.39"; // MQTT服务器地址,例如HiveMQ的公共测试服务器
const int mqtt_port = 1883; // MQTT端口,通常为1883或8883(SSL)
const char* user = "lyh" ;
const char* mqttpassword = "12345678";
const char* mqsid = "ESP32Client";
const char* mqsid2 = "ESP32Client2";
TaskHandle_t ledTaskHandle = NULL;

WiFiClient espClient;

WiFiClient espClient2;
PubSubClient client(espClient);
PubSubClient client2(espClient2);

// 定义一个结构体来存储多个参数
typedef struct {
   int pin;
   int cnt;
} TaskParams;

// 定义一个结构体来存储多个参数
typedef struct {
   int channel;
   char taskname[100];
} TaskParams2;


#define MAX_LINE_LENGTH (64)

// Define two tasks for reading and writing from and to the serial port.
void TaskReadFromMQ(void *pvParameters);
void sendMQ(char * msg, int channel);

// Define Queue handle
QueueHandle_t QueueHandle[6];
const int QueueElementSize = 100;
typedef struct {
 int pin;
 int cnt;
 int channel;
 char line[MAX_LINE_LENGTH];
 uint8_t line_length;
} message_t;


void callback(char* topic, byte* payload, unsigned int length) {
 Serial.print("Message arrived in topic: ");
 Serial.println(topic);
 Serial.print("Message:");
 // Serial.print(length);
 for (int i = 0; i < length; i++) {
   Serial.print((char)payload[i]);
}
 Serial.println();
 extractBeforeVerticalBar((char*)payload);
 char* result1 = extractBeforeVerticalBar((char*)payload);
 // Serial.println(result1);
 int cnt = atoi(result1);
 int channel = 0;

 int pin=0;
 if(strcmp(topic, "zgtopic1") == 0) {
     pin=13;
     channel=1;
  }
 if(strcmp(topic, "zgtopic2") == 0) {
     pin=12;
     channel=2;
  }
 if(strcmp(topic, "zgtopic3") == 0) {
     pin=14;
     channel=3;
  }
 if(strcmp(topic, "zgtopic4") == 0) {
     pin=32;
     channel=4;
  }
 if(strcmp(topic, "zgtopic5") == 0) {
     pin=33;
     channel=5;
  }            

 TaskParams taskParams = {pin,cnt};
 // xTaskCreate(ledControlTask, topic, 1024, &taskParams, tskIDLE_PRIORITY, &ledTaskHandle);
 // ledControlTask(&taskParams);
 // ledControlTask1(pin, cnt);
 sendMQ((char*)payload,pin,cnt, channel);
 Serial.println();
 free(result1);
}


char* extractBeforeVerticalBar(const char* input) {
   if (input == NULL) {
       return NULL;
  }

   // 查找竖线位置
   const char* barPos = strchr(input, '|');
   if (barPos == NULL) {
       // 没有找到竖线,返回整个字符串的副本
       size_t len = strlen(input);
       char* result = (char*)malloc(len + 1);
       if (result == NULL) {
           return NULL;
      }
       strcpy(result, input);
       return result;
  }

   // 计算竖线前字符串的长度
   size_t len = barPos - input;
   char* result = (char*)malloc(len + 1);
   if (result == NULL) {
       return NULL;
  }

   // 复制竖线前的字符串
   strncpy(result, input, len);
   result[len] = '\0'; // 手动添加字符串结束符

   return result;
}


void ledControlTask1(int pin, int count, int channel) {
  char msg[100];
  for(int cnt = 0; cnt < count; cnt++) {
   sprintf(msg,"channel:%d ON seq:%d",channel,cnt);
   Serial.println(msg);
   digitalWrite(pin, HIGH);  // turn the LED on (HIGH is the voltage level)
   vTaskDelay(200);                      // wait for a second
   sprintf(msg,"channel:%d OFF seq:%d",channel,cnt);
   Serial.println(msg);
   digitalWrite(pin, LOW);   // turn the LED off by making the voltage LOW
   vTaskDelay(300);                      // wait for a second
   // vTaskDelay(30); // 延迟50ms
}
}

void ledControlTask(void *pvParameters) {
TaskParams *params = (TaskParams *)pvParameters;
 int pin = params->pin;
 int count = params->cnt;
 
 for(int cnt = 0; cnt < count; cnt++) {
   // 任务A的工作代码
   digitalWrite(pin, HIGH);  // turn the LED on (HIGH is the voltage level)
   delay(100);                      // wait for a second
   digitalWrite(pin, LOW);   // turn the LED off by making the voltage LOW
   delay(100);                      // wait for a second
   // vTaskDelay(30); // 延迟50ms
}
 while(true){vTaskDelay(300); }
}
// 任务A:每50ms执行一次
void subscribeTask(void *pvParameters) {
 
}


void setup_wifi(){
 WiFi.begin(ssid, password);

 while (WiFi.status() != WL_CONNECTED) {
   delay(500);
   Serial.print(".");
}

 Serial.println("");
 Serial.println("WiFi connected");
 Serial.println("IP address: ");
 Serial.println(WiFi.localIP());

}


void reconnect() {
 // 循环直到我们连接
 while (!client.connected()) {
   Serial.print("Attempting MQTT connection...");
   // 尝试连接到MQTT服务器,如果连接成功,返回一个非0的值,通常是客户端ID。如果失败,返回0。
   if (client.connect(mqsid, user, mqttpassword)) {
     Serial.println("MQTT connected");
     // 一旦连接成功,设置回调函数,订阅主题等。
     client.subscribe("zgtopic1"); // 订阅主题
     client.subscribe("zgtopic2"); // 订阅主题
     client.subscribe("zgtopic3"); // 订阅主题
     client.subscribe("zgtopic4"); // 订阅主题
     client.subscribe("zgtopic5"); // 订阅主题
     Serial.println("subscribe zgtopic1-5");
  } else {
     Serial.print("failed, rc=");
     Serial.print(client.state());
     Serial.println(" try again in 5 seconds");
     // 等待5秒再重试
     delay(5000);
  }
}
}

void setup() {
 Serial.begin(115200);
 pinMode(13, OUTPUT);
 pinMode(12, OUTPUT);
 pinMode(14, OUTPUT);
 pinMode(32, OUTPUT);
 pinMode(33, OUTPUT);
 setup_wifi(); // 连接到WiFi的函数(未显示)

   // Create the queue which will have <QueueElementSize> number of elements, each of size `message_t` and pass the address to <QueueHandle>.
 for (int i=0;i<6;i++){
   QueueHandle[i] = xQueueCreate(QueueElementSize, sizeof(message_t));
   // Check if the queue was successfully created
   if (QueueHandle[i] == NULL) {
     Serial.println("Queue could not be created. Halt.");
     while (1) {
       delay(1000);  // Halt at this point as is not possible to continue
    }
  }
}

 for (int i=0; i<6; i++){
   TaskParams2 para;
   para.channel=i;
   char taskname[100];
   sprintf(taskname,"Task%d",i);
   xTaskCreate(
     TaskReadFromMQ, taskname, 2048  // Stack size
    ,
     &para  // No parameter is used
    ,
     1  // Priority
    ,
     NULL  // Task handle is not used here
  );
 
}
 

 client.setServer(mqtt_server, mqtt_port); // 设置MQTT服务器和端口号
 client.setCallback(callback); // 设置回调函数处理消息到达事件
 // xTaskCreate(callbackTask2, "topic2", 1024, NULL, tskIDLE_PRIORITY, NULL);
}




void loop() {

 if (!client.connected()) { // 如果未连接,尝试重连
     reconnect();
  }
 client.loop(); // 处理网络事件和数据传输等。
 // client2.loop();
 // vTaskDelete(NULL);
}




/*--------------------------------------------------*/
/*---------------------- Tasks ---------------------*/
/*--------------------------------------------------*/

void TaskReadFromMQ(void *pvParameters) {  // This is a task.
 message_t message;
 TaskParams2 *params = (TaskParams2 *)pvParameters;
 int channel = params->channel;
 
 for (;;) {  // A Task shall never return or exit.
   if (QueueHandle[channel] != NULL) {  // Sanity check just to make sure the queue actually exists
     int ret = xQueueReceive(QueueHandle[channel], &message, portMAX_DELAY);
     if (ret == pdPASS) {
       // The message was successfully received - send it back to Serial port and "Echo: "
       Serial.printf("Get Message from QUEUE: channel:%d pin:%d cnt:%d msg:%s \n", message.channel,message.pin,message.cnt, message.line);
       ledControlTask1(message.pin, message.cnt, message.channel);
       // The item is queued by copy, not by reference, so lets free the buffer after use.
    } else if (ret == pdFALSE) {
       Serial.println("The `TaskWriteToSerial` was unable to receive data from the Queue");
    }
  }  // Sanity check
}  // Infinite loop
}

void sendMQ(char * msg, int pin, int cnt, int channel) {  // This is a task.
 message_t message;
 if (QueueHandle[channel] != NULL && uxQueueSpacesAvailable(QueueHandle[channel]) > 0) {
   strcpy(message.line,msg);
   message.cnt = cnt;
   message.pin = pin;
   message.channel = channel;
   message.line_length = strlen(message.line);
   int ret = xQueueSend(QueueHandle[channel], (void *)&message, 0);
   if (ret == pdTRUE) {
     // The message was successfully sent.
  } else if (ret == errQUEUE_FULL) {
     // Since we are checking uxQueueSpacesAvailable this should not occur, however if more than one task should
     //   write into the same queue it can fill-up between the test and actual send attempt
     Serial.println("Unable to send data into the Queue");
  }  // Queue send check
}  // Queue sanity check
}
 

发表评论