ESP32使用队列实现并发控制外设
ESP32开发板可以消费MQTT消息来驱动外设,例如抖音整蛊设备,如电磁铁等,但无法实现多个通道并发,通过结合队列和FreeRTOS设置任务,可以实现并发。
以下是C代码。
#include <WiFi.h>
#include <PubSubClient.h>
#include <stdio.h>
#include <string.h>
const char* ssid = "FY-2.4G";
const char* password = "12345678";
const char* mqtt_server = "192.168.29.39"; // MQTT服务器地址,例如HiveMQ的公共测试服务器
const int mqtt_port = 1883; // MQTT端口,通常为1883或8883(SSL)
const char* user = "lyh" ;
const char* mqttpassword = "12345678";
const char* mqsid = "ESP32Client";
const char* mqsid2 = "ESP32Client2";
TaskHandle_t ledTaskHandle = NULL;
WiFiClient espClient;
WiFiClient espClient2;
PubSubClient client(espClient);
PubSubClient client2(espClient2);
// 定义一个结构体来存储多个参数
typedef struct {
int pin;
int cnt;
} TaskParams;
// 定义一个结构体来存储多个参数
typedef struct {
int channel;
char taskname[100];
} TaskParams2;
#define MAX_LINE_LENGTH (64)
// Define two tasks for reading and writing from and to the serial port.
void TaskReadFromMQ(void *pvParameters);
void sendMQ(char * msg, int channel);
// Define Queue handle
QueueHandle_t QueueHandle[6];
const int QueueElementSize = 100;
typedef struct {
int pin;
int cnt;
int channel;
char line[MAX_LINE_LENGTH];
uint8_t line_length;
} message_t;
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived in topic: ");
Serial.println(topic);
Serial.print("Message:");
// Serial.print(length);
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
extractBeforeVerticalBar((char*)payload);
char* result1 = extractBeforeVerticalBar((char*)payload);
// Serial.println(result1);
int cnt = atoi(result1);
int channel = 0;
int pin=0;
if(strcmp(topic, "zgtopic1") == 0) {
pin=13;
channel=1;
}
if(strcmp(topic, "zgtopic2") == 0) {
pin=12;
channel=2;
}
if(strcmp(topic, "zgtopic3") == 0) {
pin=14;
channel=3;
}
if(strcmp(topic, "zgtopic4") == 0) {
pin=32;
channel=4;
}
if(strcmp(topic, "zgtopic5") == 0) {
pin=33;
channel=5;
}
TaskParams taskParams = {pin,cnt};
// xTaskCreate(ledControlTask, topic, 1024, &taskParams, tskIDLE_PRIORITY, &ledTaskHandle);
// ledControlTask(&taskParams);
// ledControlTask1(pin, cnt);
sendMQ((char*)payload,pin,cnt, channel);
Serial.println();
free(result1);
}
char* extractBeforeVerticalBar(const char* input) {
if (input == NULL) {
return NULL;
}
// 查找竖线位置
const char* barPos = strchr(input, '|');
if (barPos == NULL) {
// 没有找到竖线,返回整个字符串的副本
size_t len = strlen(input);
char* result = (char*)malloc(len + 1);
if (result == NULL) {
return NULL;
}
strcpy(result, input);
return result;
}
// 计算竖线前字符串的长度
size_t len = barPos - input;
char* result = (char*)malloc(len + 1);
if (result == NULL) {
return NULL;
}
// 复制竖线前的字符串
strncpy(result, input, len);
result[len] = '\0'; // 手动添加字符串结束符
return result;
}
void ledControlTask1(int pin, int count, int channel) {
char msg[100];
for(int cnt = 0; cnt < count; cnt++) {
sprintf(msg,"channel:%d ON seq:%d",channel,cnt);
Serial.println(msg);
digitalWrite(pin, HIGH); // turn the LED on (HIGH is the voltage level)
vTaskDelay(200); // wait for a second
sprintf(msg,"channel:%d OFF seq:%d",channel,cnt);
Serial.println(msg);
digitalWrite(pin, LOW); // turn the LED off by making the voltage LOW
vTaskDelay(300); // wait for a second
// vTaskDelay(30); // 延迟50ms
}
}
void ledControlTask(void *pvParameters) {
TaskParams *params = (TaskParams *)pvParameters;
int pin = params->pin;
int count = params->cnt;
for(int cnt = 0; cnt < count; cnt++) {
// 任务A的工作代码
digitalWrite(pin, HIGH); // turn the LED on (HIGH is the voltage level)
delay(100); // wait for a second
digitalWrite(pin, LOW); // turn the LED off by making the voltage LOW
delay(100); // wait for a second
// vTaskDelay(30); // 延迟50ms
}
while(true){vTaskDelay(300); }
}
// 任务A:每50ms执行一次
void subscribeTask(void *pvParameters) {
}
void setup_wifi(){
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}
void reconnect() {
// 循环直到我们连接
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// 尝试连接到MQTT服务器,如果连接成功,返回一个非0的值,通常是客户端ID。如果失败,返回0。
if (client.connect(mqsid, user, mqttpassword)) {
Serial.println("MQTT connected");
// 一旦连接成功,设置回调函数,订阅主题等。
client.subscribe("zgtopic1"); // 订阅主题
client.subscribe("zgtopic2"); // 订阅主题
client.subscribe("zgtopic3"); // 订阅主题
client.subscribe("zgtopic4"); // 订阅主题
client.subscribe("zgtopic5"); // 订阅主题
Serial.println("subscribe zgtopic1-5");
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
// 等待5秒再重试
delay(5000);
}
}
}
void setup() {
Serial.begin(115200);
pinMode(13, OUTPUT);
pinMode(12, OUTPUT);
pinMode(14, OUTPUT);
pinMode(32, OUTPUT);
pinMode(33, OUTPUT);
setup_wifi(); // 连接到WiFi的函数(未显示)
// Create the queue which will have <QueueElementSize> number of elements, each of size `message_t` and pass the address to <QueueHandle>.
for (int i=0;i<6;i++){
QueueHandle[i] = xQueueCreate(QueueElementSize, sizeof(message_t));
// Check if the queue was successfully created
if (QueueHandle[i] == NULL) {
Serial.println("Queue could not be created. Halt.");
while (1) {
delay(1000); // Halt at this point as is not possible to continue
}
}
}
for (int i=0; i<6; i++){
TaskParams2 para;
para.channel=i;
char taskname[100];
sprintf(taskname,"Task%d",i);
xTaskCreate(
TaskReadFromMQ, taskname, 2048 // Stack size
,
¶ // No parameter is used
,
1 // Priority
,
NULL // Task handle is not used here
);
}
client.setServer(mqtt_server, mqtt_port); // 设置MQTT服务器和端口号
client.setCallback(callback); // 设置回调函数处理消息到达事件
// xTaskCreate(callbackTask2, "topic2", 1024, NULL, tskIDLE_PRIORITY, NULL);
}
void loop() {
if (!client.connected()) { // 如果未连接,尝试重连
reconnect();
}
client.loop(); // 处理网络事件和数据传输等。
// client2.loop();
// vTaskDelete(NULL);
}
/*--------------------------------------------------*/
/*---------------------- Tasks ---------------------*/
/*--------------------------------------------------*/
void TaskReadFromMQ(void *pvParameters) { // This is a task.
message_t message;
TaskParams2 *params = (TaskParams2 *)pvParameters;
int channel = params->channel;
for (;;) { // A Task shall never return or exit.
if (QueueHandle[channel] != NULL) { // Sanity check just to make sure the queue actually exists
int ret = xQueueReceive(QueueHandle[channel], &message, portMAX_DELAY);
if (ret == pdPASS) {
// The message was successfully received - send it back to Serial port and "Echo: "
Serial.printf("Get Message from QUEUE: channel:%d pin:%d cnt:%d msg:%s \n", message.channel,message.pin,message.cnt, message.line);
ledControlTask1(message.pin, message.cnt, message.channel);
// The item is queued by copy, not by reference, so lets free the buffer after use.
} else if (ret == pdFALSE) {
Serial.println("The `TaskWriteToSerial` was unable to receive data from the Queue");
}
} // Sanity check
} // Infinite loop
}
void sendMQ(char * msg, int pin, int cnt, int channel) { // This is a task.
message_t message;
if (QueueHandle[channel] != NULL && uxQueueSpacesAvailable(QueueHandle[channel]) > 0) {
strcpy(message.line,msg);
message.cnt = cnt;
message.pin = pin;
message.channel = channel;
message.line_length = strlen(message.line);
int ret = xQueueSend(QueueHandle[channel], (void *)&message, 0);
if (ret == pdTRUE) {
// The message was successfully sent.
} else if (ret == errQUEUE_FULL) {
// Since we are checking uxQueueSpacesAvailable this should not occur, however if more than one task should
// write into the same queue it can fill-up between the test and actual send attempt
Serial.println("Unable to send data into the Queue");
} // Queue send check
} // Queue sanity check
}