MQTT, IoT ve endüstriyel uygulamalarda vazgeçilmez bir protokol. Bu kapsamlı rehberde, C# ve .NET kullanarak MQTT’yi sıfırdan öğrenecek, gerçek dünya örnekleriyle production-ready uygulamalar geliştireceksiniz.

mqtt

MQTT Nedir ve Neden Önemli?

MQTT (Message Queuing Telemetry Transport), hafif bir publish-subscribe messaging protokolü. Özellikle bant genişliğinin kısıtlı olduğu ortamlar için tasarlanmış ve güvenilir mesaj iletimi sağlar.

mqtt

Temel Bileşenler

  • Broker: Mesajları alan ve dağıtan merkezi sunucu
  • Publisher: Mesaj gönderen istemci
  • Subscriber: Mesaj alan istemci
  • Topic: Mesajların kategorize edildiği kanallar

Ne Zaman MQTT Kullanmalısınız?

  • IoT sensör verilerinin toplanması
  • Gerçek zamanlı bildirim sistemleri
  • Endüstriyel otomasyon ve makine-makine iletişimi
  • Mobil uygulamalarda push notification altyapısı

mqtt

Quality of Service (QoS) Seviyeleri

MQTT’de 3 QoS seviyesi vardır:

QoS 0 – At Most Once

  • En hızlı ancak güvensiz
  • Mesaj kaybolabilir
  • Kullanım: Sensör verisi gibi sürekli akan data
var message = new MqttApplicationMessageBuilder()
    .WithTopic("sensors/temperature")
    .WithPayload(JsonSerializer.Serialize(temperatureData))
    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
    .Build();

QoS 1 – At Least Once

  • Mesajın en az bir kez iletilmesini garanti eder
  • Duplikasyon olabilir
  • Kullanım: Kritik olmayan bildirimler
var message = new MqttApplicationMessageBuilder()
    .WithTopic("alerts/warning")
    .WithPayload("Sistem uyarısı")
    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
    .Build();

QoS 2 – Exactly Once

  • Mesajın tam olarak bir kez iletilmesini garanti eder
  • En yavaş ancak en güvenilir
  • Kullanım: Finansal işlemler, kritik komutlar
var message = new MqttApplicationMessageBuilder()
    .WithTopic("commands/critical")
    .WithPayload(JsonSerializer.Serialize(criticalCommand))
    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
    .Build();

C# ile MQTT İstemci Geliştirme

Gerekli NuGet Paketleri

<PackageReference Include="MQTTnet" Version="4.3.4" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.3.4" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />

Temel Publisher Implementasyonu

using MQTTnet;
using MQTTnet.Client;
using System.Text.Json;

public interface IMqttPublisher
{
    Task PublishAsync<T>(string topic, T payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce);
    Task ConnectAsync();
    Task DisconnectAsync();
}

public class MqttPublisher : IMqttPublisher, IDisposable
{
    private readonly IMqttClient _client;
    private readonly MqttClientOptions _options;
    
    public MqttPublisher(string server, int port, string clientId)
    {
        var factory = new MqttFactory();
        _client = factory.CreateMqttClient();
        
        _options = new MqttClientOptionsBuilder()
            .WithClientId(clientId)
            .WithTcpServer(server, port)
            .WithCleanSession()
            .Build();
    }
    
    public async Task ConnectAsync()
    {
        try
        {
            await _client.ConnectAsync(_options);
            Console.WriteLine("MQTT bağlantısı başarılı");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"MQTT bağlantı hatası: {ex.Message}");
            throw;
        }
    }
    
    public async Task PublishAsync<T>(string topic, T payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
    {
        if (!_client.IsConnected)
        {
            await ConnectAsync();
        }
        
        var json = JsonSerializer.Serialize(payload);
        
        var message = new MqttApplicationMessageBuilder()
            .WithTopic(topic)
            .WithPayload(json)
            .WithQualityOfServiceLevel(qos)
            .WithRetainFlag(false)
            .Build();
            
        await _client.PublishAsync(message);
    }
    
    public async Task DisconnectAsync()
    {
        if (_client.IsConnected)
        {
            await _client.DisconnectAsync();
        }
    }
    
    public void Dispose()
    {
        _client?.Dispose();
    }
}

