2017. 9. 5. 13:27ㆍ서버 프로그래밍
1. Kafka 세팅
Kafka를 이용하여 메시지를 발행/구독하는데, 주고 받는 메시지의 기본 크기는 1메가 이하이다. 1메가 이상의 메시지를 주고 받을 수 있도록 하려면, Kafka의 config 파일에 message.max.bytes를 원하는 만큼 지정해주어야 한다. 100메가 정도까지 지정하고 테스트 해보았는데, 문제 없이 동작한다.
https://kafka.apache.org/08/documentation.html
message.max.bytes | 1000000 | The maximum size of a message that the server can receive. It is important that this property be in sync with the maximum fetch size your consumers use or else an unruly producer will be able to publish messages too large for consumers to consume. |
$ vi server.properties
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
message.max.bytes=100000000
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
message.max.bytes만 올리고 socket.request.max.bytes를 올리지 않으니 다음과 같은 오류가 발생한다. socket.request.max.bytes의 값도 원하는 만큼 증가를 시켜야 하는 듯하다.
[2017-09-06 13:20:28,027] WARN Unexpected error from /172.31.11.179; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 125376280 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at kafka.network.Processor.poll(SocketServer.scala:499)
at kafka.network.Processor.run(SocketServer.scala:435)
at java.lang.Thread.run(Thread.java:748)
$ vi server.properties
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=300000000
message.max.bytes=300000000
2. kafka-node의 producer에서 메시지 압축하도록 수정
100메가가 넘는 메시지를 kafka로 전송은 가능한데, 압축을 하지 않고 보내면 수신측에서 문제가 생기거나 Kafka 자체에도 메모리 부족 현상이 발생한다. 메시지를 압축해서 전송하도록 수정하면 깔끔하게 처리된다.
https://www.npmjs.com/package/kafka-node
send(payloads, cb)
payloads
: Array,array ofProduceRequest
,ProduceRequest
is a JSON object like:
{topic: 'topicName'messages: 'message body' // multi messages should be a array, single message can be just a string or a KeyedMessage instancekey: 'theKey' // only needed when using keyed partitionerpartition: 0 // default 0attributes: 2 // default: 0timestamp: Date // <-- defaults to Date.now() (only available with kafka v0.10 and KafkaClient only)}
cb
: Function, the callback
attributes
controls compression of the message set. It supports the following values:
0
: No compression1
: Compress using GZip2
: Compress using snappy
What is the solution?
Thankfully, Kafka support message compression. Thankfully again, max message size in all the above equations corresponds to compressed size. Luckily, in our case the messages are text messages and the compression ratio was superb. Our 300 MB message came to 50 MB after GZIP compression. So, we have enabled compression.
3. Spark Streaming 프로그램 수정
Kafka에서 큰 사이즈의 메시지를 주고 받을 수 있도록 했다고 해도, Spark Streaming 프로그램에서도 별도의 처리를 해주지 않으면 1메가 이상의 메시지를 수신하지 못하는 문제가 있다.
다음과 같이 기본적으로 Kafka로 부터 지정된 토픽을 수신하는 코드를 그대로 사용하면 1메가 이상의 메시지를 처리할 수 없다.
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
KafkaUtils 클래스에 선언되어 있는 createStream 메소드 중에서 KafkaParams를 사용할 수 있는 메소드를 사용하도록 변경해야 한다. kafkaParams 객체 정의할 때, fetch.message.max.bytes와 fetch.size 크기를 앞에서 Kafka에서 세팅한 메시지 크기에 맞춰준다.
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group,
"zookeeper.connection.timeout.ms" -> "10000",
"fetch.message.max.bytes" -> "100000000",
"fetch.size" -> "100000000")
val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,StorageLevel.MEMORY_ONLY).map(_._2)
static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> | createStream(JavaStreamingContext jssc, Class<K> keyTypeClass, Class<V> valueTypeClass, Class<U> keyDecoderClass, Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams, java.util.Map<String,Integer> topics,StorageLevel storageLevel) Create an input stream that pulls messages from Kafka Brokers. |
static JavaPairReceiverInputDStream<String,String> | createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.Map<String,Integer> topics) Create an input stream that pulls messages from Kafka Brokers. |
static JavaPairReceiverInputDStream<String,String> | createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.Map<String,Integer> topics, StorageLevel storageLevel) Create an input stream that pulls messages from Kafka Brokers. |
static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> | createStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2, scala.reflect.ClassTag<U> evidence$3, scala.reflect.ClassTag<T> evidence$4) Create an input stream that pulls messages from Kafka Brokers. |
static ReceiverInputDStream<scala.Tuple2<String,String>> | createStream(StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel) Create an input stream that pulls messages from Kafka Brokers. |
// setup Kafka with manual parameters to allow big messaging //see spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> group, "zookeeper.connection.timeout.ms" -> "10000", "fetch.message.max.bytes" -> "10485760", // 10MB "fetch.size" -> "10485760") // not needed? val lines = kafka.KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
https://stackoverflow.com/questions/40076691/cant-acess-kafka-serializer-stringdecoder
import org.apache.spark.streaming._
import kafka.serializer.StringDecoder
You may get the following error:
error: object serializer is not a member of package org.apache.spark.streaming.kafka
There is another kafka package included by the maven package in my above example, and therefore it was imported as part of "org.apache.spark.streaming._"
To resolve, do as follows:
import org.apache.spark.streaming._
import _root_.kafka.serializer.StringDecoder
이렇게 큰 메시지를 수신할 수 있도록 해놓아도, 실제로 큰 메시지가 수신되면 자바 힙 메모리 사이즈가 모자라다는 오류가 발생한다. spark-submit 실행 시에 --driver-memory 4000M와 같은 식으로 여유있게 잡아 주면 된다.
java.lang.OutOfMemoryError: Java heap space
https://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space
spark-1.6.1/bin/spark-submit
--class "MyClass"
--driver-memory 12g
--master local[*]
target/scala-2.10/simple-project_2.10-1.0.jar