MQTT

MQTT实现文件上传

文件上传

目前已经实现文本文件和图像文件

实现思路

自定义文件传输协议【前32字节:文件名;接着8字节:起始地址;后续是文件传输内容。】

  • 读取图像数据为二进制数据
  • 进行base64加密【base64字符串】后通过byte流上传
  • 物联网平台接收byte数据,得到base64字符串
  • base64解码并写入文件

设备端

python脚本模拟设备实现文件上传:

import paho.mqtt.client as mqtt
import base64
import os

def mqtt_transmit(image_path, broker="localhost", port=1883, username=None, password=None, topic="/146/D1163P0R9PA7/file/post"):
    """
    此函数用于通过 MQTT 协议传输指定路径的图像文件

    参数:
    image_path (str): 待传输图像文件的路径
    broker (str): MQTT 代理地址
    port (int): MQTT 代理端口
    username (str): MQTT 用户名
    password (str): MQTT 密码
    topic (str): 要发布的主题
    """
    try:
        # 提取文件名,转换为二进制并填充
        file_name = os.path.basename(image_path)
        file_name_bin = file_name.encode('utf-8')
        file_name_padded = file_name_bin.ljust(32)
        
        # 设定起始位置为 0 并转换为二进制
        start_position = "0"
        start_position_padded = start_position.ljust(8)
        start_position_bin = bytes(start_position_padded, encoding='utf-8')
        
        # 读取并编码图像文件内容
        with open(image_path, "rb") as image_file:
            binary_data = image_file.read()
            binary_data_encoded = base64.b64encode(binary_data).decode('utf-8')
            binary_data_encoded_bytes = binary_data_encoded.encode('utf-8')
        
        # 构建传输数据
        data_to_transmit = file_name_padded + start_position_bin + binary_data_encoded_bytes
        # print(data_to_transmit)
        
        # 配置并连接 MQTT 客户端
        client = mqtt.Client()
        if username and password:
            client.username_pw_set(username=username, password=password)
        client.connect(broker, port)
        
        # 发布数据
        client.publish(topic, data_to_transmit)
        
        # 断开连接
        client.disconnect()

    except Exception as e:
        print(f"Error: {e}")

# 使用示例
image_path = 'logo.png'
mqtt_transmit(image_path, broker="mqtt服务器地址", port=1883, username="用户名", password="密码", topic="主题")

物联网平台端

reportFile

/**
 * 上报文件信息
 * 约定格式:
 *      前32个字节 -> 文件名    【字符串】- 文件名.后缀名 UTF_8
 *      接着8个字节 -> 起始地址 【字符串数字:如"00001234" -> 得到"1234"】 UTF_8
 *      后续是文件内容
 */
@Override
public void reportFile(ReportFileDataBo bo) {
    try {
        byte[] data = bo.getData();
        if (data.length > 0) {
            // 1 前32个字节是 "文件名.后缀名"
            byte[] bFileName = Arrays.copyOfRange(data, 0, 32);
            String sFileName = new String(bFileName, StandardCharsets.UTF_8).trim();
            String pre_sFileName = sFileName.split("\\.")[0];
            String sub_sFileName = sFileName.split("\\.")[1];
            String baseDir = RuoYiConfig.getUploadPath();
            String fileName = getAbsoluteFile(baseDir, sFileName).getAbsolutePath();

            log.info("文件全名: " + fileName);
            createFileIfNotExists(fileName);

            // 2 接着8个字节是起始位置 "pos"
            byte[] pos = Arrays.copyOfRange(data, 32, 40);
            // 先转换为数字字符串
            String sPosition = new String(pos, StandardCharsets.UTF_8).trim();
            // 再转化为数字
            int position = Integer.parseInt(sPosition);
            log.info("文件起始位置: " + position);

            // 3 后面是内容
            byte[] content = Arrays.copyOfRange(data, 40, data.length);
            String contentStr = new String(content, StandardCharsets.UTF_8).trim();
            int len = contentStr.length();
            log.info("文件内容: " + contentStr.substring(0, 8) + "..." + contentStr.substring(len - 8, len) + " 文件len: " + len);
            List<String> FileTypeList = Arrays.asList("jpg", "png", "jpeg", "gif", "bmp", "svg");
            if (FileTypeList.contains(sub_sFileName)) {
                // 图片文件
                saveContentAsImg(fileName, position, contentStr);
            } else {
                // 默认文本文件
                saveContentToFile(fileName, position, contentStr);
            }

            // 4 写入表, 并在前端页面提供下载功能
            DeviceFile deviceFile = new DeviceFile();
            deviceFile.setProductId(bo.getProductId());
            deviceFile.setSerialNumber(bo.getSerialNumber());
            long size = new File(fileName).length();
            deviceFile.setFileSize(size + " Byte");

            // 文件内容预览
            deviceFile.setFileContent(contentStr.substring(0, 8) + "..." + contentStr.substring(len - 8, len));

            // 5 remark存放文件路径
            deviceFile.setRemark(fileName);
            deviceFileService.insertDeviceFileLog(deviceFile);

        }
    } catch (Exception e) {
        log.error("接收属性数据,解析数据时异常 message={},e={}", e.getMessage(),e);
    }
}

