Ordnerstruktur und Namenskonventionen
Ich habe die im ETL-Prozess zu behandelnden Daten in zwei Ordner unterteilt: Traditional_Data und Different Information. Jeder der Ordner enthält Unterordner mit unterschiedlichen Datenstadien:
- roh
- Inszenierung_1
- Inszenierung_2
- verarbeitet
Jede der Dateien ist nach Speicherort, Häufigkeit und Inhalt der Daten benannt:
Identify der Ortshäufigkeitsdatei.csv
Der Rohordner enthält die Daten in dem Zustand, in dem sie erfasst wurden. Das staging_1 verfügt über Daten mit allgemeinen Behandlungen, die manuell oder über .py-Skripte angewendet werden. Das staging_2 enthält Daten, die Datumstransformationen erforderten. Und schließlich gibt es noch den verarbeiteten Ordner, der Datendateien enthält, die von den SQL-Tabellen verwendet werden können.
transform_load
Datentransformationen wurden in einem separaten Ordner namens transform_load durchgeführt. Dies ist diejenige, die alle wichtigen Datentransformationen enthält. Es enthält die Dateien:
- etl_utils.py – ein Modul, das von den anderen Python-Dateien im Ordner verwendet wird. Es verfügt über Funktionen zum Aggregieren von Daten, zum Erkennen der Codierung und des Trennzeichens von Dateien und mehr.
- general_treatments.py – eine Datei, die Spaltennamen neu formatiert, Daten für Kalifornien abfragt und die Dateiformatierung im Allgemeinen anpasst.
- date_adjustments.py – bringt Dateien auf die gleiche Häufigkeit, indem es aggregiert, disaggregiert, den Mittelwert bildet oder die Daten unverändert beibehält – Sie können beispielsweise den Mindestlohn nicht aggregieren oder disaggregieren.
- inserting_data.py – die Datei, die den letzten Schritt des ETL ausführt. Es stellt die behandelten Daten in den PostgreSQL-Tabellen bereit, die dann für den Verbrauch durch AutoML-Modelle bereit sind.
Wenn Sie die Codierung und die Dateien überprüfen möchten, können Sie auf die gehen GitHub Repo. Sie sind alle dokumentiert, aber wenn Sie Zweifel haben oder mit uns reden möchten, können Sie hier gerne einen Kommentar hinterlassen oder mir eine Nachricht senden LinkedIn.
Frequenzunterschiede
Eine der Hauptaufgaben während des Behandlungsschritts battle die Ungleichheit in der Häufigkeit der Daten.
Das BIP wurde vierteljährlich erfasst, damit das Modell im Vergleich zu jährlichen Zeiträumen mehr Nuancen in den Mustern über die Zeit hinweg erkennen konnte.
Die Häufigkeit anderer Datenquellen variierte jedoch von täglich bis jährlich, und ich wollte die Daten in der gleichen Häufigkeit wie das BIP speichern.
Eine Möglichkeit, dieses Drawback zu umgehen, ist Resampling.
Bei hochfrequenten Daten, beispielsweise täglichen Daten, können Sie Aggregationen über die Summe der Tage in einer Woche und dann für Quartale über die Summe der Monate durchführen.
Aber bei den jährlichen Daten kann man nicht einfach aufschlüsseln und Vorhersagen treffen, weil man nicht über die Daten verfügt a priori. Das würde dazu führen Datenlecks.
Stattdessen könnte eine Schätzung der Werte durch Regressionsmethoden erfolgen. Es gibt einige referenzierte Methoden in der wissenschaftlichen Literatur wie Chow-Lin, Salazar und andere – um mehr zu erfahren, können Sie einen Blick darauf werfen Fonzo 2003.
Da ich AutoML verwende, habe ich mich nicht speziell mit diesen Methoden befasst.
In meinem Fall befanden sich die jährlichen Häufigkeitsdaten im „Expenditure and Accessible Earnings of California“, das angibt, wie hoch die Ausgaben für Waren und Dienstleistungen sind und wie hoch das Bevölkerungseinkommen ist, das nach Steuern für Ausgaben oder Ersparnisse übrig bleibt.
Ich disaggregiere sie im ETL zusammen mit den anderen Daten, aber im maschinellen Lernprozess habe ich eine Schätzung der Ausgaben und des verfügbaren Einkommens für die Testsätze der BIP-Modelle vorgenommen, wobei ich die traditionellen Daten gesammelt habe.
Bei den Ausgabenprognosen wurden das verfügbare Einkommen und das BIP ausgeschlossen, und bei den Prognosen des verfügbaren Einkommens wurden die Ausgaben und das BIP ausgeschlossen.
Auch bei den Mindestlohndaten handelte es sich um jährliche Daten, aber ich habe dafür keine besondere Datenbehandlung vorgenommen, da diese Artwork von Daten normalerweise zu Beginn des Jahres veröffentlicht werden.
Für diejenigen, die interessiert sind, finden Sie unten die Funktion, die die Häufigkeit auf vierteljährliche Foundation umgestellt hat.
def transform_to_quarterly(knowledge, file_name):
"""
Description: Resampling of the information recordsdata of the mission. This operate aggregates the file
to a quarterly frequency, utilizing strategies that matches the requirements of the recordsdata.Parameters:
knowledge (pd.DataFrame): A pandas DataFrame to be resampled;
file_name (str): A string of the file being processed on the time.
Returns:
quarterly (pd.DataFrame): A DataFrame containing the resampled quarterly knowledge.
"""
gdp = "cali-quarterly-gdp.csv"
expenditure = "cali-annual-expenditure.csv"
disposable_income = "cali-annual-disposable_income.csv"
exports = "cali-monthly-exports.csv"
imports = "cali-monthly-imports.csv"
employment = "cali-monthly-employment.csv"
minimum_wage = "cali-annual-minimum_wage.csv"
personal_income = "cali-quarterly-personal_income.csv"
google_trends = "cali-monthly-google_trends.csv"
consumer_sentiment = "us-monthly-consumer_sentiment.csv"
mobility_report = "cali-daily-mobility_report.csv"
quarterly_agreggates = [exports, imports]
quarterly_averages = [employment, google_trends, consumer_sentiment, mobility_report]
quarterly_disaggregates = [disposable_income, expenditure]
frequency = pd.infer_freq(knowledge.index)
if frequency == "MS" and file_in_list(file_name, quarterly_agreggates): # 'MS' means Month Begin in pandas nomenclature
quarterly = knowledge.resample("QS").sum() # QS signifies Quarter Begin
elif (frequency == "MS" or file_name == mobility_report) and file_in_list(file_name, quarterly_averages):
quarterly = knowledge.resample("QS").imply()
elif frequency == "AS-JAN" and file_in_list(file_name, quarterly_disaggregates): # 'AS' means Annual Begin and 'JAN' signifies that it begins in january.
quarterly = knowledge.asfreq("QS").ffill()
quarterly = quarterly / 4
elif frequency == "AS-JAN" and file_name == minimum_wage:
quarterly = knowledge.resample("QS").ffill()
else:
return knowledge
return quarterly
Daten einfügen
Nachdem die Datenverarbeitung abgeschlossen battle, battle es an der Zeit, Daten in die Datenbank einzufügen.
Ich habe die PostgreSQL-Model 15 verwendet und die Skripte für die Datenbank- und Tabellenerstellung in pgAdmin ausgeführt.
Bei der Erstellung der Tabellen habe ich bereits den Zeitraum der Analyse festgelegt, der vom ersten Quartal 2005 bis zum letzten Quartal 2022 reicht.
CREATE DATABASE california;-- Creates the SQL desk for various knowledge
CREATE TABLE alternative_data (
date DATE PRIMARY KEY,
search_hous_bubble NUMERIC (6, 4),
search_pand_assist NUMERIC (6, 4),
search_pfl NUMERIC (6, 4),
search_recession NUMERIC (6, 4),
search_unemp NUMERIC (6, 4),
us_consumer_sentiment NUMERIC(4, 2),
mob_leisure NUMERIC(8, 6),
mob_groc_pharm NUMERIC(8, 6),
mob_parks NUMERIC(8, 6),
mob_transit NUMERIC(8, 6),
mob_workplaces NUMERIC(8, 6),
mob_residential NUMERIC(8, 6)
);
INSERT INTO alternative_data (date)
SELECT generate_series('2005-01-01'::DATE, '2022-10-01'::DATE, '3 months'::INTERVAL) AS date;
-- Creates the SQL desk that comprises conventional knowledge for this mission
CREATE TABLE traditional_data (
date DATE PRIMARY KEY,
gdp NUMERIC (8, 1),
employment NUMERIC (11, 4),
expenditure NUMERIC (10, 3),
exports NUMERIC (12, 6),
imports NUMERIC (12, 6),
min_wage NUMERIC (4, 2),
personal_income NUMERIC (8, 1),
disposable_income NUMERIC (10, 3)
);
INSERT INTO traditional_data (date)
SELECT generate_series('2005-01-01'::DATE, '2022-10-01'::DATE, '3 months'::INTERVAL) AS date;
Dann habe ich alle meine Dateispalten mit einem Python-Wörterbuch wie diesem den SQL-Spalten der Tabellen zugeordnet:
df_sql = {
"disposable_income": "disposable_income",
"expenditure": "expenditure",
"STTMINWGCA": "min_wage",
"CANA": "employment",
"EXPTOTCA": "exports",
"IMPTOTCA": "imports",
...
}
Der Code würde dann die Tabellen dynamisch aktualisieren, wenn er identifiziert, welche Variable zu welcher Tabelle gehören soll. Da ich den Punkt bereits eingefügt hatte, verwendete ich eine UPDATE-Klausel.
Das UPDATE wurde verwendet, um die Werte meiner Datenrahmen in die SQL-Spalten innerhalb eines Strive/Besides-Blocks einzufügen.
attempt:
values = [filtered_df[column].to_list()[0] for column in df_vars]
values = tuple(values)set_clause = ', '.be a part of([f"{column} = %s" for column in sql_vars]) # output instance : column_1 = %s, column_2 = %s, ....
question = f"UPDATE {table_name} "
f"SET {set_clause} "
f"WHERE date = '{date_value}'; "
cursor.execute(question, values)
connection.commit()
besides (Exception, psycopg2.Error) as error:
connection.rollback() # Roll again the transaction
print("Error whereas inserting", error, values, file)