from datetime import datetime, timezone # 引入timezone用于UTC时间处理
import paho.mqtt.client as mqtt
# -------------------------- 基础配置 --------------------------
MQTT_BROKER = "broker.emqx.io" # 公共测试服务器
# MQTT_BROKER = "localhost" # 本地服务器(需安装Mosquitto)
MQTT_BASE_TOPIC = "hydropower/" # 基础主题
MQTT_KEEPALIVE = 60 # 心跳间隔(秒)
DEVICE_PUBLISH_INTERVAL = 1 # 每个设备消息的发布间隔(秒)
CYCLE_WAIT_TIME = 3 # 一轮设备发布完成后的等待时间(秒)
# -------------------------- 电站与设备核心配置 --------------------------
"plant1": { # 华东区.清河水电站(混流式)
"plant_base_id": "华东区.清河水电站.一号发电厂房", # 电厂基础ID(保留中文)
"equipment_type": "混流式水轮机",
"equipment_id": "TBN_HL_1",
"metric_config_key": "francis_turbine",
"mqtt_topic": f"{MQTT_BASE_TOPIC}turbine" # 水轮机独立主题
"equipment_type": "同步发电机(混流式配套)",
"equipment_id": "GEN_HL_1",
"metric_config_key": "generator_standard",
"mqtt_topic": f"{MQTT_BASE_TOPIC}generator" # 发电机独立主题
"equipment_type": "机械液压调速器",
"equipment_id": "GOV_HL_1",
"metric_config_key": "governor_francis",
"mqtt_topic": f"{MQTT_BASE_TOPIC}governor" # 调速器独立主题
"equipment_type": "球形进水阀",
"equipment_id": "VAL_HL_1",
"metric_config_key": "inlet_valve_spherical",
"mqtt_topic": f"{MQTT_BASE_TOPIC}inlet_valve" # 进水阀独立主题
# -------------------------- 全设备指标配置 --------------------------
ALL_EQUIPMENT_METRICS = {
"francis_turbine": { # 混流式水轮机
"status_metrics": ["operation_status", "fault_code"],
"rotational_speed": (98.5, 101.5, "rpm"),
"bearing_vertical_vibration": (0.02, 0.15, "mm/s"),
"bearing_horizontal_vibration": (0.02, 0.12, "mm/s"),
"spindle_swing": (0.05, 0.3, "mm"),
"thrust_bearing_temperature": (40, 55, "℃"),
"guide_bearing_temperature": (35, 50, "℃"),
"lubricating_oil_temperature": (30, 45, "℃"),
"inlet_water_pressure": (8.5, 10.5, "MPa"),
"draft_tube_vacuum": (-0.05, -0.02, "MPa"),
"guide_vane_opening": (30, 90, "%"),
"throughflow": (400, 600, "m³/s"),
"instantaneous_power": (3000, 5000, "kW")
"special_metrics": ["cumulative_power_generation"]
"generator_standard": { # 发电机_常规型
"status_metrics": ["grid_connection_status", "excitation_system_status", "fault_code"],
"stator_winding_temperature": (40, 80, "℃"),
"rotor_winding_temperature": (35, 75, "℃"),
"stator_core_temperature": (30, 70, "℃"),
"stator_voltage": (10.0, 11.0, "kV"),
"stator_current": (1.8, 2.2, "kA"),
"rotor_excitation_current": (0.3, 0.5, "kA"),
"rotor_excitation_voltage": (180, 220, "V"),
"shaft_radial_vibration": (0.1, 2.8, "mm/s"),
"bearing_temperature_generator_side": (30, 50, "℃")
"special_metrics": ["cumulative_power_generator_side"]
"governor_francis": { # 调速器_混流式配套
"status_metrics": ["control_mode", "oil_pressure_system_status", "fault_code"],
"guide_vane_opening_command": (30, 90, "%"),
"guide_vane_opening_feedback": (30, 90, "%"),
"main_oil_supply_pressure": (2.5, 4.0, "MPa"),
"oil_pressure_device_level": (40, 60, "%"),
"governor_response_time": (0.1, 0.5, "s"),
"speed_deviation": (-0.5, 0.5, "r/min"),
"hydraulic_oil_temperature": (30, 45, "℃")
"special_metrics": ["cumulative_guide_vane_operations"]
"inlet_valve_spherical": { # 进水阀_球形
"status_metrics": ["valve_open_close_status", "sealing_status", "fault_code"],
"valve_opening": (0, 100, "%"),
"sealing_water_pressure": (0.2, 0.5, "MPa"),
"operating_oil_pressure": (2.0, 3.5, "MPa"),
"valve_closing_time": (15, 30, "s"),
"valve_stem_seal_temperature": (25, 40, "℃")
"special_metrics": ["cumulative_valve_operations"]
CUMULATIVE_METRICS_INIT = {
"turbine": {"cumulative_power_generation": random.uniform(500000, 2000000)},
"generator": {"cumulative_power_generator_side": random.uniform(500000, 2000000)},
"governor": {"cumulative_guide_vane_operations": random.randint(1000, 5000)},
"inlet_valve": {"cumulative_valve_operations": random.randint(100, 500)}
# -------------------------- 工具函数 --------------------------
def get_correct_timestamp():
"""获取当前UTC时间戳(格式:YYYY-MM-DD HH:MM:SS.fff,带UTC标识)"""
utc_now = datetime.now(timezone.utc) # 带时区信息的UTC时间
# 格式化为"YYYY-MM-DD HH:MM:SS.fff",末尾添加" UTC"便于识别
return utc_now.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + " UTC"
def get_mqtt_error_msg(code):
return error_map.get(code, f"未知错误(代码:{code})")
def check_server(broker, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex((broker, port)) == 0
print(f"服务器连通性检查失败:{str(e)}")
def generate_value(min_val, max_val):
"""生成指定范围的随机数值(自动适配小数位数)"""
value = random.uniform(min_val, max_val)
if max_val - min_val < 1:
return round(value, 3) # 小范围(如振动)保留3位小数
return round(value, 2) # 中等范围(如温度)保留2位小数
return round(value, 1) # 大范围(如流量)保留1位小数
# -------------------------- MQTT回调函数 --------------------------
def on_connect(client, userdata, flags, reason_code, properties):
print(f"✅ 电站[{plant_id}] 成功连接MQTT服务器:{MQTT_BROKER}:{MQTT_PORT}")
plant_connections[plant_id] = True
print(f"❌ 电站[{plant_id}] 连接失败(代码:{reason_code}):{get_mqtt_error_msg(reason_code)}")
plant_connections[plant_id] = False
def on_disconnect(client, userdata, reason_code, properties):
print(f"⚠️ 电站[{plant_id}] 意外断开连接(代码:{reason_code}):{get_mqtt_error_msg(reason_code)}")
plant_connections[plant_id] = False
# -------------------------- 核心逻辑 --------------------------
def get_plant_devices(plant_id):
"""获取指定电站的所有设备类型及对应主题"""
plant_cfg = PLANTS_INFO[plant_id]
"equipment_type_abbr": dev_type,
"equipment_type": plant_cfg["core_equipment"][dev_type]["equipment_type"],
"equipment_id": plant_cfg["core_equipment"][dev_type]["equipment_id"],
"metric_config_key": plant_cfg["core_equipment"][dev_type]["metric_config_key"],
"mqtt_topic": plant_cfg["core_equipment"][dev_type]["mqtt_topic"] # 设备对应的主题
for dev_type in plant_cfg["core_equipment"].keys()
def generate_device_full_data(plant_id, device_info):
global CUMULATIVE_METRICS_INIT
plant_cfg = PLANTS_INFO[plant_id]
dev_type_name = device_info["equipment_type_abbr"]
metric_cfg_key = device_info["metric_config_key"]
metric_cfg = ALL_EQUIPMENT_METRICS[metric_cfg_key]
"timestamp": get_correct_timestamp(), # 使用UTC时间戳
"plant_base_id": plant_cfg["plant_base_id"],
"equipment_type": device_info["equipment_type"],
"equipment_type_abbr": dev_type_name,
"equipment_id": device_info["equipment_id"]
for metric in metric_cfg["status_metrics"]:
if "operation_status" in metric or "valve_open_close_status" in metric:
status = "normal" if "operation" in metric else "fully_open" if "valve" in metric else "active"
status_data[metric] = status if random.random() > 0.01 else "abnormal"
elif "control_mode" in metric:
status_data[metric] = "automatic" if random.random() > 0.1 else "manual"
elif "fault_code" in metric:
status_data[metric] = 0 if random.random() > 0.01 else random.randint(1, 10)
elif "status" in metric:
status_data[metric] = "normal" if random.random() > 0.01 else "abnormal"
for metric, (min_val, max_val, _) in metric_cfg["numeric_metrics"].items():
numeric_data[metric] = generate_value(min_val, max_val)
for metric in metric_cfg["special_metrics"]:
if "power_generation" in metric:
if dev_type_name == "turbine":
power_min, power_max, _ = ALL_EQUIPMENT_METRICS[metric_cfg_key]["numeric_metrics"]["instantaneous_power"]
voltage_min, voltage_max, _ = metric_cfg["numeric_metrics"]["stator_voltage"]
voltage = generate_value(voltage_min, voltage_max)
current_min, current_max, _ = metric_cfg["numeric_metrics"]["stator_current"]
power_min, power_max = voltage * current_min * 1.732 * 0.85, voltage * current_max * 1.732 * 0.85
instant_power = generate_value(power_min, power_max)
CUMULATIVE_METRICS_INIT[plant_id][dev_type_name][metric] += instant_power * (DEVICE_PUBLISH_INTERVAL / 3600)
special_data[metric] = round(CUMULATIVE_METRICS_INIT[plant_id][dev_type_name][metric], 2)
elif "operations" in metric:
if random.random() > 0.99:
CUMULATIVE_METRICS_INIT[plant_id][dev_type_name][metric] += 1
special_data[metric] = CUMULATIVE_METRICS_INIT[plant_id][dev_type_name][metric]
def publish_plant_devices(client, plant_id):
plant_devices = get_plant_devices(plant_id)
device_count = len(plant_devices)
print(f"\n电站[{plant_id}] 初始化完成:共 {device_count} 个设备,每个设备发布到独立主题")
for dev in plant_devices:
print(f" - {dev['equipment_type_abbr']}: {dev['mqtt_topic']}")
if not plant_connections[plant_id]:
print(f"⏳ 电站[{plant_id}] 等待MQTT连接...")
for device in plant_devices:
if not plant_connections[plant_id]:
dev_name = device["equipment_type_abbr"]
dev_topic = device["mqtt_topic"] # 获取该设备的专用主题
device_full_data = generate_device_full_data(plant_id, device)
json_data = json.dumps(device_full_data, ensure_ascii=False)
result = client.publish(dev_topic, json_data, qos=1)
result.wait_for_publish()
print(f"📤 发布【{dev_name}】到主题 {dev_topic}:{json_data}\n")
print(f"❌ 【{dev_name}】数据发布失败:{str(e)}")
plant_connections[plant_id] = False
time.sleep(DEVICE_PUBLISH_INTERVAL)
if plant_connections[plant_id]:
print(f"🔄 电站[{plant_id}] 完成一轮设备数据发布,等待 {CYCLE_WAIT_TIME} 秒后开始下一轮...\n")
time.sleep(CYCLE_WAIT_TIME)
# -------------------------- 客户端创建与主逻辑 --------------------------
def create_plant_mqtt_client(plant_id):
client_id = f"hydropower_pub_{plant_id}_{random.randint(1000, 9999)}"
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client.on_connect = on_connect
client.on_disconnect = on_disconnect
print(" 水电站设备数据发布系统(UTC时间+按设备类型分主题)")
plant_cfg = PLANTS_INFO[plant_id]
plant_devices = get_plant_devices(plant_id)
print(f" - 电站标识:{plant_cfg['plant_base_id']}")
print(f" - 基础主题:{MQTT_BASE_TOPIC}")
print(f" - 设备数量:{len(plant_devices)}个")
print(f" - 发布间隔:每个设备消息间隔 {DEVICE_PUBLISH_INTERVAL} 秒")
print(f" - 时间戳:使用UTC标准时间(格式:YYYY-MM-DD HH:MM:SS.fff UTC)") # 说明时间类型
print(f"\n🔍 检查MQTT服务器 {MQTT_BROKER}:{MQTT_PORT} 连通性...")
if not check_server(MQTT_BROKER, MQTT_PORT):
print(f"❌ MQTT服务器 {MQTT_BROKER}:{MQTT_PORT} 不可达!")
if MQTT_BROKER == "localhost":
print("💡 提示:本地服务器需安装Mosquitto,或切换为公共服务器(broker.emqx.io)")
client = create_plant_mqtt_client(plant_id)
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEPALIVE)
clients[plant_id] = client
start_time = time.time()
while not plant_connections[plant_id] and (time.time() - start_time) < timeout:
if not plant_connections[plant_id]:
print(f"\n⚠️ 电站[{plant_id}] 连接超时,无法启动数据发布!")
print(f"\n✅ 电站[{plant_id}] 已连接,开始按设备类型发布数据(按Ctrl+C停止)...")
publish_thread = threading.Thread(
target=publish_plant_devices,
args=(client, plant_id),
except KeyboardInterrupt:
print("\n\n🔌 收到退出指令,关闭系统...")
print(f"\n❌ 系统异常:{str(e)}")
for client in clients.values():
if __name__ == "__main__":