| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import asyncio
- from bleak import BleakScanner, BleakClient
- dataarr = [0.0] * 11
- lastval = [0.0] * 11
- target_name = "MPU6050_BLE"
- target_address = None
- gclient = None
- SERVICE_UUID = "d86aecf2-d87d-489f-b664-b02de82b2fc0"
- CHARACTERISTIC_UUID = "d86aecf2-d87d-489f-b664-b02de82b2fc0"
- def getDataArr() -> list[float]:
- global dataarr, lastval
- if isinstance(dataarr, list) and all(isinstance(i, float) for i in dataarr):
- lastval = dataarr
- return dataarr
- else:
- return lastval
- async def connect():
- global target_address
- print("Scanning for BLE devices...")
- while True:
- try:
- devices = await BleakScanner.discover()
- for d in devices:
- print("Found:", d.name or "Unnamed", d.address)
- if d.name == target_name:
- target_address = d.address
- print(f"✅ Found target device '{target_name}' at {target_address}")
- return target_address
- print("❌ Target device not found. Retrying in 3s...")
- await asyncio.sleep(3)
- except Exception as e:
- print("⚠ BLE scan error:", e)
- await asyncio.sleep(3)
- async def getData():
- global target_address, gclient
- while True:
- try:
- if target_address is None:
- await connect()
- print("🔌 Attempting to connect to BLE device...")
- async with BleakClient(target_address) as client:
- gclient = client
- if not client.is_connected:
- print("❌ Failed to connect. Retrying...")
- await asyncio.sleep(2)
- continue
- print(f"✅ Connected to {target_address}")
- while client.is_connected:
- try:
- data = await client.read_gatt_char(CHARACTERISTIC_UUID)
- datastr = data.decode('utf-8')
- global dataarr
- dataarr = convertToNumbers(datastr)
- print("📡 Data:", dataarr)
- await asyncio.sleep(0.05)
- except Exception as e:
- print("⚠ Read error:", e)
- break
- except Exception as e:
- print("⚠ Connection error:", e)
- print("🔁 Reconnecting in 3s...")
- await asyncio.sleep(3)
- def convertToNumbers(stringlist) -> list[float]:
- try:
- dlist = stringlist.strip().split(",")
- dlist = [float(val) for val in dlist if val != '']
- if len(dlist) >= 11:
- return dlist[0:11]
- else:
- return lastval
- except Exception as e:
- print("⚠️ Conversion error:", e)
- return lastval
|