2017. 8. 8. 21:04ㆍ서버 프로그래밍
<SparkSQL>
$ bin/spark-shell
scala> case class Trans(accNo:String, tranAmount:Double)
defined class Trans
scala> def toTrans = (trans:Seq[String]) => Trans(trans(0),trans(1).trim.toDouble)
toTrans: Seq[String] => Trans
scala> val acTransList = Array("SB10001,1000","SB10002,1200","SB10003,8000","SB10004,400","SB10005,300","SB10006,10000","SB10007,500","SB10008,56","SB10009,30","CR10010,7000","SB10002,-10")
acTransList: Array[String] = Array(SB10001,1000, SB10002,1200, SB10003,8000, SB10004,400, SB10005,300, SB10006,10000, SB10007,500, SB10008,56, SB10009,30, CR10010,7000, SB10002,-10)
scala> val acTransRDD = sc.parallelize(acTransList).map(_.split(",")).map(toTrans(_))
acTransRDD: org.apache.spark.rdd.RDD[Trans] = MapPartitionsRDD[2] at map at <console>:30
scala> var acTransDF = spark.createDataFrame(acTransRDD)
acTransDF: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> acTransDF.createOrReplaceTempView("trans")
scala> acTransDF.printSchema
root
|-- accNo: string (nullable = true)
|-- tranAmount: double (nullable = true)
scala> acTransDF.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|CR10010| 7000.0|
|SB10002| -10.0|
+-------+----------+
scala> val goodTransRecords = spark.sql("SELECT accNo,tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
goodTransRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> goodTransRecords.createOrReplaceTempView("goodtrans")
scala> goodTransRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
+-------+----------+
scala> val highValueTransRecords = spark.sql("SELECT accNo,tranAmount FROM goodtrans WHERE tranAmount > 1000")
highValueTransRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> highValueTransRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10006| 10000.0|
+-------+----------+
scala> val badAccountRecords = spark.sql("SELECT accNo,tranAmount FROM trans WHERE accNo NOT like 'SB%'")
badAccountRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> badAccountRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|CR10010| 7000.0|
+-------+----------+
scala> val badAmountRecords = spark.sql("SELECT accNo,tranAmount FROM trans WHERE tranAmount < 0")
badAmountRecords: org.apache.spark.sql.DataFrame = [accNo: string, tranAmount: double]
scala> badAmountRecords.show
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10002| -10.0|
+-------+----------+
scala> val sumAmount = spark.sql("SELECT sum(tranAmount) as sum FROM goodtrans")
sumAmount: org.apache.spark.sql.DataFrame = [sum: double]
scala> sumAmount.show
+-------+
| sum|
+-------+
|21486.0|
+-------+
scala> val maxAmount = spark.sql("SELECT max(tranAmount) as max FROM goodtrans")
maxAmount: org.apache.spark.sql.DataFrame = [max: double]
scala> maxAmount.show
+-------+
| max|
+-------+
|10000.0|
+-------+
scala> val minAmount = spark.sql("SELECT min(tranAmount) as min FROM goodtrans" )
minAmount: org.apache.spark.sql.DataFrame = [min: double]
scala> minAmount.show
+----+
| min|
+----+
|30.0|
+----+
scala> val goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")
goodAccNos: org.apache.spark.sql.DataFrame = [accNo: string]
scala> goodAccNos.show
[Stage 17:====================> (73 + 2) / 200[Stage
17:=========================> (93 + 2) / 200[Stage 17:==================================> (129 + 2) / 200[Stage 17:=========================================> (152 + 2) / 200[Stage 17:=================================================> (182 + 2) / 200 +-------+
| accNo|
+-------+
|SB10001|
|SB10002|
|SB10003|
|SB10004|
|SB10005|
|SB10006|
|SB10007|
|SB10008|
|SB10009|
+-------+
scala> val sumAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce(_+_)
sumAmountByMixing: Double = 21486.0
scala> sumAmountByMixing
res15: Double = 21486.0
scala> val maxAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a,b) => if (a > b) a else b)
maxAmountByMixing: Double = 10000.0
scala> val minAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a,b) => if (a < b) a else b)
minAmountByMixing: Double = 30.0
$ ~/spark/bin/spark-submit --class "PeopleJson" target/scala-2.11/sparksql-example_2.11-1.0.jar
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+---+----+
|age|name|
+---+----+
| 25|John|
| 30| Tom|
| 35|Song|
| 40|Kang|
+---+----+
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+----+
|name|
+----+
|John|
| Tom|
|Song|
|Kang|
+----+
+---+----+
|age|name|
+---+----+
| 40|Kang|
+---+----+
+---+----+
|age|name|
+---+----+
| 25|John|
| 30| Tom|
| 35|Song|
| 40|Kang|
+---+----+
+---+----+
|age|name|
+---+----+
| 40|Kang|
+---+----+
$ ~/spark/bin/spark-submit --class "TransList" target/scala-2.11/sparksql-translist_2.11-1.0.jar
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
root
|-- accNo: string (nullable = true)
|-- tranAmount: double (nullable = true)
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
|CR10010| 7000.0|
|SB10002| -10.0|
+-------+----------+
+-------+----------+
| accNo|tranAmount|
+-------+----------+
|SB10001| 1000.0|
|SB10002| 1200.0|
|SB10003| 8000.0|
|SB10004| 400.0|
|SB10005| 300.0|
|SB10006| 10000.0|
|SB10007| 500.0|
|SB10008| 56.0|
|SB10009| 30.0|
+-------+----------+
21486.0
--------------------------------------------------------------------------------------------
$ mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 30
Server version: 5.1.73 Source distribution
mysql> use test
mysql> create table baby_names (id int(11) primary key auto_increment, year int(11), first_name varchar(100), county varchar(100), sex varchar(5), count int(11));
Query OK, 0 rows affected (0.00 sec)
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| baby_names |
+----------------+
1 row in set (0.00 sec)
mysql> insert into baby_names(year,first_name,county,sex,count) values(2017,'Hong','Seoul','M',1);
Query OK, 1 row affected (0.00 sec)
mysql> insert into baby_names(year,first_name,county,sex,count) values(2017,'Park','Pusan','F',1);
Query OK, 1 row affected (0.00 sec)
mysql> select * from baby_names;
+----+------+------------+--------+------+-------+
| id | year | first_name | county | sex | count |
+----+------+------------+--------+------+-------+
| 1 | 2017 | Hong | Seoul | M | 1 |
| 2 | 2017 | Park | Pusan | F | 1 |
+----+------+------------+--------+------+-------+
2 rows in set (0.00 sec)
$ ~/spark/bin/spark-shell --jars mysql-connector-java-5.1.38-bin.jar
scala> val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost/test").option("driver","com.mysql.jdbc.Driver").option("dbtable","baby_names").option("user","root").option("password","hello.edu").load()
mysqlDF: org.apache.spark.sql.DataFrame = [id: int, year: int ... 4 more fields]
scala> mysqlDF.show
+---+----+----------+------+---+-----+
| id|year|first_name|county|sex|count|
+---+----+----------+------+---+-----+
| 1|2017| Hong| Seoul| M| 1|
| 2|2017| Park| Pusan| F| 1|
+---+----+----------+------+---+-----+
scala> mysqlDF.createOrReplaceTempView("baby_names")
scala> var babys = spark.sql("select * from baby_names")
babys: org.apache.spark.sql.DataFrame = [id: int, year: int ... 4 more fields]
scala> babys.show
+---+----+----------+------+---+-----+
| id|year|first_name|county|sex|count|
+---+----+----------+------+---+-----+
| 1|2017| Hong| Seoul| M| 1|
| 2|2017| Park| Pusan| F| 1|
+---+----+----------+------+---+-----+
scala> babys.collect.foreach(println)
[1,2017,Hong,Seoul,M,1]
[2,2017,Park,Pusan,F,1]