Alexander Berry-Roe 92defbe958 Add initial weather display application with API integration
Includes weather data retrieval from Open-Meteo API and basic WiFi setup for connectivity. IntelliJ project configuration files and a `.gitignore` for IDE-specific files are also added.
2025-05-15 00:50:06 +01:00

326 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import network
import ntptime
import time
import weather_display
import weather_requests
from PicoOled13 import get
from machine import Pin, RTC, Timer
import urequests as requests
import _thread
# Weather API functions
def get_weather_data(url):
response = None
try:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
return data
else:
print(f"Error: Status {response.status_code}")
print(response.text)
return None
except Exception as e:
print(f"Request failed: {e}")
return None
finally:
# Always close the response to free sockets
if response:
try:
response.close()
except:
pass
# Global variables
latitude = 50.9097
longitude = -1.4043
today_forecast = None
current_view = "simple"
station = None # Global WiFi station
display = None # Global display object
weather_disp = None # Global weather display object
setup_complete = False # Flag to indicate if setup is complete
weather_update_in_progress = False # Flag to prevent multiple concurrent updates
# WiFi credentials
SSID = 'octopod'
PASSWORD = 'amniotic-duo-portfolio'
# Event flags for button presses
update_requested = False
view_change_requested = False
# Check WiFi and reconnect if needed
def ensure_wifi_connected():
global station
if not station.isconnected():
print("WiFi disconnected, reconnecting...")
station.connect(SSID, PASSWORD)
# Wait for connection with timeout
start_time = time.time()
while not station.isconnected():
if time.time() - start_time > 15: # 15 second timeout
print("Failed to reconnect to WiFi")
return False
time.sleep(1)
return True
def update_weather():
global today_forecast, weather_update_in_progress
if weather_update_in_progress:
print("Weather update already in progress, ignoring request")
return
weather_update_in_progress = True
try:
import network
wlan = network.WLAN(network.STA_IF)
if not wlan.active():
wlan.active(True)
if not wlan.isconnected():
wlan.connect(SSID, PASSWORD)
print("Reconnecting WiFi from thread...")
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 10:
print("Thread failed to connect WiFi")
display.auto_text("WiFi error (thread)")
return
time.sleep(0.5)
display.auto_text("Updating weather...")
print("Thread WiFi connected. IP:", wlan.ifconfig()[0])
print("Fetching daily forecast...")
display.auto_text("Getting forecast...")
daily_data = weather_requests.get_daily(latitude, longitude)
if not daily_data:
print("Failed to get daily data")
display.auto_text("Error: daily data")
return
print("Fetching hourly forecast...")
hourly_data = weather_requests.get_hourly(latitude, longitude)
if not hourly_data:
print("Failed to get hourly data")
display.auto_text("Error: hourly data")
return
new_forecast = daily_data
new_forecast['current_temp'] = hourly_data['hourly']['temperature_2m'][0]
today_forecast = new_forecast
print("Weather data updated successfully")
weather_disp.reset_display()
display.auto_text("Weather OK")
except Exception as e:
print("Weather thread exception:", e)
display.auto_text("Update error")
finally:
weather_update_in_progress = False
def trigger_weather_update():
_thread.start_new_thread(update_weather, ())
# Update display based on current data
def update_display():
global display, weather_disp, today_forecast, current_view
if today_forecast:
if current_view == "detailed":
weather_disp.display_weather(today_forecast)
else:
weather_disp.display_simple_weather(today_forecast)
else:
display.auto_text("Press KEY0 to update")
def is_leap(year):
return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
def days_in_month(year, month):
if month == 2:
return 29 if is_leap(year) else 28
return [31, 30, 31, 30, 31, 31, 30, 31, 30, 31, 30, 31][month - 1]
def weekday(year, month, day):
# Zellers congruence (0=Saturday, 1=Sunday, ..., 6=Friday)
if month < 3:
month += 12
year -= 1
q = day
m = month
K = year % 100
J = year // 100
return (q + 13*(m + 1)//5 + K + K//4 + J//4 + 5*J) % 7
def last_sunday(year, month):
"""Return (day, hour) of the last Sunday in a given month."""
dim = days_in_month(year, month)
for d in range(dim, dim - 7, -1):
if weekday(year, month, d) == 1: # Sunday
return d
return None # should never happen
def is_dst(year, month, day, hour=12):
"""Returns True if the given UTC time is during UK DST (British Summer Time)."""
# DST starts last Sunday of March at 1:00
march_last_sunday = last_sunday(year, 3)
dst_start = (3, march_last_sunday, 1)
# DST ends last Sunday of October at 2:00
oct_last_sunday = last_sunday(year, 10)
dst_end = (10, oct_last_sunday, 2)
current = (month, day, hour)
return dst_start <= current < dst_end
# Main loop (synchronous version)
def main_loop():
global setup_complete, update_requested, view_change_requested
# Wait for setup to complete before proceeding
while not setup_complete:
time.sleep(0.1)
print("Main loop starting...")
display.auto_text("Ready. Press KEY0 to update")
update_weather() # initial call
last_weather = time.ticks_ms()
WEATHER_MS = 10 * 60 * 1000 # 10 min in milliseconds
while True:
update_display()
# Has 10 minutes elapsed?
if time.ticks_diff(time.ticks_ms(), last_weather) >= WEATHER_MS:
update_weather()
last_weather = time.ticks_ms()
time.sleep(0.5)
# Setup function
def setup():
global station, display, weather_disp, setup_complete
# --- WiFi Connection Setup ---
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(SSID, PASSWORD)
print("Connecting to WiFi...")
timeout = 15 # Extended timeout
start_time = time.time()
while not station.isconnected():
if time.time() - start_time > timeout:
print("Failed to connect to WiFi. Check your SSID and password.")
break
time.sleep(1)
if station.isconnected():
ip = station.ifconfig()[0] # Standard way to get IP
print("Connected to WiFi! IP address:", ip)
# Set up the RTC using NTP
rtc = RTC()
try:
print("Syncing time with NTP server...")
# Retry NTP time sync
for _ in range(3):
try:
ntptime.settime()
break
except:
print("NTP retry...")
time.sleep(1)
# Get current time from RTC
current_time = list(rtc.datetime())
year = current_time[0]
month = current_time[1]
day = current_time[2]
hour = current_time[4]
# Adjust for UK time (UTC+0/+1)
if is_dst(year, month, day, hour):
print("British Summer Time (BST) is active - adding 1 hour")
current_time[4] += 1
if current_time[4] >= 24:
current_time[4] -= 24
current_time[2] += 1
# Update RTC with the adjusted time
rtc.datetime(tuple(current_time))
print("RTC set with UK local time:", rtc.datetime())
except OSError as e:
print("Error syncing time:", e)
else:
print("WiFi connection not established. Restart and try again.")
raise SystemExit
# Initialize display
display = get()
weather_disp = weather_display.WeatherDisplay(display)
# Setup button interrupts
button0 = Pin(display.KEY0, Pin.IN, Pin.PULL_UP)
button1 = Pin(display.KEY1, Pin.IN, Pin.PULL_UP)
button0.irq(trigger=Pin.IRQ_FALLING, handler=detailed_view)
button1.irq(trigger=Pin.IRQ_FALLING, handler=simple_view)
# Display initial message
display.auto_text("Starting weather display...")
# Set flag to indicate setup is complete
setup_complete = True
print("Setup complete!")
def detailed_view(pin):
global current_view
print("Detailed view")
current_view = "detailed"
def simple_view(pin):
global current_view
print("Simple view")
current_view = "simple"
# Main function
def main():
# Run setup
setup()
# Start the main loop
main_loop()
while True:
time.sleep(1)
# Run the main program
if __name__ == "__main__":
main()