网站首页 > 技术文章 正文
该系统实现 “PLC 数据 1 秒采集→MySQL 存储→Flask 后端提供接口→BootstrapV4+ECharts 前端可视化” 的完整流程,以下是分模块实现代码(验证可行):
整体架构说明
- 数据采集端:Python 程序,1 秒读取 PLC 数据并写入 MySQL(基于 snap7+pymysql)
- 后端服务:Flask 框架,提供 API 接口供前端获取 MySQL 数据
- 前端页面:BootstrapV4 布局,ECharts 实现实时数据图表展示
- 数据流转:PLC → MySQL → Flask API → ECharts
数据采集端(PLC→MySQL)
负责 1 秒采集一次 PLC 数据并写入 MySQL,包含断线重连机制。plc_data_collector.py
import snap7
import struct
import time
import logging
import pymysql
from pymysql import OperationalError
from snap7.exceptions import Snap7Exception
# 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 自定义位操作
def get_bit(data, byte_index, bit_index):
return (data[byte_index] >> bit_index) & 1 if 0 <= byte_index < len(data) else False
# PLC连接类(优化版)
class PLCConnection:
def __init__(self, ip, rack=0, slot=1, reconnect_interval=5):
self.ip = ip
self.rack = rack
self.slot = slot
self.client = snap7.client.Client()
self.connected = False
self.reconnect_interval = reconnect_interval
self.heartbeat_interval = 10
self.last_heartbeat = 0
self.lock = False # 操作锁,避免并发冲突
def connect(self):
"""建立连接,确保连接前已断开旧连接"""
# 加锁防止并发操作冲突
if self.lock:
logging.warning("PLC操作已加锁,等待释放...")
return False
self.lock = True
try:
# 先断开可能存在的旧连接
if self.connected:
self.disconnect()
# 建立新连接
max_attempts = 3
for attempt in range(max_attempts):
try:
self.client.connect(self.ip, self.rack, self.slot)
self.connected = True
self.last_heartbeat = time.time()
logging.info(f"PLC连接成功: {self.ip}")
return True
except Snap7Exception as e:
logging.warning(f"PLC连接尝试 {attempt + 1}/3 失败: {str(e)}")
time.sleep(1)
self.connected = False
return False
finally:
self.lock = False # 释放锁
def disconnect(self):
"""安全断开连接,忽略断开时的错误"""
if self.connected:
try:
self.client.disconnect()
logging.info("PLC已断开连接")
except Exception as e:
logging.warning(f"PLC断开时发生警告: {str(e)}") # 仅警告,不中断流程
finally:
self.connected = False
def _heartbeat_check(self):
"""优化心跳检测,避免频繁读写"""
if time.time() - self.last_heartbeat > self.heartbeat_interval:
try:
# 读取一个字节的M区数据作为心跳检测
self.client.mb_read(0, 1)
self.last_heartbeat = time.time()
return True
except:
self.connected = False
logging.warning("PLC心跳检测失败,连接已断开")
return False
return True
def ensure_connected(self):
"""确保连接有效,增加重试间隔控制"""
if self.lock:
time.sleep(0.1) # 等待锁释放
if not self.connected or not self._heartbeat_check():
logging.info("PLC重连中...")
# 重连时增加初始延迟,避免频繁尝试
time.sleep(1)
while not self.connect():
time.sleep(self.reconnect_interval)
return self.connected
def read_db_real(self, db_num, offset):
if not self.ensure_connected():
return None
try:
data = self.client.db_read(db_num, offset, 4)
return round(struct.unpack('>f', data)[0], 2)
except Exception as e:
logging.error(f"读取DB{db_num}.DBD{offset}失败: {str(e)}")
self.connected = False
return None
def read_db_int(self, db_num, offset):
if not self.ensure_connected():
return None
try:
data = self.client.db_read(db_num, offset, 2)
return struct.unpack('>h', data)[0]
except Exception as e:
logging.error(f"读取DB{db_num}.DBW{offset}失败: {str(e)}")
self.connected = False
return None
# MySQL操作类保持不变
class MySQLHandler:
# (代码与之前相同,省略)
def __init__(self, host, db, user, pwd, port=3306, table="plc_real_data"):
self.host = host
self.db = db
self.user = user
self.pwd = pwd
self.port = port
self.table = table
self.conn = None
self.cursor = None
def connect(self):
try:
self.conn = pymysql.connect(
host=self.host, port=self.port, user=self.user,
password=self.pwd, database=self.db, charset="utf8mb4"
)
self.cursor = self.conn.cursor()
self.create_table()
logging.info("MySQL连接成功")
return True
except OperationalError as e:
logging.error(f"MySQL连接失败: {str(e)}")
return False
def create_table(self):
create_sql = f"""
CREATE TABLE IF NOT EXISTS {self.table} (
id INT AUTO_INCREMENT PRIMARY KEY,
collect_time DATETIME NOT NULL UNIQUE,
temperature REAL COMMENT '温度(℃)',
pressure REAL COMMENT '压力(MPa)',
flow INT COMMENT '流量(L/min)',
status TINYINT COMMENT '设备状态(0=停止,1=运行)',
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
try:
self.cursor.execute(create_sql)
self.conn.commit()
except Exception as e:
logging.error(f"创建表失败: {str(e)}")
def insert_data(self, data):
if not self.conn or not self.conn.open:
logging.warning("MySQL重连中...")
if not self.connect():
return False
try:
insert_sql = f"""
INSERT INTO {self.table} (collect_time, temperature, pressure, flow, status)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
temperature=%s, pressure=%s, flow=%s, status=%s;
"""
params = (
data["time"], data["temp"], data["press"], data["flow"], data["status"],
data["temp"], data["press"], data["flow"], data["status"]
)
self.cursor.execute(insert_sql, params)
self.conn.commit()
return True
except Exception as e:
logging.error(f"数据插入失败: {str(e)}")
self.conn = None
return False
def disconnect(self):
if self.conn and self.conn.open:
self.cursor.close()
self.conn.close()
logging.info("MySQL已断开连接")
# 主函数
def main():
PLC_CFG = {"ip": "192.168.10.87", "rack": 0, "slot": 1}
MYSQL_CFG = {
"host": "localhost", "db": "test",
"user": "root", "pwd": "123456"
}
DATA_MAP = {
"temp": {"db": 1, "offset": 0},
"press": {"db": 1, "offset": 4},
"flow": {"db": 1, "offset": 8},
"status": {"db": 1, "byte": 10, "bit": 0}
}
plc = PLCConnection(**PLC_CFG)
mysql = MySQLHandler(**MYSQL_CFG)
if not mysql.connect():
logging.error("MySQL初始化失败,程序退出")
return
try:
logging.info("开始1秒采集PLC数据...")
while True:
start = time.time()
data = {
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"temp": plc.read_db_real(DATA_MAP["temp"]["db"], DATA_MAP["temp"]["offset"]),
"press": plc.read_db_real(DATA_MAP["press"]["db"], DATA_MAP["press"]["offset"]),
"flow": plc.read_db_int(DATA_MAP["flow"]["db"],
DATA_MAP["flow"]["offset"]) if plc.ensure_connected() else None,
"status": None
}
# 优化状态读取逻辑,避免频繁调用db_read
if plc.ensure_connected():
try:
status_data = plc.client.db_read(DATA_MAP["status"]["db"], DATA_MAP["status"]["byte"], 1)
data["status"] = get_bit(status_data, 0, DATA_MAP["status"]["bit"])
except Exception as e:
logging.error(f"读取状态失败: {str(e)}")
data["status"] = 0
plc.connected = False
# 数据校验与存储
if all(v is not None for v in [data["temp"], data["press"], data["flow"]]):
if mysql.insert_data(data):
logging.info(f"采集成功: {data}")
else:
logging.warning("采集数据写入失败")
else:
logging.warning("PLC数据读取无效")
# 控制采集周期
elapsed = time.time() - start
time.sleep(max(0, 1 - elapsed))
except KeyboardInterrupt:
logging.info("用户中断采集")
finally:
plc.disconnect()
mysql.disconnect()
logging.info("采集程序退出")
if __name__ == "__main__":
main()
Flask 后端(MySQL→API)
flask_backend.py提供两个核心接口:
- /api/realtime:获取最新 1 条实时数据(用于仪表盘显示)
- /api/history:获取指定时间段的历史数据(用于趋势图)
from flask import Flask, jsonify, send_from_directory
import pymysql
from pymysql import OperationalError
from datetime import datetime, timedelta
import os
app = Flask(__name__)
# 配置静态文件目录
app.static_folder = 'static'
app.static_url_path = 'templates/static'
# MySQL配置(请修改为你的数据库信息)
MYSQL_CONFIG = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "123456", # 替换为你的密码
"database": "test",
"charset": "utf8mb4"
}
def get_mysql_conn():
"""获取MySQL连接"""
try:
conn = pymysql.connect(**MYSQL_CONFIG)
return conn
except OperationalError as e:
app.logger.error(f"MySQL连接失败: {str(e)}")
return None
@app.route("/api/realtime", methods=["GET"])
def get_realtime_data():
"""获取最新1条实时数据"""
conn = get_mysql_conn()
if not conn:
return jsonify({"code": 500, "msg": "数据库连接失败", "data": {}})
try:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = "SELECT collect_time, temperature, pressure, flow, status FROM plc_real_data ORDER BY collect_time DESC LIMIT 1;"
cursor.execute(sql)
data = cursor.fetchone()
if data:
data["collect_time"] = data["collect_time"].strftime("%Y-%m-%d %H:%M:%S")
return jsonify({"code": 200, "msg": "success", "data": data})
else:
return jsonify({"code": 404, "msg": "无数据", "data": {}})
except Exception as e:
app.logger.error(f"查询实时数据失败: {str(e)}")
return jsonify({"code": 500, "msg": "查询失败", "data": {}})
finally:
if conn:
conn.close()
@app.route("/api/history/<int:minutes>", methods=["GET"])
def get_history_data(minutes):
"""获取最近N分钟的历史数据"""
minutes = minutes if minutes > 0 else 10
conn = get_mysql_conn()
if not conn:
return jsonify({"code": 500, "msg": "数据库连接失败", "data": {}})
try:
cursor = conn.cursor(pymysql.cursors.DictCursor)
start_time = datetime.now() - timedelta(minutes=minutes)
sql = """
SELECT collect_time, temperature, pressure, flow
FROM plc_real_data
WHERE collect_time >= %s
ORDER BY collect_time ASC;
"""
cursor.execute(sql, (start_time,))
data = cursor.fetchall()
for item in data:
item["collect_time"] = item["collect_time"].strftime("%H:%M:%S")
return jsonify({"code": 200, "msg": "success", "data": data, "minutes": minutes})
except Exception as e:
app.logger.error(f"查询历史数据失败: {str(e)}")
return jsonify({"code": 500, "msg": "查询失败", "data": {}})
finally:
if conn:
conn.close()
@app.route("/", methods=["GET"])
def index():
"""前端页面入口"""
return send_from_directory(app.static_folder, 'index.html')
@app.route("/<path:path>", methods=["GET"])
def static_files(path):
"""提供静态文件访问"""
return send_from_directory(app.static_folder, path)
if __name__ == "__main__":
# 确保static目录存在
if not os.path.exists(app.static_folder):
os.makedirs(app.static_folder)
app.logger.info(f"创建静态文件目录: {app.static_folder}")
# 启动服务(允许外部访问)
app.run(host="0.0.0.0", port=5000, debug=True)
前端页面(Flask Static+BootstrapV4+ECharts)
在 Flask 项目根目录创建static文件夹,新建index.html,实现数据可视化(仪表盘 + 趋势图)。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>PLC数据监控系统</title>
<!-- 引入BootstrapV4 -->
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/css/bootstrap.min.css">
<!-- 引入ECharts -->
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
<style>
.card { margin-bottom: 20px; box-shadow: 0 2px 8px rgba(0,0,0,0.1); transition: all 0.3s; }
.card:hover { box-shadow: 0 5px 15px rgba(0,0,0,0.15); }
.chart-container { height: 400px; width: 100%; }
.realtime-value { font-size: 2rem; font-weight: bold; color: #007bff; }
.status-running { color: #28a745; } /* 运行状态绿色 */
.status-stopped { color: #dc3545; } /* 停止状态红色 */
.refresh-indicator { color: #6c757d; font-size: 0.9rem; }
</style>
</head>
<body class="container mt-4">
<h1 class="mb-4 text-center">PLC实时数据监控系统</h1>
<!-- 实时数据仪表盘 -->
<div class="row">
<!-- 温度卡片 -->
<div class="col-md-3">
<div class="card">
<div class="card-header bg-primary text-white">
<h5 class="card-title mb-0">温度 (℃)</h5>
</div>
<div class="card-body text-center">
<div id="temp-value" class="realtime-value">--</div>
<div id="temp-time" class="text-muted mt-2">暂无数据</div>
</div>
</div>
</div>
<!-- 压力卡片 -->
<div class="col-md-3">
<div class="card">
<div class="card-header bg-success text-white">
<h5 class="card-title mb-0">压力 (MPa)</h5>
</div>
<div class="card-body text-center">
<div id="press-value" class="realtime-value">--</div>
<div id="press-time" class="text-muted mt-2">暂无数据</div>
</div>
</div>
</div>
<!-- 流量卡片 -->
<div class="col-md-3">
<div class="card">
<div class="card-header bg-warning text-white">
<h5 class="card-title mb-0">流量 (L/min)</h5>
</div>
<div class="card-body text-center">
<div id="flow-value" class="realtime-value">--</div>
<div id="flow-time" class="text-muted mt-2">暂无数据</div>
</div>
</div>
</div>
<!-- 状态卡片 -->
<div class="col-md-3">
<div class="card">
<div class="card-header bg-danger text-white">
<h5 class="card-title mb-0">设备状态</h5>
</div>
<div class="card-body text-center">
<div id="status-value" class="realtime-value">--</div>
<div id="status-time" class="text-muted mt-2">暂无数据</div>
</div>
</div>
</div>
</div>
<!-- 历史趋势图 -->
<div class="card">
<div class="card-header bg-dark text-white">
<div class="row align-items-center">
<div class="col-md-8">
<h5 class="mb-0">数据趋势图</h5>
</div>
<div class="col-md-4 text-right">
<div class="btn-group" role="group">
<button type="button" class="btn btn-sm btn-light time-range" data-minutes="5">5分钟</button>
<button type="button" class="btn btn-sm btn-light time-range" data-minutes="10">10分钟</button>
<button type="button" class="btn btn-sm btn-light time-range" data-minutes="30">30分钟</button>
</div>
</div>
</div>
</div>
<div class="card-body">
<div id="trend-chart" class="chart-container"></div>
</div>
</div>
<!-- 页脚刷新状态 -->
<div class="text-center text-muted mb-4">
<span id="last-refresh" class="refresh-indicator">最后刷新:从未</span>
</div>
<script>
// 初始化图表
const chartDom = document.getElementById('trend-chart');
const myChart = echarts.init(chartDom);
let chartOption = {
tooltip: {
trigger: 'axis',
axisPointer: {
type: 'cross'
}
},
legend: {
data: ['温度(℃)', '压力(MPa)', '流量(L/min)']
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
containLabel: true
},
xAxis: {
type: 'category',
boundaryGap: false,
data: []
},
yAxis: [
{
type: 'value',
name: '温度(℃)',
position: 'left',
axisLine: {
lineStyle: {
color: '#007bff'
}
}
},
{
type: 'value',
name: '压力(MPa)',
position: 'right',
offset: 50,
axisLine: {
lineStyle: {
color: '#28a745'
}
}
},
{
type: 'value',
name: '流量(L/min)',
position: 'right',
axisLine: {
lineStyle: {
color: '#ffc107'
}
}
}
],
series: [
{
name: '温度(℃)',
type: 'line',
data: [],
symbol: 'circle',
symbolSize: 6,
lineStyle: { width: 2 },
itemStyle: { color: '#007bff' }
},
{
name: '压力(MPa)',
type: 'line',
data: [],
symbol: 'circle',
symbolSize: 6,
lineStyle: { width: 2 },
itemStyle: { color: '#28a745' },
yAxisIndex: 1
},
{
name: '流量(L/min)',
type: 'line',
data: [],
symbol: 'circle',
symbolSize: 6,
lineStyle: { width: 2 },
itemStyle: { color: '#ffc107' },
yAxisIndex: 2
}
]
};
// 当前时间范围(分钟)
let currentMinutes = 10;
// 从API获取实时数据
function fetchRealtimeData() {
fetch('/api/realtime')
.then(response => response.json())
.then(data => {
if (data.code === 200 && data.data) {
const realData = data.data;
// 更新温度
document.getElementById('temp-value').textContent = realData.temperature;
document.getElementById('temp-time').textContent = `更新于: ${realData.collect_time}`;
// 更新压力
document.getElementById('press-value').textContent = realData.pressure;
document.getElementById('press-time').textContent = `更新于: ${realData.collect_time}`;
// 更新流量
document.getElementById('flow-value').textContent = realData.flow;
document.getElementById('flow-time').textContent = `更新于: ${realData.collect_time}`;
// 更新状态
const statusEl = document.getElementById('status-value');
statusEl.textContent = realData.status === 1 ? '运行中' : '已停止';
statusEl.className = `realtime-value ${realData.status === 1 ? 'status-running' : 'status-stopped'}`;
document.getElementById('status-time').textContent = `更新于: ${realData.collect_time}`;
// 更新最后刷新时间
document.getElementById('last-refresh').textContent = `最后刷新:${new Date().toLocaleString()}`;
}
})
.catch(error => {
console.error('获取实时数据失败:', error);
document.getElementById('last-refresh').textContent = `最后刷新:失败(${new Date().toLocaleTimeString()})`;
});
}
// 从API获取历史数据并更新图表
function fetchHistoryData(minutes) {
fetch(`/api/history/${minutes}`)
.then(response => response.json())
.then(data => {
if (data.code === 200 && data.data.length > 0) {
// 提取时间和各项数据
const times = data.data.map(item => item.collect_time);
const temps = data.data.map(item => item.temperature);
const presses = data.data.map(item => item.pressure);
const flows = data.data.map(item => item.flow);
// 更新图表数据
chartOption.xAxis.data = times;
chartOption.series[0].data = temps;
chartOption.series[1].data = presses;
chartOption.series[2].data = flows;
myChart.setOption(chartOption);
}
})
.catch(error => {
console.error('获取历史数据失败:', error);
});
}
// 时间范围按钮事件
document.querySelectorAll('.time-range').forEach(button => {
button.addEventListener('click', function() {
// 更新按钮样式
document.querySelectorAll('.time-range').forEach(btn => {
btn.classList.remove('active', 'btn-primary');
btn.classList.add('btn-light');
});
this.classList.remove('btn-light');
this.classList.add('active', 'btn-primary');
// 获取并更新数据
currentMinutes = parseInt(this.getAttribute('data-minutes'));
fetchHistoryData(currentMinutes);
});
});
// 默认激活10分钟按钮
document.querySelector('.time-range[data-minutes="10"]').click();
// 定时刷新数据(1秒一次)
setInterval(() => {
fetchRealtimeData();
fetchHistoryData(currentMinutes);
}, 1000);
// 页面加载时立即获取数据
window.onload = function() {
fetchRealtimeData();
fetchHistoryData(currentMinutes);
// 响应窗口大小变化
window.addEventListener('resize', () => {
myChart.resize();
});
};
</script>
</body>
</html>
猜你喜欢
- 2025-09-21 提升档次的CSS伪元素技巧!原来::before和::after还能这么玩
- 2025-09-21 深入理解CSS变量(Custom Properties)
- 2025-09-21 使用deepSeek生成一个贪吃蛇游戏_canvas 贪吃蛇
- 2025-09-21 CSS 电梯:纯 CSS 实现的状态机与楼层导航
- 2025-09-21 操作系统应用开发(四)压缩软件开发demo——东方仙盟筑基期
- 2025-09-21 前端学习核心_黑马程序员vue全套视频教程从vue2 0到vue3 0一套全覆盖前端学习核心框架教程
- 2025-09-21 操作系统应用开发(七)mac苹果模拟器——东方仙盟练气期
- 2025-09-21 uniapp里小程序自定义tabbar,实现中间item凸起效果
- 2025-09-21 档案系统(一)学员管理系统开发——东方仙盟筑基期
- 2024-08-05 用多了各种组件库的你还会用原生JS写轮播图吗?
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- apt-getinstall-y (100)
- node_modules怎么生成 (87)
- chromepost (71)
- flexdirection (73)
- c++int转char (80)
- mysqlany_value (79)
- static函数和普通函数 (84)
- el-date-picker开始日期早于结束日期 (76)
- js判断是否是json字符串 (75)
- c语言min函数头文件 (77)
- asynccallback (87)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 无效的列索引 (74)