MQTT, IoT ve endüstriyel uygulamalarda vazgeçilmez bir protokol. Bu kapsamlı rehberde, C# ve .NET kullanarak MQTT’yi sıfırdan öğrenecek, gerçek dünya örnekleriyle production-ready uygulamalar geliştireceksiniz.

MQTT Nedir ve Neden Önemli?
MQTT (Message Queuing Telemetry Transport), hafif bir publish-subscribe messaging protokolü. Özellikle bant genişliğinin kısıtlı olduğu ortamlar için tasarlanmış ve güvenilir mesaj iletimi sağlar.

Temel Bileşenler
- Broker: Mesajları alan ve dağıtan merkezi sunucu
- Publisher: Mesaj gönderen istemci
- Subscriber: Mesaj alan istemci
- Topic: Mesajların kategorize edildiği kanallar
Ne Zaman MQTT Kullanmalısınız?
- IoT sensör verilerinin toplanması
- Gerçek zamanlı bildirim sistemleri
- Endüstriyel otomasyon ve makine-makine iletişimi
- Mobil uygulamalarda push notification altyapısı

Quality of Service (QoS) Seviyeleri
MQTT’de 3 QoS seviyesi vardır:
QoS 0 – At Most Once
- En hızlı ancak güvensiz
- Mesaj kaybolabilir
- Kullanım: Sensör verisi gibi sürekli akan data
var message = new MqttApplicationMessageBuilder()
.WithTopic("sensors/temperature")
.WithPayload(JsonSerializer.Serialize(temperatureData))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build();
QoS 1 – At Least Once
- Mesajın en az bir kez iletilmesini garanti eder
- Duplikasyon olabilir
- Kullanım: Kritik olmayan bildirimler
var message = new MqttApplicationMessageBuilder()
.WithTopic("alerts/warning")
.WithPayload("Sistem uyarısı")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
QoS 2 – Exactly Once
- Mesajın tam olarak bir kez iletilmesini garanti eder
- En yavaş ancak en güvenilir
- Kullanım: Finansal işlemler, kritik komutlar
var message = new MqttApplicationMessageBuilder()
.WithTopic("commands/critical")
.WithPayload(JsonSerializer.Serialize(criticalCommand))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
.Build();
C# ile MQTT İstemci Geliştirme
Gerekli NuGet Paketleri
<PackageReference Include="MQTTnet" Version="4.3.4" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.3.4" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
Temel Publisher Implementasyonu
using MQTTnet;
using MQTTnet.Client;
using System.Text.Json;
public interface IMqttPublisher
{
Task PublishAsync<T>(string topic, T payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce);
Task ConnectAsync();
Task DisconnectAsync();
}
public class MqttPublisher : IMqttPublisher, IDisposable
{
private readonly IMqttClient _client;
private readonly MqttClientOptions _options;
public MqttPublisher(string server, int port, string clientId)
{
var factory = new MqttFactory();
_client = factory.CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(server, port)
.WithCleanSession()
.Build();
}
public async Task ConnectAsync()
{
try
{
await _client.ConnectAsync(_options);
Console.WriteLine("MQTT bağlantısı başarılı");
}
catch (Exception ex)
{
Console.WriteLine($"MQTT bağlantı hatası: {ex.Message}");
throw;
}
}
public async Task PublishAsync<T>(string topic, T payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{
if (!_client.IsConnected)
{
await ConnectAsync();
}
var json = JsonSerializer.Serialize(payload);
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(json)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(false)
.Build();
await _client.PublishAsync(message);
}
public async Task DisconnectAsync()
{
if (_client.IsConnected)
{
await _client.DisconnectAsync();
}
}
public void Dispose()
{
_client?.Dispose();
}
}
Subscriber (Consumer) Implementasyonu
using MQTTnet;
using MQTTnet.Client;
using System.Text.Json;
public interface IMqttSubscriber
{
Task SubscribeAsync(string topic, Func<string, string, Task> messageHandler);
Task ConnectAsync();
Task DisconnectAsync();
}
public class MqttSubscriber : IMqttSubscriber, IDisposable
{
private readonly IMqttClient _client;
private readonly MqttClientOptions _options;
public MqttSubscriber(string server, int port, string clientId)
{
var factory = new MqttFactory();
_client = factory.CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(server, port)
.WithCleanSession()
.Build();
_client.ApplicationMessageReceivedAsync += OnMessageReceived;
}
private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
var topic = e.ApplicationMessage.Topic;
var payload = System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
Console.WriteLine($"Mesaj alındı - Topic: {topic}, Payload: {payload}");
// Burada mesaj işleme logic'i olacak
return Task.CompletedTask;
}
public async Task ConnectAsync()
{
await _client.ConnectAsync(_options);
}
public async Task SubscribeAsync(string topic, Func<string, string, Task> messageHandler)
{
if (!_client.IsConnected)
{
await ConnectAsync();
}
await _client.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
}
public async Task DisconnectAsync()
{
if (_client.IsConnected)
{
await _client.DisconnectAsync();
}
}
public void Dispose()
{
_client?.Dispose();
}
}
BackgroundService ile Sürekli Dinleme
Production ortamında MQTT subscriber’ları genellikle BackgroundService olarak çalışır:
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
public class MqttBackgroundService : BackgroundService
{
private readonly ILogger<MqttBackgroundService> _logger;
private readonly IMqttSubscriber _subscriber;
private readonly IServiceProvider _serviceProvider;
public MqttBackgroundService(
ILogger<MqttBackgroundService> logger,
IMqttSubscriber subscriber,
IServiceProvider serviceProvider)
{
_logger = logger;
_subscriber = subscriber;
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await _subscriber.ConnectAsync();
// Sensör verilerini dinle
await _subscriber.SubscribeAsync("sensors/+/temperature", async (topic, payload) =>
{
using var scope = _serviceProvider.CreateScope();
var processor = scope.ServiceProvider.GetRequiredService<ISensorDataProcessor>();
await processor.ProcessTemperatureData(topic, payload);
});
// Alarm mesajlarını dinle
await _subscriber.SubscribeAsync("alerts/+", async (topic, payload) =>
{
using var scope = _serviceProvider.CreateScope();
var alertService = scope.ServiceProvider.GetRequiredService<IAlertService>();
await alertService.ProcessAlert(topic, payload);
});
_logger.LogInformation("MQTT BackgroundService başlatıldı");
// Servis durdurana kadar bekle
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("MQTT BackgroundService durduruldu");
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT BackgroundService hatası");
}
finally
{
await _subscriber.DisconnectAsync();
}
}
}
Connection Resiliency ve Retry Logic
Production ortamında bağlantı kopmaları kaçınılmaz. Polly kütüphanesi ile retry policy implementasyonu:
<PackageReference Include="Polly" Version="8.4.1" />
using Polly;
using Polly.Extensions.Http;
public class ResilientMqttClient : IMqttPublisher
{
private readonly IMqttClient _client;
private readonly MqttClientOptions _options;
private readonly IAsyncPolicy _retryPolicy;
private readonly ILogger<ResilientMqttClient> _logger;
public ResilientMqttClient(
string server,
int port,
string clientId,
ILogger<ResilientMqttClient> logger)
{
_logger = logger;
var factory = new MqttFactory();
_client = factory.CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(server, port)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
.WithCommunicationTimeout(TimeSpan.FromSeconds(10))
.Build();
// Exponential backoff ile retry policy
_retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)) +
TimeSpan.FromMilliseconds(Random.Shared.Next(0, 1000)), // Jitter
onRetry: (outcome, delay, retryCount, context) =>
{
_logger.LogWarning("MQTT bağlantısı yeniden deneniyor. Deneme: {RetryCount}, Gecikme: {Delay}ms",
retryCount, delay.TotalMilliseconds);
});
_client.DisconnectedAsync += OnDisconnected;
}
private async Task OnDisconnected(MqttClientDisconnectedEventArgs e)
{
_logger.LogWarning("MQTT bağlantısı koptu: {Reason}", e.Reason);
// 5 saniye sonra yeniden bağlanmaya çalış
await Task.Delay(5000);
await _retryPolicy.ExecuteAsync(async () =>
{
if (!_client.IsConnected)
{
await _client.ConnectAsync(_options);
_logger.LogInformation("MQTT yeniden bağlantı başarılı");
}
});
}
public async Task PublishAsync<T>(string topic, T payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{
await _retryPolicy.ExecuteAsync(async () =>
{
if (!_client.IsConnected)
{
await _client.ConnectAsync(_options);
}
var json = JsonSerializer.Serialize(payload);
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(json)
.WithQualityOfServiceLevel(qos)
.Build();
await _client.PublishAsync(message);
});
}
public async Task ConnectAsync()
{
await _retryPolicy.ExecuteAsync(async () =>
{
await _client.ConnectAsync(_options);
});
}
}
Topic Tasarımı ve Best Practices
Topic Naming Conventions
Doğru topic yapısı:
company/facility/area/device/metric
- acme/factory1/production/line1/temperature
- acme/factory1/production/line1/pressure
- acme/factory1/quality/aoi1/defect_count
Yanlış topic yapısı:
temp
sensor1_data
ALERT_SYSTEM_WARNING
Wildcards Kullanımı
// Tek seviye wildcard (+)
await subscriber.SubscribeAsync("acme/factory1/+/temperature");
// Çok seviyeli wildcard (#)
await subscriber.SubscribeAsync("acme/factory1/production/#");
// Specific subscription
await subscriber.SubscribeAsync("acme/factory1/production/line1/temperature");
Güvenlik: TLS ve Authentication
TLS Konfigürasyonu
_options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(server, 8883) // TLS port
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
AllowUntrustedCertificates = false, // Production'da false olmalı
CertificateValidationHandler = context =>
{
// Sertifika doğrulama logic'i
return true;
}
})
.Build();
Username/Password Authentication
_options = new MqttClientOptionsBuilder()
.WithClientId(clientId)
.WithTcpServer(server, port)
.WithCredentials("username", "password")
.Build();
Entity Framework Core ile Data Persistence
Model Tanımları
public class SensorReading
{
public int Id { get; set; }
public string DeviceId { get; set; } = string.Empty;
public string Topic { get; set; } = string.Empty;
public string MetricType { get; set; } = string.Empty;
public double Value { get; set; }
public DateTime Timestamp { get; set; }
public string? Unit { get; set; }
public Dictionary<string, object>? Metadata { get; set; }
}
public class MqttDbContext : DbContext
{
public DbSet<SensorReading> SensorReadings { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<SensorReading>(entity =>
{
entity.HasKey(e => e.Id);
entity.HasIndex(e => new { e.DeviceId, e.Timestamp });
entity.HasIndex(e => e.Topic);
entity.Property(e => e.Metadata)
.HasConversion(
v => JsonSerializer.Serialize(v, (JsonSerializerOptions?)null),
v => JsonSerializer.Deserialize<Dictionary<string, object>>(v, (JsonSerializerOptions?)null));
});
}
}
Message Handler ile Database Integration
public interface ISensorDataProcessor
{
Task ProcessTemperatureData(string topic, string payload);
}
public class SensorDataProcessor : ISensorDataProcessor
{
private readonly MqttDbContext _context;
private readonly ILogger<SensorDataProcessor> _logger;
public SensorDataProcessor(MqttDbContext context, ILogger<SensorDataProcessor> logger)
{
_context = context;
_logger = logger;
}
public async Task ProcessTemperatureData(string topic, string payload)
{
try
{
var data = JsonSerializer.Deserialize<TemperatureReading>(payload);
if (data == null) return;
var sensorReading = new SensorReading
{
DeviceId = ExtractDeviceIdFromTopic(topic),
Topic = topic,
MetricType = "temperature",
Value = data.Temperature,
Timestamp = data.Timestamp,
Unit = "°C",
Metadata = new Dictionary<string, object>
{
["humidity"] = data.Humidity,
["location"] = data.Location
}
};
_context.SensorReadings.Add(sensorReading);
await _context.SaveChangesAsync();
// Kritik sıcaklık kontrolü
if (data.Temperature > 80)
{
await TriggerTemperatureAlert(sensorReading);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Sıcaklık verisi işlenirken hata: {Topic}", topic);
}
}
private string ExtractDeviceIdFromTopic(string topic)
{
// "acme/factory1/production/line1/temperature" -> "line1"
var parts = topic.Split('/');
return parts.Length > 3 ? parts[3] : "unknown";
}
private async Task TriggerTemperatureAlert(SensorReading reading)
{
// Alert sistemi tetikle
_logger.LogWarning("Kritik sıcaklık: {Temperature}°C, Cihaz: {DeviceId}",
reading.Value, reading.DeviceId);
}
}
public class TemperatureReading
{
public double Temperature { get; set; }
public double Humidity { get; set; }
public string Location { get; set; } = string.Empty;
public DateTime Timestamp { get; set; }
}
Docker Compose ile Deployment
Eclipse Mosquitto Broker Setup
# docker-compose.yml
version: '3.8'
services:
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
restart: unless-stopped
networks:
- mqtt-network
mqtt-app:
build: .
depends_on:
- mosquitto
- postgres
environment:
- MQTT_SERVER=mosquitto
- MQTT_PORT=1883
- DATABASE_CONNECTION=Host=postgres;Database=mqtt_db;Username=postgres;Password=postgres123
networks:
- mqtt-network
restart: unless-stopped
postgres:
image: postgres:16
environment:
POSTGRES_DB: mqtt_db
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres123
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- mqtt-network
volumes:
postgres_data:
networks:
mqtt-network:
driver: bridge
Mosquitto Konfigürasyonu
# mosquitto/config/mosquitto.conf
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
# WebSocket support
listener 9001
protocol websockets
# Logging
log_dest file /mosquitto/log/mosquitto.log
log_type error
log_type warning
log_type notice
log_type information
log_timestamp true
Performans Optimizasyonu
Batch Processing ile Throughput Artırma
public class BatchedSensorProcessor : ISensorDataProcessor
{
private readonly MqttDbContext _context;
private readonly Channel<SensorReading> _channel;
private readonly ChannelWriter<SensorReading> _writer;
private readonly ILogger<BatchedSensorProcessor> _logger;
public BatchedSensorProcessor(MqttDbContext context, ILogger<BatchedSensorProcessor> logger)
{
_context = context;
_logger = logger;
var options = new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};
_channel = Channel.CreateBounded<SensorReading>(options);
_writer = _channel.Writer;
// Background task ile batch processing
_ = Task.Run(ProcessBatches);
}
public async Task ProcessTemperatureData(string topic, string payload)
{
try
{
var data = JsonSerializer.Deserialize<TemperatureReading>(payload);
if (data == null) return;
var sensorReading = new SensorReading
{
DeviceId = ExtractDeviceIdFromTopic(topic),
Topic = topic,
MetricType = "temperature",
Value = data.Temperature,
Timestamp = data.Timestamp,
Unit = "°C"
};
await _writer.WriteAsync(sensorReading);
}
catch (Exception ex)
{
_logger.LogError(ex, "Mesaj kuyruğa eklenirken hata: {Topic}", topic);
}
}
private async Task ProcessBatches()
{
const int batchSize = 100;
const int timeoutMs = 5000;
var batch = new List<SensorReading>(batchSize);
await foreach (var reading in _channel.Reader.ReadAllAsync())
{
batch.Add(reading);
if (batch.Count >= batchSize)
{
await SaveBatch(batch);
batch.Clear();
}
// Timeout-based flush
if (batch.Count > 0 && DateTime.UtcNow.Subtract(batch[0].Timestamp).TotalMilliseconds > timeoutMs)
{
await SaveBatch(batch);
batch.Clear();
}
}
// Son batch'i kaydet
if (batch.Count > 0)
{
await SaveBatch(batch);
}
}
private async Task SaveBatch(List<SensorReading> batch)
{
try
{
_context.SensorReadings.AddRange(batch);
await _context.SaveChangesAsync();
_logger.LogInformation("Batch kaydedildi: {Count} kayıt", batch.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Batch kaydetme hatası: {Count} kayıt", batch.Count);
}
}
}
Monitoring ve Observability
OpenTelemetry ile Metrics
<PackageReference Include="OpenTelemetry" Version="1.9.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.9.0" />
using System.Diagnostics.Metrics;
public class MqttMetrics
{
private readonly Meter _meter;
private readonly Counter<int> _messagesReceived;
private readonly Counter<int> _messagesPublished;
private readonly Histogram<double> _processingDuration;
private readonly Gauge<int> _activeConnections;
public MqttMetrics()
{
_meter = new Meter("MyApp.MQTT");
_messagesReceived = _meter.CreateCounter<int>("mqtt_messages_received_total", "Total received messages");
_messagesPublished = _meter.CreateCounter<int>("mqtt_messages_published_total", "Total published messages");
_processingDuration = _meter.CreateHistogram<double>("mqtt_message_processing_duration_seconds", "Message processing duration");
_activeConnections = _meter.CreateGauge<int>("mqtt_active_connections", "Active MQTT connections");
}
public void RecordMessageReceived(string topic) =>
_messagesReceived.Add(1, new("topic", topic));
public void RecordMessagePublished(string topic) =>
_messagesPublished.Add(1, new("topic", topic));
public void RecordProcessingDuration(double durationSeconds, string topic) =>
_processingDuration.Record(durationSeconds, new("topic", topic));
public void SetActiveConnections(int count) =>
_activeConnections.Record(count);
}
// Instrumented Processor
public class InstrumentedSensorProcessor : ISensorDataProcessor
{
private readonly ISensorDataProcessor _inner;
private readonly MqttMetrics _metrics;
public InstrumentedSensorProcessor(ISensorDataProcessor inner, MqttMetrics metrics)
{
_inner = inner;
_metrics = metrics;
}
public async Task ProcessTemperatureData(string topic, string payload)
{
var stopwatch = Stopwatch.StartNew();
try
{
_metrics.RecordMessageReceived(topic);
await _inner.ProcessTemperatureData(topic, payload);
}
finally
{
stopwatch.Stop();
_metrics.RecordProcessingDuration(stopwatch.Elapsed.TotalSeconds, topic);
}
}
}
Health Checks
public class MqttHealthCheck : IHealthCheck
{
private readonly IMqttClient _client;
public MqttHealthCheck(IMqttClient client)
{
_client = client;
}
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
try
{
if (_client.IsConnected)
{
return Task.FromResult(HealthCheckResult.Healthy("MQTT broker bağlantısı aktif"));
}
else
{
return Task.FromResult(HealthCheckResult.Unhealthy("MQTT broker bağlantısı kapalı"));
}
}
catch (Exception ex)
{
return Task.FromResult(HealthCheckResult.Unhealthy($"MQTT health check hatası: {ex.Message}"));
}
}
}
Dependency Injection Setup
// Program.cs
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Diagnostics.HealthChecks;
var builder = WebApplication.CreateBuilder(args);
// Database
builder.Services.AddDbContext<MqttDbContext>(options =>
options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection")));
// MQTT Services
builder.Services.AddSingleton<IMqttPublisher>(sp =>
{
var logger = sp.GetRequiredService<ILogger<ResilientMqttClient>>();
return new ResilientMqttClient("localhost", 1883, "MyApp", logger);
});
builder.Services.AddSingleton<IMqttSubscriber>(sp =>
new MqttSubscriber("localhost", 1883, "MyAppConsumer"));
// Business Services
builder.Services.AddScoped<ISensorDataProcessor, BatchedSensorProcessor>();
builder.Services.AddSingleton<MqttMetrics>();
// Background Services
builder.Services.AddHostedService<MqttBackgroundService>();
// Health Checks
builder.Services.AddHealthChecks()
.AddCheck<MqttHealthCheck>("mqtt");
// OpenTelemetry
builder.Services.AddOpenTelemetry()
.WithMetrics(builder =>
{
builder.AddMeter("MyApp.MQTT")
.AddPrometheusExporter();
});
var app = builder.Build();
// Health check endpoint
app.MapHealthChecks("/health");
app.Run();
Test Stratejisi
Integration Test
[Fact]
public async Task Should_Process_Temperature_Message()
{
// Arrange
using var testServer = new TestMqttServer();
await testServer.StartAsync();
var client = new MqttSubscriber("localhost", testServer.Port, "test-client");
await client.ConnectAsync();
var processor = new SensorDataProcessor(_mockContext.Object, _mockLogger.Object);
// Act
await client.SubscribeAsync("test/temperature", async (topic, payload) =>
{
await processor.ProcessTemperatureData(topic, payload);
});
var publisher = new MqttPublisher("localhost", testServer.Port, "test-publisher");
await publisher.ConnectAsync();
var tempData = new TemperatureReading
{
Temperature = 25.5,
Humidity = 60.0,
Location = "Room A",
Timestamp = DateTime.UtcNow
};
await publisher.PublishAsync("test/temperature", tempData);
// Assert
await Task.Delay(1000); // Message processing için bekle
_mockContext.Verify(x => x.SaveChangesAsync(default), Times.Once);
}
public class TestMqttServer : IDisposable
{
private readonly IMqttServer _server;
public int Port { get; private set; }
public TestMqttServer()
{
Port = GetAvailablePort();
var factory = new MqttFactory();
_server = factory.CreateMqttServer(new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointPort(Port)
.Build());
}
public async Task StartAsync()
{
await _server.StartAsync();
}
private static int GetAvailablePort()
{
using var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)socket.LocalEndPoint!).Port;
}
public void Dispose()
{
_server?.Dispose();
}
}
Gerçek Dünya Senaryosu: Üretim Hattı Monitoring
// Üretim hattı veri modeli
public class ProductionLineData
{
public string LineId { get; set; } = string.Empty;
public int ProductsPerMinute { get; set; }
public double Temperature { get; set; }
public double Pressure { get; set; }
public bool IsRunning { get; set; }
public List<string> ActiveAlerts { get; set; } = new();
public DateTime Timestamp { get; set; }
}
// Specialized processor
public class ProductionLineProcessor : ISensorDataProcessor
{
private readonly MqttDbContext _context;
private readonly IMqttPublisher _alertPublisher;
private readonly ILogger<ProductionLineProcessor> _logger;
public ProductionLineProcessor(
MqttDbContext context,
IMqttPublisher alertPublisher,
ILogger<ProductionLineProcessor> logger)
{
_context = context;
_alertPublisher = alertPublisher;
_logger = logger;
}
public async Task ProcessProductionData(string topic, string payload)
{
try
{
var data = JsonSerializer.Deserialize<ProductionLineData>(payload);
if (data == null) return;
// Database'e kaydet
var reading = new SensorReading
{
DeviceId = data.LineId,
Topic = topic,
MetricType = "production",
Value = data.ProductsPerMinute,
Timestamp = data.Timestamp,
Unit = "ppm",
Metadata = new Dictionary<string, object>
{
["temperature"] = data.Temperature,
["pressure"] = data.Pressure,
["isRunning"] = data.IsRunning,
["alerts"] = data.ActiveAlerts
}
};
_context.SensorReadings.Add(reading);
await _context.SaveChangesAsync();
// Business logic kontrolları
await CheckProductionThresholds(data);
}
catch (Exception ex)
{
_logger.LogError(ex, "Üretim verisi işlenirken hata: {Topic}", topic);
}
}
private async Task CheckProductionThresholds(ProductionLineData data)
{
// Düşük verimlilik uyarısı
if (data.IsRunning && data.ProductsPerMinute < 50)
{
var alert = new ProductionAlert
{
LineId = data.LineId,
AlertType = "LOW_EFFICIENCY",
Message = $"Üretim verimi düştü: {data.ProductsPerMinute} ppm",
Severity = "WARNING",
Timestamp = DateTime.UtcNow
};
await _alertPublisher.PublishAsync($"alerts/production/{data.LineId}", alert);
}
// Kritik sıcaklık uyarısı
if (data.Temperature > 85)
{
var alert = new ProductionAlert
{
LineId = data.LineId,
AlertType = "HIGH_TEMPERATURE",
Message = $"Kritik sıcaklık: {data.Temperature}°C",
Severity = "CRITICAL",
Timestamp = DateTime.UtcNow
};
await _alertPublisher.PublishAsync($"alerts/critical/{data.LineId}", alert);
}
}
}
public class ProductionAlert
{
public string LineId { get; set; } = string.Empty;
public string AlertType { get; set; } = string.Empty;
public string Message { get; set; } = string.Empty;
public string Severity { get; set; } = string.Empty;
public DateTime Timestamp { get; set; }
}
Özet ve Best Practices
Yapılması Gerekenler
- QoS seviyesini senaryoya göre seç
- Retry logic ve circuit breaker kullan
- Topic hierarchy düzgün tasarla
- TLS ve authentication kullan
- Batch processing ile performansı artır
- Metrics ve health checks ekle
- Connection pooling için singleton pattern kullan
Kaçınılması Gerekenler
- Retained messages gereksiz kullanma
- Broad wildcards (#) sık kullanma
- Large payloads (>1MB) gönderme
- Clean session=false gereksiz kullanma
- Exception handling ihmal etme
Bu kapsamlı rehber ile C# ve .NET kullanarak production-ready MQTT uygulamaları geliştirebilir, IoT ve endüstriyel sistemlerde güvenilir real-time iletişim kurabilirsiniz.