Kafka - Spark Streaming 1메가 이상의 메시지 처리 방법

2017. 9. 5. 13:27서버 프로그래밍

1. Kafka 세팅

Kafka를 이용하여 메시지를 발행/구독하는데, 주고 받는 메시지의 기본 크기는 1메가 이하이다. 1메가 이상의 메시지를 주고 받을 수 있도록 하려면, Kafka의 config 파일에 message.max.bytes를 원하는 만큼 지정해주어야 한다. 100메가 정도까지 지정하고 테스트 해보았는데, 문제 없이 동작한다.

https://kafka.apache.org/08/documentation.html

message.max.bytes1000000The maximum size of a message that the server can receive. It is important that this property be in sync with the maximum fetch size your consumers use or else an unruly producer will be able to publish messages too large for consumers to consume.

$ vi server.properties


# The number of threads that the server uses for processing requests, which may include disk I/O

num.io.threads=8


# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400


# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400


# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

message.max.bytes=100000000


############################# Log Basics #############################


# A comma seperated list of directories under which to store log files

log.dirs=/tmp/kafka-logs


# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.


message.max.bytes만 올리고 socket.request.max.bytes를 올리지 않으니 다음과 같은 오류가 발생한다. socket.request.max.bytes의 값도 원하는 만큼 증가를 시켜야 하는 듯하다.

[2017-09-06 13:20:28,027] WARN Unexpected error from /172.31.11.179; closing connection (org.apache.kafka.common.network.Selector)

org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 125376280 larger than 104857600)

        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95)

        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)

        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)

        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)

        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379)

        at org.apache.kafka.common.network.Selector.poll(Selector.java:326)

        at kafka.network.Processor.poll(SocketServer.scala:499)

        at kafka.network.Processor.run(SocketServer.scala:435)

        at java.lang.Thread.run(Thread.java:748)


$ vi server.properties

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=300000000

message.max.bytes=300000000


2. kafka-node의 producer에서 메시지 압축하도록 수정

100메가가 넘는 메시지를 kafka로 전송은 가능한데, 압축을 하지 않고 보내면 수신측에서 문제가 생기거나 Kafka 자체에도 메모리 부족 현상이 발생한다. 메시지를 압축해서 전송하도록 수정하면 깔끔하게 처리된다.

https://www.npmjs.com/package/kafka-node

send(payloads, cb)

  • payloadsArray,array of ProduceRequestProduceRequest is a JSON object like:
{
   topic: 'topicName',
   messages: ['message body'], // multi messages should be a array, single message can be just a string or a KeyedMessage instance
   key: 'theKey', // only needed when using keyed partitioner
   partition: 0, // default 0
   attributes: 2, // default: 0
   timestamp: Date.now() // <-- defaults to Date.now() (only available with kafka v0.10 and KafkaClient only)
}
  • cbFunction, the callback

attributes controls compression of the message set. It supports the following values:

  • 0: No compression
  • 1: Compress using GZip
  • 2: Compress using snappy


https://olnrao.wordpress.com/2015/03/24/apache-kafka-case-of-large-messages-many-partitions-few-consumers/

What is the solution?

Thankfully, Kafka support message compression.  Thankfully again, max message size in all the above equations corresponds to compressed size.  Luckily, in our case the messages are text messages and the compression ratio was superb.  Our 300 MB message came to 50 MB after GZIP compression.   So, we have enabled compression.


3. Spark Streaming 프로그램 수정

Kafka에서 큰 사이즈의 메시지를 주고 받을 수 있도록 했다고 해도, Spark Streaming 프로그램에서도 별도의 처리를 해주지 않으면 1메가 이상의 메시지를 수신하지 못하는 문제가 있다.

다음과 같이 기본적으로 Kafka로 부터 지정된 토픽을 수신하는 코드를 그대로 사용하면 1메가 이상의 메시지를 처리할 수 없다.

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)


KafkaUtils 클래스에 선언되어 있는 createStream 메소드 중에서 KafkaParams를 사용할 수 있는 메소드를 사용하도록 변경해야 한다. kafkaParams 객체 정의할 때, fetch.message.max.bytes와 fetch.size 크기를 앞에서 Kafka에서 세팅한 메시지 크기에 맞춰준다.

val kafkaParams = Map[String, String](

      "zookeeper.connect" -> zkQuorum, "group.id" -> group,

      "zookeeper.connection.timeout.ms" -> "10000",

      "fetch.message.max.bytes" -> "100000000", 

      "fetch.size" -> "100000000"

val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap,StorageLevel.MEMORY_ONLY).map(_._2)


https://spark.apache.org/docs/1.3.0/api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html

static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> 
JavaPairReceiverInputDStream<K,V>
createStream(JavaStreamingContext jssc, Class<K> keyTypeClass, Class<V> valueTypeClass, Class<U> keyDecoderClass, Class<T> valueDecoderClass, java.util.Map<String,String> kafkaParams, java.util.Map<String,Integer> topics,StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.
static JavaPairReceiverInputDStream<String,String>createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.Map<String,Integer> topics)
Create an input stream that pulls messages from Kafka Brokers.
static JavaPairReceiverInputDStream<String,String>createStream(JavaStreamingContext jssc, String zkQuorum, String groupId, java.util.Map<String,Integer> topics, StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.
static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> 
ReceiverInputDStream<scala.Tuple2<K,V>>
createStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2, scala.reflect.ClassTag<U> evidence$3, scala.reflect.ClassTag<T> evidence$4)
Create an input stream that pulls messages from Kafka Brokers.
static ReceiverInputDStream<scala.Tuple2<String,String>>createStream(StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
Create an input stream that pulls messages from Kafka Brokers.

https://mail-archives.apache.org/mod_mbox/kafka-users/201406.mbox/%3CCAK9BuXVUtHby=3BwjK9ajwvho4xQO-CxUtx=2qm7DwGiwR7_ZQ@mail.gmail.com%3E

    // setup Kafka with manual parameters to allow big messaging
    //see
spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> group,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760",    // 10MB
      "fetch.size" -> "10485760")    // not needed?
    val lines = kafka.KafkaUtils.createStream[String, String,
StringDecoder, StringDecoder](
                ssc, kafkaParams, topicpMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

https://stackoverflow.com/questions/40076691/cant-acess-kafka-serializer-stringdecoder

import org.apache.spark.streaming._
import kafka.serializer.StringDecoder

You may get the following error:

error: object serializer is not a member of package org.apache.spark.streaming.kafka

There is another kafka package included by the maven package in my above example, and therefore it was imported as part of "org.apache.spark.streaming._"

To resolve, do as follows:

import org.apache.spark.streaming._
import _root_.kafka.serializer.StringDecoder


이렇게 큰 메시지를 수신할 수 있도록 해놓아도, 실제로 큰 메시지가 수신되면 자바 힙 메모리 사이즈가 모자라다는 오류가 발생한다. spark-submit 실행 시에 --driver-memory 4000M와 같은 식으로 여유있게 잡아 주면 된다.

java.lang.OutOfMemoryError: Java heap space

https://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar