2017. 7. 18. 11:17ㆍ서버 프로그래밍
<Spark 설치 관련>
https://spark.apache.org/downloads.html
Download Spark: spark-2.2.0-bin-hadoop2.7.tgz
Spark StandAlone 설치부터 예제 테스트까지.
./sbin/start-master.sh
./sbin/start-slave.sh spark://Hajiui-MacBook-Pro-2.local:7077
./bin/pyspark --master spark://Hajiui-MacBook-Pro-2.local:7077
Python 2.7.10 (default, Feb 7 2017, 00:08:15)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/18 13:22:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/18 13:23:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.2.0
/_/
Using Python version 2.7.10 (default, Feb 7 2017 00:08:15)
SparkSession available as 'spark'.
>>>
./bin/spark-submit --master spark://Hajiui-MacBook-Pro-2.local:7077 examples/src/main/python/ml/naive_bayes_example.py
17/07/18 13:20:29 INFO CodeGenerator: Code generated in 21.876471 ms
+-----+--------------------+--------------------+-----------+----------+
|label| features| rawPrediction|probability|prediction|
+-----+--------------------+--------------------+-----------+----------+
| 0.0|(692,[95,96,97,12...|[-174115.98587057...| [1.0,0.0]| 0.0|
| 0.0|(692,[98,99,100,1...|[-178402.52307196...| [1.0,0.0]| 0.0|
| 0.0|(692,[100,101,102...|[-100905.88974016...| [1.0,0.0]| 0.0|
| 0.0|(692,[123,124,125...|[-244784.29791241...| [1.0,0.0]| 0.0|
| 0.0|(692,[123,124,125...|[-196900.88506109...| [1.0,0.0]| 0.0|
| 0.0|(692,[124,125,126...|[-238164.45338794...| [1.0,0.0]| 0.0|
| 0.0|(692,[124,125,126...|[-184206.87833381...| [1.0,0.0]| 0.0|
| 0.0|(692,[127,128,129...|[-214174.52863813...| [1.0,0.0]| 0.0|
| 0.0|(692,[127,128,129...|[-182844.62193963...| [1.0,0.0]| 0.0|
| 0.0|(692,[128,129,130...|[-246557.10990301...| [1.0,0.0]| 0.0|
| 0.0|(692,[152,153,154...|[-208282.08496711...| [1.0,0.0]| 0.0|
| 0.0|(692,[152,153,154...|[-243457.69885665...| [1.0,0.0]| 0.0|
| 0.0|(692,[153,154,155...|[-260933.50931276...| [1.0,0.0]| 0.0|
| 0.0|(692,[154,155,156...|[-220274.72552901...| [1.0,0.0]| 0.0|
| 0.0|(692,[181,182,183...|[-154830.07125175...| [1.0,0.0]| 0.0|
| 1.0|(692,[99,100,101,...|[-145978.24563975...| [0.0,1.0]| 1.0|
| 1.0|(692,[100,101,102...|[-147916.32657832...| [0.0,1.0]| 1.0|
| 1.0|(692,[123,124,125...|[-139663.27471685...| [0.0,1.0]| 1.0|
| 1.0|(692,[124,125,126...|[-129013.44238751...| [0.0,1.0]| 1.0|
| 1.0|(692,[125,126,127...|[-81829.799906049...| [0.0,1.0]| 1.0|
+-----+--------------------+--------------------+-----------+----------+
only showing top 20 rows
<Elasticsearch 설치 관련>
https://www.elastic.co/kr/downloads/elasticsearch
./bin/elasticsearch
웹브라우저로 접속하여 확인
http://localhost:9200/
<PostgreSQL 설치 관련>
brew나 PostgreSQL 공식 홈페이지의 설치 프로그램을 이용하여 설치를 했다가 초기 암호 문제 때문에 삽질.
모두 삭제하고 postgresapp을 설치하니 너무 쉽게 실행됨 ㅠㅠ
mac - postgresql & postgis install and setting
https://stackoverflow.com/questions/8037729/completely-uninstall-postgresql-9-0-4-from-mac-osx-lion
To remove the EnterpriseDB One-Click install of PostgreSQL 9.1:
- Open a terminal window. Terminal is found in: Applications->Utilities->Terminal
Run the uninstaller:
sudo /Library/PostgreSQL/9.1/uninstall-postgresql.app/Contents/MacOS/installbuilder.sh
If you installed with the Postgres Installer, you can do:
open /Library/PostgreSQL/9.2/uninstall-postgresql.app
It will ask for the administrator password and run the uninstaller.
Remove the PostgreSQL and data folders. The Wizard will notify you that these were not removed.
sudo rm -rf /Library/PostgreSQL
Remove the ini file:
sudo rm /etc/postgres-reg.ini
Remove the PostgreSQL user using System Preferences -> Users & Groups.
- Unlock the settings panel by clicking on the padlock and entering your password.
- Select the PostgreSQL user and click on the minus button.
Restore your shared memory settings:
sudo rm /etc/sysctl.conf
That should be all! The uninstall wizard would have removed all icons and start-up applications files so you don't have to worry about those.
<Elasticsearch와 Kafka 연동 관련>
Elasticsearch <--> Kafka
Kafka and Logstash 1.5 Integration
https://www.elastic.co/blog/logstash-kafka-intro
[Logstash] Kafka 연동 시 쉽게 디버깅 하기.
http://jjeong.tistory.com/1158
Logstash Output Kafka - Producer)
$ bin/logstash -e "input { stdin {} } output { kafka { bootstrap_servers => '172.x.x.x:9024' topic_id => 'logstash_logs' } }"
Logstash Input Kafka - Consumer)
$ bin/logstash -e "input { kafka { zk_connect => '172.x.x.x:2181' topic_id => 'logstash_logs' } } output { stdout { codec => rubydebug } }"
Kafka --> Elasticsearch
Kafka Connect Elasticsearch: Consuming and Indexing with Kafka Connect
https://sematext.com/blog/2017/03/06/kafka-connect-elasticsearch-how-to/
Kafka and Elasticsearch, a Perfect Match
https://qbox.io/blog/kafka-and-elasticsearch-a-perfect-match-1
kafka-elasticsearch-standalone-consumer
https://github.com/BigDataDevs/kafka-elasticsearch-consumer
Elastic Stack에는 Kafka면 충분합니다
https://www.elastic.co/kr/blog/just-enough-kafka-for-the-elastic-stack-part1
https://www.elastic.co/kr/blog/just-enough-kafka-for-the-elastic-stack-part2
<PostgreSQL과 Kafka 연동 관련>
Logstash Reference [5.4] » Input plugins » jdbc
https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-jdbc.html
https://stackoverflow.com/questions/42088523/cant-config-logstash-to-postgres
Bottled Water: Real-time integration of PostgreSQL and Kafka
https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/
https://github.com/confluentinc/bottledwater-pg
https://github.com/kayform/bwcontrol
<Kafka와 Spark streaming 연동 관련>
Kafka + Spark-Streaming with Python으로 실시간 분석시스템 만들기
Step 1: Download the code
https://kafka.apache.org/downloads
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
Step 2: Start the server
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Step 3: Create a topic
bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic scott
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic scott
Step 4: Send some messages(Start a Producer)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic scott
Step 5: Start a consumer
bin/kafka-console-consumer.sh -zookeeper localhost:2181 --topic scott --from-beginning
Step 6: Spark-Streaming
spark-stream이 가지고 있는 예제로 테스트
examples/src/main/python/streaming/kafka_wordcount.py
spark 기본 설정은 1초마다 결과를 출력
./bin/spark-submit --master spark://Hajiui-MacBook-Pro-2.local:7077 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 scott
kafka-console-producer로 Spark-Streaming에 데이터를 보내기
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic scott
Step 7: Logstash - Writing to kafka
./bin/logstash -e "input { stdin {} } output { kafka { topic_id => 'scott' } }"
Step 8: Insert into Logstash from PostgreSQL - Writing to kafka
logstash jdbc 플러그인 설치
./bin/logstash-plugin install logstash-input-jdbc
PostgreSQL JDBC 드라이버 다운로드
https://jdbc.postgresql.org/download.html#current
https://jdbc.postgresql.org/download/postgresql-42.1.3.jar
INSERT INTO LOGSTASH SELECT DATA FROM DATABASE
https://www.elastic.co/blog/logstash-jdbc-input-plugin
https://discuss.elastic.co/t/getting-started-with-jdbc-input-plugin/77162
setup database
create table contacts ( uid serial, email VARCHAR(80) not null, first_name VARCHAR(80) NOT NULL, last_name VARCHAR(80) NOT NULL ); INSERT INTO contacts(email, first_name, last_name) VALUES('jim@example.com', 'Jim', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES(null, 'John', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES('carol@example.com', 'Carol', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES('sam@example.com', 'Sam', null);
vi logstash_postgresql.conf
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
jdbc_user => "postgres"
jdbc_driver_library => "postgresql-42.1.3.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT * from contacts"
}
}
output {
stdout {
codec => json_lines
}
kafka {
topic_id => "scott"
codec => json_lines
}
}
./bin/logstash -f logstash_postgresql.conf
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to /Users/khjeong/Downloads/logstash-5.5.0/logs which is now configured via log4j2.properties
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2017-07-18T17:02:33,982][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-07-18T17:02:34,206][INFO ][logstash.pipeline ] Pipeline main started
[2017-07-18T17:02:34,305][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-18T17:02:34,880][INFO ][logstash.inputs.jdbc ] (0.023000s) SELECT * from contacts
{"uid":1,"@timestamp":"2017-07-18T08:02:34.916Z","@version":"1","last_name":"Smith","first_name":"Jim","email":"jim@example.com"}
{"uid":3,"@timestamp":"2017-07-18T08:02:34.918Z","@version":"1","last_name":"Smith","first_name":"Carol","email":"carol@example.com"}
[2017-07-18T17:02:37,228][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
spark streaming console :
-------------------------------------------
Time: 2017-07-18 17:14:44
-------------------------------------------
(u'{"uid":3,"@timestamp":"2017-07-18T08:14:14.000Z","@version":"1","last_name":"Smith","first_name":"Carol","email":"carol@example.com"}\n', 1)
(u'{"uid":1,"@timestamp":"2017-07-18T08:14:13.998Z","@version":"1","last_name":"Smith","first_name":"Jim","email":"jim@example.com"}\n', 1)
Step 9: Insert into Logstash from Elasticsearch - Writing to kafka
How to Import from CSV into Elasticsearch via Logstash and Sincedb
https://qbox.io/blog/import-csv-elasticsearch-logstash-sincedb
vi logstash_bitcoin.conf
input {
file {
path => "/Users/khjeong/Downloads/logstash-5.5.0/BCHARTS-MTGOXUSD.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
#Date,Open,High,Low,Close,Volume (BTC),Volume (Currency),Weighted Price
columns => ["Date","Open","High","Low","Close","Volume (BTC)", "Volume (Currency)" ,"Weighted Price"]
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "bitcoin-prices"
}
stdout {}
}
데이터 파일 경로
http://www.quandl.com/api/v1/datasets/BCHARTS/MTGOXUSD.csv
./bin/logstash -f logstash_bitcoin.conf
웹 브라우저에서 데이터 확인
http://localhost:9200/bitcoin-prices/logs/_search
Elasticsearch input pluginedit
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html
예제에 있는대로 query => "*" 라고 했다가 파싱 에러가 계속 발생하였음
vi logstash_elasticsearch.conf
input {
elasticsearch {
hosts => "localhost"
index => "bitcoin-prices"
type => "logs"
query => "{}"
}
}
output {
stdout {
}
kafka {
topic_id => "scott"
}
}
./bin/logstash -f logstash_elasticsearch.conf
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
Sending Logstash's logs to /Users/khjeong/Downloads/logstash-5.5.0/logs which is now configured via log4j2.properties
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2017-07-18T18:40:48,934][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-07-18T18:40:48,958][INFO ][logstash.pipeline ] Pipeline main started
[2017-07-18T18:40:49,140][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
2017-07-18T09:05:56.833Z Hajiui-MacBook-Pro-2.local 2010-10-30,0.1876,0.199,0.1875,0.1989,26708.94,5112.034034,0.191397862813
2017-07-18T09:08:54.582Z Hajiui-MacBook-Pro-2.local 2014-01-07,1012.88,1044.2,880.0,880.0,22263.9752165,21388990.0933,960.699510547
2017-07-18T09:08:54.426Z Hajiui-MacBook-Pro-2.local 2014-02-20,264.328,271.43,109.0,111.697,101724.171543,16298837.3816,160.225806064
2017-07-18T09:08:54.436Z Hajiui-MacBook-Pro-2.local 2014-02-16,371.0,540.0,220.29327,299.72277,86061.3453883,26181409.739,304.217992653
.................
2017-07-18T09:08:56.705Z Hajiui-MacBook-Pro-2.local 2010-10-16,0.1,0.103,0.1,0.101,6284.708,633.63076692,0.100821035268
2017-07-18T09:08:56.707Z Hajiui-MacBook-Pro-2.local 2010-10-09,0.08411,0.12001,0.068,0.0938,187846.954,16104.8730071,0.085734011993
2017-07-18T09:08:56.709Z Hajiui-MacBook-Pro-2.local 2010-10-05,0.06132,0.06301,0.0609,0.0614,27526.613,1699.19527154,0.061729180831
[2017-07-18T18:40:51,981][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
spark streaming console :
-------------------------------------------
Time: 2017-07-18 18:40:50
-------------------------------------------
(u'2017-07-18T09:08:55.674Z', 1)
(u'2012-02-26,4.77303,5.1,4.77303,4.922,70034.6057242,347092.755801,4.95601784591', 1)
(u'2017-07-18T09:08:56.673Z', 1)
(u'2017-07-18T09:08:56.648Z', 1)
(u'2017-07-18T09:08:55.465Z', 1)
(u'2017-07-18T09:08:56.402Z', 1)
(u'2017-07-18T09:08:55.090Z', 1)
(u'2017-07-18T09:08:55.232Z', 1)
(u'2011-11-01,3.25617,3.35,3.07001,3.15,37893.4724479,120727.902337,3.18598150389', 1)
(u'2010-09-11,0.0619,0.065,0.0619,0.06366,7749.14,491.192371,0.063386694653', 1)
...