CentOS 6에서 SparkSQL 프로그래밍

2017. 8. 8. 21:04서버 프로그래밍

<SparkSQL>


$ bin/spark-shell


scala> case class Trans(accNo:String, tranAmount:Double)

defined class Trans


scala> def toTrans = (trans:Seq[String]) => Trans(trans(0),trans(1).trim.toDouble)

toTrans: Seq[String] => Trans


scala> val acTransList = Array("SB10001,1000","SB10002,1200","SB10003,8000","SB10004,400","SB10005,300","SB10006,10000","SB10007,500","SB10008,56","SB10009,30","CR10010,7000","SB10002,-10")

acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10003,8000, SB10004,400, SB10005,300, SB10006,10000, SB10007,500, SB10008,56, SB10009,30, CR10010,7000, SB10002,-10)


scala> val acTransRDD = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_))

acTransRDD: org.apache.spark.rdd.RDD[Trans] = MapPartitionsRDD[2] at map at <console>:30


scala> var acTransDF = spark.createDataFrame(acTransRDD)

acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]


scala> acTransDF.createOrReplaceTempView("trans")


scala> acTransDF.printSchema

root

 |-- accNo: string (nullable = true)

 |-- tranAmount: double (nullable = true)


scala> acTransDF.show

+-------+----------+

|  accNo|tranAmount|

+-------+----------+

|SB10001|    1000.0|

|SB10002|    1200.0|

|SB10003|    8000.0|

|SB10004|     400.0|

|SB10005|     300.0|

|SB10006|   10000.0|

|SB10007|     500.0|

|SB10008|      56.0|

|SB10009|      30.0|

|CR10010|    7000.0|

|SB10002|     -10.0|

+-------+----------+


scala> val goodTransRecords = spark.sql("SELECT accNo,tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")

goodTransRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]


scala> goodTransRecords.createOrReplaceTempView("goodtrans")


scala> goodTransRecords.show

+-------+----------+

|  accNo|tranAmount|

+-------+----------+

|SB10001|    1000.0|

|SB10002|    1200.0|

|SB10003|    8000.0|

|SB10004|     400.0|

|SB10005|     300.0|

|SB10006|   10000.0|

|SB10007|     500.0|

|SB10008|      56.0|

|SB10009|      30.0|

+-------+----------+


scala> val highValueTransRecords = spark.sql("SELECT accNo,tranAmount FROM goodtrans WHERE tranAmount > 1000")

highValueTransRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]


scala> highValueTransRecords.show

+-------+----------+

|  accNo|tranAmount|

+-------+----------+

|SB10002|    1200.0|

|SB10003|    8000.0|

|SB10006|   10000.0|

+-------+----------+


scala> val badAccountRecords = spark.sql("SELECT accNo,tranAmount FROM trans WHERE accNo NOT like 'SB%'")

badAccountRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]


scala> badAccountRecords.show

+-------+----------+

|  accNo|tranAmount|

+-------+----------+

|CR10010|    7000.0|

+-------+----------+


scala> val badAmountRecords = spark.sql("SELECT accNo,tranAmount FROM trans WHERE tranAmount < 0")

badAmountRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]


scala> badAmountRecords.show

+-------+----------+

|  accNo|tranAmount|

+-------+----------+

|SB10002|     -10.0|

+-------+----------+


scala> val sumAmount = spark.sql("SELECT sum(tranAmount) as sum FROM goodtrans")

sumAmount: org.apache.spark.sql.DataFrame = [sum: double]


scala> sumAmount.show

+-------+

|    sum|

+-------+

|21486.0|

+-------+


scala> val maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")

maxAmount: org.apache.spark.sql.DataFrame = [max: double]


scala> maxAmount.show

+-------+

|    max|

+-------+

|10000.0|

+-------+


scala> val minAmount = spark.sql("SELECT min(tranAmount) as min FROM goodtrans" )

minAmount: org.apache.spark.sql.DataFrame = [min: double]


scala> minAmount.show

+----+

| min|

+----+

|30.0|

+----+


scala> val goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")

goodAccNos: org.apache.spark.sql.DataFrame = [accNo: string]


scala> goodAccNos.show

