Conecte o PLCnext Control via MQTT ao Apache Kafka
Antecedentes Técnicos
Kafka
Apache Kafka é uma estrutura para ingestão, armazenamento, processamento e redistribuição de dados. Atualmente, é amplamente implantado em empresas de todo o mundo. O site oficial de Kafka oferece mais informações sobre sua ideia e como implantá-la. Uma de suas principais características é o grande número de conectores já existentes para outras aplicações e protocolos de comunicação como o MQTT.
MQTT
O MQTT é um protocolo de mensagens leve baseado em TCP, frequentemente usado para comunicação IoT devido à sua robustez e tamanho reduzido. Detalhes sobre o padrão OASIS MQTT podem ser encontrados em seu site.
Aqui você pode encontrar um artigo do Makers Blog sobre como compilar mosquitto para PLCnext, uma implementação MQTT do Eclipse. Alternativamente, a PLCnext Store oferece aplicativos MQTT prontos.
Requisitos
- Cliente MQTT no PLCnext (consulte a seção anterior para dicas de implementação)
- o controlador está conectado a um PC/VM
- Corretor MQTT no PC/VM (por exemplo, mosquitto)
- Instância do Kafka no PC/VM (consulte o guia de início rápido do Kafka)
Configuração
A imagem a seguir mostra uma visão geral da configuração que vamos implementar para ingerir dados do controle PLCnext para Kafka. Embora seja possível usar o MQTT Proxy da Confluent para sua versão do Kafka (2), focaremos na solução mais genérica (1). É composto por um broker MQTT onde o cliente se conecta e publica mensagens e um conector que assina um tópico no broker, processa as mensagens e as encaminha para o Kafka.

Criando o conector
Neste tutorial, nosso conector se baseia no repositório evokly/kafka-connect-mqtt do GitHub, licenciado sob a Licença MIT (informações detalhadas sobre a licença). Primeiro, baixamos e extraímos o repositório. Como a versão mais recente do repositório é do final de 2016, atualizamos o
build.gradle
arquivo, substituindo as dependências antigas por suas novas versões:ext { kafkaVersion = '2.6.0' }
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.13'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
compile 'org.bouncycastle:bcprov-jdk15on:1.67'
compile 'org.bouncycastle:bcpkix-jdk15on:1.67'
compile 'org.bouncycastle:bcpg-jdk15on:1.67'
compile 'commons-io:commons-io:2.8.0'
compile 'org.slf4j:slf4j-api:1.7.30'
testCompile 'org.slf4j:slf4j-simple:1.7.30'
}
Neste exemplo, enviaremos mensagens String simples para Kafka. Portanto, temos que editar a classe Java
DumbProcessor.java
na pasta /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt
, que é o processador de mensagens padrão:@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[]{new SourceRecord(null, //sourcePartition
null, //sourceOffset
kafkaTopic, //topic
null, //partition
null, //keySchema
mTopic, //key
null, //valueSchema
mMessage.toString(), //value
new Long(123L))}; //long timestamp
}
Em seguida, construímos um Java Archive File (JAR) que contém as dependências:
./gradlew clean jar
. Copiamos o JAR de saída kafka-connect-mqtt-1.1-SNAPSHOT.jar
que pode ser encontrado na pasta /kafka-connect-mqtt-master/build/libs
para o libs
diretório de Kafka. Também precisamos de uma cópia do arquivo org.eclipse.paho.client.mqttv3-1.2.5.jar no diretório libs do Kafka. Podemos baixá-lo aqui.
Além disso, temos que criar um arquivo de configuração para o conector
mqtt.properties
no config
de Kafka pasta. O arquivo tem o seguinte conteúdo:name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=test_in # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://172.17.0.1:1883 # address of the MQTT broker
mqtt.topic=test/# # MQTT topic where the messages should be collected
#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor
Teste local
Agora podemos testar nosso conector localmente. Vá para o diretório do Kafka e inicie uma instância do ZooKeeper e do Broker:
# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka:
bin/kafka-server-start.sh config/server.properties
# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto
# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true
# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123
A mensagem aparece no consumidor do console.
Tecnologia industrial
- Circuitos de controle do motor
- Circuitos de controle
- Eclipse Hono apoiando Apache Kafka para mensagens
- 5 vantagens do controle remoto de produção
- Controle de impedância de vias e sua influência na integridade do sinal no projeto de PCB
- Gerenciando um dispositivo PLCnext Control via SNMP
- Clustermangement no PLCnext?
- Painel PLCnext Tableau
- Relatórios PLCnext Power BI
- Aplicação Java no PLCnext Control