public static T Deserialize<T>(byte[] bytes) where T : ISpecificRecord, new()
{
using (var ms = new
MemoryStream(bytes))
{
var dec = new
BinaryDecoder(ms);
var regenObj = new T();
var reader = new
SpecificDefaultReader(regenObj.Schema, regenObj.Schema);
reader.Read(regenObj, dec);
return regenObj;
}
}
public static byte[] Serialize<T>(T thisObj) where T : ISpecificRecord
{
using (var ms = new MemoryStream())
{
var enc = new
BinaryEncoder(ms);
var writer = new
SpecificDefaultWriter(thisObj.Schema); // Schema comes from pre-compiled,
code-gen phase
writer.Write(thisObj, enc);
return ms.ToArray();
}
}
public class KafkaPublish : IKafkaPublish
{
private readonly IProducer<string, byte[]> _producer;
public KafkaPublish(ProducerConfig
config)
{
_producer = new
ProducerBuilder<string, byte[]>(config).Build();
}
public async Task<bool>
PublishToKafka(Message<string, byte[]> message, string topic)
{
var deliveryReport =
await
_producer.ProduceAsync(topic, new Message<string, byte[]> { Key = message.Key, Value =
message.Value, Headers = message.Headers });
return
deliveryReport.Status == PersistenceStatus.Persisted;
}
}
No comments:
Post a Comment