[Stage 17:====================>                                  (73 + 2) / 200[Stage 

17:=========================>                             (93 + 2) / 200[Stage 17:==================================>                   (129 + 2) / 200[Stage 17:=========================================>            (152 + 2) / 200[Stage 17:=================================================>    (182 + 2) / 200                                                           +-------+

|  accNo|

+-------+

|SB10001|

|SB10002|

|SB10003|

|SB10004|

|SB10005|

|SB10006|

|SB10007|

|SB10008|

|SB10009|

+-------+


scala> val sumAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce(_+_)

sumAmountByMixing: Double = 21486.0


scala> sumAmountByMixing

res15: Double = 21486.0


scala> val maxAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a,b) => if (a > b) a else b)

maxAmountByMixing: Double = 10000.0


scala> val minAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a,b) => if (a < b) a else b)

minAmountByMixing: Double = 30.0


$ ~/spark/bin/spark-submit --class "PeopleJson" target/scala-2.11/sparksql-example_2.11-1.0.jar 

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

+---+----+                                                                      

|age|name|

+---+----+

| 25|John|

| 30| Tom|

| 35|Song|

| 40|Kang|

+---+----+


root

 |-- age: long (nullable = true)

 |-- name: string (nullable = true)


+----+

|name|

+----+

|John|

| Tom|

|Song|

|Kang|

+----+


+---+----+

|age|name|

+---+----+

| 40|Kang|

+---+----+


+---+----+

|age|name|

+---+----+

| 25|John|

| 30| Tom|

| 35|Song|

| 40|Kang|

+---+----+


+---+----+

|age|name|

+---+----+

| 40|Kang|

+---+----+


$ ~/spark/bin/spark-submit --class "TransList" target/scala-2.11/sparksql-translist_2.11-1.0.jar 

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


root

 |-- accNo: string (nullable = true)

 |-- tranAmount: double (nullable = true)


+-------+----------+

|  accNo|tranAmount|

+-------+----------+

|SB10001|    1000.0|

|SB10002|    1200.0|

|SB10003|    8000.0|

|SB10004|     400.0|

|SB10005|     300.0|

|SB10006|   10000.0|

|SB10007|     500.0|

|SB10008|      56.0|

|SB10009|      30.0|

|CR10010|    7000.0|

|SB10002|     -10.0|

+-------+----------+


+-------+----------+

|  accNo|tranAmount|

+-------+----------+

|SB10001|    1000.0|

|SB10002|    1200.0|

|SB10003|    8000.0|

|SB10004|     400.0|

|SB10005|     300.0|

|SB10006|   10000.0|

|SB10007|     500.0|

|SB10008|      56.0|

|SB10009|      30.0|

+-------+----------+

21486.0


--------------------------------------------------------------------------------------------


$ mysql -u root -p

Enter password: 

Welcome to the MySQL monitor.  Commands end with ; or \g.

Your MySQL connection id is 30

Server version: 5.1.73 Source distribution


mysql> use test

mysql> create table baby_names (id int(11) primary key auto_increment, year int(11), first_name varchar(100), county varchar(100), sex varchar(5), count int(11));

Query OK, 0 rows affected (0.00 sec)


mysql> show tables;

+----------------+

| Tables_in_test |

+----------------+

| baby_names     |

+----------------+

1 row in set (0.00 sec)


mysql> insert into baby_names(year,first_name,county,sex,count) values(2017,'Hong','Seoul','M',1);

Query OK, 1 row affected (0.00 sec)


mysql> insert into baby_names(year,first_name,county,sex,count) values(2017,'Park','Pusan','F',1);

Query OK, 1 row affected (0.00 sec)


mysql> select * from baby_names;

+----+------+------------+--------+------+-------+

| id | year | first_name | county | sex  | count |

+----+------+------------+--------+------+-------+

|  1 | 2017 | Hong       | Seoul  | M    |     1 |

|  2 | 2017 | Park       | Pusan  | F    |     1 |

+----+------+------------+--------+------+-------+

2 rows in set (0.00 sec)


$ ~/spark/bin/spark-shell --jars mysql-connector-java-5.1.38-bin.jar


scala> val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost/test").option("driver","com.mysql.jdbc.Driver").option("dbtable","baby_names").option("user","root").option("password","hello.edu").load()

mysqlDF: org.apache.spark.sql.DataFrame = [id: int, year: int ... 4 more fields]


scala> mysqlDF.show

+---+----+----------+------+---+-----+

| id|year|first_name|county|sex|count|

+---+----+----------+------+---+-----+

|  1|2017|      Hong| Seoul|  M|    1|

|  2|2017|      Park| Pusan|  F|    1|

+---+----+----------+------+---+-----+


scala> mysqlDF.createOrReplaceTempView("baby_names")


scala> var babys = spark.sql("select * from baby_names")

babys: org.apache.spark.sql.DataFrame = [id: int, year: int ... 4 more fields]


scala> babys.show

+---+----+----------+------+---+-----+

| id|year|first_name|county|sex|count|

+---+----+----------+------+---+-----+

|  1|2017|      Hong| Seoul|  M|    1|

|  2|2017|      Park| Pusan|  F|    1|

+---+----+----------+------+---+-----+


scala> babys.collect.foreach(println)

[1,2017,Hong,Seoul,M,1]

[2,2017,Park,Pusan,F,1]


sparksql-translist.zip

sparksql-example.zip