Als ich mit der Maschine (Machine Studying) vertraut struggle, waren einige Ferramentas, die ich in der Programmiersprache Python und in den Bibliotheken Numpy, Pandas und Sklearn hatte. Dies ist sehr wahrscheinlich, und es kann zu Problemen mit Modellen mit hoher Genauigkeit und Effizienz kommen, wenn es erforderlich ist, ein Modell für viele große Datensätze auszuführen, was zu Problemen führen kann, und viel Zeit für die Ausführung benötigen.
Aufgrund der erhöhten Datenmengen, die durch die Einführung von Massive Information entstanden sind, kann der Einsatz von Technologien, die nicht verteilt werden, nach einer hohen Ausführungsgeschwindigkeit erfolgen, die einige von einer Maschine nutzen, und es müssen alle Daten im Speicher gespeichert werden, um Transformationen, Vorverarbeitungen und Vorhersagen zu realisieren . Daher ist PySpark eine Spark-Python-API, die die Ausführung in Kind von Verteilungsoperationen und die Verwendung von Kind-Otim-Speichern ermöglicht, oder sie kann mit verschiedenen Maschinen parallel verwendet werden.
Derzeit gibt es eine Bibliothek, die von Apache Spark für maschinelles Lernen entwickelt wurde und MLLib enthält. Viele Jahre später wurde das Modell von einem Wissenschaftler entwickelt, der möglicherweise mit Python und nicht mit PySpark verbunden struggle, und es struggle notwendig, dass sein Modell später auf einer höheren Stufe ausgeführt wurde.
Zu diesem Zweck kann eine Rekursion von PySpark in Verbindung mit Pandas UDF verwendet werden, die grundsätzlich in die höheren Versionen von Spark 2.3 eingeführt werden muss, um die Funktionalität und Benutzerfreundlichkeit von definierten Funktionen (UDFs) in Python zu verbessern und in der Kind der Vetorisierung auszuführen ada.
Als UDFs gerade ausgeführt wurden, weil es Probleme mit der Hochserialisierung und dem Obtain gab, wurde das Drawback erst nach kurzer Zeit gelöst, als UDFs in Java oder Scala definiert und als Teil von Python aufgerufen wurden. Da die Lösung von Pandas UDF ein Drawback darstellt, wurden sie auf Foundation von Apache Arrow erstellt und vollständig in Python verwendet.
Als seine frühere Funktion bereits genehmigt wurde, wurde das Spark-Brand von DataFrame in mehreren Paketen (10.000 Registrierungen) verwendet und es wurden mehrere Daten wie ein Array (Sequence Pandas) für die Ausführung verwendet. Ich glaube, diese Funktion ist der Meinung, dass es eine Kombination aus Linhas und keine Funktion gibt, die Linha auf Linha anwenden würde.
Um Ihre Implementierung zu erleichtern, müssen Sie ein in Bytes serialisiertes Modell erstellen und in einer ausgewählten Datei speichern und Objekte von Sklearn, XGBoost oder anderen Geräten rekonstruieren. Es gibt kein Beispiel für die Erstellung zweier Klassen, um zwei wichtige Etappen des Vorhersageprozesses eines Modells für maschinelles Lernen zu realisieren: Vorprozess und Vorhersage.
Auf der Vorverarbeitungsstufe, die ich denke, stehen uns keine Vorverarbeitungsmodule für Sklearn zur Verfügung (z. B. StandardScaler, MinMaxScaler, MaxAbsScaler, Normalizer usw.). Wichtig ist, dass die jetzt präsentierte Klasse eine Transformation und Entwicklung einer neuen Spalte durchgeführt hat, die ein Array mit normalisierten/verwalteten Werten ist, und zwar aufgrund der Spalteninformationen, die durch die Argumentation ohne Vorverarbeitungsmethode entstehen.
import pickle
from typing import Listing
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.varieties import ArrayType, FloatType
from pyspark.sql.features import array, col, pandas_udfclass PreProcessing():
def __init__(self, identify: str) -> None:
self.scaler = pickle.load(open(identify, 'rb'))
def preprocessing(self, df: DataFrame, columns: Listing[str]) -> DataFrame:
import pandas as pd
@pandas_udf(ArrayType(FloatType()))
def udf_preprocessing(samples: pd.Sequence) -> pd.Sequence:
return pd.Sequence(
[
self.scaler.transform([sample]).reshape(1, -1)[0]
for pattern in samples
]
)
return (
df
.withColumn("features_preprocessing", array(*columns))
.withColumn(
"features_preprocessing",
udf_preprocessing(col("features_preprocessing"))
)
)
df_preprocessed = (
PreProcessing("scaler.sav")
.preprocessing(df, ["column_1", "column_2"])
)
Kein Beispiel, da die Spalten „column_1“ und „column_2“ durch einen vor der Verwendung der StandardScaler-Methode verwendeten Prozess übergeben wurden, der in die Spalte „features_preprocessing“ umgewandelt wurde und die Werte der beiden hinzugefügten Spalten enthält.
Ich denke an die Vorhersage-Etappe, da es sich um Klassifizierungs-, Regressions- und andere Modelle von Sklearn handelt (z. B. LinearRegression, KNeighborsClassifier, LogisticRegression, RandomForestClassifier usw.). Es gibt einen kleinen Unterschied zwischen Regressions- und Klassifizierungsmodellen, da die Regressionsmodelle zu einem gewissen Grad um einen Wertverlust zurückgekehrt sind, der für die Klassifizierungsmodelle von Bedeutung ist oder einen größeren Anteil an der Likelihood hat, die Klassifizierungsklasse zu beeinflussen , so wird es in die Klasse zurückgeworfen, je nachdem, wie viel es kostet.
import pickle
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.varieties import ArrayType, FloatType
from pyspark.sql.features import array_max, col, expr, pandas_udfclass Predict():
def __init__(self, identify: str) -> None:
self.mannequin = pickle.load(open(identify, 'rb'))
def predict(self, df: DataFrame) -> DataFrame:
import pandas as pd
@pandas_udf(FloatType())
def udf_predict(samples: pd.Sequence) -> pd.Sequence:
return pd.Sequence(
self.mannequin.predict([sample])[0][0] for pattern in samples
)
@pandas_udf(ArrayType(FloatType()))
def udf_predict_classifier(samples: pd.Sequence) -> pd.Sequence:
return pd.Sequence(
self.mannequin.predict([sample])[0] for pattern in samples
)
if "predict_proba" in dir(self.mannequin):
return (
df
.withColumn("predict", udf_predict_classifier(col("features_preprocessing")))
.withColumn("predict_prob", array_max(col("predict")))
.withColumn("predict_class", expr("array_position(predict, predict_prob)"))
)
else:
return (
df
.withColumn("predict", udf_predict("features_preprocessing"))
)
df_predicted = Predict("mannequin.sav").predict(df_processed)
Als Schlussfolgerung oder Ergebnis der Ausführung der beiden Klassen (PreProcessing und Predict) wird das Ergebnis des Modells in der Spalte Predict zurückgegeben. Assim, es ist möglich, eine Vertriebsform und parallele Modelle auszuführen, die in Python unter Verwendung von Spark entwickelt wurden.