Subscriber (Consumer) Implementasyonu

using MQTTnet;
using MQTTnet.Client;
using System.Text.Json;

public interface IMqttSubscriber
{
    Task SubscribeAsync(string topic, Func<string, string, Task> messageHandler);
    Task ConnectAsync();
    Task DisconnectAsync();
}

public class MqttSubscriber : IMqttSubscriber, IDisposable
{
    private readonly IMqttClient _client;
    private readonly MqttClientOptions _options;
    
    public MqttSubscriber(string server, int port, string clientId)
    {
        var factory = new MqttFactory();
        _client = factory.CreateMqttClient();
        
        _options = new MqttClientOptionsBuilder()
            .WithClientId(clientId)
            .WithTcpServer(server, port)
            .WithCleanSession()
            .Build();
            
        _client.ApplicationMessageReceivedAsync += OnMessageReceived;
    }
    
    private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
    {
        var topic = e.ApplicationMessage.Topic;
        var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
        
        Console.WriteLine($"Mesaj alındı - Topic: {topic}, Payload: {payload}");
        
        // Burada mesaj işleme logic'i olacak
        return Task.CompletedTask;
    }
    
    public async Task ConnectAsync()
    {
        await _client.ConnectAsync(_options);
    }
    
    public async Task SubscribeAsync(string topic, Func<string, string, Task> messageHandler)
    {
        if (!_client.IsConnected)
        {
            await ConnectAsync();
        }
        
        await _client.SubscribeAsync(new MqttTopicFilterBuilder()
            .WithTopic(topic)
            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
            .Build());
    }
    
    public async Task DisconnectAsync()
    {
        if (_client.IsConnected)
        {
            await _client.DisconnectAsync();
        }
    }
    
    public void Dispose()
    {
        _client?.Dispose();
    }
}

BackgroundService ile Sürekli Dinleme

Production ortamında MQTT subscriber’ları genellikle BackgroundService olarak çalışır:

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;

public class MqttBackgroundService : BackgroundService
{
    private readonly ILogger<MqttBackgroundService> _logger;
    private readonly IMqttSubscriber _subscriber;
    private readonly IServiceProvider _serviceProvider;
    
    public MqttBackgroundService(
        ILogger<MqttBackgroundService> logger,
        IMqttSubscriber subscriber,
        IServiceProvider serviceProvider)
    {
        _logger = logger;
        _subscriber = subscriber;
        _serviceProvider = serviceProvider;
    }
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            await _subscriber.ConnectAsync();
            
            // Sensör verilerini dinle
            await _subscriber.SubscribeAsync("sensors/+/temperature", async (topic, payload) =>
            {
                using var scope = _serviceProvider.CreateScope();
                var processor = scope.ServiceProvider.GetRequiredService<ISensorDataProcessor>();
                await processor.ProcessTemperatureData(topic, payload);
            });
            
            // Alarm mesajlarını dinle  
            await _subscriber.SubscribeAsync("alerts/+", async (topic, payload) =>
            {
                using var scope = _serviceProvider.CreateScope();
                var alertService = scope.ServiceProvider.GetRequiredService<IAlertService>();
                await alertService.ProcessAlert(topic, payload);
            });
            
            _logger.LogInformation("MQTT BackgroundService başlatıldı");
            
            // Servis durdurana kadar bekle
            await Task.Delay(Timeout.Infinite, stoppingToken);
        }
        catch (OperationCanceledException)
        {
            _logger.LogInformation("MQTT BackgroundService durduruldu");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "MQTT BackgroundService hatası");
        }
        finally
        {
            await _subscriber.DisconnectAsync();
        }
    }
}

Connection Resiliency ve Retry Logic

Production ortamında bağlantı kopmaları kaçınılmaz. Polly kütüphanesi ile retry policy implementasyonu:

<PackageReference Include="Polly" Version="8.4.1" />
using Polly;
using Polly.Extensions.Http;

public class ResilientMqttClient : IMqttPublisher
{
    private readonly IMqttClient _client;
    private readonly MqttClientOptions _options;
    private readonly IAsyncPolicy _retryPolicy;
    private readonly ILogger<ResilientMqttClient> _logger;
    
