Lucas преди 7 месеца
родител
ревизия
fca5f929b1
променени са 3 файла, в които са добавени 251 реда и са изтрити 51 реда
  1. BIN
      __pycache__/dobluetooth.cpython-313.pyc
  2. 75 37
      dobluetooth.py
  3. 176 14
      graphing.py

BIN
__pycache__/dobluetooth.cpython-313.pyc


+ 75 - 37
dobluetooth.py

@@ -1,47 +1,85 @@
 import asyncio
-from bleak import BleakScanner
-from bleak import BleakClient
+from bleak import BleakScanner, BleakClient
 import time
 
+dataarr = [0.0] * 6
+lastval = [0.0] * 6
+
 target_name = "Long name works now"
-global target_address
 target_address = None
+gclient = None
+
+SERVICE_UUID =        "d86aecf2-d87d-489f-b664-b02de82b2fc0"
+CHARACTERISTIC_UUID = "d86aecf2-d87d-489f-b664-b02de82b2fc0"
 
-SERVICE_UUID=        "d86aecf2-d87d-489f-b664-b02de82b2fc0"
-CHARACTERISTIC_UUID= "d86aecf2-d87d-489f-b664-b02de82b2fc0"
+def getDataArr() -> list[float]:
+    global dataarr, lastval
+    if isinstance(dataarr, list) and all(isinstance(i, float) for i in dataarr):
+        lastval = dataarr
+        return dataarr
+    else:
+        return lastval
 
 async def connect():
-    devices = await BleakScanner.discover()
-    for d in devices:
-        print(d)
-        if target_name == d.name:
-            target_address = d.address
-            print("found target {} bluetooth device with address {} ".format(target_name,target_address))
-            return target_address
-            break
-        else:
-            print("could not find device")
+    global target_address
+    print("Scanning for BLE devices...")
+    while True:
+        try:
+            devices = await BleakScanner.discover()
+            for d in devices:
+                print("Found:", d.name or "Unnamed", d.address)
+                if d.name == target_name:
+                    target_address = d.address
+                    print(f"✅ Found target device '{target_name}' at {target_address}")
+                    return target_address
+            print("❌ Target device not found. Retrying in 3s...")
+            await asyncio.sleep(3)
+        except Exception as e:
+            print("⚠ BLE scan error:", e)
+            await asyncio.sleep(3)
 
 async def getData():
-    target_address = await connect()
-    if target_address is not None:
-        async with BleakClient(target_address) as client:
-            print(f"Connected: {client.is_connected}")
-
-            while 1:
-                try:
-                    data = await client.read_gatt_char(CHARACTERISTIC_UUID)
-                    datastr = data.decode('utf-8') #convert byte to str
-                    dataarr = convertToNumbers(datastr)
-                    print("time: {} ||| numbers: {}".format(time.time(), dataarr))
-                except Exception:
-                    print("failed to get data, restarting connection.")
-
-def convertToNumbers(stringlist):
-    dlist = stringlist.split(",")
-    dlist = dlist[0:-1]
-    intlist = []
-
-    for string in dlist:
-        intlist.append(float(string))
-    return intlist
+    global target_address, gclient
+
+    while True:
+        try:
+            if target_address is None:
+                await connect()
+
+            print("🔌 Attempting to connect to BLE device...")
+            async with BleakClient(target_address) as client:
+                gclient = client
+                if not client.is_connected:
+                    print("❌ Failed to connect. Retrying...")
+                    await asyncio.sleep(2)
+                    continue
+
+                print(f"✅ Connected to {target_address}")
+                while client.is_connected:
+                    try:
+                        data = await client.read_gatt_char(CHARACTERISTIC_UUID)
+                        datastr = data.decode('utf-8')
+                        global dataarr
+                        dataarr = convertToNumbers(datastr)
+                        print("📡 Data:", dataarr)
+                        await asyncio.sleep(0.05)  # Optional: slow it down a bit
+                    except Exception as e:
+                        print("⚠ Read error:", e)
+                        break  # Break loop and reconnect
+        except Exception as e:
+            print("⚠ Connection error:", e)
+
+        print("🔁 Reconnecting in 3s...")
+        await asyncio.sleep(3)
+
+def convertToNumbers(stringlist) -> list[float]:
+    try:
+        dlist = stringlist.strip().split(",")
+        dlist = [float(val) for val in dlist if val != '']
+        if len(dlist) >= 6:
+            return dlist[0:6]  # Already scaled: [gyroX, gyroY, gyroZ, accX, accY, accZ]
+        else:
+            return lastval
+    except Exception as e:
+        print("⚠️ Conversion error:", e)
+        return lastval

