Вопрос-ответ

How can I send large messages with Kafka (over 15MB)?

Как я могу отправлять большие сообщения с помощью Kafka (более 15 МБ)?

Я отправляю строковые сообщения на Kafka V. 0.8 с помощью Java Producer API. Если размер сообщения составляет около 15 МБ, я получаю MessageSizeTooLargeException. Я пытался установить для message.max.bytesзначение 40 МБ, но все равно получаю исключение. Небольшие сообщения работали без проблем.

Исключение появляется в производителе, у меня нет потребителя в этом приложении.

Как я могу избавиться от этого исключения?

Мой пример конфигурации производителя

private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}

Журнал ошибок:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Переведено автоматически
Ответ 1

Вам нужно настроить три (или четыре) свойства:


  • На стороне потребителя:fetch.message.max.bytes - это определит наибольший размер сообщения, которое может быть извлечено потребителем.

  • Сторона брокера: replica.fetch.max.bytes - это позволит репликам в брокерах отправлять сообщения внутри кластера и убедиться, что сообщения реплицируются правильно. Если это слишком мало, то сообщение никогда не будет реплицировано, и, следовательно, потребитель никогда не увидит сообщение, потому что сообщение никогда не будет зафиксировано (полностью реплицировано).

  • Сторона брокера: message.max.bytes - это самый большой размер сообщения, которое может быть получено брокером от производителя.

  • Сторона брокера (для каждой темы): max.message.bytes - это максимальный размер сообщения, который брокер разрешит добавить к теме. Этот размер проверяется предварительным сжатием. (По умолчанию используется значение брокера message.max.bytes.)

Я на собственном опыте узнал о номере 2 - вы не получаете никаких исключений, сообщений или предупреждений от Kafka, поэтому обязательно учитывайте это при отправке больших сообщений.

Ответ 2

Требуются незначительные изменения для Kafka 0.10 и нового потребителя по сравнению с ответом laughing_man:


  • Брокер: изменений нет, вам все равно нужно увеличить свойства message.max.bytes и replica.fetch.max.bytes. message.max.bytes должно быть равно или меньше (*), чем replica.fetch.max.bytes.

  • Производитель: Увеличьте max.request.size, чтобы отправить сообщение большего размера.

  • Потребитель: увеличьте max.partition.fetch.bytes для получения сообщений большего размера.

(*) Прочитайте комментарии, чтобы узнать больше о message.max.bytes<=replica.fetch.max.bytes

Ответ 3

Ответ от @laughing_man довольно точный. Но все же я хотел дать рекомендацию, которую я узнал от эксперта по Kafka Стефана Маарека. Мы активно применяли это решение в наших живых системах.

Kafka не предназначен для обработки больших сообщений.

Ваш API должен использовать облачное хранилище (например, AWS S3) и просто отправлять ссылку на S3 в Kafka или любой другой message broker. Вам нужно будет найти место для сохранения ваших данных, будь то сетевой диск или что-то совсем другое, но это не должен быть посредник сообщений.

Если вы не хотите переходить к рекомендованному и надежному решению, приведенному выше,

Максимальный размер сообщения составляет 1 МБ (вызывается настройкой в ваших брокерах message.max.bytes) Apache Kafka. Если вам это действительно очень нужно, вы могли бы увеличить этот размер и обязательно увеличить сетевые буферы для ваших производителей и потребителей.

И если вы действительно хотите разделить свое сообщение, убедитесь, что каждое разделенное сообщение имеет точно такой же ключ, чтобы оно помещалось в один и тот же раздел, а содержимое вашего сообщения должно содержать “идентификатор части”, чтобы ваш потребитель мог полностью восстановить сообщение.

Если сообщение текстовое, попробуйте сжать данные, что может уменьшить размер данных, но не волшебным образом.

Опять же, вы должны использовать внешнюю систему для хранения этих данных и просто отправлять внешнюю ссылку на Kafka. Это очень распространенная архитектура, с которой вам следует работать, и она широко принята.

Имейте в виду, что Kafka работает лучше всего, только если сообщения огромны по объему, но не по размеру.

Источник: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

Ответ 4

Идея состоит в том, чтобы сообщения одинакового размера отправлялись от производителя Kafka к брокеру Kafka, а затем принимались потребителем Kafka, т.е.

Производитель Kafka --> Брокер Kafka --> Потребитель Kafka

Предположим, что если требуется отправить сообщение размером 15 МБ, то производитель, Брокер и потребитель, все три, должны быть синхронизированы.

Производитель Kafka отправляет 15 МБ --> Брокер Kafka разрешает / сохраняет 15 МБ --> Потребитель Kafka получает 15 МБ

Поэтому настройка должна быть:

a) в Broker:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

б) на Consumer:

fetch.message.max.bytes=15728640
java exception