Jakiś czas temu pokazałem, że czasem warto użyć bazy danych w projekcie analitycznym. Przeczytasz o tym w moim artykule pt. SQLite i Python – czy warto? Zastanówmy się więc, jak możemy użyć bazy danych do skompletowania sobie aktualnych danych o koncentracji polutantów z GIOŚ.
Po co w ogóle się zajmować tym problemem? GIOŚ systematycznie udostępnia paczki z danymi z całego roku. Udostępniają też API, które umożliwia pobranie danych z aktualnego i dwóch wcześniejszych dni kalendarzowych. Mamy więc całkiem niezłą sytuację. Ale jest tutaj też pewna luka. W momencie publikacji tego artykułu najnowsza paczka danych to rok 2017. Przez API mamy dostęp do danych z 16, 17 i 18 lutego 2019. Widzimy więc, że trochę danych nam potencjalnie brakuje. Nie uda nam się ich już w tej chwili uzyskać (musimy poczekać na paczkę), ale możemy zastanowić się jak obejść ten problem na przyszłość.
Proces
Żeby dobrać się do wyników pomiarów z czujników, musimy wiedzieć, o jaki czujnik zapytać. A żeby wiedzieć, o jaki czujnik zapytać, musimy dowiedzieć się, w jakiej jest on stacji. A skąd będziemy wiedzieć, jakie mamy dostępne stacje? Tego dowiemy się, gdy zapytamy o wszystkie dostępne stacje. Mamy więc liniowy proces, w którym zaczynamy od uzyskania ogólnej informacji (stacje), następnie schodzimy o poziom niżej (czujniki), żeby w końcu dobrać się do „miąższu” (pomiary). Proces ten wygląda więc tak:
Osobnymi kolorami oznaczyłem tutaj rodzaj zapytania do API, a strzałki pokazują, które z uzyskanych danych (id) wykorzystuję w kolejnym zapytaniu. Nie jest to więc sytuacja nadmiernie skomplikowana.
Schemat bazy danych
Powyższa struktura na tyle mi się spodobała, że postanowiłem ją wykorzystać przy projektowaniu bazy danych. W praktyce sensowne okazało się użycie jej dokładnie w takiej formie. Mamy bowiem tutaj całkiem sensownie odseparowane powtarzalnych informacji. Nie zdziwiłbym się, gdyby pod spodem serwisu wystawiającego API nie były właśnie podobne tabele do tych, które właśnie omówię.
I tak w tabeli stations umieściłem informacje uzyskane z pierwszego zapytania (o wszystkie stacje). Mam tam aktualnie 179 rekordów z 10 polami opisującymi m.in. położenie stacji pomiarowych.
W tabeli sensors każdy rekord opisuje jeden sensor. Mamy tam jego id, informację jaki polutant mierzy oraz identyfikator stacji, w której jest umieszczony. Mam tam 715 rekordów i 6 pól.
Tabela zawierająca „miąższ” nazywa się readings i składa się z minimalnej sensownej ilości pól (3), czyli id czujnika, połączonej daty i godziny oraz wartości z czujnika. Po pierwszym uruchomieniu skryptu (który znajdziesz tutaj) powinno wpaść Ci tam około 40K (sic!) odczytów.
Pozostał nam jeszcze jeden element do omówienia, czyli klucz główny. W tabeli stations będzie to id, czyli w tym przypadku unikatowy identyfikator stacji. Z racji tego, że GIOŚ może chyba wprowadzać mniejsze lub większe zmiany np. w położeniu stacji, za każdym uruchomieniem skryptu aktualizuję te rekordy. Teoretycznie może to wprowadzić nieścisłości (np. przeniesienie stacji o 100 metrów i zmiana współrzędnych na nowe), ale ufam tutaj, że jeśli zmiana miałaby byś istotnie znacząca, to GIOŚ wprowadziłby nową stację do systemu a „wygasił” starą. W tabeli sensors kluczem znów jest id, ale tym razem jest to unikatowy identyfikator czujnika. Stosuję tutaj podejście takie jak powyżej, przy czym nie widzę tutaj zapotrzebowania na aktualizację tych wpisów w bazie danych. Pozostawiłem ten pomysł niejako z lenistwa, bo narzut czasowy tej operacji jest pomijalny w tym przypadku. W tabeli readings kluczem natomiast jest para wartości id i datetime. W ten sposób mam pewność, że w tabeli znajdą się unikatowe odczyty. Jest to o tyle istotne, że za każdym odpytaniem API otrzymujemy pełne dane z trzech dni kalendarzowych. Używając takiego klucza oraz instrukcji INSERT OR REPLACE nie muszę się martwić o duplikację danych – jeśli taki klucz już występuje, rekord zostanie skasowany i wstawiony na nowo wraz z nowymi danymi.
W naszej sytuacji jest to szczególnie przydatne dlatego, że w trakcie pracy z API zaobserwowałem, że jeśli dokonujemy odczytu tuż po zmianie godziny (np. o 15:00:30), to najnowsze rekordy (podpisane 15:00:00) dostajemy z brakującymi wartościami (NaN). Wynika to pewnie z tego, że baza danych w GIOŚ nie jest jeszcze uzupełniona odpowiednimi średnimi (z okresu 14-15) a API już te dane serwuje. Jeśli więc za kilka godzin uruchomimy skrypt ponownie, to jest całkiem spora szansa, że tak powstałe wartości NaN zostaną zastąpione prawidłową średnią godzinową. Części wpisów NaN nie uda się w ten sposób poprawić. Wynika to z tego, że czasem faktycznie nie były prowadzone pomiary – mogły trwać wtedy prace serwisowe, system mógł ulec awarii albo były problemy z dostawą prądu.
Po implementacji tego procesu w Pythonie wystarczy, że uruchomimy skrypt (notebook w moim przypadku) raz na dwa dni. W ten sposób będziemy sobie zapełniać osobistą bazę danych z aktualnymi odczytami, które jeszcze nie trafiły do archiwum rocznego. W moim przypadku, na 10-letnim laptopie najdłużej trwająca operacja to odczyt danych pomiarowych i umieszczanie ich w bazie danych. Operacja ta zajmuje mi nieco ponad 5 minut czasu rzeczywistego, nie kombinowałem więc tutaj z żadnymi optymalizacjami :-).
Python
Nie byłbym sobą, gdybym nie umieścił w artykule najistotniejszych elementów kodu w celu jego dokładniejszego omówienia. Zacznijmy więc od tworzenia tabeli stations:
# Kod 1 # stations create_query = ''' CREATE TABLE IF NOT EXISTS stations ( id integer PRIMARY KEY, addressStreet text, city_commune_communeName text, city_commune_districtName text, city_commune_provinceName text, city_id integer, city_name text, gegrLat real, gegrLon real, stationName text) ''' c.execute(create_query)
Nic specjalnego się tutaj nie dzieje. Definiujemy kolumny i klucz główny. Nową tabelę tworzymy, tylko jeśli nie ma już jej w bazie danych – w ten sposób nie utracimy zgromadzonych już rekordów. Analogicznie sytuacja wygląda w przypadku tabeli sensors:
# Kod 2 # sensors create_query = ''' CREATE TABLE IF NOT EXISTS sensors ( id integer PRIMARY KEY, param_idParam integer, param_paramCode text, param_paramFormula text, param_paramName text, stationId integer) ''' c.execute(create_query)
Jest to w sumie ta sama sytuacja jak w przypadku stations, o czym pisałem zresztą powyżej. Jedna mała zmiana pojawia się natomiast w tabeli readings:
# Kod 3 # readings create_query = ''' CREATE TABLE IF NOT EXISTS readings ( id integer, datetime text, value real, PRIMARY KEY (id, datetime)) ''' c.execute(create_query)
Widzimy tutaj, że nie ustawiam klucza głównego na początku, ale dopiero gdy zdefiniuję już kolumny, które są jego składnikami. Robię to za pomocą instrukcji PRIMARY KEY (id, datetime). Widać więc, że przygotowywanie tabel w naszym wypadku nie jest niczym skomplikowanym.
Przejdźmy wiec do zapytań do API. Okazuje się, że moduły requests i pandas opędzają cały ten problem. Moduł requests umożliwia wysyłanie zapytań i sprytne przechowywanie odpowiedzi, a pandas „z palca” przerabia uzyskaną odpowiedź (JSON) na ramkę danych. Wygląda to mniej więcej tak:
# Kod 4 r = requests.get('http://api.gios.gov.pl/pjp-api/rest/station/findAll') stations = pd.io.json.json_normalize(r.json())
I całkiem mi się podoba ten sposób pracy z API. Sprawdźmy więc, jak „pakuję” dane stacji do bazy danych:
# Kod 5 for index, station in stations.iterrows(): c.execute("INSERT OR REPLACE INTO stations VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ([station["id"], station["addressStreet"], station["city.commune.communeName"], station["city.commune.districtName"], station["city.commune.provinceName"], station["city.id"], station["city.name"], station["gegrLat"], station["gegrLon"], station["stationName"]])) conn.commit()
Tutaj sprawa może być dyskusyjna. Dobrą praktyką jest bowiem unikanie iterowania po wierszach ramki danych. Nie powinno się więc wykonywać jakiejś operacji wiersz po wierszu, tylko poszukać jakiegoś sposobu, żeby zadziałać na całej ramce na raz. Szybszą operacją pewnie byłoby przerobienie całej ramki danych na tabelę. Ja jednak na tym etapie postanowiłem zastosować podejście iteracyjne, jako nieco bardziej intuicyjnie (i pewnie niewiele wolniejsze w przypadku takiej ilości danych). Przechodzę więc po wszystkich wierszach, kawałkuję ja na odpowiednie pola i umieszczam wartości z tych pól we wspomnianej instrukcji INSERT OR REPLACE. Gdy zlecę wszystkie inserty, wykonuję instrukcję commit, która faktycznie modyfikuje bazę danych.
Nieco inaczej z kolei przygotowałem umieszczanie danych czujników w badzie danych:
# Kod 6 for station_id in tqdm(pd.read_sql_query("select id from stations", conn).values): station_id = station_id[0] #print(station_id) r = requests.get('http://api.gios.gov.pl/pjp-api/rest/station/sensors/' + str(station_id)) sensors = pd.io.json.json_normalize(r.json()) for index, sensor in sensors.iterrows(): #print(sensor) c.execute("INSERT OR REPLACE INTO sensors VALUES (?, ?, ?, ?, ?, ?)", ([sensor["id"], sensor["param.idParam"], sensor["param.paramCode"], sensor["param.paramFormula"], sensor["param.paramName"], sensor["stationId"]])) conn.commit() #break
Mamy tutaj dwie pętle. Pętla wewnętrzna wygląda w zasadzie tak samo, jak w przykładzie powyżej. Pętla zewnętrzna jest natomiast nieco inna. Za pomocą instrukcji pandas read_sql_query wykonuję query select id from stations, wydobywam surowe wartości (.values) i idę po tych wartościach. W ten sposób wydobywam listę stacji z bazy danych i później używam wartości z tej listy do uzupełniania adresu kolejnych zapytań. Użyłem tutaj dodatkowo opcjonalnego modułu tqdm do stworzenia sobie paska postępu tej operacji. Commit do bazy robię po każdej stacji którą „obrobiłem” w ten sposób. Sądzę, że jest to dobry kompromis pomiędzy commitowaniem każdego rekordu indywidualnie i wszystkich uzyskanych rekordów.
Zerknijmy teraz na „miąższ”:
# Kod 7 lengths = [] for sensor_id in tqdm(pd.read_sql_query("select id from sensors", conn).values): sensor_id = sensor_id[0] #print(sensor_id) r = requests.get('http://api.gios.gov.pl/pjp-api/rest/data/getData/' + str(sensor_id)) readings = pd.io.json.json_normalize(r.json()) readingsFrame = pd.DataFrame() readingsFrame["datetime"] = [d[u'date'] for d in readings["values"].values.item()] readingsFrame["value"] = [d[u'value'] for d in readings["values"].values.item()] length = len(readingsFrame) lengths.append(length) #if length == 0: # print(sensor_id) for index, reading in readingsFrame.iterrows(): #print(reading["datetime"]) #print(reading["value"]) c.execute("INSERT OR REPLACE INTO readings VALUES (?, ?, ?)", ([int(sensor_id), reading["datetime"], reading["value"], ])) conn.commit() #break
Tutaj koncepcja jest taka sama jak powyżej – dwie pętle, pierwsza bazująca na informacji z bazy danych, a druga bazująca na informacji z API. Dołożyłem tutaj natomiast dwa dodatkowe elementy. Pierwszy element to stworzenie nowej podręcznej ramki danych readingsFrame. Tworzę tę ramkę i zapełniam dlatego, że dane, które dostaję z API, są w tym wypadku ze sobą sklejone. Sprytnie rozklejając je, zapełniam od razu ramkę danych, którą później wrzucam do bazy danych. Drugim elementem jest stworzenie listy lengths, którą zapełniam liczbami, które są długością utworzonej powyżej ramki danych.
W czasie tworzenia tego skryptu nie zgadzała mi się ilość uzyskanych rekordów w tabeli readings z moimi oczekiwaniami. Okazało się jednak, że mamy aż trzy potencjalne przyczyny powstania tej nieścisłości. Pierwsza jest taka, że niektóre sensory dostępne przez API nie posiadają odczytów. Nie wiem, z czego to wynika, ale aktualnie takich sensorów jest 21. Drugim powodem było to, że od czasu do czasu pojawiają się odczyty z godziny 00 z pierwszego dnia kalendarzowego. API nie powinno ich wystawiać (zawsze zaczyna się od godziny 01), natomiast mi trafiło się co najmniej kilka takich rekordów. Trzeci powód był taki, że czasem przy pobieraniu danych pomiarowych robiłem to na przełomie godzin. Część czujników miała więc n odczytów, a część n+1. Teraz, dzięki naocznej analizie wartości unikatowych we wspomnianej liście lengths, mogę namierzyć, na którą anomalię natrafiłem.
Co dalej?
Dalej już nie ma nic. To znaczy, jest bardzo wiele, ale na temat gromadzenia danych z API GIOŚ do własnej bazy danych chyba już nic nie zostało do omówienia. Myślę, że jedyne co warto tutaj dodać to przykład dostania się do zapisanych w ten sposób danych. To, co mi przyszło do głowy, wygląda tak:
%%time # Kod 8 query = """ SELECT readings.id as sensor_id, datetime, value, param_paramFormula, gegrLat, gegrLon, stationId as station_id FROM readings, sensors, stations WHERE readings.id = sensors.id AND sensors.stationId = stations.id """ gios_data = pd.read_sql_query(query, conn) gios_data.sort_values(by = ["sensor_id", "datetime"], inplace=True)
Widzimy tutaj query SQL, które składa się z wyboru kolumn (SELECT), z trzech tabel (FROM) gdzie rekordy są sklejane za pomocą odpowiednich pól id (WHERE). Tak uzyskana tabela jest od razu wrzucana do ramki danych, którą sobie sortuję (można by to sortowanie zrobić też w query SQL). Początek i koniec uzyskanej w ten sposób ramki wyglądają u mnie następująco:
A jeśli mamy już ramkę danych, to jesteśmy w domu ;-). Myślę, że jeśli jeszcze nie miałeś do czynienia z bazami danych, to powyższy przykład jest wystarczająco realistycznym, ale prostym wprowadzeniem w to zagadnienie. Oczywiście, jeśli byśmy chcieli, to całą tę operację moglibyśmy zrealizować za pomocą np. plików csv. Ale niekoniecznie byłoby to równie przejrzyste, a pewnie też niekoniecznie równie szybkie.
Jeśli chciałbyś przeczytać więcej o jakimś zagadnieniu związanym z danymi, to zapraszam do głosowania tutaj.
Pełny kod przykładu omówionego w artykule znajdziesz tutaj.