+ 176 - 14
graphing.py

@@ -1,39 +1,201 @@
+import threading
 import pygame
 import sys
+import dobluetooth
+import math
 
 # Initialize Pygame
 pygame.init()
+pygame.font.init()
+
+from collections import deque
+
+# Rolling history buffers
+graph_length = 200
+# Gyroscope history
+gyro_mag_history = deque(maxlen=graph_length)
+gyro_x_history = deque(maxlen=graph_length)
+gyro_y_history = deque(maxlen=graph_length)
+gyro_z_history = deque(maxlen=graph_length)
+
+# Accelerometer history
+acc_x_history = deque(maxlen=graph_length)
+acc_y_history = deque(maxlen=graph_length)
+acc_z_history = deque(maxlen=graph_length)
+
 
 # Set up the display
-width, height = 800, 600
+width, height = 1000, 550
 screen = pygame.display.set_mode((width, height))
-pygame.display.set_caption("Draw a Circle")
+pygame.display.set_caption("MPU BLE Visualizer")
+
+# Fonts
+font = pygame.font.SysFont("Arial", 20)
+warning_font = pygame.font.SysFont("Arial", 32, bold=True)
 
-# Define colors
+# Colors
 WHITE = (255, 255, 255)
-BLUE = (0, 102, 255)
+GRAY = (180, 180, 180)
+BLACK = (0, 0, 0)
+GYRO_COLORS = [(0, 255, 255), (255, 0, 255), (255, 255, 0)]  # rotX, rotY, rotZ
+ACCEL_COLORS = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]        # accX, accY, accZ
+
+# Circle setup
+padding = 50
+circle_spacing = (width - 2 * padding) // 6
+circle_radius = 40
+circle_positions = [(padding + i * circle_spacing, 150) for i in range(6)]
+
+# Axis labels
+axis_labels = ['rotX', 'rotY', 'rotZ', 'accX', 'accY', 'accZ']
+
+# Force threshold
+HIGH_FORCE_THRESHOLD = 3.00
+
+# Start BLE thread
+def ble_thread_func():
+    import asyncio
+    loop = asyncio.new_event_loop()
+    asyncio.set_event_loop(loop)
+    loop.run_until_complete(dobluetooth.getData())
+
+ble_thread = threading.Thread(target=ble_thread_func, daemon=True)
+ble_thread.start()
+
+def draw_graph(surface, history, color, y_base, height, height_scale=1.0):
+    if len(history) < 2:
+        return
+    points = []
+    for i, val in enumerate(history):
+        x = i + 50
+        y = y_base + height // 2 - val * height_scale
+        y = max(y_base, min(y_base + height, y))  # Clamp inside graph area
+        points.append((x, y))
+    pygame.draw.lines(surface, color, False, points, 2)
 
