import network import ntptime import time import weather_display import weather_requests from PicoOled13 import get from machine import Pin, RTC, Timer import urequests as requests import _thread # Weather API functions def get_weather_data(url): response = None try: response = requests.get(url) if response.status_code == 200: data = response.json() return data else: print(f"Error: Status {response.status_code}") print(response.text) return None except Exception as e: print(f"Request failed: {e}") return None finally: # Always close the response to free sockets if response: try: response.close() except: pass # Global variables latitude = 50.9097 longitude = -1.4043 today_forecast = None current_view = "simple" station = None display = None weather_disp = None setup_complete = False weather_update_in_progress = False # WiFi credentials SSID = 'octopod' PASSWORD = 'amniotic-duo-portfolio' update_requested = False view_change_requested = False # Check WiFi and reconnect if needed def ensure_wifi_connected(): global station if not station.isconnected(): print("WiFi disconnected, reconnecting...") station.connect(SSID, PASSWORD) start_time = time.time() while not station.isconnected(): if time.time() - start_time > 15: print("Failed to reconnect to WiFi") return False time.sleep(1) return True def update_weather(): global today_forecast, weather_update_in_progress if weather_update_in_progress: print("Weather update already in progress, ignoring request") return weather_update_in_progress = True try: import network wlan = network.WLAN(network.STA_IF) if not wlan.active(): wlan.active(True) if not wlan.isconnected(): wlan.connect(SSID, PASSWORD) print("Reconnecting WiFi from thread...") start_time = time.time() while not wlan.isconnected(): if time.time() - start_time > 10: print("Thread failed to connect WiFi") display.auto_text("WiFi error (thread)") return time.sleep(0.5) display.auto_text("Updating weather...") print("Thread WiFi connected. IP:", wlan.ifconfig()[0]) print("Fetching daily forecast...") display.auto_text("Getting forecast...") daily_data = weather_requests.get_daily(latitude, longitude) if not daily_data: print("Failed to get daily data") display.auto_text("Error: daily data") return print("Fetching hourly forecast...") hourly_data = weather_requests.get_hourly(latitude, longitude) if not hourly_data: print("Failed to get hourly data") display.auto_text("Error: hourly data") return new_forecast = daily_data new_forecast['current_temp'] = hourly_data['hourly']['temperature_2m'][0] today_forecast = new_forecast print("Weather data updated successfully") weather_disp.reset_display() display.auto_text("Weather OK") except Exception as e: print("Weather thread exception:", e) display.auto_text("Update error") finally: weather_update_in_progress = False def trigger_weather_update(): _thread.start_new_thread(update_weather, ()) def update_display(): global display, weather_disp, today_forecast, current_view if today_forecast: if current_view == "detailed": weather_disp.display_weather(today_forecast) else: weather_disp.display_simple_weather(today_forecast) else: display.auto_text("Press KEY0 to update") def is_leap(year): return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0) def days_in_month(year, month): if month == 2: return 29 if is_leap(year) else 28 return [31, 30, 31, 30, 31, 31, 30, 31, 30, 31, 30, 31][month - 1] def weekday(year, month, day): if month < 3: month += 12 year -= 1 q = day m = month K = year % 100 J = year // 100 return (q + 13*(m + 1)//5 + K + K//4 + J//4 + 5*J) % 7 def last_sunday(year, month): """Return (day, hour) of the last Sunday in a given month.""" dim = days_in_month(year, month) for d in range(dim, dim - 7, -1): if weekday(year, month, d) == 1: return d return None def is_dst(year, month, day, hour=12): """Returns True if the given UTC time is during UK DST (British Summer Time).""" march_last_sunday = last_sunday(year, 3) dst_start = (3, march_last_sunday, 1) # DST ends last Sunday of October at 2:00 oct_last_sunday = last_sunday(year, 10) dst_end = (10, oct_last_sunday, 2) current = (month, day, hour) return dst_start <= current < dst_end def main_loop(): global setup_complete, update_requested, view_change_requested while not setup_complete: time.sleep(0.1) print("Main loop starting...") display.auto_text("Ready. Press KEY0 to update") update_weather() last_weather = time.ticks_ms() WEATHER_MS = 10 * 60 * 1000 while True: update_display() if time.ticks_diff(time.ticks_ms(), last_weather) >= WEATHER_MS: update_weather() last_weather = time.ticks_ms() time.sleep(0.5) # Setup function def setup(): global station, display, weather_disp, setup_complete station = network.WLAN(network.STA_IF) station.active(True) station.connect(SSID, PASSWORD) print("Connecting to WiFi...") timeout = 15 start_time = time.time() while not station.isconnected(): if time.time() - start_time > timeout: print("Failed to connect to WiFi. Check your SSID and password.") break time.sleep(1) if station.isconnected(): ip = station.ifconfig()[0] print("Connected to WiFi! IP address:", ip) rtc = RTC() try: print("Syncing time with NTP server...") for _ in range(3): try: ntptime.settime() break except: print("NTP retry...") time.sleep(1) current_time = list(rtc.datetime()) year = current_time[0] month = current_time[1] day = current_time[2] hour = current_time[4] if is_dst(year, month, day, hour): print("British Summer Time (BST) is active - adding 1 hour") current_time[4] += 1 if current_time[4] >= 24: current_time[4] -= 24 current_time[2] += 1 rtc.datetime(tuple(current_time)) print("RTC set with UK local time:", rtc.datetime()) except OSError as e: print("Error syncing time:", e) else: print("WiFi connection not established. Restart and try again.") raise SystemExit display = get() weather_disp = weather_display.WeatherDisplay(display) button0 = Pin(display.KEY0, Pin.IN, Pin.PULL_UP) button1 = Pin(display.KEY1, Pin.IN, Pin.PULL_UP) button0.irq(trigger=Pin.IRQ_FALLING, handler=detailed_view) button1.irq(trigger=Pin.IRQ_FALLING, handler=simple_view) display.auto_text("Starting weather display...") setup_complete = True print("Setup complete!") def detailed_view(pin): global current_view print("Detailed view") current_view = "detailed" def simple_view(pin): global current_view print("Simple view") current_view = "simple" # Main function def main(): setup() main_loop() while True: time.sleep(1) if __name__ == "__main__": main()