MQTT实现文件上传
文件上传
目前已经实现文本文件和图像文件
实现思路
自定义文件传输协议【前32字节:文件名;接着8字节:起始地址;后续是文件传输内容。】
- 读取图像数据为二进制数据
- 进行base64加密【base64字符串】后通过byte流上传
- 物联网平台接收byte数据,得到base64字符串
- base64解码并写入文件
设备端
python脚本模拟设备实现文件上传:
import paho.mqtt.client as mqtt
import base64
import os
def mqtt_transmit(image_path, broker="localhost", port=1883, username=None, password=None, topic="/146/D1163P0R9PA7/file/post"):
"""
此函数用于通过 MQTT 协议传输指定路径的图像文件
参数:
image_path (str): 待传输图像文件的路径
broker (str): MQTT 代理地址
port (int): MQTT 代理端口
username (str): MQTT 用户名
password (str): MQTT 密码
topic (str): 要发布的主题
"""
try:
# 提取文件名,转换为二进制并填充
file_name = os.path.basename(image_path)
file_name_bin = file_name.encode('utf-8')
file_name_padded = file_name_bin.ljust(32)
# 设定起始位置为 0 并转换为二进制
start_position = "0"
start_position_padded = start_position.ljust(8)
start_position_bin = bytes(start_position_padded, encoding='utf-8')
# 读取并编码图像文件内容
with open(image_path, "rb") as image_file:
binary_data = image_file.read()
binary_data_encoded = base64.b64encode(binary_data).decode('utf-8')
binary_data_encoded_bytes = binary_data_encoded.encode('utf-8')
# 构建传输数据
data_to_transmit = file_name_padded + start_position_bin + binary_data_encoded_bytes
# print(data_to_transmit)
# 配置并连接 MQTT 客户端
client = mqtt.Client()
if username and password:
client.username_pw_set(username=username, password=password)
client.connect(broker, port)
# 发布数据
client.publish(topic, data_to_transmit)
# 断开连接
client.disconnect()
except Exception as e:
print(f"Error: {e}")
# 使用示例
image_path = 'logo.png'
mqtt_transmit(image_path, broker="mqtt服务器地址", port=1883, username="用户名", password="密码", topic="主题")
物联网平台端
reportFile
/**
* 上报文件信息
* 约定格式:
* 前32个字节 -> 文件名 【字符串】- 文件名.后缀名 UTF_8
* 接着8个字节 -> 起始地址 【字符串数字:如"00001234" -> 得到"1234"】 UTF_8
* 后续是文件内容
*/
@Override
public void reportFile(ReportFileDataBo bo) {
try {
byte[] data = bo.getData();
if (data.length > 0) {
// 1 前32个字节是 "文件名.后缀名"
byte[] bFileName = Arrays.copyOfRange(data, 0, 32);
String sFileName = new String(bFileName, StandardCharsets.UTF_8).trim();
String pre_sFileName = sFileName.split("\\.")[0];
String sub_sFileName = sFileName.split("\\.")[1];
String baseDir = RuoYiConfig.getUploadPath();
String fileName = getAbsoluteFile(baseDir, sFileName).getAbsolutePath();
log.info("文件全名: " + fileName);
createFileIfNotExists(fileName);
// 2 接着8个字节是起始位置 "pos"
byte[] pos = Arrays.copyOfRange(data, 32, 40);
// 先转换为数字字符串
String sPosition = new String(pos, StandardCharsets.UTF_8).trim();
// 再转化为数字
int position = Integer.parseInt(sPosition);
log.info("文件起始位置: " + position);
// 3 后面是内容
byte[] content = Arrays.copyOfRange(data, 40, data.length);
String contentStr = new String(content, StandardCharsets.UTF_8).trim();
int len = contentStr.length();
log.info("文件内容: " + contentStr.substring(0, 8) + "..." + contentStr.substring(len - 8, len) + " 文件len: " + len);
List<String> FileTypeList = Arrays.asList("jpg", "png", "jpeg", "gif", "bmp", "svg");
if (FileTypeList.contains(sub_sFileName)) {
// 图片文件
saveContentAsImg(fileName, position, contentStr);
} else {
// 默认文本文件
saveContentToFile(fileName, position, contentStr);
}
// 4 写入表, 并在前端页面提供下载功能
DeviceFile deviceFile = new DeviceFile();
deviceFile.setProductId(bo.getProductId());
deviceFile.setSerialNumber(bo.getSerialNumber());
long size = new File(fileName).length();
deviceFile.setFileSize(size + " Byte");
// 文件内容预览
deviceFile.setFileContent(contentStr.substring(0, 8) + "..." + contentStr.substring(len - 8, len));
// 5 remark存放文件路径
deviceFile.setRemark(fileName);
deviceFileService.insertDeviceFileLog(deviceFile);
}
} catch (Exception e) {
log.error("接收属性数据,解析数据时异常 message={},e={}", e.getMessage(),e);
}
}
getAbsoluteFile
public static final File getAbsoluteFile(String uploadDir, String fileName) throws IOException
{
File desc = new File(uploadDir + File.separator + fileName);
if (!desc.exists())
{
if (!desc.getParentFile().exists())
{
desc.getParentFile().mkdirs();
}
}
return desc;
}
createFileIfNotExists
/**
* 根据名称创建文件,如果已经存在,则跳过。
*/
private void createFileIfNotExists(String fileName) {
File file = new File(fileName);
if (!file.exists()) {
try {
if (file.createNewFile()) {
log.info("文件 {} 创建成功", fileName);
} else {
log.error("文件 {} 创建失败", fileName);
}
} catch (IOException e) {
log.error("创建文件 {} 时出现异常", fileName, e);
}
} else {
log.info("文件 {} 已经存在,跳过创建", fileName);
}
}
saveContentAsImg
private void saveContentAsImg(String fileName, int position, String content) {
byte[] writeBytes;
try {
// 找到文件位置
File file = new File(fileName);
if (!file.exists()) {
log.error("文件 {} 不存在,无法写入内容", fileName);
}
if (position < 0) {
log.error("插入到文件 {} 的起始位置 {} 不能为负数", fileName, position);
}
// 找到写入位置
final Path path = Paths.get(fileName);
byte[] fileBytes = Files.readAllBytes(path);
// base64解码并写入文件
byte[] imageBytes = Base64.getDecoder().decode(content);
writeBytes = replaceBytes(fileBytes, imageBytes, position);
saveImageFile(fileName, writeBytes);
} catch (IOException e) {
log.error("写入文件 {} 时出现异常", fileName, e);
}
}
private static void saveImageFile(String fileName, byte[] imageBytes)
{
try (FileOutputStream fos = new FileOutputStream(fileName)) {
fos.write(imageBytes);
System.out.println("Saved image to " + fileName);
} catch (IOException e) {
e.printStackTrace();
}
}
saveContentToFile
/**
* 根据文件名称和起始位置存入内容。
* @return 返回插入后的文件内容
*/
private void saveContentToFile(String fileName, int position, String content) {
byte[] writeBytes;
try {
File file = new File(fileName);
if (!file.exists()) {
log.error("文件 {} 不存在,无法写入内容", fileName);
}
if (position < 0) {
log.error("插入到文件 {} 的起始位置 {} 不能为负数", fileName, position);
}
// 读取文件内容
final Path path = Paths.get(fileName);
byte[] fileBytes = Files.readAllBytes(path);
// 将 content 转换为字节数组
byte[] contentBytes = content.getBytes();
// 使用替换方法,并接受可能的新数组
writeBytes = replaceBytes(fileBytes, contentBytes, position);
// 写入文件
Files.write(path, writeBytes, StandardOpenOption.WRITE);
log.info("内容已经成功写入文件 {}", fileName);
} catch (IOException e) {
log.error("写入文件 {} 时出现异常", fileName, e);
}
}
replaceBytes
/**
* todo:将 contentBytes 从 position开始,长度 contentBytes.length 的内容替换 fileBytes处的内容
* todo:特殊情况,当 插入的内容 大于 原内容长度时,处理逻辑如下:
* 1,[0, position) -> result;
* 2, [position, position+contentBytes.length) -> reslut;
* 3, result.length = position+contentBytes.length.
*/
public byte[] replaceBytes(byte[] fileBytes, byte[] contentBytes, int position) {
// 如果指定位置大于原数组长度,那么从原数组末尾插入
if (position >= fileBytes.length) {
// 创建一个新的数组,长度为原数组长度加上内容长度
byte[] result = Arrays.copyOf(fileBytes, fileBytes.length + contentBytes.length);
// 复制内容到新数组的末端
System.arraycopy(contentBytes, 0, result, fileBytes.length, contentBytes.length);
return result;
} else if (contentBytes.length + position > fileBytes.length) {
// 如果超出,则按照特殊情况处理逻辑,创建一个新的数组
byte[] result = new byte[position + contentBytes.length];
// 复制原数组的[0, position)部分到新数组
System.arraycopy(fileBytes, 0, result, 0, position);
// 插入contentBytes到新数组中的[position, position + contentBytes.length)部分
System.arraycopy(contentBytes, 0, result, position, contentBytes.length);
// 返回新数组,后面不需要的部分会自动用0填充,符合Java数组初始化特性
return result;
} else {
// 如果不超出原数组长度,则在原来的数组上操作
System.arraycopy(contentBytes, 0, fileBytes, position, contentBytes.length);
// 返回原数组,因为我们在原地进行了修改
return fileBytes;
}
}
文件下载
handleDownload
async handleDownload(row) {
try {
const fileId = row.fileId || this.ids;
const responseText = await downloadDeviceUploadFile(fileId);
let fileContent = responseText;
var filePath = row.remark;
console.log('文件内容:', fileContent);
console.log('文件大小:', fileContent.length);
const parts = filePath.split(/[\\\/]/);
const fileNameWithExtension = parts[parts.length - 1];
const fileNameParts = fileNameWithExtension.split('.');
const fileName = fileNameParts.slice(0, -1).join('.');
const extension = fileNameParts[fileNameParts.length - 1];
console.log('文件名:', fileName); // 输出文件名
console.log('后缀名:', extension); // 输出后缀名
if (isImage(extension)) {
const base64Data = fileContent.includes(",") ? fileContent.split(",")[1] : fileContent;
if (base64Data) {
const contentType = `image/${extension}`; // 设定MIME类型
const blob = base64ToBlob(base64Data, contentType); // base64字符串转换为Blob对象
const link = document.createElement('a');
const timestamp = new Date().getTime();
const name = `${fileName}_${timestamp}.${extension}`;
link.href = URL.createObjectURL(blob);
link.download = name;
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
} else {
console.log("无效的Base64数据");
}
} else {
// 如果不是图像文件,处理为普通Blob
const blob = new Blob([fileContent]);
const link = document.createElement('a');
const timestamp = new Date().getTime();
const name = fileName + `_${timestamp}.${extension}`;
link.href = URL.createObjectURL(blob);
link.download = name;
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
}
} catch (error) {
console.error(error);
}
}
依赖代码
// 下载设备上传文件
export function downloadDeviceUploadFile(fileId) {
return request({
url: '/iotData/deviceUploadFile/download?fileId=' + fileId,
method: 'get'
})
}
export function isImage(extension) {
const imageExtensions = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg'];
return imageExtensions.includes(extension.toLowerCase());
}
export function base64ToBlob(base64, contentType = '') {
// 确保字符串有效并进行必要处理
let cleanedBase64Data = base64.trim().replace(/-/g, '+').replace(/_/g, '/');
const padding = '='.repeat((4 - cleanedBase64Data.length % 4) % 4);
cleanedBase64Data += padding;
// Base64解码
const binaryString = Buffer.from(cleanedBase64Data, 'base64').toString('binary'); // 使用 Buffer 解码
const len = binaryString.length;
const bytes = new Uint8Array(len);
// 将每个字符转换为字节
for (let i = 0; i < len; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
return new Blob([bytes], { type: contentType });
}
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 jungle8884@163.com