我正在使用Kafka Consumer API来构建消费者。消息结构很复杂。为了构建反序列化器,我实现了 Deserializer 类并提供了必要的实现。我使用 Jackson API 进行反序列化。 我收到这个错误 “异常引发org.apache.kafka.common.errors.SerializationException:在偏移量19205124处反序列化分区staging.datafeeds.PartnerHotel-0的键/值时出错”

#POJO classes 
 
    public class Change { 
    private  Schema schema; 
    private  Payload payload; 
    //Getters and constructor 
    } 
    public class Details { 
    private List<String> effectedAttributes; 
    private List<PartnerHotel> cluster; 
    //Getters and contructor 
    } 
    public class Field { 
    private String type; 
    private Boolean optional; 
    private String field; 
    //Getters and constructor 
    } 
    public class Fields { 
    private String type; 
    private List<Field> fields; 
    private Boolean optional; 
    private String name; 
    //Getters and contructor 
    } 
    public class Geom{ 
    private int srid; 
    private String wkb; 
    //Getters and contructor 
    } 
    public class PartnerHotel{ 
    private int id; 
    private int shopId; 
    private String partnerHotelId; 
    private boolean isOnline; 
    private boolean isRemovedByUser; 
    private int mappingPriority; 
    private int hotelId; 
    private String statusHotelId; 
    private String name; 
    private String street; 
    private String zipCode; 
    private String city; 
    private String sourceCityId; 
    private String state; 
    private String stateAlpha2; 
    private String country; 
    private String alpha2; 
    private String alpha3; 
    private double latitude; 
    private double longitude; 
    private Geom geomPoint; 
    private int countryIdShop; 
    private int selectedGeoname; 
    private String propertyType; 
    private List<String> tags; 
    private int stars; 
    private String url; 
    private int nrRatings; 
    private double recommendation; 
    private long dateHotelId; 
    private long timeStamp; 
    private long lastImport; 
    //Getters and contructor 
    } 
    public class Payload { 
    private PartnerHotel before; 
    private PartnerHotel after; 
    private Source source; 
    private String op; 
    private String ts_ms; 
    //Getters and contructor 
    } 
    public class Schema { 
    private String type; 
    private Boolean optional; 
    private String name; 
    private List<Fields> fields; 
    //Getters and contructor 
    } 
    public class Source { 
    private String version; 
    private String name; 
    private String ts_usec; 
    private String txId; 
    private String lxn; 
    private Boolean snapshot; 
    private Object lastSnapshotRecord; 
    //Getters and contructor 
    } 
 
#Deserializer 
 
    public class ChangeDeserializer implements Deserializer<Change> { 
 
    public ChangeDeserializer(){ } 
 
    public void configure(Map<String, ?> map, boolean b) {} 
 
    public Change deserialize(String topic, byte[] data) { 
        if(data == null){ 
            return null; 
        } 
        try{ 
            ObjectMapper objectMapper = new ObjectMapper(); 
            Change change = objectMapper.readValue(data,Change.class); 
            return change; 
        } 
        catch(IOException exception){ 
            throw new DeserializationException("Unable to deserialize               Change", exception); 
        }} 
 
    public void close() {} 
    } 
 
#Consumer 
    public class KafkaAcnowledger { 
        public static void main(String[] args){ 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", "someUrl"); 
        props.put("group.id", "test131"); 
        props.put("enable.auto.commit", "true"); 
        props.put("auto.commit.interval.ms", "1000"); 
        props.put("max.poll.records",1); 
        props.put("auto.offset.reset","earliest"); 
        props.put("key.deserializer",    "org.apache.kafka.common.serialization.LongDeserializer"); 
        props.put("value.deserializer",    "deserializer.ChangeDeserializer"); 
        KafkaConsumer<Long, Change> consumer = new KafkaConsumer(props); 
        consumer.subscribe(Arrays.asList("staging.datafeeds.PartnerHotel")); 
        while (true) { 
            try{ 
            ConsumerRecords<Long, Change> records = consumer.poll(100); 
            for (ConsumerRecord<Long, Change> record : records) 
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
        } 
        catch(Exception exception){ 
                System.out.println("Exception raised" + exception); 
        } 
        } 
 
 
    } 
    } 

消费者中的 poll() 看起来不错,在此处输入代码异常我收到了序列化异常。我通过 kafka-consumer-groups.sh 检查了消费者组,该消费者的组在列表中。任何方向都表示赞赏。

Kafka主题中消息的结构:

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"db"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"string","optional":true,"field":"schema"},{"type":"string","optional":true,"field":"table"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"staging.datafeeds.PartnerHotel.Envelope"},"payload":{"before":null,"after":{"id":13893497,"shopId":135,"partnerHotelId":"6-42036","isOnline":false,"isRemovedByUser":false,"mappingPriority":0,"hotelId":null,"statusHotelId":"AUTO","dateHotelId":null,"timestamp":1529334013938327,"lastImport":1503491984188866,"name":"Ferienvermietung Wiedemann","street":"Chausseeberg 3","zipcode":"17429","city":"Mellenthin","sourceCityId":null,"state":null,"stateAlpha2":null,"country":"Deutschland","alpha2":"DE","alpha3":null,"latitude":53.920278,"longitude":14.013333,"geomPoint":{"wkb":"AQEAACDmEAAARuo9ldMGLEA5nWSry/VKQA==","srid":4326},"proposedGeonames":[2872064],"countryIdShop":83,"selectedGeoname":2872064,"propertyType":null,"tags":["77","36","33","34","38","43","41","123","26","29","1","7","6","70","9","1000","58","17","18","15","13","14","20","65","63","46","10","52"],"chains":[],"creditCards":[],"stars":null,"url":"http://www.buchen.travel/onepage-idealo-booking/index.php?room=6-42036","nrRatings":null,"recommendation":null,"proposedHotels":[],"proposedPartnerHotels":[],"removedFromHotelIds":[]},"source":{"version":"0.8.3.Final","name":"staging","db":"geo","ts_usec":1554391067119000,"txId":4757138,"lsn":1139303143104,"schema":"datafeeds","table":"PartnerHotel","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1554391067119}} 

请您参考如下方法:

您的 POJO 与您的消息不兼容,jackson 无法解析它。至少缺少几个字段,可以发现如下错误。

Unrecognized field "timestamp" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable 
Unrecognized field "zipcode" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable 
Unrecognized field "proposedGeonames" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable 
Unrecognized field "chains" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable 
Unrecognized field "creditCards" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable 
Unrecognized field "proposedHotels" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable 
Unrecognized field "proposedPartnerHotels" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable 
Unrecognized field "removedFromHotelIds" (class  com.example.kafka.Change$PartnerHotel), not marked as ignorable 
Unrecognized field "db" (class  com.example.kafka.Change$Source), not marked as ignorable 
Unrecognized field "lsn" (class  com.example.kafka.Change$Source), not marked as ignorable 
Unrecognized field "schema" (class  com.example.kafka.Change$Source), not marked as ignorable 
Unrecognized field "table" (class  com.example.kafka.Change$Source), not marked as ignorable 
Unrecognized field "last_snapshot_record" (class  com.example.kafka.Change$Source), not marked as ignorable 

要修复此问题,您必须将这些字段添加到 POJO 中或禁用未知失败:objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);。有关 Jackson 反序列化错误的更多信息可以在这里找到:jackson Unrecognized field


评论关闭
IT干货网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!