-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsparkcode.txt
31 lines (30 loc) · 1.73 KB
/
sparkcode.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vectors,Vector}
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val train = train_data.map{line =>
val parts = line.split(',')
LabeledPoint(parts(28).toDouble, Vectors.dense(parts(1).toDouble,parts(2).toDouble,parts(3).toDouble,parts(4).toDouble,parts(5).toDouble,parts(8).toDouble,parts(9).toDouble,parts(10).toDouble,parts(11).toDouble,parts(12).toDouble,parts(13).toDouble,parts(14).toDouble,parts(15).toDouble,parts(16).toDouble,parts(17).toDouble,parts(18).toDouble,parts(19).toDouble,parts(20).toDouble,parts(21).toDouble,parts(22).toDouble,parts(23).toDouble,parts(24).toDouble,parts(25).toDouble,parts(26).toDouble,parts(27).toDouble))}
val numIterations = 1000
val model = SVMWithSGD.train(train, numIterations)
model.clearThreshold()
val scoreAndLabels = test.map{point =>
val score = model.predict(point.features)
score+" "+point.label
}
scoreAndLabels.foreach(println)
scoreAndLabels.foreach(println)
val resultRDD = scoreAndLabels.map(_.split(" "))
val schema = StructType(List(StructField("score", StringType, true),StructField("label", StringType, true)))
val rowRDD = qiyeRDD.map(p => Row(p(0).trim, p(1).trim))
val resultDF = spark.createDataFrame(rowRDD, schema)
val prop = new Properties()
prop.put("user", "zhanghao")
prop.put("password", "123456")
prop.put("driver","com.mysql.jdbc.Driver")
qiyeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/qiye", "qiye.result", prop)