getAbsoluteFile

public static final File getAbsoluteFile(String uploadDir, String fileName) throws IOException
{
    File desc = new File(uploadDir + File.separator + fileName);

    if (!desc.exists())
    {
        if (!desc.getParentFile().exists())
        {
            desc.getParentFile().mkdirs();
        }
    }
    return desc;
}

createFileIfNotExists

/**
 * 根据名称创建文件,如果已经存在,则跳过。
 */
private void createFileIfNotExists(String fileName) {
    File file = new File(fileName);
    if (!file.exists()) {
        try {
            if (file.createNewFile()) {
                log.info("文件 {} 创建成功", fileName);
            } else {
                log.error("文件 {} 创建失败", fileName);
            }
        } catch (IOException e) {
            log.error("创建文件 {} 时出现异常", fileName, e);
        }
    } else {
        log.info("文件 {} 已经存在,跳过创建", fileName);
    }
}

saveContentAsImg

private void saveContentAsImg(String fileName, int position, String content) {
    byte[] writeBytes;
    try {
        // 找到文件位置
        File file = new File(fileName);
        if (!file.exists()) {
            log.error("文件 {} 不存在,无法写入内容", fileName);
        }

        if (position < 0) {
            log.error("插入到文件 {} 的起始位置 {} 不能为负数", fileName, position);
        }

        // 找到写入位置
        final Path path = Paths.get(fileName);
        byte[] fileBytes = Files.readAllBytes(path);

        // base64解码并写入文件
        byte[] imageBytes = Base64.getDecoder().decode(content);
        writeBytes = replaceBytes(fileBytes, imageBytes, position);
        saveImageFile(fileName, writeBytes);
    } catch (IOException e) {
        log.error("写入文件 {} 时出现异常", fileName, e);
    }
}