-# Circle parameters
-circle_center = (width // 2, height // 2)
-circle_radius = 75
 
 # Main loop
 running = True
+clock = pygame.time.Clock()
+
 while running:
-    # Handle events
+    screen.fill(WHITE)
+
     for event in pygame.event.get():
         if event.type == pygame.QUIT:
             running = False
 
-    # Fill the background
-    screen.fill(WHITE)
+    try:
+        data = dobluetooth.getDataArr()
+        high_force : bool = False
+
+        if len(data) >= 6:
+            # Accelerometer values
+            accX, accY, accZ = data[3:6]
+            acc_x_history.append(accX)
+            acc_y_history.append(accY)
+            acc_z_history.append(accZ)
+
+            acc_magnitude = math.sqrt(accX**2 + accY**2 + accZ**2)
+
+            if acc_magnitude > HIGH_FORCE_THRESHOLD:
+                high_force = True
+
+            rotX, rotY, rotZ = data[0:3]
+            gyro_magnitude = math.sqrt(rotX**2 + rotY**2 + rotZ**2)
+
+            if gyro_magnitude < 10:
+                gyro_condition = "🟢 Stable"
+                gyro_color = (0, 200, 0)
+            elif gyro_magnitude < 75:
+                gyro_condition = "🟡 Moving"
+                gyro_color = (255, 200, 0)
+            else:
+                gyro_condition = "🔴 High Rotation"
+                gyro_color = (255, 50, 50)
+
+            # Create the font surface
+            gyro_status_surface = warning_font.render(f"Gyro Status: {gyro_condition}", True, gyro_color)
+
+            # Position it — move it higher up on the screen
+            gyro_status_rect = gyro_status_surface.get_rect(center=(width // 2, 250))
+
+            # Blit it to the screen
+            screen.blit(gyro_status_surface, gyro_status_rect)
+
+            # Add to history
+            gyro_mag_history.append(gyro_magnitude)
+            gyro_x_history.append(rotX)
+            gyro_y_history.append(rotY)
+            gyro_z_history.append(rotZ)
+
+            for i in range(6):
+                val = abs(data[i])
+
+                if i < 3:  # Gyroscope
+                    scale = min(val / 5.0, 1.0)
+                    base_color = GYRO_COLORS[i]
+                else:  # Accelerometer
+                    scale = min(val / 5.0, 1.0)  # ✅ Fixed scale
+                    base_color = ACCEL_COLORS[i - 3]
+
+                color = [int(c * scale) for c in base_color]
+                pygame.draw.circle(screen, color, circle_positions[i], circle_radius)
+
+                # Axis label
+                label_surface = font.render(axis_labels[i], True, BLACK)
+                label_rect = label_surface.get_rect(center=(circle_positions[i][0], circle_positions[i][1] + circle_radius + 10))
+                screen.blit(label_surface, label_rect)
+
+                # Value label
+                val_surface = font.render(f"{val:.2f}", True, BLACK)
+                val_rect = val_surface.get_rect(center=(circle_positions[i][0], circle_positions[i][1] - circle_radius - 10))
+                screen.blit(val_surface, val_rect)
+
+        else:
+            for pos in circle_positions:
+                pygame.draw.circle(screen, GRAY, pos, circle_radius)
+
+    except Exception as e:
+        print("Error parsing BLE data:", e)
+
+    # Display "High Force" warning
+    if high_force:
+        warning_text = warning_font.render("⚠ High Force Detected!", True, (255, 0, 0))
+        warning_rect = warning_text.get_rect(center=(width // 2, 50))
+        screen.blit(warning_text, warning_rect)
+
+    # === GRAPH VISUALIZATION ===
+    # Where to place the graphs
+    graph_area_top = 300# Leave top area for circles/status
+    graph_spacing = 120   # Vertical space per graph
+
+    # Graph heights
+    graph_height = 80
+
+    # Top Y positions for gyro and accel graphs
+    gyro_top = graph_area_top
+    accel_top = gyro_top + graph_spacing
+
+    # Background
+    pygame.draw.rect(screen, (245, 245, 245), (0, graph_area_top, width, 220))
+
+    # Labels
+    screen.blit(font.render("Gyro X/Y/Z (°/s)", True, (0, 0, 0)), (10, gyro_top - 15))
+    screen.blit(font.render("Accel X/Y/Z (g)", True, (0, 0, 0)), (10, accel_top - 15))
+
+    # Gyroscope graph
+    draw_graph(screen, gyro_x_history, (255, 0, 0), gyro_top, graph_height, height_scale=10)
+    draw_graph(screen, gyro_y_history, (0, 255, 0), gyro_top, graph_height, height_scale=10)
+    draw_graph(screen, gyro_z_history, (0, 0, 255), gyro_top, graph_height, height_scale=10)
 
-    # Draw the circle
-    pygame.draw.circle(screen, BLUE, circle_center, circle_radius)
+    # Accelerometer graph
+    draw_graph(screen, acc_x_history, (255, 128, 0), accel_top, graph_height, height_scale=25)
+    draw_graph(screen, acc_y_history, (0, 200, 200), accel_top, graph_height, height_scale=25)
+    draw_graph(screen, acc_z_history, (128, 0, 255), accel_top, graph_height, height_scale=25)
 
-    # Update the display
     pygame.display.flip()
+    clock.tick(60)
 
-# Quit Pygame
 pygame.quit()
 sys.exit()