Friday, 4 November 2022

Kafka avro serilization and deserilazation

public static T Deserialize<T>(byte[] bytes) where T : ISpecificRecord, new()

        {

            using (var ms = new MemoryStream(bytes))

            {

                var dec = new BinaryDecoder(ms);

                var regenObj = new T();

 

                var reader = new SpecificDefaultReader(regenObj.Schema, regenObj.Schema);

                reader.Read(regenObj, dec);

                return regenObj;

            }

        }

        public static byte[] Serialize<T>(T thisObj) where T : ISpecificRecord

        {

            using (var ms = new MemoryStream())

            {

                var enc = new BinaryEncoder(ms);

                var writer = new SpecificDefaultWriter(thisObj.Schema); // Schema comes from pre-compiled, code-gen phase

                writer.Write(thisObj, enc);

                return ms.ToArray();

            }

        }



public class KafkaPublish : IKafkaPublish

    {

        private readonly IProducer<string, byte[]> _producer;

 

        public KafkaPublish(ProducerConfig config)

        {

            _producer = new ProducerBuilder<string, byte[]>(config).Build();

 

        }

        public async Task<bool> PublishToKafka(Message<string, byte[]> message, string topic)

        {

            var deliveryReport = await _producer.ProduceAsync(topic, new Message<string, byte[]> { Key = message.Key, Value = message.Value, Headers = message.Headers });

            return deliveryReport.Status == PersistenceStatus.Persisted;

        }

    } 

No comments:

Post a Comment

Search Keyword