Error: Tipo de cable ilegal para el campo Mensaje.Campo .protobuf.MessageTypeAck.sourceModuleID: 1 (0 esperado)

Tengo una aplicación que procesa y consume mensajes con kafka y el búfer de protocolo y todo funciona muy bien. Estoy serializando el búfer de protocolo con SerializeAsString() (esta aplicación fue escrita en c ++).

Ahora, he agregado el nuevo sitio web node.js que también consume mensajes y trata de decodificarlos.

Mi código js (usando el gran módulo ProtoBuf.js ):

 var builder = ProtoBuf.loadProtoFile("/home/aii/general/proto/All.proto"), protobuf = builder.build("protobuf"), Trace = protobuf.Trace, MessageType = protobuf.MessageType, MessageTypeAck = protobuf.MessageTypeAck, MessageTypeKeepAlive = protobuf.MessageTypeKeepAlive; function getMessageType(val) { return Object.keys(MessageType).filter(function(key) {return MessageType[key] === val})[0] } consumer.on('message', function (message) { try{ switch(getMessageType(message.key[0])) { case 'MESSAGE_TYPE_ACK': console.log(MessageTypeAck.decode(message.value)); break; case 'MESSAGE_TYPE_KEEP_ALIVE': console.log(MessageTypeKeepAlive.decode(message.value)); break; default: console.log("Unknown message type"); } } catch (e){ if (e.decoded) { var err = e.decoded; console.log(err); } else { console.log(e); } } }); 

Resultado:

 [Error: Illegal wire type for field Message.Field .protobuf.MessageTypeAck.sourceModuleID: 1 (0 expected)] 

Mis archivos de proto:

Trace.proto:

 package protobuf; message Trace { optional string topic = 1; optional int32 partition = 2; optional int64 offset = 3; } 

MessageType.proto

 package protobuf; enum MessageType { MESSAGE_TYPE_ACK = 1; MESSAGE_TYPE_KEEP_ALIVE = 2; } 

Mensajes.proto:

 import "Trace.proto"; package protobuf; message MessageTypeAck { repeated Trace trace = 1; optional string sourceModuleName = 2; optional int32 sourceModuleID = 3; } message MessageTypeKeepAlive { repeated Trace trace = 1; optional string sourceModuleName = 2; optional int32 sourceModuleID = 3; } 

All.proto

 import "Trace.proto" import "MessageType.proto"; import "Messages.proto" 

¿Qué estoy haciendo mal? (¿descodificar?)

Entonces, gracias a esta pregunta y respuesta de SO , ¡lo descubrí! El problema está relacionado con la forma en que consumí el búfer (por kafka), como utf-8 (predeterminado). En realidad estaba relacionado con el código que no adjunté:

 var kafka = require('kafka-node'), Consumer = kafka.Consumer, client = new kafka.Client('localhost:2181'), consumer = new Consumer( client, [ { topic: 'Genesis', partition: 0 } ], { autoCommit: false, encoding: 'buffer' } ); 

y la solución fue agregar la encoding: línea ‘buffer’ (el valor predeterminado es ‘utf-8’ como se menciona aquí ).