299 lines
7.9 KiB
Python

import network
import ntptime
import time
import weather_display
import weather_requests
from PicoOled13 import get
from machine import Pin, RTC, Timer
import urequests as requests
import _thread
# Weather API functions
def get_weather_data(url):
response = None
try:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
return data
else:
print(f"Error: Status {response.status_code}")
print(response.text)
return None
except Exception as e:
print(f"Request failed: {e}")
return None
finally:
# Always close the response to free sockets
if response:
try:
response.close()
except:
pass
# Global variables
latitude = 50.9097
longitude = -1.4043
today_forecast = None
current_view = "simple"
station = None
display = None
weather_disp = None
setup_complete = False
weather_update_in_progress = False
# WiFi credentials
SSID = 'octopod'
PASSWORD = 'amniotic-duo-portfolio'
update_requested = False
view_change_requested = False
# Check WiFi and reconnect if needed
def ensure_wifi_connected():
global station
if not station.isconnected():
print("WiFi disconnected, reconnecting...")
station.connect(SSID, PASSWORD)
start_time = time.time()
while not station.isconnected():
if time.time() - start_time > 15:
print("Failed to reconnect to WiFi")
return False
time.sleep(1)
return True
def update_weather():
global today_forecast, weather_update_in_progress
if weather_update_in_progress:
print("Weather update already in progress, ignoring request")
return
weather_update_in_progress = True
try:
import network
wlan = network.WLAN(network.STA_IF)
if not wlan.active():
wlan.active(True)
if not wlan.isconnected():
wlan.connect(SSID, PASSWORD)
print("Reconnecting WiFi from thread...")
start_time = time.time()
while not wlan.isconnected():
if time.time() - start_time > 10:
print("Thread failed to connect WiFi")
display.auto_text("WiFi error (thread)")
return
time.sleep(0.5)
display.auto_text("Updating weather...")
print("Thread WiFi connected. IP:", wlan.ifconfig()[0])
print("Fetching daily forecast...")
display.auto_text("Getting forecast...")
daily_data = weather_requests.get_daily(latitude, longitude)
if not daily_data:
print("Failed to get daily data")
display.auto_text("Error: daily data")
return
print("Fetching hourly forecast...")
hourly_data = weather_requests.get_hourly(latitude, longitude)
if not hourly_data:
print("Failed to get hourly data")
display.auto_text("Error: hourly data")
return
new_forecast = daily_data
new_forecast['current_temp'] = hourly_data['hourly']['temperature_2m'][0]
today_forecast = new_forecast
print("Weather data updated successfully")
weather_disp.reset_display()
display.auto_text("Weather OK")
except Exception as e:
print("Weather thread exception:", e)
display.auto_text("Update error")
finally:
weather_update_in_progress = False
def trigger_weather_update():
_thread.start_new_thread(update_weather, ())
def update_display():
global display, weather_disp, today_forecast, current_view
if today_forecast:
if current_view == "detailed":
weather_disp.display_weather(today_forecast)
else:
weather_disp.display_simple_weather(today_forecast)
else:
display.auto_text("Press KEY0 to update")
def is_leap(year):
return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
def days_in_month(year, month):
if month == 2:
return 29 if is_leap(year) else 28
return [31, 30, 31, 30, 31, 31, 30, 31, 30, 31, 30, 31][month - 1]
def weekday(year, month, day):
if month < 3:
month += 12
year -= 1
q = day
m = month
K = year % 100
J = year // 100
return (q + 13*(m + 1)//5 + K + K//4 + J//4 + 5*J) % 7
def last_sunday(year, month):
"""Return (day, hour) of the last Sunday in a given month."""
dim = days_in_month(year, month)
for d in range(dim, dim - 7, -1):
if weekday(year, month, d) == 1:
return d
return None
def is_dst(year, month, day, hour=12):
"""Returns True if the given UTC time is during UK DST (British Summer Time)."""
march_last_sunday = last_sunday(year, 3)
dst_start = (3, march_last_sunday, 1)
# DST ends last Sunday of October at 2:00
oct_last_sunday = last_sunday(year, 10)
dst_end = (10, oct_last_sunday, 2)
current = (month, day, hour)
return dst_start <= current < dst_end
def main_loop():
global setup_complete, update_requested, view_change_requested
while not setup_complete:
time.sleep(0.1)
print("Main loop starting...")
display.auto_text("Ready. Press KEY0 to update")
update_weather()
last_weather = time.ticks_ms()
WEATHER_MS = 10 * 60 * 1000
while True:
update_display()
if time.ticks_diff(time.ticks_ms(), last_weather) >= WEATHER_MS:
update_weather()
last_weather = time.ticks_ms()
time.sleep(0.5)
# Setup function
def setup():
global station, display, weather_disp, setup_complete
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect(SSID, PASSWORD)
print("Connecting to WiFi...")
timeout = 15
start_time = time.time()
while not station.isconnected():
if time.time() - start_time > timeout:
print("Failed to connect to WiFi. Check your SSID and password.")
break
time.sleep(1)
if station.isconnected():
ip = station.ifconfig()[0]
print("Connected to WiFi! IP address:", ip)
rtc = RTC()
try:
print("Syncing time with NTP server...")
for _ in range(3):
try:
ntptime.settime()
break
except:
print("NTP retry...")
time.sleep(1)
current_time = list(rtc.datetime())
year = current_time[0]
month = current_time[1]
day = current_time[2]
hour = current_time[4]
if is_dst(year, month, day, hour):
print("British Summer Time (BST) is active - adding 1 hour")
current_time[4] += 1
if current_time[4] >= 24:
current_time[4] -= 24
current_time[2] += 1
rtc.datetime(tuple(current_time))
print("RTC set with UK local time:", rtc.datetime())
except OSError as e:
print("Error syncing time:", e)
else:
print("WiFi connection not established. Restart and try again.")
raise SystemExit
display = get()
weather_disp = weather_display.WeatherDisplay(display)
button0 = Pin(display.KEY0, Pin.IN, Pin.PULL_UP)
button1 = Pin(display.KEY1, Pin.IN, Pin.PULL_UP)
button0.irq(trigger=Pin.IRQ_FALLING, handler=detailed_view)
button1.irq(trigger=Pin.IRQ_FALLING, handler=simple_view)
display.auto_text("Starting weather display...")
setup_complete = True
print("Setup complete!")
def detailed_view(pin):
global current_view
print("Detailed view")
current_view = "detailed"
def simple_view(pin):
global current_view
print("Simple view")
current_view = "simple"
# Main function
def main():
setup()
main_loop()
while True:
time.sleep(1)
if __name__ == "__main__":
main()