    public ResilientMqttClient(
        string server, 
        int port, 
        string clientId,
        ILogger<ResilientMqttClient> logger)
    {
        _logger = logger;
        
        var factory = new MqttFactory();
        _client = factory.CreateMqttClient();
        
        _options = new MqttClientOptionsBuilder()
            .WithClientId(clientId)
            .WithTcpServer(server, port)
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
            .WithCommunicationTimeout(TimeSpan.FromSeconds(10))
            .Build();
            
        // Exponential backoff ile retry policy
        _retryPolicy = Policy
            .Handle<Exception>()
            .WaitAndRetryAsync(
                retryCount: 5,
                sleepDurationProvider: retryAttempt => 
                    TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)) + 
                    TimeSpan.FromMilliseconds(Random.Shared.Next(0, 1000)), // Jitter
                onRetry: (outcome, delay, retryCount, context) =>
                {
                    _logger.LogWarning("MQTT bağlantısı yeniden deneniyor. Deneme: {RetryCount}, Gecikme: {Delay}ms", 
                        retryCount, delay.TotalMilliseconds);
                });
                
        _client.DisconnectedAsync += OnDisconnected;
    }
    
    private async Task OnDisconnected(MqttClientDisconnectedEventArgs e)
    {
        _logger.LogWarning("MQTT bağlantısı koptu: {Reason}", e.Reason);
        
        // 5 saniye sonra yeniden bağlanmaya çalış
        await Task.Delay(5000);
        
        await _retryPolicy.ExecuteAsync(async () =>
        {
            if (!_client.IsConnected)
            {
                await _client.ConnectAsync(_options);
                _logger.LogInformation("MQTT yeniden bağlantı başarılı");
            }
        });
    }
    
    public async Task PublishAsync<T>(string topic, T payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
    {
        await _retryPolicy.ExecuteAsync(async () =>
        {
            if (!_client.IsConnected)
            {
                await _client.ConnectAsync(_options);
            }
            
            var json = JsonSerializer.Serialize(payload);
            
            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(json)
                .WithQualityOfServiceLevel(qos)
                .Build();
                
            await _client.PublishAsync(message);
        });
    }
    
    public async Task ConnectAsync()
    {
        await _retryPolicy.ExecuteAsync(async () =>
        {
            await _client.ConnectAsync(_options);
        });
    }
}

Topic Tasarımı ve Best Practices

Topic Naming Conventions

Doğru topic yapısı:

company/facility/area/device/metric
- acme/factory1/production/line1/temperature
- acme/factory1/production/line1/pressure  
- acme/factory1/quality/aoi1/defect_count

Yanlış topic yapısı:

temp
sensor1_data  
ALERT_SYSTEM_WARNING

Wildcards Kullanımı

// Tek seviye wildcard (+)
await subscriber.SubscribeAsync("acme/factory1/+/temperature");

// Çok seviyeli wildcard (#)
await subscriber.SubscribeAsync("acme/factory1/production/#");

// Specific subscription
await subscriber.SubscribeAsync("acme/factory1/production/line1/temperature");

Güvenlik: TLS ve Authentication

TLS Konfigürasyonu

_options = new MqttClientOptionsBuilder()
    .WithClientId(clientId)
    .WithTcpServer(server, 8883) // TLS port
    .WithTls(new MqttClientOptionsBuilderTlsParameters
    {
        UseTls = true,
        AllowUntrustedCertificates = false, // Production'da false olmalı
        CertificateValidationHandler = context =>
        {
            // Sertifika doğrulama logic'i
            return true;
        }
    })
    .Build();

Username/Password Authentication

_options = new MqttClientOptionsBuilder()
    .WithClientId(clientId)
    .WithTcpServer(server, port)
    .WithCredentials("username", "password")
    .Build();

Entity Framework Core ile Data Persistence

Model Tanımları

public class SensorReading
{
    public int Id { get; set; }
    public string DeviceId { get; set; } = string.Empty;
    public string Topic { get; set; } = string.Empty;
    public string MetricType { get; set; } = string.Empty;
    public double Value { get; set; }
    public DateTime Timestamp { get; set; }
    public string? Unit { get; set; }
    public Dictionary<string, object>? Metadata { get; set; }
}