private static void saveImageFile(String fileName, byte[] imageBytes)
{
    try (FileOutputStream fos = new FileOutputStream(fileName)) {
        fos.write(imageBytes);
        System.out.println("Saved image to " + fileName);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

saveContentToFile

/**
 * 根据文件名称和起始位置存入内容。
 * @return 返回插入后的文件内容
 */
private void saveContentToFile(String fileName, int position, String content) {
    byte[] writeBytes;
    try {
        File file = new File(fileName);
        if (!file.exists()) {
            log.error("文件 {} 不存在,无法写入内容", fileName);
        }

        if (position < 0) {
            log.error("插入到文件 {} 的起始位置 {} 不能为负数", fileName, position);
        }

        // 读取文件内容
        final Path path = Paths.get(fileName);
        byte[] fileBytes = Files.readAllBytes(path);

        // 将 content 转换为字节数组
        byte[] contentBytes = content.getBytes();
        // 使用替换方法,并接受可能的新数组
        writeBytes = replaceBytes(fileBytes, contentBytes, position);

        // 写入文件
        Files.write(path, writeBytes, StandardOpenOption.WRITE);
        log.info("内容已经成功写入文件 {}", fileName);
    } catch (IOException e) {
        log.error("写入文件 {} 时出现异常", fileName, e);
    }
}

replaceBytes

/**
 * todo:将 contentBytes 从 position开始,长度 contentBytes.length 的内容替换 fileBytes处的内容
 * todo:特殊情况,当 插入的内容 大于 原内容长度时,处理逻辑如下:
 *          1,[0, position) -> result;
 *          2, [position, position+contentBytes.length) -> reslut;
 *          3, result.length = position+contentBytes.length.
 */
public byte[] replaceBytes(byte[] fileBytes, byte[] contentBytes, int position) {
    // 如果指定位置大于原数组长度,那么从原数组末尾插入
    if (position >= fileBytes.length) {
        // 创建一个新的数组,长度为原数组长度加上内容长度
        byte[] result = Arrays.copyOf(fileBytes, fileBytes.length + contentBytes.length);
        // 复制内容到新数组的末端
        System.arraycopy(contentBytes, 0, result, fileBytes.length, contentBytes.length);
        return result;
    } else if (contentBytes.length + position > fileBytes.length) {
        // 如果超出,则按照特殊情况处理逻辑,创建一个新的数组
        byte[] result = new byte[position + contentBytes.length];

        // 复制原数组的[0, position)部分到新数组
        System.arraycopy(fileBytes, 0, result, 0, position);

        // 插入contentBytes到新数组中的[position, position + contentBytes.length)部分
        System.arraycopy(contentBytes, 0, result, position, contentBytes.length);

        // 返回新数组,后面不需要的部分会自动用0填充,符合Java数组初始化特性
        return result;
    } else {
        // 如果不超出原数组长度,则在原来的数组上操作
        System.arraycopy(contentBytes, 0, fileBytes, position, contentBytes.length);

        // 返回原数组,因为我们在原地进行了修改
        return fileBytes;
    }
}

文件下载

handleDownload

async handleDownload(row) {
  try {
    const fileId = row.fileId || this.ids;
    const responseText = await downloadDeviceUploadFile(fileId);
    let fileContent = responseText;
    var filePath = row.remark;

    console.log('文件内容:', fileContent);
    console.log('文件大小:', fileContent.length);

    const parts = filePath.split(/[\\\/]/); 
    const fileNameWithExtension = parts[parts.length - 1];
    const fileNameParts = fileNameWithExtension.split('.');
    const fileName = fileNameParts.slice(0, -1).join('.');
    const extension = fileNameParts[fileNameParts.length - 1];

    console.log('文件名:', fileName); // 输出文件名
    console.log('后缀名:', extension); // 输出后缀名

    if (isImage(extension)) {
        const base64Data = fileContent.includes(",") ? fileContent.split(",")[1] : fileContent;
        if (base64Data) {
          const contentType = `image/${extension}`; // 设定MIME类型
          const blob = base64ToBlob(base64Data, contentType); // base64字符串转换为Blob对象
          const link = document.createElement('a');
          const timestamp = new Date().getTime();
          const name = `${fileName}_${timestamp}.${extension}`;

          link.href = URL.createObjectURL(blob);
          link.download = name;

          document.body.appendChild(link);
          link.click();
          document.body.removeChild(link);
        } else {
          console.log("无效的Base64数据");
        }        
    } else {
      // 如果不是图像文件,处理为普通Blob
      const blob = new Blob([fileContent]);
      const link = document.createElement('a');

      const timestamp = new Date().getTime();
      const name = fileName + `_${timestamp}.${extension}`;

      link.href = URL.createObjectURL(blob);
      link.download = name;
      document.body.appendChild(link);
      link.click();
      document.body.removeChild(link);
    }
  } catch (error) {
    console.error(error);
  }
}

依赖代码

// 下载设备上传文件
export function downloadDeviceUploadFile(fileId) {
  return request({
    url: '/iotData/deviceUploadFile/download?fileId=' + fileId,
    method: 'get'
  })
}

export function isImage(extension) {
  const imageExtensions = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg'];
  return imageExtensions.includes(extension.toLowerCase());
}

export function base64ToBlob(base64, contentType = '') {
  // 确保字符串有效并进行必要处理
  let cleanedBase64Data = base64.trim().replace(/-/g, '+').replace(/_/g, '/');
  const padding = '='.repeat((4 - cleanedBase64Data.length % 4) % 4);
  cleanedBase64Data += padding;

  // Base64解码
  const binaryString = Buffer.from(cleanedBase64Data, 'base64').toString('binary'); // 使用 Buffer 解码
  const len = binaryString.length;
  const bytes = new Uint8Array(len);

  // 将每个字符转换为字节
  for (let i = 0; i < len; i++) {
      bytes[i] = binaryString.charCodeAt(i);
  }

  return new Blob([bytes], { type: contentType });
}

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 jungle8884@163.com

×

喜欢就点赞,疼爱就打赏