Запуск кластера Kafka на iSulad

Вводная часть

Apache Kafka — знакомая многим и одна из наиболее известных распределённых платформ потоковой передачи данных, предназначенной для обработки и передачи данных в реальном времени.

Будучи представленной миру opensource в 2011 году (именно в этом году, изначально разработанная внутри LinkedIn, Kafka была передана фонду Apache) к 2025 kafka проделала значительный путь в развитии и по праву заслужила звание удобного и гибкого инструмента работы с потоками данных. Отличительной особенность kafka было то, что с самого начала существования как продукта, она была предназначена для потоковой обработки очень больших объёмов данных в режиме реального времени при этом позволяя производить чтение и запись данных с высокой пропускной способностью и низкой задержкой.

Мы не будем глубоко погружаться в архитектуру Kafka (по этой теме есть достаточно статей в интернете и на сайте самого проекта – https://kafka.apache.org/documentation), но позволим себе сделать краткий обзор основных компонентов.

Основные компоненты Kafka:

– Brokers: Главный компонент, роль которого заключается собственно в хранении данных и обработки запросов на чтение и запись данных.

– Zookeeper: До недавних пор этот сервис служил для хранения конфигурации и метаданных и выступал координатором. Начиная с версии 3 начался переход на использование нового алгоритма кластеризации, благодаря которому надобность в zookeeper отпала.

– Topics: Основные логические каналы, через которые проходят данные.

– Producer: Клиентские приложения, отправляющие данные в Kafka.

– Consumer: Клиентские приложения, читающие данные из Kafka.

Уход от использования ZooKeeper

До версии Kafka 3.0 Zookeeper выполнял роль координатора, который управлял метаданными, следил за состояниями брокеров (при падении брокера — обеспечивал выбор нового лидера), но тем самым он вносил дополнительную сложность в архитектуру Kafka.

Вышедшая в 2021 году (https://www.confluent.io/blog/apache-kafka-3-0-major-improvements-and-new-features/) версия 3.0 явила миру новый протокол Kraft (https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum), который убрал зависимость от использования сервиса Zookeeper, но добавил новую роль в архитектуру Kafka под названием controller, которую можно добавлять на сервера брокеров (или делать отдельно). Теперь controller занимались слежением за состоянием брокеров и выбирали нового лидера в случае аварий.

К преимуществам нового протокола Kraft (с описанием можно ознакомится по ссылке https://developer.confluent.io/learn/kraft) можно отнести повышенную производительность в операциях обработки как данных, так и коммуникации между брокерами. К тому же, отказ от внешнего сервиса Zookeeper позволил повысить надёжность системы в целом, убрав одну из точек отказа.

От слов к делу

В сегодняшней статье мы поставим себе задачу развернуть на 3-х узлах современную реализацию кластера Kafka, запущенную в контейнере в системе контейнеризации iSulad, а в качестве операционной системы нам послужит недавно вышедший релиз OpenScaler 24.03 SP1 (скачать и попробовать свежий релиз вы всегда можете пройдя по ссылке https://repo.openscaler.ru/openScaler-24.03-LTS-SP1/ISO/).

Запускать контейнеры мы будем на выделенных виртуальных машинах, имеющих следующие IP адреса:

  • 172.17.3.91 test-srv-1
  • 172.17.3.92 test-srv-2
  • 172.17.3.93 test-srv-3

На каждом узле у нас используется ядро Linux версии: 6.6.0-76.0.0.80.os2403sp1.x86_64

Устанавливаем плагин CNI

Так как контейнеры, запущенные в iSulad работают либо в режиме отсутствия сети (none), либо используют сеть, созданную на основе плагинов CNI, нам необходимо установить эти плагины (на каждый сервер):

wget https://github.com/containernetworking/plugins/releases/download/v1.3.0/cni-plugins-linux-amd64-v1.3.0.tgz

mkdir -p /opt/cni/bin/

tar -zxvf cni-plugins-linux-amd64-v1.3.0.tgz -C /opt/cni/bin/

Запуск контейнеров в iSulad мы уже делали в статьях:

Тестирование запуска контейнеров внутри виртуальных машин в кластере kubernetes

 

Второй подход к снаряду, iSula.

Поэтому, для разнообразия, мы настроим в iSulad так, чтобы обращаться к нему через CRI (Container Runtime Interface, что это такое можно прочитать здесь https://kubernetes.io/ru/docs/concepts/architecture/cri/) интерфейс и будем использовать в работе утилиты crictl.

Установка утилиты CRI

Опять-таки, на каждый наш сервер мы установим утилиту:

wget https://github.com/kubernetes-sigs/cri-tools/releases/download/v1.32.0/crictl-v1.32.0-linux-amd64.tar.gz

tar -zxvf crictl-v1.32.0-linux-amd64.tar.gz -C /usr/local/bin/

проверяем:

Рисунок 1. Версия crictl

Установка и настройка iSulad

Ставим пакет из официального репозитория openScaler:

dnf install isulad

В нашем случае установленная версия iSulad: iSulad-2.1.5-18.os2403sp1.x86_64

Чтобы в iSulad заработал CRI интерфейс нам надо поправить конфигурационный файл /etc/isulad/daemon.json и добавить (или изменить) строки:

"enable-cri-v1": true,

пропишем адрес зеркала для образов контейнеров:

"registry-mirrors": [

"docker.io"

],

Укажем адрес и версию для образа контейнера pause (необходим при запуске пода (pod)):

"pod-sandbox-image": "registry.aliyuncs.com/google_containers/pause:3.5",

Указываем сетевой плагин и каталоги CNI:

"network-plugin": "cni",

"cni-bin-dir": "/opt/cni/bin",

"cni-conf-dir": "/etc/cni/net.d",

Получившийся в итоге конфиг выглядит вот так:

 

{

"group": "isula",

"default-runtime": "runc",

"graph": "/var/lib/isulad",

"state": "/var/run/isulad",

"log-level": "ERROR",

"pidfile": "/var/run/isulad.pid",

"log-opts": {

"log-file-mode": "0600",

"log-path": "/var/lib/isulad",

"max-file": "1",

"max-size": "30KB"

},

"log-driver": "stdout",

"container-log": {

"driver": "json-file"

},

"hook-spec": "/etc/default/isulad/hooks/default.json",

"start-timeout": "2m",

"storage-driver": "overlay2",

"storage-opts": [

"overlay2.override_kernel_check=true"

],

"enable-cri-v1": true,

"registry-mirrors": [

"docker.io"

],

"insecure-registries": [

],

"pod-sandbox-image": "registry.aliyuncs.com/google_containers/pause:3.5",

"native.umask": "normal",

"network-plugin": "cni",

"cni-bin-dir": "/opt/cni/bin",

"cni-conf-dir": "/etc/cni/net.d",

"image-layer-check": false,

"use-decrypted-key": true,

"insecure-skip-verify-enforce": false,

"cri-runtimes": {

"kata": "io.containerd.kata.v2"

}

}

Осталось только запустить iSulad командой:

systemctl start isulad

После успешного запуска iSulad на каждом хосте создадим виртуальную сеть командой:

isula network create cni0

Настройка firewall

Для работы kafka нам понадобится на каждом нашем сервере открыть следующие сетевые порты:

firewall-cmd --add-port=9091/tcp --permanent

firewall-cmd --add-port=9091/tcp

firewall-cmd --add-port=9092/tcp --permanent

firewall-cmd --add-port=9092/tcp

firewall-cmd --add-port=9093/tcp --permanent

firewall-cmd --add-port=9093/tcp

Запуск контейнеров с kafka

Наконец-то мы добрались непосредственно до запуска чего-то полезного. Приступим непосредственно к запуску контейнеров kafka на каждом из наших узлов.

создаём на каждом сервере каталог:

/srv/kafka_data

И сразу выставим на каталог права для пользователя, под которым работает контейнер kafka:

chown 1001:1001 /srv/kafka_data

Так как мы будем использовать crictl, то наши контейнеры kafka будут работать каждый в своём pod (как во взрослом k8s). Для этого нам необходимо подготовить JSON файлы с описанием пода (pod) и контейнера.

В каталоге /srv/ на каждом сервере мы помещаем соответствующие файлы container.json и pod.json.

Пример для первого узла kafka-1:

 

pod.json:

{

"metadata": {

"name": "kafka-1",

"uid": "kafka-1-pod",

"namespace": "default",

"attempt": 1

},

"hostname": "kafka-1",

"log_directory": "",

"dns_config": {

"servers": ["77.88.8.8"]

},

"port_mappings": [],

"labels": {

"group": "test"

},

"linux": {

"security_context": {

"namespace_options": {

"network": 2,

"pid": 1,

"ipc": 0

}

}

}

}

 

container.json

{

"metadata": {

"name": "kafka-1",

"namespace": "default"

},

"image": {

"image": "docker.io/bitnami/kafka:3.9.0"

},

"envs": [

{

"key": "KAFKA_ENABLE_KRAFT",

"value": "yes"

},

{

"key": "KAFKA_CFG_PROCESS_ROLES",

"value": "broker,controller"

},

{

"key": "KAFKA_CFG_CONTROLLER_LISTENER_NAMES",

"value": "CONTROLLER"

},

{

"key": "KAFKA_CFG_LISTENERS",

"value": "INTERNAL://:9091,PLAINTEXT://:9092,CONTROLLER://:9093"

},

{

"key": "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP",

"value": "INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"

},

{

"key": "KAFKA_CFG_CONTROLLER_QUORUM_VOTERS",

"value": "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"

},

{

"key": "ALLOW_PLAINTEXT_LISTENER",

"value": "yes"

},

{

"key": "KAFKA_KRAFT_CLUSTER_ID",

"value": "isuladkafkacluster"

},

{

"key": "KAFKA_CFG_ADVERTISED_LISTENERS",

"value": "INTERNAL://:9091,PLAINTEXT://172.17.3.91:9092"

},

{

"key": "KAFKA_BROKER_ID",

"value": "1"

},

{

"key": "KAFKA_CFG_NODE_ID",

"value": "1"

},

{

"key": "KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE",

"value": "false"

},

{

"key": "KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR",

"value": "3"

}

],

"mounts": [{

"container_path": "/bitnami/kafka",

"host_path": "/srv/kafka_data"

},

{

"container_path": "/etc/hosts",

"host_path": "/srv/hosts",

"readonly": true

}

],

"log_path": "/tmp/kafka-1.log",

"linux": {

"security_context": {

"namespace_options": {

"network": 2,

"pid": 1

}

}

}

}

На узлах kafka-2 и kafka-3 нам надо поменять в своих JSON файлах соответствующие настройки.

Для kafka-2

в файле pod.json меняем:

"name": "kafka-2",

...

"uid": "kafka-2-pod",

...

"hostname": "kafka-2",

в файле container.json прописываем:

"name": "kafka-2",

{

"key": "KAFKA_CFG_ADVERTISED_LISTENERS",

"value": "INTERNAL://:9091,PLAINTEXT://172.17.3.92:9092"

},

{

"key": "KAFKA_BROKER_ID",

"value": "2"

},

{

"key": "KAFKA_CFG_NODE_ID",

"value": "2"

},

"log_path": "/tmp/kafka-2.log",

Для kafka-3

в файле pod.json меняем:

"name": "kafka-3",

...

"uid": "kafka-3-pod",

...

"hostname": "kafka-3",

в файле container.json прописываем:

"name": "kafka-3",

{

"key": "KAFKA_CFG_ADVERTISED_LISTENERS",

"value": "INTERNAL://:9091,PLAINTEXT://172.17.3.93:9092"

},

{

"key": "KAFKA_BROKER_ID",

"value": "3"

},

{

"key": "KAFKA_CFG_NODE_ID",

"value": "3"

},

"log_path": "/tmp/kafka-3.log",

Описание всех использованных при запуске контейнера переменных KAFKA_CFG_* можно найти в описании к используемому образу на страничке https://hub.docker.com/r/bitnami/kafka . Замечу лишь, что в целях облегчения понимания мы поднимаем кластер kafka без аутентификации и использования TLS/SSL.

Запускаем кластер.

Для этого, на каждом сервере запускаем команды:

Рисунок 2. Запуск pod и контейнер kafka

 

Убеждаемся, что под (pod) и контейнер работают и находятся в статусе Running:

Рисунок 3. Список запущенных pod.

Рисунок 4. Список запущенных контейнеров.

При этом, заметим, что crictl работает именно с pod и запущенным в нём контейнером. И он не показывает инфраструктурный контейнер pause. Его мы сможем увидеть, если посмотрим на контейнеры утилитой isula:

Рисунок 5. Вывод команды isula ps.

Лог контейнера мы можем смотреть напрямую обратившись к файлу, который мы указали в настройках контейнера:

Рисунок 6. Файл с логов контейнера

Запуск клиента kafka

Для демонстрации работы кластера мы запустим пару клиентов (их можно запускать на любом сервере, необязательно использовать наши сервера выделенные под кластер kafka, но мы поднимем на kafka-1 и kafka-3).

На kafka-1 мы запустим производителя (Producer):

Рисунок 7. Запуск producer

на kafka-3 мы запустим потребителя (Сonsumer):

Рисунок 8. Запуск consumer

Проверка работы кластера

создание тестового топика:

Рисунок 9. Создание топика.

вывод списка топиков:

Рисунок 10. Список топиков.

Смотрим описание и параметры топика:

Рисунок 11. Описание и параметры тестового топика.

Создаём тестовые сообщения на producer:

Рисунок 12. Отправка тестовых сообщений на producer.

И получаем тестовые сообщения на consumer:

Рисунок 13. Получение тестовых сообщений на consumer.

Собственно всё. Всего чего мы запланировали – мы достигли. Кластер Kafka заработал внутри контейнеров на выделенных серверах под управлением openScaler 24.03 SP1, контейнеры при этом успешно работают в runtime iSulad, которым в свою очередь мы успешно управляли при помощи стандартной утилиты crictl.