public class MqttDbContext : DbContext
{
    public DbSet<SensorReading> SensorReadings { get; set; }
    
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<SensorReading>(entity =>
        {
            entity.HasKey(e => e.Id);
            entity.HasIndex(e => new { e.DeviceId, e.Timestamp });
            entity.HasIndex(e => e.Topic);
            
            entity.Property(e => e.Metadata)
                .HasConversion(
                    v => JsonSerializer.Serialize(v, (JsonSerializerOptions?)null),
                    v => JsonSerializer.Deserialize<Dictionary<string, object>>(v, (JsonSerializerOptions?)null));
        });
    }
}

Message Handler ile Database Integration

public interface ISensorDataProcessor
{
    Task ProcessTemperatureData(string topic, string payload);
}

public class SensorDataProcessor : ISensorDataProcessor
{
    private readonly MqttDbContext _context;
    private readonly ILogger<SensorDataProcessor> _logger;
    
    public SensorDataProcessor(MqttDbContext context, ILogger<SensorDataProcessor> logger)
    {
        _context = context;
        _logger = logger;
    }
    
    public async Task ProcessTemperatureData(string topic, string payload)
    {
        try
        {
            var data = JsonSerializer.Deserialize<TemperatureReading>(payload);
            if (data == null) return;
            
            var sensorReading = new SensorReading
            {
                DeviceId = ExtractDeviceIdFromTopic(topic),
                Topic = topic,
                MetricType = "temperature",
                Value = data.Temperature,
                Timestamp = data.Timestamp,
                Unit = "°C",
                Metadata = new Dictionary<string, object>
                {
                    ["humidity"] = data.Humidity,
                    ["location"] = data.Location
                }
            };
            
            _context.SensorReadings.Add(sensorReading);
            await _context.SaveChangesAsync();
            
            // Kritik sıcaklık kontrolü
            if (data.Temperature > 80)
            {
                await TriggerTemperatureAlert(sensorReading);
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Sıcaklık verisi işlenirken hata: {Topic}", topic);
        }
    }
    
    private string ExtractDeviceIdFromTopic(string topic)
    {
        // "acme/factory1/production/line1/temperature" -> "line1"
        var parts = topic.Split('/');
        return parts.Length > 3 ? parts[3] : "unknown";
    }
    
    private async Task TriggerTemperatureAlert(SensorReading reading)
    {
        // Alert sistemi tetikle
        _logger.LogWarning("Kritik sıcaklık: {Temperature}°C, Cihaz: {DeviceId}", 
            reading.Value, reading.DeviceId);
    }
}

public class TemperatureReading
{
    public double Temperature { get; set; }
    public double Humidity { get; set; }
    public string Location { get; set; } = string.Empty;
    public DateTime Timestamp { get; set; }
}

Docker Compose ile Deployment

Eclipse Mosquitto Broker Setup

# docker-compose.yml
version: '3.8'

services:
  mosquitto:
    image: eclipse-mosquitto:2.0
    ports:
      - "1883:1883"
      - "9001:9001"
    volumes:
      - ./mosquitto/config:/mosquitto/config
      - ./mosquitto/data:/mosquitto/data
      - ./mosquitto/log:/mosquitto/log
    restart: unless-stopped
    networks:
      - mqtt-network

  mqtt-app:
    build: .
    depends_on:
      - mosquitto
      - postgres
    environment:
      - MQTT_SERVER=mosquitto
      - MQTT_PORT=1883
      - DATABASE_CONNECTION=Host=postgres;Database=mqtt_db;Username=postgres;Password=postgres123
    networks:
      - mqtt-network
    restart: unless-stopped

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: mqtt_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres123
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - mqtt-network

volumes:
  postgres_data:

networks:
  mqtt-network:
    driver: bridge

Mosquitto Konfigürasyonu

# mosquitto/config/mosquitto.conf
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/

# WebSocket support
listener 9001
protocol websockets

# Logging
log_dest file /mosquitto/log/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true

Performans Optimizasyonu

Batch Processing ile Throughput Artırma

public class BatchedSensorProcessor : ISensorDataProcessor
{
    private readonly MqttDbContext _context;
    private readonly Channel<SensorReading> _channel;
    private readonly ChannelWriter<SensorReading> _writer;
    private readonly ILogger<BatchedSensorProcessor> _logger;
    
    public BatchedSensorProcessor(MqttDbContext context, ILogger<BatchedSensorProcessor> logger)
    {
        _context = context;
        _logger = logger;
        
        var options = new BoundedChannelOptions(1000)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = true,
            SingleWriter = false
        };
        
        _channel = Channel.CreateBounded<SensorReading>(options);
        _writer = _channel.Writer;
        
        // Background task ile batch processing
        _ = Task.Run(ProcessBatches);
    }
    
    public async Task ProcessTemperatureData(string topic, string payload)
    {
        try
        {
            var data = JsonSerializer.Deserialize<TemperatureReading>(payload);
            if (data == null) return;
            
            var sensorReading = new SensorReading
            {
                DeviceId = ExtractDeviceIdFromTopic(topic),
                Topic = topic,
                MetricType = "temperature", 
                Value = data.Temperature,
                Timestamp = data.Timestamp,
                Unit = "°C"
            };
            
            await _writer.WriteAsync(sensorReading);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Mesaj kuyruğa eklenirken hata: {Topic}", topic);
        }
    }
    
    private async Task ProcessBatches()
    {
        const int batchSize = 100;
        const int timeoutMs = 5000;
        
        var batch = new List<SensorReading>(batchSize);
        
        await foreach (var reading in _channel.Reader.ReadAllAsync())
        {
            batch.Add(reading);
            
            if (batch.Count >= batchSize)
            {
                await SaveBatch(batch);
                batch.Clear();
            }
            
            // Timeout-based flush
            if (batch.Count > 0 && DateTime.UtcNow.Subtract(batch[0].Timestamp).TotalMilliseconds > timeoutMs)
            {
                await SaveBatch(batch);
                batch.Clear();
            }
        }
        
        // Son batch'i kaydet
        if (batch.Count > 0)
        {
            await SaveBatch(batch);
        }
    }
    
    private async Task SaveBatch(List<SensorReading> batch)
    {
        try
        {
            _context.SensorReadings.AddRange(batch);
            await _context.SaveChangesAsync();
            
            _logger.LogInformation("Batch kaydedildi: {Count} kayıt", batch.Count);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Batch kaydetme hatası: {Count} kayıt", batch.Count);
        }
    }
}

Monitoring ve Observability

OpenTelemetry ile Metrics

<PackageReference Include="OpenTelemetry" Version="1.9.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.9.0" />
using System.Diagnostics.Metrics;

public class MqttMetrics
{
    private readonly Meter _meter;
    private readonly Counter<int> _messagesReceived;
    private readonly Counter<int> _messagesPublished;
    private readonly Histogram<double> _processingDuration;
    private readonly Gauge<int> _activeConnections;
    
    public MqttMetrics()
    {
        _meter = new Meter("MyApp.MQTT");
        _messagesReceived = _meter.CreateCounter<int>("mqtt_messages_received_total", "Total received messages");
        _messagesPublished = _meter.CreateCounter<int>("mqtt_messages_published_total", "Total published messages");
        _processingDuration = _meter.CreateHistogram<double>("mqtt_message_processing_duration_seconds", "Message processing duration");
        _activeConnections = _meter.CreateGauge<int>("mqtt_active_connections", "Active MQTT connections");
    }
    
    public void RecordMessageReceived(string topic) =>
        _messagesReceived.Add(1, new("topic", topic));
        
    public void RecordMessagePublished(string topic) => 
        _messagesPublished.Add(1, new("topic", topic));
        
    public void RecordProcessingDuration(double durationSeconds, string topic) =>
        _processingDuration.Record(durationSeconds, new("topic", topic));
        
    public void SetActiveConnections(int count) =>
        _activeConnections.Record(count);
}

// Instrumented Processor
public class InstrumentedSensorProcessor : ISensorDataProcessor
{
    private readonly ISensorDataProcessor _inner;
    private readonly MqttMetrics _metrics;
    
    public InstrumentedSensorProcessor(ISensorDataProcessor inner, MqttMetrics metrics)
    {
        _inner = inner;
        _metrics = metrics;
    }
    
