diff --git a/drizzle/20241104190811_lucia.sql b/drizzle/20241104190811_lucia.sql new file mode 100644 index 0000000..691820a --- /dev/null +++ b/drizzle/20241104190811_lucia.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS "sessions" ( + "id" text PRIMARY KEY NOT NULL, + "user_id" text NOT NULL, + "expires_at" timestamp with time zone NOT NULL +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "users" ( + "id" text PRIMARY KEY NOT NULL, + "age" integer, + "username" text NOT NULL, + "password_hash" text NOT NULL, + CONSTRAINT "users_username_unique" UNIQUE("username") +); +--> statement-breakpoint +DO $$ BEGIN + ALTER TABLE "sessions" ADD CONSTRAINT "sessions_user_id_users_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."users"("id") ON DELETE no action ON UPDATE no action; +EXCEPTION + WHEN duplicate_object THEN null; +END $$; diff --git a/drizzle/20250614083214_lucia.sql b/drizzle/20250614083214_lucia.sql deleted file mode 100644 index 5b65242..0000000 --- a/drizzle/20250614083214_lucia.sql +++ /dev/null @@ -1,43 +0,0 @@ -CREATE TABLE "floors" ( - "floor" integer PRIMARY KEY NOT NULL, - "url" text NOT NULL, - "image" text -); ---> statement-breakpoint -CREATE TABLE "plans" ( - "floor" integer PRIMARY KEY NOT NULL, - "plan" json NOT NULL -); ---> statement-breakpoint -CREATE TABLE "sensor_data" ( - "uuid" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, - "sensor" text NOT NULL, - "temperature" real NOT NULL, - "humidity" real NOT NULL, - "pressure" real NOT NULL, - "altitude" real NOT NULL, - "time" timestamp with time zone DEFAULT now() NOT NULL -); ---> statement-breakpoint -CREATE TABLE "sensors" ( - "id" text PRIMARY KEY NOT NULL, - "user" text NOT NULL -); ---> statement-breakpoint -CREATE TABLE "sessions" ( - "id" text PRIMARY KEY NOT NULL, - "user_id" text NOT NULL, - "expires_at" timestamp with time zone NOT NULL -); ---> statement-breakpoint -CREATE TABLE "users" ( - "id" text PRIMARY KEY NOT NULL, - "age" integer, - "username" text NOT NULL, - "password_hash" text NOT NULL, - CONSTRAINT "users_username_unique" UNIQUE("username") -); ---> statement-breakpoint -ALTER TABLE "sensor_data" ADD CONSTRAINT "sensor_data_sensor_sensors_id_fk" FOREIGN KEY ("sensor") REFERENCES "public"."sensors"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint -ALTER TABLE "sensors" ADD CONSTRAINT "sensors_user_users_id_fk" FOREIGN KEY ("user") REFERENCES "public"."users"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint -ALTER TABLE "sessions" ADD CONSTRAINT "sessions_user_id_users_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."users"("id") ON DELETE no action ON UPDATE no action; \ No newline at end of file diff --git a/drizzle/meta/20241104190811_snapshot.json b/drizzle/meta/20241104190811_snapshot.json new file mode 100644 index 0000000..13d527c --- /dev/null +++ b/drizzle/meta/20241104190811_snapshot.json @@ -0,0 +1,103 @@ +{ + "id": "4b3474a3-7de5-4b99-878e-f839b53c52f7", + "prevId": "00000000-0000-0000-0000-000000000000", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.sessions": { + "name": "sessions", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": { + "sessions_user_id_users_id_fk": { + "name": "sessions_user_id_users_id_fk", + "tableFrom": "sessions", + "tableTo": "users", + "columnsFrom": ["user_id"], + "columnsTo": ["id"], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.users": { + "name": "users", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "age": { + "name": "age", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "username": { + "name": "username", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "password_hash": { + "name": "password_hash", + "type": "text", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": { + "users_username_unique": { + "name": "users_username_unique", + "nullsNotDistinct": false, + "columns": ["username"] + } + }, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": {}, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} diff --git a/drizzle/meta/20250614083214_snapshot.json b/drizzle/meta/20250614083214_snapshot.json deleted file mode 100644 index 129a3c6..0000000 --- a/drizzle/meta/20250614083214_snapshot.json +++ /dev/null @@ -1,261 +0,0 @@ -{ - "id": "23f11c9d-f98b-4321-aca4-54ec7fc4eece", - "prevId": "00000000-0000-0000-0000-000000000000", - "version": "7", - "dialect": "postgresql", - "tables": { - "public.floors": { - "name": "floors", - "schema": "", - "columns": { - "floor": { - "name": "floor", - "type": "integer", - "primaryKey": true, - "notNull": true - }, - "url": { - "name": "url", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "image": { - "name": "image", - "type": "text", - "primaryKey": false, - "notNull": false - } - }, - "indexes": {}, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "policies": {}, - "checkConstraints": {}, - "isRLSEnabled": false - }, - "public.plans": { - "name": "plans", - "schema": "", - "columns": { - "floor": { - "name": "floor", - "type": "integer", - "primaryKey": true, - "notNull": true - }, - "plan": { - "name": "plan", - "type": "json", - "primaryKey": false, - "notNull": true - } - }, - "indexes": {}, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "policies": {}, - "checkConstraints": {}, - "isRLSEnabled": false - }, - "public.sensor_data": { - "name": "sensor_data", - "schema": "", - "columns": { - "uuid": { - "name": "uuid", - "type": "uuid", - "primaryKey": true, - "notNull": true, - "default": "gen_random_uuid()" - }, - "sensor": { - "name": "sensor", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "temperature": { - "name": "temperature", - "type": "real", - "primaryKey": false, - "notNull": true - }, - "humidity": { - "name": "humidity", - "type": "real", - "primaryKey": false, - "notNull": true - }, - "pressure": { - "name": "pressure", - "type": "real", - "primaryKey": false, - "notNull": true - }, - "altitude": { - "name": "altitude", - "type": "real", - "primaryKey": false, - "notNull": true - }, - "time": { - "name": "time", - "type": "timestamp with time zone", - "primaryKey": false, - "notNull": true, - "default": "now()" - } - }, - "indexes": {}, - "foreignKeys": { - "sensor_data_sensor_sensors_id_fk": { - "name": "sensor_data_sensor_sensors_id_fk", - "tableFrom": "sensor_data", - "tableTo": "sensors", - "columnsFrom": ["sensor"], - "columnsTo": ["id"], - "onDelete": "no action", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "policies": {}, - "checkConstraints": {}, - "isRLSEnabled": false - }, - "public.sensors": { - "name": "sensors", - "schema": "", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true - }, - "user": { - "name": "user", - "type": "text", - "primaryKey": false, - "notNull": true - } - }, - "indexes": {}, - "foreignKeys": { - "sensors_user_users_id_fk": { - "name": "sensors_user_users_id_fk", - "tableFrom": "sensors", - "tableTo": "users", - "columnsFrom": ["user"], - "columnsTo": ["id"], - "onDelete": "no action", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "policies": {}, - "checkConstraints": {}, - "isRLSEnabled": false - }, - "public.sessions": { - "name": "sessions", - "schema": "", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true - }, - "user_id": { - "name": "user_id", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "expires_at": { - "name": "expires_at", - "type": "timestamp with time zone", - "primaryKey": false, - "notNull": true - } - }, - "indexes": {}, - "foreignKeys": { - "sessions_user_id_users_id_fk": { - "name": "sessions_user_id_users_id_fk", - "tableFrom": "sessions", - "tableTo": "users", - "columnsFrom": ["user_id"], - "columnsTo": ["id"], - "onDelete": "no action", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "policies": {}, - "checkConstraints": {}, - "isRLSEnabled": false - }, - "public.users": { - "name": "users", - "schema": "", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true - }, - "age": { - "name": "age", - "type": "integer", - "primaryKey": false, - "notNull": false - }, - "username": { - "name": "username", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "password_hash": { - "name": "password_hash", - "type": "text", - "primaryKey": false, - "notNull": true - } - }, - "indexes": {}, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": { - "users_username_unique": { - "name": "users_username_unique", - "nullsNotDistinct": false, - "columns": ["username"] - } - }, - "policies": {}, - "checkConstraints": {}, - "isRLSEnabled": false - } - }, - "enums": {}, - "schemas": {}, - "sequences": {}, - "roles": {}, - "policies": {}, - "views": {}, - "_meta": { - "columns": {}, - "schemas": {}, - "tables": {} - } -} diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index c0eeced..2c97128 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -5,8 +5,8 @@ { "idx": 0, "version": "7", - "when": 1749889934943, - "tag": "20250614083214_lucia", + "when": 1730747291671, + "tag": "20241104190811_lucia", "breakpoints": true } ] diff --git a/src/lib/server/mqtt-devices.js b/src/lib/server/mqtt-devices.js index ad0b62d..8445506 100644 --- a/src/lib/server/mqtt-devices.js +++ b/src/lib/server/mqtt-devices.js @@ -1,8 +1,15 @@ +// src/lib/server/mqtt-devices.js +// This module provides access to MQTT device data for other parts of the application + +// We'll import the maps directly from the MQTT server module + let connectedDevices = new Map(); let deviceSensorData = new Map(); +// Export the maps so the MQTT server can set them export { connectedDevices, deviceSensorData }; +// Clean up offline devices (not seen in last 5 minutes) function cleanupDevices() { const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); let updated = false; @@ -17,6 +24,7 @@ function cleanupDevices() { return updated; } +// Export function to get current devices export function getCurrentDevices() { cleanupDevices(); return Array.from(connectedDevices.values()).map((device) => { @@ -28,6 +36,7 @@ export function getCurrentDevices() { }); } +// Export function to get sensor data for a specific device export function getDeviceSensorData(deviceId) { return deviceSensorData.get(deviceId) || null; } diff --git a/src/routes/(app)/[slug]/+page.server.ts b/src/routes/(app)/[slug]/+page.server.ts index 119d062..8ce7412 100644 --- a/src/routes/(app)/[slug]/+page.server.ts +++ b/src/routes/(app)/[slug]/+page.server.ts @@ -2,11 +2,14 @@ import { db } from "$lib/server/db"; import * as table from "$lib/server/db/schema"; import { eq } from "drizzle-orm"; import type { PageServerLoad } from "./$types"; +import { connect } from "mqtt"; import { getDeviceSensorData } from "$lib/server/mqtt-devices.js"; export const load: PageServerLoad = async ({ params }) => { + // Convert slug to number for floor lookup const floorNumber = Number(params.slug); + // First check if we have a saved floor configuration in the floors table const floorData = await db.select({ floor: table.floors.floor, url: table.floors.url @@ -14,8 +17,10 @@ export const load: PageServerLoad = async ({ params }) => { if (floorData.length > 0 && floorData[0].url && floorData[0].url !== "/") { try { + // Try to parse the saved configuration const config = JSON.parse(floorData[0].url); + // Add real sensor data to devices if (config.devices) { config.devices = config.devices.map(device => { const sensorData = getDeviceSensorData(device.id); @@ -36,6 +41,7 @@ export const load: PageServerLoad = async ({ params }) => { } } + // Fallback to the old canvas drawing system const floor_cnt = await db.select({ floor: table.plans.floor, json: table.plans.plan }).from(table.plans).where(eq(table.plans.floor, params.slug)); if (floor_cnt.length == 0) { await db.insert(table.plans).values({ floor: params.slug, plan: { diff --git a/src/routes/(app)/[slug]/+page.svelte b/src/routes/(app)/[slug]/+page.svelte index 45c60c6..b825992 100644 --- a/src/routes/(app)/[slug]/+page.svelte +++ b/src/routes/(app)/[slug]/+page.svelte @@ -14,6 +14,7 @@ let refreshInterval; onMount(() => { + // Connect to the SSE endpoint console.log("Attempting to connect to MQTT SSE..."); eventSource = new EventSource("/mqtt"); @@ -31,6 +32,7 @@ const messageData = JSON.parse(event.data); console.log("Floor page received SSE data:", messageData); + // Handle device updates if (messageData.devices) { console.log("Received devices:", messageData.devices); const newData = new Map(); @@ -44,6 +46,7 @@ console.log("Updated deviceSensorData:", deviceSensorData); } + // Handle MQTT message if present if (messageData.message) { mqttMessage = messageData.message; } else { @@ -56,7 +59,7 @@ eventSource.onerror = (error) => { console.error("SSE Error:", error); - eventSource.close(); + eventSource.close(); // Close the connection on error }; }); @@ -79,6 +82,7 @@ return; } + // Draw walls, doors, and furniture (simplified example) drawRegions(data.regions); drawDoors(data.doors); drawFurniture(data.furnitures); @@ -96,7 +100,7 @@ function drawDoors(doors) { doors.forEach((door) => { ctx.beginPath(); - ctx.rect(door.location.x, door.location.y, door.width, 5); + ctx.rect(door.location.x, door.location.y, door.width, 5); // Simplified door drawing ctx.stroke(); }); } @@ -451,6 +455,7 @@
{#if filteredData.length > 0} +
Latest
@@ -492,8 +497,10 @@
+
+ {#if filteredData.length > 1} {@const minValue = Math.min(...filteredData.map((d) => d[selectedSensor]))} {@const maxValue = Math.max(...filteredData.map((d) => d[selectedSensor]))} @@ -502,6 +509,7 @@ {@const chartWidth = 800 - padding * 2} {@const chartHeight = 400 - padding * 2} + + {maxValue.toFixed(1)} @@ -536,12 +545,14 @@ text-anchor="end">{minValue.toFixed(1)} + {#if filteredData.length > 0} {@const firstDate = filteredData[0].date} {@const lastDate = filteredData[filteredData.length - 1].date} {@const midIndex = Math.floor(filteredData.length / 2)} {@const midDate = filteredData[midIndex].date} + {@const formatDate = (date) => { if (timeRange === "7d") { return date.toLocaleDateString("en-US", { @@ -579,6 +590,7 @@ > {/if} + { + // Get all sensor data (like the original working version) console.log(event.locals.session.userId) const rawData = await db.select({ altitude: table.sensorData.altitude, @@ -14,9 +15,11 @@ export const GET = async (event) => { user: table.sensors.user, }).from(table.sensorData).innerJoin(table.sensors, eq(table.sensors.id, table.sensorData.sensor)).where(eq(table.sensors.user, event.locals.session.userId)); + // Scale pressure values to be in a similar range as other sensors + // Divide by 1000 to convert from Pa to kPa (more reasonable scale) const data = rawData.map(item => ({ ...item, - pressure: Math.round((item.pressure / 1000) * 10) / 10 + pressure: Math.round((item.pressure / 1000) * 10) / 10 // Convert to kPa with 1 decimal place })); console.log(`Returning ${data.length} data points`); diff --git a/src/routes/(app)/settings/+page.server.js b/src/routes/(app)/settings/+page.server.js index 20e70f4..1939028 100644 --- a/src/routes/(app)/settings/+page.server.js +++ b/src/routes/(app)/settings/+page.server.js @@ -5,13 +5,16 @@ import { fail } from "@sveltejs/kit"; import { eq } from "drizzle-orm"; export const load = async (event) => { + // Fetch all available floors const floors = await db .select({ floor: table.floors.floor }) .from(table.floors) .orderBy(table.floors.floor); + // Get real connected devices from MQTT let devices = getCurrentDevices(); + // If no real devices, provide fallback mock devices if (devices.length === 0) { devices = [ { @@ -82,6 +85,7 @@ export const actions = { await db.insert(table.sensors).values({ id: dev.id, user: event.locals.session.userId }); }); + // Check if floor exists const exists = await db .select({ floor: table.floors.floor }) .from(table.floors) @@ -91,6 +95,9 @@ export const actions = { return fail(400, { message: `Floor ${n} does not exist! Please create it first.` }); } + // Update floor with configuration + // Note: In a real implementation, you would store this data properly + // For now, we'll just update the url field as a JSON string const floorConfig = { image: floorPlanImage, devices: deviceData, diff --git a/src/routes/(app)/settings/+page.svelte b/src/routes/(app)/settings/+page.svelte index 7c79fee..9b57069 100644 --- a/src/routes/(app)/settings/+page.svelte +++ b/src/routes/(app)/settings/+page.svelte @@ -31,11 +31,13 @@ }); $effect(() => { + // Update devices when data changes if (data.devices) { availableDevices = data.devices; } }); + // Set up real-time updates via SSE onMount(() => { console.log("Settings: Attempting to connect to MQTT SSE..."); eventSource = new EventSource("/mqtt"); @@ -54,10 +56,13 @@ const messageData = JSON.parse(event.data); console.log("Settings page received SSE data:", messageData); + // Update device sensor data and device list from real-time updates if (messageData.devices) { console.log("Updating available devices with:", messageData.devices); + // Update available devices with latest data availableDevices = messageData.devices; + // Update sensor data map const newData = new Map(); messageData.devices.forEach((device) => { if (device.sensorData) { @@ -111,11 +116,14 @@ const x = ((event.clientX - rect.left) / rect.width) * 100; const y = ((event.clientY - rect.top) / rect.height) * 100; + // Check if device is already placed const existingIndex = placedDevices.findIndex((d) => d.id === draggedDevice.id); if (existingIndex >= 0) { + // Update position placedDevices[existingIndex] = { ...placedDevices[existingIndex], x, y }; } else { + // Add new device placedDevices = [...placedDevices, { ...draggedDevice, x, y }]; } @@ -236,6 +244,7 @@ return async ({ result }) => { if (result.type === "success") { saveMessage = { type: "success", text: result.data.message }; + // Clear form after successful save selectedFloorNumber = ""; } else if (result.type === "failure") { saveMessage = { diff --git a/src/routes/(auth)/login/+page.server.js b/src/routes/(auth)/login/+page.server.js index 686785a..e4b51c5 100644 --- a/src/routes/(auth)/login/+page.server.js +++ b/src/routes/(auth)/login/+page.server.js @@ -63,6 +63,7 @@ export const actions = { const userId = generateUserId(); const passwordHash = await hash(password, { + // recommended minimum parameters memoryCost: 19456, timeCost: 2, outputLen: 32, @@ -83,6 +84,7 @@ export const actions = { }; function generateUserId() { + // ID with 120 bits of entropy, or about the same as UUID v4. const bytes = crypto.getRandomValues(new Uint8Array(15)); const id = encodeBase64url(bytes); return id; diff --git a/src/routes/mqtt/+server.js b/src/routes/mqtt/+server.js index 999b0e7..75ae0aa 100644 --- a/src/routes/mqtt/+server.js +++ b/src/routes/mqtt/+server.js @@ -1,3 +1,4 @@ +// src/routes/mqtt/+server.js import { db } from "$lib/server/db"; import * as table from "$lib/server/db/schema"; import { connectedDevices, deviceSensorData, getCurrentDevices } from "$lib/server/mqtt-devices.js"; @@ -5,15 +6,20 @@ import { eq } from "drizzle-orm"; import mqtt from "mqtt"; import { writable } from "svelte/store"; +// A Svelte store to hold the latest MQTT message. +// In a real application, you might want to store more data or +// use a more robust way to manage messages, but for a basic example, this works. const latestMessage = writable("No message yet"); const devices = writable([]); let client = null; +// Function to update device status and sensor data function updateDevice(deviceId, sensorData = null) { const now = new Date(); const deviceName = `ESP8266 #${deviceId.slice(-6)}`; + // Update device info connectedDevices.set(deviceId, { id: deviceId, name: deviceName, @@ -22,6 +28,7 @@ function updateDevice(deviceId, sensorData = null) { lastSeen: now, }); + // Update sensor data if provided if (sensorData) { deviceSensorData.set(deviceId, { ...sensorData, @@ -29,9 +36,11 @@ function updateDevice(deviceId, sensorData = null) { }); } + // Update the devices store updateDevicesStore(); } +// Function to parse sensor data from MQTT payload function parseSensorData(payload) { try { const data = JSON.parse(payload); @@ -47,8 +56,10 @@ function parseSensorData(payload) { } } +// Store SSE controllers for cleanup const sseControllers = new Set(); +// Function to update devices store with latest data function updateDevicesStore() { const deviceList = Array.from(connectedDevices.values()).map((device) => { const sensorData = deviceSensorData.get(device.id); @@ -60,6 +71,7 @@ function updateDevicesStore() { devices.set(deviceList); } +// Clean up offline devices (not seen in last 5 minutes) function cleanupDevices() { const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); let updated = false; @@ -76,15 +88,21 @@ function cleanupDevices() { } } +// Run cleanup every minute setInterval(cleanupDevices, 60000); +// getCurrentDevices is now imported from the shared module + +// Function to connect to MQTT function connectMqtt() { if (client && client.connected) { - return; + return; // Already connected } - const BROKER_URL = "mqtt://kada49.it:1883"; - const TOPIC = "esp8266/+/data"; + // Replace with your MQTT broker details + const BROKER_URL = "mqtt://kada49.it:1883"; // Example public broker + //const BROKER_URL = "mqtt://test.mosquitto.org:1883"; + const TOPIC = "esp8266/+/data"; // Replace with your desired topic client = mqtt.connect(BROKER_URL); @@ -102,27 +120,32 @@ function connectMqtt() { client.on("message", async (topic, message) => { const payload = message.toString(); console.log(`Received message from topic "${topic}": ${payload}`); - latestMessage.set(payload); + latestMessage.set(payload); // Update the Svelte store + // Extract device ID from topic (esp8266/DEVICE_ID/data) const topicParts = topic.split("/"); if (topicParts.length >= 3 && topicParts[0] === "esp8266" && topicParts[2] === "data") { const deviceId = topicParts[1]; console.log(`Processing data for device: ${deviceId}`); + // Parse sensor data const sensorData = parseSensorData(payload); if (sensorData) { console.log(`Parsed sensor data:`, sensorData); updateDevice(deviceId, sensorData); const devices = await db.select().from(table.sensors).where(eq(table.sensors.id, deviceId)); if (devices.length == 1) - await db.insert(table.sensorData).values({ - sensor: deviceId, - temperature: sensorData.temperature, - humidity: sensorData.humidity, - altitude: sensorData.altitude, - pressure: sensorData.pressure, - }); + await db + .insert(table.sensorData) + .values({ + sensor: deviceId, + temperature: sensorData.temperature, + humidity: sensorData.humidity, + altitude: sensorData.altitude, + pressure: sensorData.pressure, + }); } else { + // Still update device as online even if data parsing failed updateDevice(deviceId); } } @@ -131,17 +154,20 @@ function connectMqtt() { client.on("error", (err) => { console.error(`MQTT error: ${err}`); if (client) { - client.end(); + client.end(); // Close connection on error } client = null; + // Implement re-connection logic here if needed }); client.on("close", () => { console.log("MQTT connection closed."); client = null; + // Implement re-connection logic here if needed }); } +// Connect to MQTT when the server starts connectMqtt(); /** @type {import("./$types").RequestHandler} */ @@ -160,8 +186,10 @@ export async function GET({ request }) { let isConnected = true; let interval; + // Add this controller to the broadcast set sseControllers.add(controller); + // Cleanup function const cleanup = () => { console.log("Cleaning up MQTT SSE connection"); isConnected = false; @@ -175,6 +203,7 @@ export async function GET({ request }) { } }; + // Send current device data function sendDevices() { if (!isConnected) { cleanup(); @@ -188,10 +217,12 @@ export async function GET({ request }) { timestamp: new Date().toISOString(), }; + // Check if controller is still valid before enqueueing if (isConnected) { controller.enqueue(`data: ${JSON.stringify(message)}\n\n`); } } catch (error) { + // Handle controller closed error specifically if ( error.code === "ERR_INVALID_STATE" || error.message.includes("Controller is already closed") @@ -204,10 +235,13 @@ export async function GET({ request }) { } } + // Send initial data sendDevices(); + // Send updates every 2 seconds interval = setInterval(sendDevices, 2000); + // Subscribe to MQTT messages via the store for real-time updates const unsubscribe = latestMessage.subscribe((message) => { if (message !== "No message yet" && isConnected) { try { @@ -231,6 +265,7 @@ export async function GET({ request }) { } }); + // Handle client disconnection request.signal.addEventListener("abort", () => { console.log("Client disconnected from MQTT SSE stream"); cleanup();