299 lines
7.9 KiB
Python
299 lines
7.9 KiB
Python
import network
|
|
import ntptime
|
|
import time
|
|
import weather_display
|
|
import weather_requests
|
|
from PicoOled13 import get
|
|
from machine import Pin, RTC, Timer
|
|
import urequests as requests
|
|
import _thread
|
|
|
|
|
|
# Weather API functions
|
|
def get_weather_data(url):
|
|
response = None
|
|
try:
|
|
response = requests.get(url)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data
|
|
else:
|
|
print(f"Error: Status {response.status_code}")
|
|
print(response.text)
|
|
return None
|
|
except Exception as e:
|
|
print(f"Request failed: {e}")
|
|
return None
|
|
finally:
|
|
# Always close the response to free sockets
|
|
if response:
|
|
try:
|
|
response.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
# Global variables
|
|
latitude = 50.9097
|
|
longitude = -1.4043
|
|
today_forecast = None
|
|
current_view = "simple"
|
|
station = None
|
|
display = None
|
|
weather_disp = None
|
|
setup_complete = False
|
|
weather_update_in_progress = False
|
|
|
|
# WiFi credentials
|
|
SSID = 'octopod'
|
|
PASSWORD = 'amniotic-duo-portfolio'
|
|
|
|
|
|
update_requested = False
|
|
view_change_requested = False
|
|
|
|
|
|
# Check WiFi and reconnect if needed
|
|
def ensure_wifi_connected():
|
|
global station
|
|
|
|
if not station.isconnected():
|
|
print("WiFi disconnected, reconnecting...")
|
|
station.connect(SSID, PASSWORD)
|
|
|
|
|
|
start_time = time.time()
|
|
while not station.isconnected():
|
|
if time.time() - start_time > 15:
|
|
print("Failed to reconnect to WiFi")
|
|
return False
|
|
time.sleep(1)
|
|
|
|
return True
|
|
|
|
|
|
def update_weather():
|
|
global today_forecast, weather_update_in_progress
|
|
|
|
|
|
if weather_update_in_progress:
|
|
print("Weather update already in progress, ignoring request")
|
|
return
|
|
|
|
weather_update_in_progress = True
|
|
|
|
try:
|
|
import network
|
|
wlan = network.WLAN(network.STA_IF)
|
|
if not wlan.active():
|
|
wlan.active(True)
|
|
if not wlan.isconnected():
|
|
wlan.connect(SSID, PASSWORD)
|
|
print("Reconnecting WiFi from thread...")
|
|
start_time = time.time()
|
|
while not wlan.isconnected():
|
|
if time.time() - start_time > 10:
|
|
print("Thread failed to connect WiFi")
|
|
display.auto_text("WiFi error (thread)")
|
|
return
|
|
time.sleep(0.5)
|
|
|
|
display.auto_text("Updating weather...")
|
|
print("Thread WiFi connected. IP:", wlan.ifconfig()[0])
|
|
|
|
print("Fetching daily forecast...")
|
|
display.auto_text("Getting forecast...")
|
|
daily_data = weather_requests.get_daily(latitude, longitude)
|
|
if not daily_data:
|
|
print("Failed to get daily data")
|
|
display.auto_text("Error: daily data")
|
|
return
|
|
|
|
print("Fetching hourly forecast...")
|
|
hourly_data = weather_requests.get_hourly(latitude, longitude)
|
|
if not hourly_data:
|
|
print("Failed to get hourly data")
|
|
display.auto_text("Error: hourly data")
|
|
return
|
|
|
|
new_forecast = daily_data
|
|
new_forecast['current_temp'] = hourly_data['hourly']['temperature_2m'][0]
|
|
today_forecast = new_forecast
|
|
print("Weather data updated successfully")
|
|
|
|
weather_disp.reset_display()
|
|
|
|
|
|
display.auto_text("Weather OK")
|
|
|
|
except Exception as e:
|
|
print("Weather thread exception:", e)
|
|
display.auto_text("Update error")
|
|
|
|
finally:
|
|
weather_update_in_progress = False
|
|
|
|
def trigger_weather_update():
|
|
_thread.start_new_thread(update_weather, ())
|
|
|
|
def update_display():
|
|
global display, weather_disp, today_forecast, current_view
|
|
|
|
if today_forecast:
|
|
if current_view == "detailed":
|
|
weather_disp.display_weather(today_forecast)
|
|
else:
|
|
weather_disp.display_simple_weather(today_forecast)
|
|
else:
|
|
display.auto_text("Press KEY0 to update")
|
|
|
|
|
|
|
|
def is_leap(year):
|
|
return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
|
|
|
|
def days_in_month(year, month):
|
|
if month == 2:
|
|
return 29 if is_leap(year) else 28
|
|
return [31, 30, 31, 30, 31, 31, 30, 31, 30, 31, 30, 31][month - 1]
|
|
|
|
def weekday(year, month, day):
|
|
if month < 3:
|
|
month += 12
|
|
year -= 1
|
|
q = day
|
|
m = month
|
|
K = year % 100
|
|
J = year // 100
|
|
return (q + 13*(m + 1)//5 + K + K//4 + J//4 + 5*J) % 7
|
|
|
|
def last_sunday(year, month):
|
|
"""Return (day, hour) of the last Sunday in a given month."""
|
|
dim = days_in_month(year, month)
|
|
for d in range(dim, dim - 7, -1):
|
|
if weekday(year, month, d) == 1:
|
|
return d
|
|
return None
|
|
|
|
def is_dst(year, month, day, hour=12):
|
|
"""Returns True if the given UTC time is during UK DST (British Summer Time)."""
|
|
march_last_sunday = last_sunday(year, 3)
|
|
dst_start = (3, march_last_sunday, 1)
|
|
|
|
# DST ends last Sunday of October at 2:00
|
|
oct_last_sunday = last_sunday(year, 10)
|
|
dst_end = (10, oct_last_sunday, 2)
|
|
|
|
current = (month, day, hour)
|
|
|
|
return dst_start <= current < dst_end
|
|
|
|
|
|
def main_loop():
|
|
global setup_complete, update_requested, view_change_requested
|
|
|
|
while not setup_complete:
|
|
time.sleep(0.1)
|
|
|
|
print("Main loop starting...")
|
|
display.auto_text("Ready. Press KEY0 to update")
|
|
|
|
update_weather()
|
|
last_weather = time.ticks_ms()
|
|
WEATHER_MS = 10 * 60 * 1000
|
|
|
|
while True:
|
|
update_display()
|
|
if time.ticks_diff(time.ticks_ms(), last_weather) >= WEATHER_MS:
|
|
update_weather()
|
|
last_weather = time.ticks_ms()
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
# Setup function
|
|
def setup():
|
|
global station, display, weather_disp, setup_complete
|
|
|
|
station = network.WLAN(network.STA_IF)
|
|
station.active(True)
|
|
station.connect(SSID, PASSWORD)
|
|
print("Connecting to WiFi...")
|
|
|
|
timeout = 15
|
|
start_time = time.time()
|
|
while not station.isconnected():
|
|
if time.time() - start_time > timeout:
|
|
print("Failed to connect to WiFi. Check your SSID and password.")
|
|
break
|
|
time.sleep(1)
|
|
|
|
if station.isconnected():
|
|
ip = station.ifconfig()[0]
|
|
print("Connected to WiFi! IP address:", ip)
|
|
rtc = RTC()
|
|
try:
|
|
print("Syncing time with NTP server...")
|
|
for _ in range(3):
|
|
try:
|
|
ntptime.settime()
|
|
break
|
|
except:
|
|
print("NTP retry...")
|
|
time.sleep(1)
|
|
|
|
current_time = list(rtc.datetime())
|
|
year = current_time[0]
|
|
month = current_time[1]
|
|
day = current_time[2]
|
|
hour = current_time[4]
|
|
if is_dst(year, month, day, hour):
|
|
print("British Summer Time (BST) is active - adding 1 hour")
|
|
current_time[4] += 1
|
|
if current_time[4] >= 24:
|
|
current_time[4] -= 24
|
|
current_time[2] += 1
|
|
|
|
rtc.datetime(tuple(current_time))
|
|
print("RTC set with UK local time:", rtc.datetime())
|
|
except OSError as e:
|
|
print("Error syncing time:", e)
|
|
else:
|
|
print("WiFi connection not established. Restart and try again.")
|
|
raise SystemExit
|
|
|
|
display = get()
|
|
weather_disp = weather_display.WeatherDisplay(display)
|
|
button0 = Pin(display.KEY0, Pin.IN, Pin.PULL_UP)
|
|
button1 = Pin(display.KEY1, Pin.IN, Pin.PULL_UP)
|
|
button0.irq(trigger=Pin.IRQ_FALLING, handler=detailed_view)
|
|
button1.irq(trigger=Pin.IRQ_FALLING, handler=simple_view)
|
|
display.auto_text("Starting weather display...")
|
|
setup_complete = True
|
|
|
|
print("Setup complete!")
|
|
|
|
|
|
def detailed_view(pin):
|
|
global current_view
|
|
print("Detailed view")
|
|
current_view = "detailed"
|
|
|
|
def simple_view(pin):
|
|
global current_view
|
|
print("Simple view")
|
|
current_view = "simple"
|
|
|
|
# Main function
|
|
def main():
|
|
setup()
|
|
main_loop()
|
|
|
|
while True:
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |