gRPC Python – Ler e gravar dados do processo
Este artigo descreve como acessar e gravar dados de processo simples com Python com um AXC F 3152 utilizando gRPC. (https://www.plcnext.help/te/Service_Components/gRPC_Introduction.htm)
Pré-requisito
Primeiro temos que preparar os arquivos necessários, fora do PLC, por exemplo, em uma máquina Windows.
- Instale o Python 3.9 (3.10 pode causar erros)
- Instale o pacote Python necessário para gerar o código dos arquivos .proto:
pip install grpcio-tools==1.36.1
- Baixe e descompacte o repositório que contém os arquivos .proto em https://github.com/PLCnext/gRPC
Gerar _pb2.py e _pb2_grpc.py de arquivos .proto
Em seguida, temos que gerar os arquivos python necessários a partir dos arquivos .proto fornecidos. Estes últimos estão localizados na seguinte pasta:gRPC-master/protobuf.
Use este código para criar um script Python na pasta gRPC-master, por exemplo, generate_grpc.py. O roteiro
- gera os arquivos necessários e os coloca em gRPC-master/pxc_grpc
- adaptar os caminhos de importação
import glob
import os
from pathlib import Path
# create the output directory
Path('pxc_grpc').mkdir(parents=True, exist_ok=True)
grpc_command_base = 'python -m grpc_tools.protoc -I./protobuf --python_out=pxc_grpc --grpc_python_out=pxc_grpc '
import_paths = set()
# generate the *_pb2.py and *_pb2_grpc.py files
for filename in glob.iglob('./protobuf/**', recursive=True):
if filename.endswith('.proto'):
# store the import path
path_parts = filename.split(os.sep)
import_paths.add('.'.join(path_parts[1:-1]))
grpc_command = ''.join([grpc_command_base, os.path.join('.', os.path.relpath(filename))])
stream = os.popen(grpc_command)
output = stream.read()
if output != '':
print(''.join(['error/info for file ', os.path.relpath(filename), ' - ', output]))
# get the python files in the base directory
base_pys = set()
for (dirpath, dirnames, filenames) in os.walk('./pxc_grpc'):
for f in filenames:
base_pys.add(f.split('.py')[0])
break
# reformat the stored paths to adapt the import statements
try:
import_paths.remove('')
except:
pass
import_paths = list(import_paths)
import_paths.sort(key=len)
import_paths.reverse()
# adapt the imports
for filename in glob.iglob('./pxc_grpc/**', recursive=True):
if filename.endswith('.py'):
new_lines = []
with open(filename, 'r') as file:
lines = file.readlines()
for line in lines:
if line.startswith('from'):
for import_path in import_paths:
if import_path in line:
line = line.replace(import_path, ''.join(['pxc_grpc.', import_path]), 1)
break
elif line.startswith('import'):
parts = line.split()
if parts[1] in base_pys:
line = line.replace('import', 'from pxc_grpc import')
new_lines.append(line)
with open(filename, 'w') as file:
file.write(''.join(new_lines))
Abra um shell e execute o script:
pyton generate_grpc.py
Criar um projeto de demonstração PLCnext
O projeto mostrado deve apenas demonstrar como a interface gRPC interage com o GDS. Sinta-se à vontade para usar um projeto existente. Para um projeto individual, você precisa editar os nomes das portas no script Python a seguir, por exemplo,
Arp.Plc.Eclr/MainInstance.strInput
. Prepare o PLC
Instale o pip para gerenciar seus pacotes Python:
- Conecte o controlador AXC F 3152 à Internet.
- Digite o comando curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py.
- Em seguida, digite o comando python3 get-pip.py.
Instale os pacotes necessários:
pip install grpcio protobuf==3.20.0
Crie uma pasta 'grpc_test' no diretório de projetos (/opt/plcnext/projects/).
Copie a pasta 'pxc_grpc', contendo os arquivos Python criados para 'grpc_test', por exemplo, com WinSCP.
Crie um script Python na pasta 'grpc_test' chamado 'grpc_test.py' e insira o seguinte código:
import grpc
from pxc_grpc.Plc.Gds.IDataAccessService_pb2 import IDataAccessServiceReadSingleRequest, \
IDataAccessServiceReadRequest, IDataAccessServiceWriteSingleRequest, IDataAccessServiceWriteRequest
from pxc_grpc.Plc.Gds.IDataAccessService_pb2_grpc import IDataAccessServiceStub
from pxc_grpc.Plc.Gds.WriteItem_pb2 import WriteItem
def write_single_string(stub, port_name, value):
single_write_request = IDataAccessServiceWriteSingleRequest()
single_write_request.data.PortName = port_name
single_write_request.data.Value.TypeCode = 19
single_write_request.data.Value.StringValue = value
return stub.WriteSingle(single_write_request)
def write_single_int(stub, port_name, value):
single_write_request = IDataAccessServiceWriteSingleRequest()
single_write_request.data.PortName = port_name
single_write_request.data.Value.TypeCode = 6
single_write_request.data.Value.Int16Value = value
return stub.WriteSingle(single_write_request)
def write_multiple_values(stub):
write_request = IDataAccessServiceWriteRequest()
wi1 = WriteItem()
wi1.PortName = 'Arp.Plc.Eclr/MainInstance.strInput'
wi1.Value.StringValue = "test1"
wi1.Value.TypeCode = 19
wi2 = WriteItem()
wi2.PortName = 'Arp.Plc.Eclr/MainInstance.strInput2'
wi2.Value.StringValue = "test2"
wi2.Value.TypeCode = 19
# add multiple WriteItems at once
write_request.data.extend([wi1, wi2])
# add WriteItems separately
# response1.data.append(wi1)
# response1.data.append(wi2)
return stub.Write(write_request)
def read_single_value(stub, port_name):
single_read_request = IDataAccessServiceReadSingleRequest()
single_read_request.portName=port_name
return stub.ReadSingle(single_read_request)
def read_multiple_values(stub, port_names):
read_request = IDataAccessServiceReadRequest()
read_request.portNames.extend(port_names)
return stub.Read(read_request)
if __name__ == "__main__":
# create channel and stub
channel = grpc.insecure_channel('unix:/run/plcnext/grpc.sock')
stub = IDataAccessServiceStub(channel)
print(write_single_string(stub, 'Arp.Plc.Eclr/MainInstance.strInput', 'test123'))
print(write_single_int(stub, 'Arp.Plc.Eclr/MainInstance.iInput', 18))
print(write_multiple_values(stub))
r = read_single_value(stub, 'Arp.Plc.Eclr/MainInstance.strInput')
print(r)
print(r._ReturnValue.Value.TypeCode)
print(r._ReturnValue.Value.StringValue)
r = read_multiple_values(stub, ['Arp.Plc.Eclr/MainInstance.iInput', 'Arp.Plc.Eclr/MainInstance.strInput'])
for value in r._ReturnValue:
print(value, value.Value.TypeCode)
Conecte seu PLC ao PLCnext Engineer, baixe o projeto e inicie a visualização ao vivo.
Execute o exemplo
Agora inicie o exemplo. Faça login no PLC via ssh e navegue até 'grpc_test', então inicie o script Python:
cd projects/grpc_test/
python3 grpc_test.py
O gRPC permite a interação com variáveis GDS.
Tipos de dados
Para ler e escrever variáveis, o tipo de dados é necessário, por exemplo,
wi1.Value.TypeCode = 19
. Os tipos são descritos no arquivo gerado gRPC-master/pxc_grpc/ArpTypes_pb2.py
começando na linha 242:CT_None = 0
CT_End = 0
CT_Void = 1
CT_Boolean = 2
CT_Char = 3
CT_Int8 = 4
CT_Uint8 = 5
CT_Int16 = 6
CT_Uint16 = 7
CT_Int32 = 8
CT_Uint32 = 9
CT_Int64 = 10
CT_Uint64 = 11
CT_Real32 = 12
CT_Real64 = 13
CT_Struct = 18
CT_String = 19
CT_Utf8String = 19
CT_Array = 20
CT_DateTime = 23
CT_Version = 24
CT_Guid = 25
CT_AnsiString = 26
CT_Object = 28
CT_Utf16String = 30
CT_Stream = 34
CT_Enumerator = 35
CT_SecureString = 36
CT_Enum = 37
CT_Dictionary = 38
CT_SecurityToken = 39
CT_Exception = 40
CT_IecTime = 41
CT_IecTime64 = 42
CT_IecDate = 43
CT_IecDate64 = 44
CT_IecDateTime = 45
CT_IecDateTime64 = 46
CT_IecTimeOfDay = 47
CT_IecTimeOfDay64 = 48
As variáveis de valor correspondentes, por exemplo,
r._ReturnValue.Value.StringValue
, pode ser encontrado no mesmo arquivo, começando na linha 365, por exemplo, BoolValue
, Int8Value
, StringValue
. Tecnologia industrial
- Apacer:Cartões CV110-SD e CV110-MSD lançados em todo o mundo
- Apacer:série SSD SV250 de nível industrial com velocidades de leitura / gravação de 560 e 520 MB / s
- Como entender big data:RTUs e aplicativos de controle de processo
- O que é Fresamento? - Definição, Processo e Operações
- O que é Perfuração? - Definição, Processo e Dicas
- O que é Metalurgia do Pó? - Definição e Processo
- O que é Brochar? - Processo, Trabalho e Tipos
- O que é usinagem química? - Trabalho e processo
- O que é usinagem ultrassônica? - Trabalho e processo
- O que é soldagem por pulverização? - Processo e técnicas