    public async Task ProcessTemperatureData(string topic, string payload)
    {
        var stopwatch = Stopwatch.StartNew();
        
        try
        {
            _metrics.RecordMessageReceived(topic);
            await _inner.ProcessTemperatureData(topic, payload);
        }
        finally
        {
            stopwatch.Stop();
            _metrics.RecordProcessingDuration(stopwatch.Elapsed.TotalSeconds, topic);
        }
    }
}

Health Checks

public class MqttHealthCheck : IHealthCheck
{
    private readonly IMqttClient _client;
    
    public MqttHealthCheck(IMqttClient client)
    {
        _client = client;
    }
    
    public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
    {
        try
        {
            if (_client.IsConnected)
            {
                return Task.FromResult(HealthCheckResult.Healthy("MQTT broker bağlantısı aktif"));
            }
            else
            {
                return Task.FromResult(HealthCheckResult.Unhealthy("MQTT broker bağlantısı kapalı"));
            }
        }
        catch (Exception ex)
        {
            return Task.FromResult(HealthCheckResult.Unhealthy($"MQTT health check hatası: {ex.Message}"));
        }
    }
}

Dependency Injection Setup

// Program.cs
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Diagnostics.HealthChecks;

var builder = WebApplication.CreateBuilder(args);

// Database
builder.Services.AddDbContext<MqttDbContext>(options =>
    options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection")));

// MQTT Services
builder.Services.AddSingleton<IMqttPublisher>(sp =>
{
    var logger = sp.GetRequiredService<ILogger<ResilientMqttClient>>();
    return new ResilientMqttClient("localhost", 1883, "MyApp", logger);
});

builder.Services.AddSingleton<IMqttSubscriber>(sp =>
    new MqttSubscriber("localhost", 1883, "MyAppConsumer"));

// Business Services
builder.Services.AddScoped<ISensorDataProcessor, BatchedSensorProcessor>();
builder.Services.AddSingleton<MqttMetrics>();

// Background Services
builder.Services.AddHostedService<MqttBackgroundService>();

// Health Checks
builder.Services.AddHealthChecks()
    .AddCheck<MqttHealthCheck>("mqtt");

// OpenTelemetry
builder.Services.AddOpenTelemetry()
    .WithMetrics(builder =>
    {
        builder.AddMeter("MyApp.MQTT")
               .AddPrometheusExporter();
    });

var app = builder.Build();

// Health check endpoint
app.MapHealthChecks("/health");

app.Run();

Test Stratejisi

Integration Test

[Fact]
public async Task Should_Process_Temperature_Message()
{
    // Arrange
    using var testServer = new TestMqttServer();
    await testServer.StartAsync();
    
    var client = new MqttSubscriber("localhost", testServer.Port, "test-client");
    await client.ConnectAsync();
    
    var processor = new SensorDataProcessor(_mockContext.Object, _mockLogger.Object);
    
    // Act
    await client.SubscribeAsync("test/temperature", async (topic, payload) =>
    {
        await processor.ProcessTemperatureData(topic, payload);
    });
    
    var publisher = new MqttPublisher("localhost", testServer.Port, "test-publisher");
    await publisher.ConnectAsync();
    
    var tempData = new TemperatureReading 
    { 
        Temperature = 25.5, 
        Humidity = 60.0,
        Location = "Room A",
        Timestamp = DateTime.UtcNow 
    };
    
    await publisher.PublishAsync("test/temperature", tempData);
    
    // Assert  
    await Task.Delay(1000); // Message processing için bekle
    _mockContext.Verify(x => x.SaveChangesAsync(default), Times.Once);
}

public class TestMqttServer : IDisposable
{
    private readonly IMqttServer _server;
    public int Port { get; private set; }
    
    public TestMqttServer()
    {
        Port = GetAvailablePort();
        
        var factory = new MqttFactory();
        _server = factory.CreateMqttServer(new MqttServerOptionsBuilder()
            .WithDefaultEndpoint()
            .WithDefaultEndpointPort(Port)
            .Build());
    }
    
    public async Task StartAsync()
    {
        await _server.StartAsync();
    }
    
    private static int GetAvailablePort()
    {
        using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        socket.Bind(new IPEndPoint(IPAddress.Loopback, 0));
        return ((IPEndPoint)socket.LocalEndPoint!).Port;
    }
    
    public void Dispose()
    {
        _server?.Dispose();
    }
}

Gerçek Dünya Senaryosu: Üretim Hattı Monitoring

// Üretim hattı veri modeli
public class ProductionLineData
{
    public string LineId { get; set; } = string.Empty;
    public int ProductsPerMinute { get; set; }
    public double Temperature { get; set; }
    public double Pressure { get; set; }
    public bool IsRunning { get; set; }
    public List<string> ActiveAlerts { get; set; } = new();
    public DateTime Timestamp { get; set; }
}

// Specialized processor
public class ProductionLineProcessor : ISensorDataProcessor
{
    private readonly MqttDbContext _context;
    private readonly IMqttPublisher _alertPublisher;
    private readonly ILogger<ProductionLineProcessor> _logger;
    
    public ProductionLineProcessor(
        MqttDbContext context, 
        IMqttPublisher alertPublisher,
        ILogger<ProductionLineProcessor> logger)
    {
        _context = context;
        _alertPublisher = alertPublisher;
        _logger = logger;
    }
    
    public async Task ProcessProductionData(string topic, string payload)
    {
        try
        {
            var data = JsonSerializer.Deserialize<ProductionLineData>(payload);
            if (data == null) return;
            
            // Database'e kaydet
            var reading = new SensorReading
            {
                DeviceId = data.LineId,
                Topic = topic,
                MetricType = "production",
                Value = data.ProductsPerMinute,
                Timestamp = data.Timestamp,
                Unit = "ppm",
                Metadata = new Dictionary<string, object>
                {
                    ["temperature"] = data.Temperature,
                    ["pressure"] = data.Pressure,
                    ["isRunning"] = data.IsRunning,
                    ["alerts"] = data.ActiveAlerts
                }
            };
            
            _context.SensorReadings.Add(reading);
            await _context.SaveChangesAsync();
            
            // Business logic kontrolları
            await CheckProductionThresholds(data);
            
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Üretim verisi işlenirken hata: {Topic}", topic);
        }
    }
    
    private async Task CheckProductionThresholds(ProductionLineData data)
    {
        // Düşük verimlilik uyarısı
        if (data.IsRunning && data.ProductsPerMinute < 50)
        {
            var alert = new ProductionAlert
            {
                LineId = data.LineId,
                AlertType = "LOW_EFFICIENCY",
                Message = $"Üretim verimi düştü: {data.ProductsPerMinute} ppm",
                Severity = "WARNING",
                Timestamp = DateTime.UtcNow
            };
            
            await _alertPublisher.PublishAsync($"alerts/production/{data.LineId}", alert);
        }
        
        // Kritik sıcaklık uyarısı
        if (data.Temperature > 85)
        {
            var alert = new ProductionAlert
            {
                LineId = data.LineId,
                AlertType = "HIGH_TEMPERATURE", 
                Message = $"Kritik sıcaklık: {data.Temperature}°C",
                Severity = "CRITICAL",
                Timestamp = DateTime.UtcNow
            };
            
            await _alertPublisher.PublishAsync($"alerts/critical/{data.LineId}", alert);
        }
    }
}

public class ProductionAlert
{
    public string LineId { get; set; } = string.Empty;
    public string AlertType { get; set; } = string.Empty;
    public string Message { get; set; } = string.Empty;
    public string Severity { get; set; } = string.Empty;
    public DateTime Timestamp { get; set; }
}

Özet ve Best Practices

Yapılması Gerekenler

  1. QoS seviyesini senaryoya göre seç
  2. Retry logic ve circuit breaker kullan
  3. Topic hierarchy düzgün tasarla
  4. TLS ve authentication kullan
  5. Batch processing ile performansı artır
  6. Metrics ve health checks ekle
  7. Connection pooling için singleton pattern kullan

Kaçınılması Gerekenler

  1. Retained messages gereksiz kullanma
  2. Broad wildcards (#) sık kullanma
  3. Large payloads (>1MB) gönderme
  4. Clean session=false gereksiz kullanma
  5. Exception handling ihmal etme

Bu kapsamlı rehber ile C# ve .NET kullanarak production-ready MQTT uygulamaları geliştirebilir, IoT ve endüstriyel sistemlerde güvenilir real-time iletişim kurabilirsiniz.

By tanju.bozok

Software Architect, Developer, and Entrepreneur

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir