优秀的编程知识分享平台

网站首页 > 技术文章 正文

PLC 数据采集 + MySQL 存储 + Flask+ECharts 可视化系统

nanyue 2025-09-21 20:05:48 技术文章 1 ℃

该系统实现 “PLC 数据 1 秒采集→MySQL 存储→Flask 后端提供接口→BootstrapV4+ECharts 前端可视化” 的完整流程,以下是分模块实现代码(验证可行):

整体架构说明

  • 数据采集端:Python 程序,1 秒读取 PLC 数据并写入 MySQL(基于 snap7+pymysql)
  • 后端服务:Flask 框架,提供 API 接口供前端获取 MySQL 数据
  • 前端页面:BootstrapV4 布局,ECharts 实现实时数据图表展示
  • 数据流转:PLC → MySQL → Flask API → ECharts

数据采集端(PLC→MySQL)

负责 1 秒采集一次 PLC 数据并写入 MySQL,包含断线重连机制。plc_data_collector.py

import snap7
import struct
import time
import logging
import pymysql
from pymysql import OperationalError
from snap7.exceptions import Snap7Exception

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)


# 自定义位操作
def get_bit(data, byte_index, bit_index):
    return (data[byte_index] >> bit_index) & 1 if 0 <= byte_index < len(data) else False


# PLC连接类(优化版)
class PLCConnection:
    def __init__(self, ip, rack=0, slot=1, reconnect_interval=5):
        self.ip = ip
        self.rack = rack
        self.slot = slot
        self.client = snap7.client.Client()
        self.connected = False
        self.reconnect_interval = reconnect_interval
        self.heartbeat_interval = 10
        self.last_heartbeat = 0
        self.lock = False  # 操作锁,避免并发冲突

    def connect(self):
        """建立连接,确保连接前已断开旧连接"""
        # 加锁防止并发操作冲突
        if self.lock:
            logging.warning("PLC操作已加锁,等待释放...")
            return False

        self.lock = True
        try:
            # 先断开可能存在的旧连接
            if self.connected:
                self.disconnect()

            # 建立新连接
            max_attempts = 3
            for attempt in range(max_attempts):
                try:
                    self.client.connect(self.ip, self.rack, self.slot)
                    self.connected = True
                    self.last_heartbeat = time.time()
                    logging.info(f"PLC连接成功: {self.ip}")
                    return True
                except Snap7Exception as e:
                    logging.warning(f"PLC连接尝试 {attempt + 1}/3 失败: {str(e)}")
                    time.sleep(1)

            self.connected = False
            return False

        finally:
            self.lock = False  # 释放锁

    def disconnect(self):
        """安全断开连接,忽略断开时的错误"""
        if self.connected:
            try:
                self.client.disconnect()
                logging.info("PLC已断开连接")
            except Exception as e:
                logging.warning(f"PLC断开时发生警告: {str(e)}")  # 仅警告,不中断流程
            finally:
                self.connected = False

    def _heartbeat_check(self):
        """优化心跳检测,避免频繁读写"""
        if time.time() - self.last_heartbeat > self.heartbeat_interval:
            try:
                # 读取一个字节的M区数据作为心跳检测
                self.client.mb_read(0, 1)
                self.last_heartbeat = time.time()
                return True
            except:
                self.connected = False
                logging.warning("PLC心跳检测失败,连接已断开")
                return False
        return True

    def ensure_connected(self):
        """确保连接有效,增加重试间隔控制"""
        if self.lock:
            time.sleep(0.1)  # 等待锁释放

        if not self.connected or not self._heartbeat_check():
            logging.info("PLC重连中...")
            # 重连时增加初始延迟,避免频繁尝试
            time.sleep(1)
            while not self.connect():
                time.sleep(self.reconnect_interval)
        return self.connected

    def read_db_real(self, db_num, offset):
        if not self.ensure_connected():
            return None
        try:
            data = self.client.db_read(db_num, offset, 4)
            return round(struct.unpack('>f', data)[0], 2)
        except Exception as e:
            logging.error(f"读取DB{db_num}.DBD{offset}失败: {str(e)}")
            self.connected = False
            return None

    def read_db_int(self, db_num, offset):
        if not self.ensure_connected():
            return None
        try:
            data = self.client.db_read(db_num, offset, 2)
            return struct.unpack('>h', data)[0]
        except Exception as e:
            logging.error(f"读取DB{db_num}.DBW{offset}失败: {str(e)}")
            self.connected = False
            return None


# MySQL操作类保持不变
class MySQLHandler:
    # (代码与之前相同,省略)
    def __init__(self, host, db, user, pwd, port=3306, table="plc_real_data"):
        self.host = host
        self.db = db
        self.user = user
        self.pwd = pwd
        self.port = port
        self.table = table
        self.conn = None
        self.cursor = None

    def connect(self):
        try:
            self.conn = pymysql.connect(
                host=self.host, port=self.port, user=self.user,
                password=self.pwd, database=self.db, charset="utf8mb4"
            )
            self.cursor = self.conn.cursor()
            self.create_table()
            logging.info("MySQL连接成功")
            return True
        except OperationalError as e:
            logging.error(f"MySQL连接失败: {str(e)}")
            return False

    def create_table(self):
        create_sql = f"""
        CREATE TABLE IF NOT EXISTS {self.table} (
            id INT AUTO_INCREMENT PRIMARY KEY,
            collect_time DATETIME NOT NULL UNIQUE,
            temperature REAL COMMENT '温度(℃)',
            pressure REAL COMMENT '压力(MPa)',
            flow INT COMMENT '流量(L/min)',
            status TINYINT COMMENT '设备状态(0=停止,1=运行)',
            update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
        """
        try:
            self.cursor.execute(create_sql)
            self.conn.commit()
        except Exception as e:
            logging.error(f"创建表失败: {str(e)}")

    def insert_data(self, data):
        if not self.conn or not self.conn.open:
            logging.warning("MySQL重连中...")
            if not self.connect():
                return False
        try:
            insert_sql = f"""
            INSERT INTO {self.table} (collect_time, temperature, pressure, flow, status)
            VALUES (%s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE 
                temperature=%s, pressure=%s, flow=%s, status=%s;
            """
            params = (
                data["time"], data["temp"], data["press"], data["flow"], data["status"],
                data["temp"], data["press"], data["flow"], data["status"]
            )
            self.cursor.execute(insert_sql, params)
            self.conn.commit()
            return True
        except Exception as e:
            logging.error(f"数据插入失败: {str(e)}")
            self.conn = None
            return False

    def disconnect(self):
        if self.conn and self.conn.open:
            self.cursor.close()
            self.conn.close()
            logging.info("MySQL已断开连接")


# 主函数
def main():
    PLC_CFG = {"ip": "192.168.10.87", "rack": 0, "slot": 1}
    MYSQL_CFG = {
        "host": "localhost", "db": "test",
        "user": "root", "pwd": "123456"
    }
    DATA_MAP = {
        "temp": {"db": 1, "offset": 0},
        "press": {"db": 1, "offset": 4},
        "flow": {"db": 1, "offset": 8},
        "status": {"db": 1, "byte": 10, "bit": 0}
    }

    plc = PLCConnection(**PLC_CFG)
    mysql = MySQLHandler(**MYSQL_CFG)
    if not mysql.connect():
        logging.error("MySQL初始化失败,程序退出")
        return

    try:
        logging.info("开始1秒采集PLC数据...")
        while True:
            start = time.time()
            data = {
                "time": time.strftime("%Y-%m-%d %H:%M:%S"),
                "temp": plc.read_db_real(DATA_MAP["temp"]["db"], DATA_MAP["temp"]["offset"]),
                "press": plc.read_db_real(DATA_MAP["press"]["db"], DATA_MAP["press"]["offset"]),
                "flow": plc.read_db_int(DATA_MAP["flow"]["db"],
                                        DATA_MAP["flow"]["offset"]) if plc.ensure_connected() else None,
                "status": None
            }

            # 优化状态读取逻辑,避免频繁调用db_read
            if plc.ensure_connected():
                try:
                    status_data = plc.client.db_read(DATA_MAP["status"]["db"], DATA_MAP["status"]["byte"], 1)
                    data["status"] = get_bit(status_data, 0, DATA_MAP["status"]["bit"])
                except Exception as e:
                    logging.error(f"读取状态失败: {str(e)}")
                    data["status"] = 0
                    plc.connected = False

            # 数据校验与存储
            if all(v is not None for v in [data["temp"], data["press"], data["flow"]]):
                if mysql.insert_data(data):
                    logging.info(f"采集成功: {data}")
                else:
                    logging.warning("采集数据写入失败")
            else:
                logging.warning("PLC数据读取无效")

            # 控制采集周期
            elapsed = time.time() - start
            time.sleep(max(0, 1 - elapsed))

    except KeyboardInterrupt:
        logging.info("用户中断采集")
    finally:
        plc.disconnect()
        mysql.disconnect()
        logging.info("采集程序退出")


if __name__ == "__main__":
    main()

Flask 后端(MySQL→API)

flask_backend.py提供两个核心接口:

  • /api/realtime:获取最新 1 条实时数据(用于仪表盘显示)
  • /api/history:获取指定时间段的历史数据(用于趋势图)
from flask import Flask, jsonify, send_from_directory
import pymysql
from pymysql import OperationalError
from datetime import datetime, timedelta
import os

app = Flask(__name__)

# 配置静态文件目录
app.static_folder = 'static'
app.static_url_path = 'templates/static'

# MySQL配置(请修改为你的数据库信息)
MYSQL_CONFIG = {
    "host": "localhost",
    "port": 3306,
    "user": "root",
    "password": "123456",  # 替换为你的密码
    "database": "test",
    "charset": "utf8mb4"
}


def get_mysql_conn():
    """获取MySQL连接"""
    try:
        conn = pymysql.connect(**MYSQL_CONFIG)
        return conn
    except OperationalError as e:
        app.logger.error(f"MySQL连接失败: {str(e)}")
        return None


@app.route("/api/realtime", methods=["GET"])
def get_realtime_data():
    """获取最新1条实时数据"""
    conn = get_mysql_conn()
    if not conn:
        return jsonify({"code": 500, "msg": "数据库连接失败", "data": {}})

    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        sql = "SELECT collect_time, temperature, pressure, flow, status FROM plc_real_data ORDER BY collect_time DESC LIMIT 1;"
        cursor.execute(sql)
        data = cursor.fetchone()
        if data:
            data["collect_time"] = data["collect_time"].strftime("%Y-%m-%d %H:%M:%S")
            return jsonify({"code": 200, "msg": "success", "data": data})
        else:
            return jsonify({"code": 404, "msg": "无数据", "data": {}})
    except Exception as e:
        app.logger.error(f"查询实时数据失败: {str(e)}")
        return jsonify({"code": 500, "msg": "查询失败", "data": {}})
    finally:
        if conn:
            conn.close()


@app.route("/api/history/<int:minutes>", methods=["GET"])
def get_history_data(minutes):
    """获取最近N分钟的历史数据"""
    minutes = minutes if minutes > 0 else 10
    conn = get_mysql_conn()
    if not conn:
        return jsonify({"code": 500, "msg": "数据库连接失败", "data": {}})

    try:
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        start_time = datetime.now() - timedelta(minutes=minutes)
        sql = """
        SELECT collect_time, temperature, pressure, flow 
        FROM plc_real_data 
        WHERE collect_time >= %s 
        ORDER BY collect_time ASC;
        """
        cursor.execute(sql, (start_time,))
        data = cursor.fetchall()
        for item in data:
            item["collect_time"] = item["collect_time"].strftime("%H:%M:%S")
        return jsonify({"code": 200, "msg": "success", "data": data, "minutes": minutes})
    except Exception as e:
        app.logger.error(f"查询历史数据失败: {str(e)}")
        return jsonify({"code": 500, "msg": "查询失败", "data": {}})
    finally:
        if conn:
            conn.close()


@app.route("/", methods=["GET"])
def index():
    """前端页面入口"""
    return send_from_directory(app.static_folder, 'index.html')


@app.route("/<path:path>", methods=["GET"])
def static_files(path):
    """提供静态文件访问"""
    return send_from_directory(app.static_folder, path)


if __name__ == "__main__":
    # 确保static目录存在
    if not os.path.exists(app.static_folder):
        os.makedirs(app.static_folder)
        app.logger.info(f"创建静态文件目录: {app.static_folder}")

    # 启动服务(允许外部访问)
    app.run(host="0.0.0.0", port=5000, debug=True)

前端页面(Flask Static+BootstrapV4+ECharts)

在 Flask 项目根目录创建static文件夹,新建index.html,实现数据可视化(仪表盘 + 趋势图)。

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>PLC数据监控系统</title>
    <!-- 引入BootstrapV4 -->
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/css/bootstrap.min.css">
    <!-- 引入ECharts -->
    <script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
    <style>
        .card { margin-bottom: 20px; box-shadow: 0 2px 8px rgba(0,0,0,0.1); transition: all 0.3s; }
        .card:hover { box-shadow: 0 5px 15px rgba(0,0,0,0.15); }
        .chart-container { height: 400px; width: 100%; }
        .realtime-value { font-size: 2rem; font-weight: bold; color: #007bff; }
        .status-running { color: #28a745; } /* 运行状态绿色 */
        .status-stopped { color: #dc3545; } /* 停止状态红色 */
        .refresh-indicator { color: #6c757d; font-size: 0.9rem; }
    </style>
</head>
<body class="container mt-4">
    <h1 class="mb-4 text-center">PLC实时数据监控系统</h1>

    <!-- 实时数据仪表盘 -->
    <div class="row">
        <!-- 温度卡片 -->
        <div class="col-md-3">
            <div class="card">
                <div class="card-header bg-primary text-white">
                    <h5 class="card-title mb-0">温度 (℃)</h5>
                </div>
                <div class="card-body text-center">
                    <div id="temp-value" class="realtime-value">--</div>
                    <div id="temp-time" class="text-muted mt-2">暂无数据</div>
                </div>
            </div>
        </div>
        <!-- 压力卡片 -->
        <div class="col-md-3">
            <div class="card">
                <div class="card-header bg-success text-white">
                    <h5 class="card-title mb-0">压力 (MPa)</h5>
                </div>
                <div class="card-body text-center">
                    <div id="press-value" class="realtime-value">--</div>
                    <div id="press-time" class="text-muted mt-2">暂无数据</div>
                </div>
            </div>
        </div>
        <!-- 流量卡片 -->
        <div class="col-md-3">
            <div class="card">
                <div class="card-header bg-warning text-white">
                    <h5 class="card-title mb-0">流量 (L/min)</h5>
                </div>
                <div class="card-body text-center">
                    <div id="flow-value" class="realtime-value">--</div>
                    <div id="flow-time" class="text-muted mt-2">暂无数据</div>
                </div>
            </div>
        </div>
        <!-- 状态卡片 -->
        <div class="col-md-3">
            <div class="card">
                <div class="card-header bg-danger text-white">
                    <h5 class="card-title mb-0">设备状态</h5>
                </div>
                <div class="card-body text-center">
                    <div id="status-value" class="realtime-value">--</div>
                    <div id="status-time" class="text-muted mt-2">暂无数据</div>
                </div>
            </div>
        </div>
    </div>

    <!-- 历史趋势图 -->
    <div class="card">
        <div class="card-header bg-dark text-white">
            <div class="row align-items-center">
                <div class="col-md-8">
                    <h5 class="mb-0">数据趋势图</h5>
                </div>
                <div class="col-md-4 text-right">
                    <div class="btn-group" role="group">
                        <button type="button" class="btn btn-sm btn-light time-range" data-minutes="5">5分钟</button>
                        <button type="button" class="btn btn-sm btn-light time-range" data-minutes="10">10分钟</button>
                        <button type="button" class="btn btn-sm btn-light time-range" data-minutes="30">30分钟</button>
                    </div>
                </div>
            </div>
        </div>
        <div class="card-body">
            <div id="trend-chart" class="chart-container"></div>
        </div>
    </div>

    <!-- 页脚刷新状态 -->
    <div class="text-center text-muted mb-4">
        <span id="last-refresh" class="refresh-indicator">最后刷新:从未</span>
    </div>

    <script>
        // 初始化图表
        const chartDom = document.getElementById('trend-chart');
        const myChart = echarts.init(chartDom);
        let chartOption = {
            tooltip: {
                trigger: 'axis',
                axisPointer: {
                    type: 'cross'
                }
            },
            legend: {
                data: ['温度(℃)', '压力(MPa)', '流量(L/min)']
            },
            grid: {
                left: '3%',
                right: '4%',
                bottom: '3%',
                containLabel: true
            },
            xAxis: {
                type: 'category',
                boundaryGap: false,
                data: []
            },
            yAxis: [
                {
                    type: 'value',
                    name: '温度(℃)',
                    position: 'left',
                    axisLine: {
                        lineStyle: {
                            color: '#007bff'
                        }
                    }
                },
                {
                    type: 'value',
                    name: '压力(MPa)',
                    position: 'right',
                    offset: 50,
                    axisLine: {
                        lineStyle: {
                            color: '#28a745'
                        }
                    }
                },
                {
                    type: 'value',
                    name: '流量(L/min)',
                    position: 'right',
                    axisLine: {
                        lineStyle: {
                            color: '#ffc107'
                        }
                    }
                }
            ],
            series: [
                {
                    name: '温度(℃)',
                    type: 'line',
                    data: [],
                    symbol: 'circle',
                    symbolSize: 6,
                    lineStyle: { width: 2 },
                    itemStyle: { color: '#007bff' }
                },
                {
                    name: '压力(MPa)',
                    type: 'line',
                    data: [],
                    symbol: 'circle',
                    symbolSize: 6,
                    lineStyle: { width: 2 },
                    itemStyle: { color: '#28a745' },
                    yAxisIndex: 1
                },
                {
                    name: '流量(L/min)',
                    type: 'line',
                    data: [],
                    symbol: 'circle',
                    symbolSize: 6,
                    lineStyle: { width: 2 },
                    itemStyle: { color: '#ffc107' },
                    yAxisIndex: 2
                }
            ]
        };

        // 当前时间范围(分钟)
        let currentMinutes = 10;

        // 从API获取实时数据
        function fetchRealtimeData() {
            fetch('/api/realtime')
                .then(response => response.json())
                .then(data => {
                    if (data.code === 200 && data.data) {
                        const realData = data.data;

                        // 更新温度
                        document.getElementById('temp-value').textContent = realData.temperature;
                        document.getElementById('temp-time').textContent = `更新于: ${realData.collect_time}`;

                        // 更新压力
                        document.getElementById('press-value').textContent = realData.pressure;
                        document.getElementById('press-time').textContent = `更新于: ${realData.collect_time}`;

                        // 更新流量
                        document.getElementById('flow-value').textContent = realData.flow;
                        document.getElementById('flow-time').textContent = `更新于: ${realData.collect_time}`;

                        // 更新状态
                        const statusEl = document.getElementById('status-value');
                        statusEl.textContent = realData.status === 1 ? '运行中' : '已停止';
                        statusEl.className = `realtime-value ${realData.status === 1 ? 'status-running' : 'status-stopped'}`;
                        document.getElementById('status-time').textContent = `更新于: ${realData.collect_time}`;

                        // 更新最后刷新时间
                        document.getElementById('last-refresh').textContent = `最后刷新:${new Date().toLocaleString()}`;
                    }
                })
                .catch(error => {
                    console.error('获取实时数据失败:', error);
                    document.getElementById('last-refresh').textContent = `最后刷新:失败(${new Date().toLocaleTimeString()})`;
                });
        }

        // 从API获取历史数据并更新图表
        function fetchHistoryData(minutes) {
            fetch(`/api/history/${minutes}`)
                .then(response => response.json())
                .then(data => {
                    if (data.code === 200 && data.data.length > 0) {
                        // 提取时间和各项数据
                        const times = data.data.map(item => item.collect_time);
                        const temps = data.data.map(item => item.temperature);
                        const presses = data.data.map(item => item.pressure);
                        const flows = data.data.map(item => item.flow);

                        // 更新图表数据
                        chartOption.xAxis.data = times;
                        chartOption.series[0].data = temps;
                        chartOption.series[1].data = presses;
                        chartOption.series[2].data = flows;
                        myChart.setOption(chartOption);
                    }
                })
                .catch(error => {
                    console.error('获取历史数据失败:', error);
                });
        }

        // 时间范围按钮事件
        document.querySelectorAll('.time-range').forEach(button => {
            button.addEventListener('click', function() {
                // 更新按钮样式
                document.querySelectorAll('.time-range').forEach(btn => {
                    btn.classList.remove('active', 'btn-primary');
                    btn.classList.add('btn-light');
                });
                this.classList.remove('btn-light');
                this.classList.add('active', 'btn-primary');

                // 获取并更新数据
                currentMinutes = parseInt(this.getAttribute('data-minutes'));
                fetchHistoryData(currentMinutes);
            });
        });

        // 默认激活10分钟按钮
        document.querySelector('.time-range[data-minutes="10"]').click();

        // 定时刷新数据(1秒一次)
        setInterval(() => {
            fetchRealtimeData();
            fetchHistoryData(currentMinutes);
        }, 1000);

        // 页面加载时立即获取数据
        window.onload = function() {
            fetchRealtimeData();
            fetchHistoryData(currentMinutes);

            // 响应窗口大小变化
            window.addEventListener('resize', () => {
                myChart.resize();
            });
        };
    </script>
</body>
</html>

Tags:

最近发表
标签列表