import ctypes import os import sys import time import random from opcua import Client, ua import threading # Adjustable variables Sending_Port = 1 # Port to send DMX values (fixed, adjustable) Receiving_Port = 2 # Port to receive DMX data (fixed, adjustable) RX_DMX_START_CH = 1 # Starting DMX channel for receive (adjustable) RX_COUNT = 5 # Number of channels to receive (adjustable, e.g., 1 to 5) RX_PLC_START_IDX = 100 # Starting PLC index for receive data (adjustable) TX_PLC_START_IDX = 100 # Starting PLC index for transmit data (adjustable) TX_COUNT = 5 # Number of channels to transmit (adjustable, e.g., 1 to 5) TX_DMX_START_CH = 1 # Starting DMX channel for transmit (adjustable) TX_RATE_S = 0.04 # DMX update rate (~25 FPS) ALIVE_WINDOW_S = 1.0 # Alive tag toggle window (seconds) # Specify the directory and DLL path G2_DIR = r"C:\libGadget\x64\bin" G2_DLL = os.path.join(G2_DIR, "GadgetDLL.dll") # OPC UA configuration PLC_OPC_URL = "opc.tcp://10.0.11.2:4840" DMX_IN_TAG = 'ns=4;i=12' # BYTE[512] from DMX -> PLC (mirror 1..5 for testing) DMX_OUT_TAG = 'ns=4;i=535' # BYTE[512] from PLC -> DMX (send 1..5 for testing) RX_ALIVE_TAG = 'ns=4;i=1058' # BOOL TX_ALIVE_TAG = 'ns=4;i=1069' # BOOL # Add DLL directory to the search path (Python 3.8+) if hasattr(os, "add_dll_directory"): os.add_dll_directory(G2_DIR) # Load the DLL g = ctypes.CDLL(G2_DLL) # Define C types c_uint = ctypes.c_uint c_char_p = ctypes.c_char_p c_bool = ctypes.c_bool c_ubyte = ctypes.c_ubyte c_short = ctypes.c_short # For raw data (unsigned short *) c_int = ctypes.c_int # For SetRawReceiveMode return type # Define function signatures based on GadgetDLL.h GetDllVersion = g.Gadget2_GetDllVersion GetDllVersion.restype = c_char_p Connect = g.Gadget2_Connect Connect.restype = c_bool Disconnect = g.Gadget2_Disconnect GetNumDevs = g.Gadget2_GetNumGadgetDevices GetNumDevs.restype = c_uint GetPorts = g.Gadget2_GetPortCount GetPorts.argtypes = [c_uint] GetPorts.restype = c_ubyte GetType = g.Gadget2_GetGadgetType GetType.argtypes = [c_uint] GetType.restype = c_char_p GetVersion = g.Gadget2_GetGadgetVersion GetVersion.argtypes = [c_uint] GetVersion.restype = c_char_p GetSerial = g.Gadget2_GetGadgetSerialNumber GetSerial.argtypes = [c_uint] GetSerial.restype = ctypes.c_uint32 SendDMX = g.Gadget2_SendDMX SendDMX.argtypes = [c_uint, c_uint, ctypes.POINTER(c_ubyte), c_uint] # DeviceNum, PortNum, Buffer, Size SetRawReceiveMode = g.Gadget2_SetRawReceiveMode SetRawReceiveMode.argtypes = [c_uint, c_uint] SetRawReceiveMode.restype = c_int # Returns 0 on success, -1 on failure GetNumberOfRXRawBytes = g.Gadget2_GetNumberOfRXRawBytes GetNumberOfRXRawBytes.argtypes = [c_uint, c_uint] GetNumberOfRXRawBytes.restype = c_uint GetRXRawBytes = g.Gadget2_GetRXRawBytes GetRXRawBytes.argtypes = [c_uint, c_uint, ctypes.POINTER(c_short), c_uint] # DeviceNum, PortNum, Data, Length # Global variables for OPC UA opc_client = None dmx_in_data = (ctypes.c_ubyte * 512)() # 512-byte array for received DMX dmx_out_data = (ctypes.c_ubyte * 512)() # 512-byte array for sent DMX rx_alive = False tx_alive = False def opcua_update(): global opc_client, dmx_in_data, dmx_out_data, rx_alive, tx_alive while True: try: if opc_client is None: opc_client = Client(PLC_OPC_URL) opc_client.connect() print("Connected to OPC UA server at", PLC_OPC_URL) # Read DMX_OUT data from PLC dmx_out_node = opc_client.get_node(DMX_OUT_TAG) plc_data = dmx_out_node.get_value() if isinstance(plc_data, list) and len(plc_data) >= TX_PLC_START_IDX + TX_COUNT: for i in range(TX_COUNT): dmx_out_data[TX_DMX_START_CH - 1 + i] = plc_data[TX_PLC_START_IDX - 1 + i] tx_alive = not tx_alive # Toggle TX alive opc_client.get_node(TX_ALIVE_TAG).set_value(ua.Variant(tx_alive, ua.VariantType.Boolean)) # Write DMX_IN data to PLC (simplified write) dmx_in_node = opc_client.get_node(DMX_IN_TAG) rx_data = list(dmx_in_data) if len(rx_data) >= RX_PLC_START_IDX + RX_COUNT: plc_in_data = [rx_data[i] for i in range(RX_PLC_START_IDX - 1, RX_PLC_START_IDX - 1 + RX_COUNT)] dmx_in_node.set_value(plc_in_data) # Simplified write without status/timestamps rx_alive = not rx_alive # Toggle RX alive opc_client.get_node(RX_ALIVE_TAG).set_value(ua.Variant(rx_alive, ua.VariantType.Boolean)) except Exception as e: print(f"OPC UA error: {e}") if opc_client is not None: opc_client.disconnect() opc_client = None time.sleep(ALIVE_WINDOW_S) # Print DLL information print("DLL:", G2_DLL) print("DLL ver:", (GetDllVersion() or b'').decode()) # Initialize the Gadget2 interface if not Connect(): sys.exit("Connect False") try: # Wait up to 5 seconds for devices to be detected t = time.time() while time.time() - t < 5 and GetNumDevs() == 0: time.sleep(0.1) print("Waiting for device detection...") # Get the number of Gadget devices n = int(GetNumDevs()) print("devices:", n) if n: ports = int(GetPorts(0)) print("ports:", ports) print("type:", (GetType(0) or b'').decode()) print("fw :", (GetVersion(0) or b'').decode()) print("sn :", int(GetSerial(0))) # Set receiving port to raw receive mode if SetRawReceiveMode(0, Receiving_Port) == 0: print(f"Port {Receiving_Port} set to raw receive mode successfully.") else: print(f"Failed to set port {Receiving_Port} to raw receive mode.") print(f"Note: Ensure {Receiving_Port} is valid and a DMX loopback cable is used on {Sending_Port} for receive testing.") # Start OPC UA update thread opc_thread = threading.Thread(target=opcua_update, daemon=True) opc_thread.start() # Start sending DMX values from PLC and reading to PLC print(f"Starting DMX bridge: Sending to port {Sending_Port}, receiving from port {Receiving_Port}. Press Ctrl+C to stop...") while True: # Send DMX data from PLC (TX) SendDMX(0, Sending_Port, dmx_out_data, 512) # Print sent values for the specified range tx_range = [dmx_out_data[TX_DMX_START_CH - 1 + i] for i in range(TX_COUNT)] print(f"Sent DMX values to device 0, port {Sending_Port} (channels {TX_DMX_START_CH} to {TX_DMX_START_CH + TX_COUNT - 1}): {tx_range}") # Attempt to read raw data from the receiving port num_bytes = GetNumberOfRXRawBytes(0, Receiving_Port) print(f"Number of raw bytes available on port {Receiving_Port}: {num_bytes}") if num_bytes > 0: raw_data = (c_short * num_bytes)() GetRXRawBytes(0, Receiving_Port, raw_data, num_bytes) # Print raw data sample for debugging print(f"Raw data sample from port {Receiving_Port}: {list(raw_data[:10])}...") # Accumulate data until 512 bytes, resetting on break temp_buffer = [] for i in range(num_bytes): value = raw_data[i] if value == 0x8000: # Reset on break temp_buffer = [] elif value != 0x9000: # Skip framing errors temp_buffer.append(value & 0xFF) # Add data byte if len(temp_buffer) >= 512: break # Pad or truncate to 512 while len(temp_buffer) < 512: temp_buffer.append(0) temp_buffer = temp_buffer[:512] # Update dmx_in_data with the full 512-byte frame for i in range(512): dmx_in_data[i] = temp_buffer[i] # Map received data to specific receive range rx_range = [dmx_in_data[i] for i in range(RX_DMX_START_CH - 1, RX_DMX_START_CH - 1 + RX_COUNT)] print(f"Received DMX data from port {Receiving_Port} (channels {RX_DMX_START_CH} to {RX_DMX_START_CH + RX_COUNT - 1}): {rx_range}") else: print(f"No raw data received on port {Receiving_Port}. Use a DMX loopback cable on port {Sending_Port} if needed.") # Sleep for the specified rate time.sleep(TX_RATE_S) else: print("No devices detected. Check connections and drivers.") input("Press Enter to exit…") except KeyboardInterrupt: print("\nStopping DMX transmission.") except Exception as e: print(f"Error during transmission: {e}") finally: try: Disconnect() except: pass if opc_client is not None: opc_client.disconnect() print("Gadget2 interface disconnected.")