ElasticSearch,PostgreSQL,Spark,Kafka,Logstash 관련 레퍼런스

2017. 7. 18. 11:17서버 프로그래밍


<Spark 설치 관련>

https://spark.apache.org/downloads.html

Download Spark: spark-2.2.0-bin-hadoop2.7.tgz


Spark StandAlone 설치부터 예제 테스트까지.

http://hellowuniverse.com/2017/03/08/spark-standalone-%EC%84%A4%EC%B9%98%EB%B6%80%ED%84%B0-%EC%98%88%EC%A0%9C-%ED%85%8C%EC%8A%A4%ED%8A%B8%EA%B9%8C%EC%A7%80/

./sbin/start-master.sh

./sbin/start-slave.sh spark://Hajiui-MacBook-Pro-2.local:7077


./bin/pyspark --master spark://Hajiui-MacBook-Pro-2.local:7077


Python 2.7.10 (default, Feb  7 2017, 00:08:15) 

[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin

Type "help", "copyright", "credits" or "license" for more information.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

17/07/18 13:22:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

17/07/18 13:23:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0

      /_/


Using Python version 2.7.10 (default, Feb  7 2017 00:08:15)

SparkSession available as 'spark'.

>>> 


./bin/spark-submit --master spark://Hajiui-MacBook-Pro-2.local:7077 examples/src/main/python/ml/naive_bayes_example.py


17/07/18 13:20:29 INFO CodeGenerator: Code generated in 21.876471 ms

+-----+--------------------+--------------------+-----------+----------+

|label|            features|       rawPrediction|probability|prediction|

+-----+--------------------+--------------------+-----------+----------+

|  0.0|(692,[95,96,97,12...|[-174115.98587057...|  [1.0,0.0]|       0.0|

|  0.0|(692,[98,99,100,1...|[-178402.52307196...|  [1.0,0.0]|       0.0|

|  0.0|(692,[100,101,102...|[-100905.88974016...|  [1.0,0.0]|       0.0|

|  0.0|(692,[123,124,125...|[-244784.29791241...|  [1.0,0.0]|       0.0|

|  0.0|(692,[123,124,125...|[-196900.88506109...|  [1.0,0.0]|       0.0|

|  0.0|(692,[124,125,126...|[-238164.45338794...|  [1.0,0.0]|       0.0|

|  0.0|(692,[124,125,126...|[-184206.87833381...|  [1.0,0.0]|       0.0|

|  0.0|(692,[127,128,129...|[-214174.52863813...|  [1.0,0.0]|       0.0|

|  0.0|(692,[127,128,129...|[-182844.62193963...|  [1.0,0.0]|       0.0|

|  0.0|(692,[128,129,130...|[-246557.10990301...|  [1.0,0.0]|       0.0|

|  0.0|(692,[152,153,154...|[-208282.08496711...|  [1.0,0.0]|       0.0|

|  0.0|(692,[152,153,154...|[-243457.69885665...|  [1.0,0.0]|       0.0|

|  0.0|(692,[153,154,155...|[-260933.50931276...|  [1.0,0.0]|       0.0|

|  0.0|(692,[154,155,156...|[-220274.72552901...|  [1.0,0.0]|       0.0|

|  0.0|(692,[181,182,183...|[-154830.07125175...|  [1.0,0.0]|       0.0|

|  1.0|(692,[99,100,101,...|[-145978.24563975...|  [0.0,1.0]|       1.0|

|  1.0|(692,[100,101,102...|[-147916.32657832...|  [0.0,1.0]|       1.0|

|  1.0|(692,[123,124,125...|[-139663.27471685...|  [0.0,1.0]|       1.0|

|  1.0|(692,[124,125,126...|[-129013.44238751...|  [0.0,1.0]|       1.0|

|  1.0|(692,[125,126,127...|[-81829.799906049...|  [0.0,1.0]|       1.0|

+-----+--------------------+--------------------+-----------+----------+

only showing top 20 rows


<Elasticsearch 설치 관련>

https://www.elastic.co/kr/downloads/elasticsearch

./bin/elasticsearch


웹브라우저로 접속하여 확인


http://localhost:9200/


<PostgreSQL 설치 관련>

http://postgresapp.com/

brew나 PostgreSQL 공식 홈페이지의 설치 프로그램을 이용하여 설치를 했다가 초기 암호 문제 때문에 삽질.

모두 삭제하고 postgresapp을 설치하니 너무 쉽게 실행됨 ㅠㅠ


mac - postgresql & postgis install and setting

http://ngee.tistory.com/1051


https://stackoverflow.com/questions/8037729/completely-uninstall-postgresql-9-0-4-from-mac-osx-lion

To remove the EnterpriseDB One-Click install of PostgreSQL 9.1:

  1. Open a terminal window. Terminal is found in: Applications->Utilities->Terminal
  2. Run the uninstaller:

    sudo /Library/PostgreSQL/9.1/uninstall-postgresql.app/Contents/MacOS/installbuilder.sh

    If you installed with the Postgres Installer, you can do:

    open /Library/PostgreSQL/9.2/uninstall-postgresql.app

    It will ask for the administrator password and run the uninstaller.

  3. Remove the PostgreSQL and data folders. The Wizard will notify you that these were not removed.

    sudo rm -rf /Library/PostgreSQL
  4. Remove the ini file:

    sudo rm /etc/postgres-reg.ini
  5. Remove the PostgreSQL user using System Preferences -> Users & Groups.

    1. Unlock the settings panel by clicking on the padlock and entering your password.
    2. Select the PostgreSQL user and click on the minus button.
  6. Restore your shared memory settings:

    sudo rm /etc/sysctl.conf

That should be all! The uninstall wizard would have removed all icons and start-up applications files so you don't have to worry about those.


<Elasticsearch와 Kafka 연동 관련>

Elasticsearch <--> Kafka

Kafka and Logstash 1.5 Integration

https://www.elastic.co/blog/logstash-kafka-intro

[Logstash] Kafka 연동 시 쉽게 디버깅 하기.

http://jjeong.tistory.com/1158

Logstash Output Kafka - Producer)

$ bin/logstash -e "input { stdin {} } output { kafka { bootstrap_servers => '172.x.x.x:9024' topic_id => 'logstash_logs' } }"


Logstash Input Kafka - Consumer)

$ bin/logstash -e "input { kafka { zk_connect => '172.x.x.x:2181' topic_id => 'logstash_logs' } } output { stdout { codec => rubydebug } }"


Kafka --> Elasticsearch

Kafka Connect Elasticsearch: Consuming and Indexing with Kafka Connect

https://sematext.com/blog/2017/03/06/kafka-connect-elasticsearch-how-to/

Kafka and Elasticsearch, a Perfect Match

https://qbox.io/blog/kafka-and-elasticsearch-a-perfect-match-1

kafka-elasticsearch-standalone-consumer

https://github.com/BigDataDevs/kafka-elasticsearch-consumer


Elastic Stack에는 Kafka면 충분합니다 

https://www.elastic.co/kr/blog/just-enough-kafka-for-the-elastic-stack-part1

https://www.elastic.co/kr/blog/just-enough-kafka-for-the-elastic-stack-part2


<PostgreSQL과 Kafka 연동 관련>

Logstash Reference [5.4] » Input plugins » jdbc

https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-jdbc.html

https://stackoverflow.com/questions/42088523/cant-config-logstash-to-postgres


Bottled Water: Real-time integration of PostgreSQL and Kafka

https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/

https://github.com/confluentinc/bottledwater-pg

https://github.com/kayform/bwcontrol


<Kafka와 Spark streaming 연동 관련>

Kafka + Spark-Streaming with Python으로 실시간 분석시스템 만들기

http://hellowuniverse.com/2017/04/26/kafka-spark-streaming-with-python%EC%9C%BC%EB%A1%9C-%EC%8B%A4%EC%8B%9C%EA%B0%84-%EB%B6%84%EC%84%9D%EC%8B%9C%EC%8A%A4%ED%85%9C-%EB%A7%8C%EB%93%A4%EA%B8%B0/


Step 1: Download the code

https://kafka.apache.org/downloads

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz

Step 2: Start the server

bin/zookeeper-server-start.sh config/zookeeper.properties


bin/kafka-server-start.sh config/server.properties

Step 3: Create a topic


bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic scott


bin/kafka-topics.sh --list --zookeeper localhost:2181


bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic scott


Step 4: Send some messages(Start a Producer)


bin/kafka-console-producer.sh --broker-list localhost:9092 --topic scott


Step 5: Start a consumer


bin/kafka-console-consumer.sh -zookeeper localhost:2181 --topic scott --from-beginning

Step 6: Spark-Streaming


spark-stream이 가지고 있는 예제로 테스트

examples/src/main/python/streaming/kafka_wordcount.py

spark 기본 설정은 1초마다 결과를 출력


./bin/spark-submit --master spark://Hajiui-MacBook-Pro-2.local:7077 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 scott

kafka-console-producer로 Spark-Streaming에 데이터를 보내기

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic scott

Step 7: Logstash - Writing to kafka

./bin/logstash -e "input { stdin {} } output { kafka { topic_id => 'scott' } }"


Step 8: Insert into Logstash from PostgreSQL - Writing to kafka

logstash jdbc 플러그인 설치

./bin/logstash-plugin install logstash-input-jdbc

PostgreSQL JDBC 드라이버 다운로드

https://jdbc.postgresql.org/download.html#current

https://jdbc.postgresql.org/download/postgresql-42.1.3.jar


INSERT INTO LOGSTASH SELECT DATA FROM DATABASE

https://www.elastic.co/blog/logstash-jdbc-input-plugin

https://discuss.elastic.co/t/getting-started-with-jdbc-input-plugin/77162


setup database

create table contacts (     uid serial,     email VARCHAR(80) not null,     first_name VARCHAR(80) NOT NULL,     last_name VARCHAR(80) NOT NULL ); INSERT INTO contacts(email, first_name, last_name) VALUES('jim@example.com', 'Jim', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES(null, 'John', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES('carol@example.com', 'Carol', 'Smith'); INSERT INTO contacts(email, first_name, last_name) VALUES('sam@example.com', 'Sam', null);


vi logstash_postgresql.conf


input {

        jdbc {

                jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"

                jdbc_user => "postgres"

                jdbc_driver_library => "postgresql-42.1.3.jar"

                jdbc_driver_class => "org.postgresql.Driver"

                statement => "SELECT * from contacts"

        }

}

output {

        stdout {

                codec => json_lines

        }

        kafka {

                topic_id => "scott"

                codec => json_lines

        }

}


./bin/logstash -f logstash_postgresql.conf 

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

Sending Logstash's logs to /Users/khjeong/Downloads/logstash-5.5.0/logs which is now configured via log4j2.properties

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

[2017-07-18T17:02:33,982][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}

[2017-07-18T17:02:34,206][INFO ][logstash.pipeline        ] Pipeline main started

[2017-07-18T17:02:34,305][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

[2017-07-18T17:02:34,880][INFO ][logstash.inputs.jdbc     ] (0.023000s) SELECT * from contacts

{"uid":1,"@timestamp":"2017-07-18T08:02:34.916Z","@version":"1","last_name":"Smith","first_name":"Jim","email":"jim@example.com"}

{"uid":3,"@timestamp":"2017-07-18T08:02:34.918Z","@version":"1","last_name":"Smith","first_name":"Carol","email":"carol@example.com"}

[2017-07-18T17:02:37,228][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}


spark streaming console :

-------------------------------------------

Time: 2017-07-18 17:14:44

-------------------------------------------

(u'{"uid":3,"@timestamp":"2017-07-18T08:14:14.000Z","@version":"1","last_name":"Smith","first_name":"Carol","email":"carol@example.com"}\n', 1)

(u'{"uid":1,"@timestamp":"2017-07-18T08:14:13.998Z","@version":"1","last_name":"Smith","first_name":"Jim","email":"jim@example.com"}\n', 1)


Step 9: Insert into Logstash from Elasticsearch - Writing to kafka

How to Import from CSV into Elasticsearch via Logstash and Sincedb

https://qbox.io/blog/import-csv-elasticsearch-logstash-sincedb


vi logstash_bitcoin.conf

input {

  file {

    path => "/Users/khjeong/Downloads/logstash-5.5.0/BCHARTS-MTGOXUSD.csv"

    start_position => "beginning"

   sincedb_path => "/dev/null"

  }

}

filter {

  csv {

      separator => ","

#Date,Open,High,Low,Close,Volume (BTC),Volume (Currency),Weighted Price

     columns => ["Date","Open","High","Low","Close","Volume (BTC)", "Volume (Currency)" ,"Weighted Price"]

  }

}

output {

   elasticsearch {

     hosts => "http://localhost:9200"

     index => "bitcoin-prices"

  }

stdout {}

}

데이터 파일 경로

http://www.quandl.com/api/v1/datasets/BCHARTS/MTGOXUSD.csv


./bin/logstash -f logstash_bitcoin.conf


웹 브라우저에서 데이터 확인

http://localhost:9200/bitcoin-prices/logs/_search


Elasticsearch input pluginedit

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html

예제에 있는대로 query => "*" 라고 했다가 파싱 에러가 계속 발생하였음


vi logstash_elasticsearch.conf

input {

  elasticsearch {

    hosts => "localhost"

    index => "bitcoin-prices"

    type => "logs"

    query => "{}"

  }

}

output {

  stdout {

  }

  kafka {

    topic_id => "scott"

  }

}


./bin/logstash -f logstash_elasticsearch.conf 

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

Sending Logstash's logs to /Users/khjeong/Downloads/logstash-5.5.0/logs which is now configured via log4j2.properties

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

[2017-07-18T18:40:48,934][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}

[2017-07-18T18:40:48,958][INFO ][logstash.pipeline        ] Pipeline main started

[2017-07-18T18:40:49,140][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

2017-07-18T09:05:56.833Z Hajiui-MacBook-Pro-2.local 2010-10-30,0.1876,0.199,0.1875,0.1989,26708.94,5112.034034,0.191397862813

2017-07-18T09:08:54.582Z Hajiui-MacBook-Pro-2.local 2014-01-07,1012.88,1044.2,880.0,880.0,22263.9752165,21388990.0933,960.699510547

2017-07-18T09:08:54.426Z Hajiui-MacBook-Pro-2.local 2014-02-20,264.328,271.43,109.0,111.697,101724.171543,16298837.3816,160.225806064

2017-07-18T09:08:54.436Z Hajiui-MacBook-Pro-2.local 2014-02-16,371.0,540.0,220.29327,299.72277,86061.3453883,26181409.739,304.217992653


.................


2017-07-18T09:08:56.705Z Hajiui-MacBook-Pro-2.local 2010-10-16,0.1,0.103,0.1,0.101,6284.708,633.63076692,0.100821035268

2017-07-18T09:08:56.707Z Hajiui-MacBook-Pro-2.local 2010-10-09,0.08411,0.12001,0.068,0.0938,187846.954,16104.8730071,0.085734011993

2017-07-18T09:08:56.709Z Hajiui-MacBook-Pro-2.local 2010-10-05,0.06132,0.06301,0.0609,0.0614,27526.613,1699.19527154,0.061729180831

[2017-07-18T18:40:51,981][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}



spark streaming console :

-------------------------------------------

Time: 2017-07-18 18:40:50

-------------------------------------------

(u'2017-07-18T09:08:55.674Z', 1)

(u'2012-02-26,4.77303,5.1,4.77303,4.922,70034.6057242,347092.755801,4.95601784591', 1)

(u'2017-07-18T09:08:56.673Z', 1)

(u'2017-07-18T09:08:56.648Z', 1)

(u'2017-07-18T09:08:55.465Z', 1)

(u'2017-07-18T09:08:56.402Z', 1)

(u'2017-07-18T09:08:55.090Z', 1)

(u'2017-07-18T09:08:55.232Z', 1)

(u'2011-11-01,3.25617,3.35,3.07001,3.15,37893.4724479,120727.902337,3.18598150389', 1)

(u'2010-09-11,0.0619,0.065,0.0619,0.06366,7749.14,491.192371,0.063386694653', 1)

...