forked from docs/modelarts
Reviewed-by: Jiang, Beibei <beibei.jiang@t-systems.com> Co-authored-by: proposalbot <proposalbot@otc-service.com> Co-committed-by: proposalbot <proposalbot@otc-service.com>
89 lines
3.2 KiB
ReStructuredText
89 lines
3.2 KiB
ReStructuredText
:original_name: modelarts_23_0178.html
|
|
|
|
.. _modelarts_23_0178:
|
|
|
|
PySpark
|
|
=======
|
|
|
|
Training and Saving a Model
|
|
---------------------------
|
|
|
|
.. code-block::
|
|
|
|
from pyspark.ml import Pipeline, PipelineModel
|
|
from pyspark.ml.linalg import Vectors
|
|
from pyspark.ml.classification import LogisticRegression
|
|
|
|
# Prepare training data using tuples.
|
|
# Prepare training data from a list of (label, features) tuples.
|
|
training = spark.createDataFrame([
|
|
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
|
|
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
|
|
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
|
|
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
|
|
|
|
# Create a training instance. The logistic regression algorithm is used for training.
|
|
# Create a LogisticRegression instance. This instance is an Estimator.
|
|
lr = LogisticRegression(maxIter=10, regParam=0.01)
|
|
|
|
# Train the logistic regression model.
|
|
# Learn a LogisticRegression model. This uses the parameters stored in lr.
|
|
model = lr.fit(training)
|
|
|
|
# Save the model to a local directory.
|
|
# Save model to local path.
|
|
model.save("/tmp/spark_model")
|
|
|
|
After the model is saved, it must be uploaded to the OBS directory before being published. The **config.json** configuration and **customize_service.py** must be contained during publishing. For details about the definition method, see :ref:`Model Package Specifications <modelarts_23_0091>`.
|
|
|
|
Inference Code
|
|
--------------
|
|
|
|
.. code-block::
|
|
|
|
# coding:utf-8
|
|
import collections
|
|
import json
|
|
import traceback
|
|
|
|
import model_service.log as log
|
|
from model_service.spark_model_service import SparkServingBaseService
|
|
from pyspark.ml.classification import LogisticRegression
|
|
|
|
logger = log.getLogger(__name__)
|
|
|
|
|
|
class user_Service(SparkServingBaseService):
|
|
# Pre-process data.
|
|
def _preprocess(self, data):
|
|
logger.info("Begin to handle data from user data...")
|
|
# Read data.
|
|
req_json = json.loads(data, object_pairs_hook=collections.OrderedDict)
|
|
try:
|
|
# Convert data to the spark dataframe format.
|
|
predict_spdf = self.spark.createDataFrame(pd.DataFrame(req_json["data"]["req_data"]))
|
|
except Exception as e:
|
|
logger.error("check your request data does meet the requirements ?")
|
|
logger.error(traceback.format_exc())
|
|
raise Exception("check your request data does meet the requirements ?")
|
|
return predict_spdf
|
|
|
|
# Perform model inference.
|
|
def _inference(self, data):
|
|
try:
|
|
# Load a model file.
|
|
predict_model = LogisticRegression.load(self.model_path)
|
|
# Perform data inference.
|
|
prediction_result = predict_model.transform(data)
|
|
except Exception as e:
|
|
logger.error(traceback.format_exc())
|
|
raise Exception("Unable to load model and do dataframe transformation.")
|
|
return prediction_result
|
|
|
|
# Post-process data.
|
|
def _postprocess(self, pre_data):
|
|
logger.info("Get new data to respond...")
|
|
predict_str = pre_data.toPandas().to_json(orient='records')
|
|
predict_result = json.loads(predict_str)
|
|
return predict_result
|