ElasticSearch,PostgreSQL,Spark,Kafka,Logstash 관련 레퍼런스

2017. 7. 18. 11:17서버 프로그래밍

<Spark 설치 관련>


Download Spark: spark-2.2.0-bin-hadoop2.7.tgz

Spark StandAlone 설치부터 예제 테스트까지.



./sbin/start-slave.sh spark://Hajiui-MacBook-Pro-2.local:7077

./bin/pyspark --master spark://Hajiui-MacBook-Pro-2.local:7077

Python 2.7.10 (default, Feb  7 2017, 00:08:15) 

[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin

Type "help", "copyright", "credits" or "license" for more information.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

17/07/18 13:22:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

17/07/18 13:23:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0


Using Python version 2.7.10 (default, Feb  7 2017 00:08:15)

SparkSession available as 'spark'.


./bin/spark-submit --master spark://Hajiui-MacBook-Pro-2.local:7077 examples/src/main/python/ml/naive_bayes_example.py

17/07/18 13:20:29 INFO CodeGenerator: Code generated in 21.876471 ms


|label|            features|       rawPrediction|probability|prediction|


|  0.0|(692,[95,96,97,12...|[-174115.98587057...|  [1.0,0.0]|       0.0|

|  0.0|(692,[98,99,100,1...|[-178402.52307196...|  [1.0,0.0]|       0.0|

|  0.0|(692,[100,101,102...|[-100905.88974016...|  [1.0,0.0]|       0.0|

|  0.0|(692,[123,124,125...|[-244784.29791241...|  [1.0,0.0]|       0.0|

|  0.0|(692,[123,124,125...|[-196900.88506109...|  [1.0,0.0]|       0.0|

|  0.0|(692,[124,125,126...|[-238164.45338794...|  [1.0,0.0]|       0.0|

|  0.0|(692,[124,125,126...|[-184206.87833381...|  [1.0,0.0]|       0.0|

|  0.0|(692,[127,128,129...|[-214174.52863813...|  [1.0,0.0]|       0.0|

|  0.0|(692,[127,128,129...|[-182844.62193963...|  [1.0,0.0]|       0.0|

|  0.0|(692,[128,129,130...|[-246557.10990301...|  [1.0,0.0]|       0.0|

|  0.0|(692,[152,153,154...|[-208282.08496711...|  [1.0,0.0]|       0.0|

|  0.0|(692,[152,153,154...|[-243457.69885665...|  [1.0,0.0]|       0.0|

|  0.0|(692,[153,154,155...|[-260933.50931276...|  [1.0,0.0]|       0.0|

|  0.0|(692,[154,155,156...|[-220274.72552901...|  [1.0,0.0]|       0.0|

|  0.0|(692,[181,182,183...|[-154830.07125175...|  [1.0,0.0]|       0.0|

|  1.0|(692,[99,100,101,...|[-145978.24563975...|  [0.0,1.0]|       1.0|

|  1.0|(692,[100,101,102...|[-147916.32657832...|  [0.0,1.0]|       1.0|

|  1.0|(692,[123,124,125...|[-139663.27471685...|  [0.0,1.0]|       1.0|

|  1.0|(692,[124,125,126...|[-129013.44238751...|  [0.0,1.0]|       1.0|

|  1.0|(692,[125,126,127...|[-81829.799906049...|  [0.0,1.0]|       1.0|


only showing top 20 rows

<Elasticsearch 설치 관련>



웹브라우저로 접속하여 확인


<PostgreSQL 설치 관련>


brew나 PostgreSQL 공식 홈페이지의 설치 프로그램을 이용하여 설치를 했다가 초기 암호 문제 때문에 삽질.

모두 삭제하고 postgresapp을 설치하니 너무 쉽게 실행됨 ㅠㅠ

mac - postgresql & postgis install and setting



To remove the EnterpriseDB One-Click install of PostgreSQL 9.1:

  1. Open a terminal window. Terminal is found in: Applications->Utilities->Terminal
  2. Run the uninstaller:

    sudo /Library/PostgreSQL/9.1/uninstall-postgresql.app/Contents/MacOS/installbuilder.sh

    If you installed with the Postgres Installer, you can do:

    open /Library/PostgreSQL/9.2/uninstall-postgresql.app

    It will ask for the administrator password and run the uninstaller.

  3. Remove the PostgreSQL and data folders. The Wizard will notify you that these were not removed.

    sudo rm -rf /Library/PostgreSQL
  4. Remove the ini file:

    sudo rm /etc/postgres-reg.ini
  5. Remove the PostgreSQL user using System Preferences -> Users & Groups.

    1. Unlock the settings panel by clicking on the padlock and entering your password.
    2. Select the PostgreSQL user and click on the minus button.
  6. Restore your shared memory settings:

    sudo rm /etc/sysctl.conf

That should be all! The uninstall wizard would have removed all icons and start-up applications files so you don't have to worry about those.

<Elasticsearch와 Kafka 연동 관련>

Elasticsearch <--> Kafka

Kafka and Logstash 1.5 Integration


[Logstash] Kafka 연동 시 쉽게 디버깅 하기.


Logstash Output Kafka - Producer)

$ bin/logstash -e "input { stdin {} } output { kafka { bootstrap_servers => '172.x.x.x:9024' topic_id => 'logstash_logs' } }"

Logstash Input Kafka - Consumer)

$ bin/logstash -e "input { kafka { zk_connect => '172.x.x.x:2181' topic_id => 'logstash_logs' } } output { stdout { codec => rubydebug } }"

Kafka --> Elasticsearch

Kafka Connect Elasticsearch: Consuming and Indexing with Kafka Connect


Kafka and Elasticsearch, a Perfect Match




Elastic Stack에는 Kafka면 충분합니다 



<PostgreSQL과 Kafka 연동 관련>

Logstash Reference [5.4] » Input plugins » jdbc



Bottled Water: Real-time integration of PostgreSQL and Kafka




<Kafka와 Spark streaming 연동 관련>

Kafka + Spark-Streaming with Python으로 실시간 분석시스템 만들기


Step 1: Download the code



Step 2: Start the server

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

Step 3: Create a topic

bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic scott

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic scott

Step 4: Send some messages(Start a Producer)

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic scott

Step 5: Start a consumer

bin/kafka-console-consumer.sh -zookeeper localhost:2181 --topic scott --from-beginning

Step 6: Spark-Streaming

spark-stream이 가지고 있는 예제로 테스트


spark 기본 설정은 1초마다 결과를 출력

./bin/spark-submit --master spark://Hajiui-MacBook-Pro-2.local:7077 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 scott

kafka-console-producer로 Spark-Streaming에 데이터를 보내기

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic scott

Step 7: Logstash - Writing to kafka

./bin/logstash -e "input { stdin {} } output { kafka { topic_id => 'scott' } }"

Step 8: Insert into Logstash from PostgreSQL - Writing to kafka

logstash jdbc 플러그인 설치

./bin/logstash-plugin install logstash-input-jdbc

PostgreSQL JDBC 드라이버 다운로드






setup database

create table contacts (     uid serial,     email VARCHAR(80) not null,     first_name VARCHAR(80) NOT NULL,     last_name VARCHAR(80) NOT NULL ); INSERT INTO contacts(email, first_name, last_name) VALUES('jim@example.com', 'Jim', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES(null, 'John', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES('carol@example.com', 'Carol', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES('sam@example.com', 'Sam', null);

vi logstash_postgresql.conf

input {

        jdbc {

                jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"

                jdbc_user => "postgres"

                jdbc_driver_library => "postgresql-42.1.3.jar"

                jdbc_driver_class => "org.postgresql.Driver"

                statement => "SELECT * from contacts"



output {

        stdout {

                codec => json_lines


        kafka {

                topic_id => "scott"

                codec => json_lines



./bin/logstash -f logstash_postgresql.conf 

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

Sending Logstash's logs to /Users/khjeong/Downloads/logstash-5.5.0/logs which is now configured via log4j2.properties

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

[2017-07-18T17:02:33,982][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}

[2017-07-18T17:02:34,206][INFO ][logstash.pipeline        ] Pipeline main started

[2017-07-18T17:02:34,305][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

[2017-07-18T17:02:34,880][INFO ][logstash.inputs.jdbc     ] (0.023000s) SELECT * from contacts



[2017-07-18T17:02:37,228][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

spark streaming console :


Time: 2017-07-18 17:14:44


(u'{"uid":3,"@timestamp":"2017-07-18T08:14:14.000Z","@version":"1","last_name":"Smith","first_name":"Carol","email":"carol@example.com"}\n', 1)

(u'{"uid":1,"@timestamp":"2017-07-18T08:14:13.998Z","@version":"1","last_name":"Smith","first_name":"Jim","email":"jim@example.com"}\n', 1)

Step 9: Insert into Logstash from Elasticsearch - Writing to kafka

How to Import from CSV into Elasticsearch via Logstash and Sincedb


vi logstash_bitcoin.conf

input {

  file {

    path => "/Users/khjeong/Downloads/logstash-5.5.0/BCHARTS-MTGOXUSD.csv"

    start_position => "beginning"

   sincedb_path => "/dev/null"



filter {

  csv {

      separator => ","

#Date,Open,High,Low,Close,Volume (BTC),Volume (Currency),Weighted Price

     columns => ["Date","Open","High","Low","Close","Volume (BTC)", "Volume (Currency)" ,"Weighted Price"]



output {

   elasticsearch {

     hosts => "http://localhost:9200"

     index => "bitcoin-prices"


stdout {}


데이터 파일 경로


./bin/logstash -f logstash_bitcoin.conf

웹 브라우저에서 데이터 확인


Elasticsearch input pluginedit


예제에 있는대로 query => "*" 라고 했다가 파싱 에러가 계속 발생하였음

vi logstash_elasticsearch.conf

input {

  elasticsearch {

    hosts => "localhost"

    index => "bitcoin-prices"

    type => "logs"

    query => "{}"



output {

  stdout {


  kafka {

    topic_id => "scott"



./bin/logstash -f logstash_elasticsearch.conf 

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

Sending Logstash's logs to /Users/khjeong/Downloads/logstash-5.5.0/logs which is now configured via log4j2.properties

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

[2017-07-18T18:40:48,934][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}

[2017-07-18T18:40:48,958][INFO ][logstash.pipeline        ] Pipeline main started

[2017-07-18T18:40:49,140][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

2017-07-18T09:05:56.833Z Hajiui-MacBook-Pro-2.local 2010-10-30,0.1876,0.199,0.1875,0.1989,26708.94,5112.034034,0.191397862813

2017-07-18T09:08:54.582Z Hajiui-MacBook-Pro-2.local 2014-01-07,1012.88,1044.2,880.0,880.0,22263.9752165,21388990.0933,960.699510547

2017-07-18T09:08:54.426Z Hajiui-MacBook-Pro-2.local 2014-02-20,264.328,271.43,109.0,111.697,101724.171543,16298837.3816,160.225806064

2017-07-18T09:08:54.436Z Hajiui-MacBook-Pro-2.local 2014-02-16,371.0,540.0,220.29327,299.72277,86061.3453883,26181409.739,304.217992653


2017-07-18T09:08:56.705Z Hajiui-MacBook-Pro-2.local 2010-10-16,0.1,0.103,0.1,0.101,6284.708,633.63076692,0.100821035268

2017-07-18T09:08:56.707Z Hajiui-MacBook-Pro-2.local 2010-10-09,0.08411,0.12001,0.068,0.0938,187846.954,16104.8730071,0.085734011993

2017-07-18T09:08:56.709Z Hajiui-MacBook-Pro-2.local 2010-10-05,0.06132,0.06301,0.0609,0.0614,27526.613,1699.19527154,0.061729180831

[2017-07-18T18:40:51,981][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

spark streaming console :


Time: 2017-07-18 18:40:50


(u'2017-07-18T09:08:55.674Z', 1)

(u'2012-02-26,4.77303,5.1,4.77303,4.922,70034.6057242,347092.755801,4.95601784591', 1)

(u'2017-07-18T09:08:56.673Z', 1)

(u'2017-07-18T09:08:56.648Z', 1)

(u'2017-07-18T09:08:55.465Z', 1)

(u'2017-07-18T09:08:56.402Z', 1)

(u'2017-07-18T09:08:55.090Z', 1)

(u'2017-07-18T09:08:55.232Z', 1)

(u'2011-11-01,3.25617,3.35,3.07001,3.15,37893.4724479,120727.902337,3.18598150389', 1)

(u'2010-09-11,0.0619,0.065,0.0619,0.06366,7749.14,491.192371,0.063386694653', 1)
