-
Notifications
You must be signed in to change notification settings - Fork 121
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add the example code of how to accelerate the data sharing with viney…
…ard on fluid platform. Signed-off-by: Ye Cao <[email protected]>
- Loading branch information
Showing
2 changed files
with
228 additions
and
0 deletions.
There are no files selected for viewing
163 changes: 163 additions & 0 deletions
163
k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
import fluid | ||
|
||
from fluid import constants | ||
from fluid import models | ||
|
||
# Use the default kubeconfig file to connect to the Fluid control plane | ||
# and create a Fluid client instance | ||
client_config = fluid.ClientConfig() | ||
fluid_client = fluid.FluidClient(client_config) | ||
|
||
# Create a dataset named "vineyard" in the default namespace | ||
fluid_client.create_dataset( | ||
dataset_name="vineyard", | ||
mount_name="dummy-mount-name", | ||
mount_point="dummy-mount-point" | ||
) | ||
|
||
# Get the dataset instance of the "vineyard" dataset | ||
dataset = fluid_client.get_dataset(dataset_name="vineyard") | ||
|
||
# Init vineyard runtime configuration and bind the vineyard dataset instance to the runtime. | ||
# Replicas is 2, and the memory is 30Gi | ||
dataset.bind_runtime( | ||
runtime_type=constants.VINEYARD_RUNTIME_KIND, | ||
replicas=2, | ||
cache_capacity_GiB=30, | ||
cache_medium="MEM", | ||
wait=True | ||
) | ||
|
||
# define the script of data preprocessing | ||
preprocess_data_script = """ | ||
pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 | ||
#!/bin/bash | ||
set -ex | ||
cat <<EOF > ./preprocess.py | ||
from sklearn.model_selection import train_test_split | ||
import pandas as pd | ||
import vineyard | ||
df = pd.read_pickle('/data/df.pkl') | ||
# Preprocess Data | ||
df = df.drop(df[(df['GrLivArea']>4800)].index) | ||
X = df.drop('SalePrice', axis=1) # Features | ||
y = df['SalePrice'] # Target variable | ||
del df | ||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) | ||
del X, y | ||
vineyard.put(X_train, name="x_train", persist=True) | ||
vineyard.put(X_test, name="x_test", persist=True) | ||
vineyard.put(y_train, name="y_train", persist=True) | ||
vineyard.put(y_test, name="y_test", persist=True) | ||
EOF | ||
python3 ./preprocess.py | ||
""" | ||
|
||
# define the script of model training | ||
train_data_script = """ | ||
pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 | ||
#!/bin/bash | ||
set -ex | ||
cat <<EOF > ./train.py | ||
from sklearn.linear_model import LinearRegression | ||
import joblib | ||
import pandas as pd | ||
import vineyard | ||
x_train_data = vineyard.get(name="x_train", fetch=True) | ||
y_train_data = vineyard.get(name="y_train", fetch=True) | ||
model = LinearRegression() | ||
model.fit(x_train_data, y_train_data) | ||
joblib.dump(model, '/data/model.pkl') | ||
EOF | ||
python3 ./train.py | ||
""" | ||
|
||
# define the script of model testing | ||
test_data_script = """ | ||
pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 | ||
#!/bin/bash | ||
set -ex | ||
cat <<EOF > ./test.py | ||
from sklearn.linear_model import LinearRegression | ||
from sklearn.metrics import mean_squared_error | ||
import vineyard | ||
import joblib | ||
import pandas as pd | ||
x_test_data = vineyard.get(name="x_test", fetch=True) | ||
y_test_data = vineyard.get(name="y_test", fetch=True) | ||
model = joblib.load("/data/model.pkl") | ||
y_pred = model.predict(x_test_data) | ||
err = mean_squared_error(y_test_data, y_pred) | ||
with open('/data/output.txt', 'a') as f: | ||
f.write(str(err)) | ||
EOF | ||
python3 ./test.py | ||
""" | ||
|
||
from kubernetes.client import models as k8s_models | ||
Check notice on line 121 in k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py Codacy Production / Codacy Static Code Analysisk8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py#L121
|
||
# define the template of the task processor and mount the OSS Volume | ||
def create_processor(script): | ||
return models.Processor( | ||
# When enabling fuse affinity scheduling, add the following label | ||
# to achieve the best performance of data processing | ||
# pod_metadata=models.PodMetadata( | ||
# labels={"fuse.serverful.fluid.io/inject": "true"}, | ||
# ), | ||
script=models.ScriptProcessor( | ||
command=["bash"], | ||
source=script, | ||
image="python", | ||
image_tag="3.10", | ||
volumes=[k8s_models.V1Volume( | ||
name="data", | ||
persistent_volume_claim=k8s_models.V1PersistentVolumeClaimVolumeSource( | ||
claim_name="pvc-oss" | ||
) | ||
)], | ||
volume_mounts=[k8s_models.V1VolumeMount( | ||
name="data", | ||
mount_path="/data" | ||
)], | ||
) | ||
) | ||
|
||
preprocess_processor = create_processor(preprocess_data_script) | ||
train_processor = create_processor(train_data_script) | ||
test_processor = create_processor(test_data_script) | ||
|
||
# Create a linear regression model task workflow: data preprocessing -> model training -> model testing | ||
# The following mount path "/var/run" is the default path of the vineyard configuration file | ||
flow = dataset.process(processor=preprocess_processor, dataset_mountpath="/var/run") \ | ||
.process(processor=train_processor, dataset_mountpath="/var/run") \ | ||
.process(processor=test_processor, dataset_mountpath="/var/run") | ||
|
||
# Submit the linear regression model task workflow to the Fluid platform and start execution | ||
run = flow.run(run_id="linear-regression-with-vineyard") | ||
run.wait() | ||
|
||
# Clean up all resources | ||
dataset.clean_up(wait=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import numpy as np | ||
import pandas as pd | ||
|
||
# generate a dataframe with size around 22G | ||
num_rows = 6000 * 10000 | ||
df = pd.DataFrame({ | ||
'Id': np.random.randint(1, 100000, num_rows), | ||
'MSSubClass': np.random.randint(20, 201, size=num_rows), | ||
'LotFrontage': np.random.randint(50, 151, size=num_rows), | ||
'LotArea': np.random.randint(5000, 20001, size=num_rows), | ||
'OverallQual': np.random.randint(1, 11, size=num_rows), | ||
'OverallCond': np.random.randint(1, 11, size=num_rows), | ||
'YearBuilt': np.random.randint(1900, 2022, size=num_rows), | ||
'YearRemodAdd': np.random.randint(1900, 2022, size=num_rows), | ||
'MasVnrArea': np.random.randint(0, 1001, size=num_rows), | ||
'BsmtFinSF1': np.random.randint(0, 2001, size=num_rows), | ||
'BsmtFinSF2': np.random.randint(0, 1001, size=num_rows), | ||
'BsmtUnfSF': np.random.randint(0, 2001, size=num_rows), | ||
'TotalBsmtSF': np.random.randint(0, 3001, size=num_rows), | ||
'1stFlrSF': np.random.randint(500, 4001, size=num_rows), | ||
'2andFlrSF': np.random.randint(0, 2001, size=num_rows), | ||
'LowQualFinSF': np.random.randint(0, 201, size=num_rows), | ||
'GrLivArea': np.random.randint(600, 5001, size=num_rows), | ||
'BsmtFullBath': np.random.randint(0, 4, size=num_rows), | ||
'BsmtHalfBath': np.random.randint(0, 3, size=num_rows), | ||
'FullBath': np.random.randint(0, 5, size=num_rows), | ||
'HalfBath': np.random.randint(0, 3, size=num_rows), | ||
'BedroomAbvGr': np.random.randint(0, 11, size=num_rows), | ||
'KitchenAbvGr': np.random.randint(0, 4, size=num_rows), | ||
'TotRmsAbvGrd': np.random.randint(0, 16, size=num_rows), | ||
'Fireplaces': np.random.randint(0, 4, size=num_rows), | ||
'GarageYrBlt': np.random.randint(1900, 2022, size=num_rows), | ||
'GarageCars': np.random.randint(0, 5, num_rows), | ||
'GarageArea': np.random.randint(0, 1001, num_rows), | ||
'WoodDeckSF': np.random.randint(0, 501, num_rows), | ||
'OpenPorchSF': np.random.randint(0, 301, num_rows), | ||
'EnclosedPorch': np.random.randint(0, 201, num_rows), | ||
'3SsnPorch': np.random.randint(0, 101, num_rows), | ||
'ScreenPorch': np.random.randint(0, 201, num_rows), | ||
'PoolArea': np.random.randint(0, 301, num_rows), | ||
'MiscVal': np.random.randint(0, 5001, num_rows), | ||
'TotalRooms': np.random.randint(2, 11, num_rows), | ||
"GarageAge": np.random.randint(1, 31, num_rows), | ||
"RemodAge": np.random.randint(1, 31, num_rows), | ||
"HouseAge": np.random.randint(1, 31, num_rows), | ||
"TotalBath": np.random.randint(1, 5, num_rows), | ||
"TotalPorchSF": np.random.randint(1, 1001, num_rows), | ||
"TotalSF": np.random.randint(1000, 6001, num_rows), | ||
"TotalArea": np.random.randint(1000, 6001, num_rows), | ||
'MoSold': np.random.randint(1, 13, num_rows), | ||
'YrSold': np.random.randint(2006, 2022, num_rows), | ||
'SalePrice': np.random.randint(50000, 800001, num_rows), | ||
}) | ||
|
||
import oss2 | ||
import io | ||
Check notice on line 56 in k8s/examples/vineyard-on-fluid/prepare-dataset.py Codacy Production / Codacy Static Code Analysisk8s/examples/vineyard-on-fluid/prepare-dataset.py#L56
|
||
from oss2.credentials import EnvironmentVariableCredentialsProvider | ||
Check notice on line 57 in k8s/examples/vineyard-on-fluid/prepare-dataset.py Codacy Production / Codacy Static Code Analysisk8s/examples/vineyard-on-fluid/prepare-dataset.py#L57
|
||
# Please set your OSS accessKeyID and accessKeySecret as environment variables OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET | ||
auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) | ||
# Please replace OSS_ENDPOINT and BUCKET_NAME with your OSS Endpoint and Bucket | ||
bucket = oss2.Bucket(auth, 'OSS_ENDPOINT', 'BUCKET_NAME') | ||
|
||
bytes_buffer = io.BytesIO() | ||
df.to_pickle(bytes_buffer) | ||
bucket.put_object("df.pkl", bytes_buffer.getvalue()) |