Add Notifiarr support (#52)

This commit is contained in:
Marius Nechifor
2025-02-02 20:45:50 +02:00
parent 1713d0fd1e
commit 19b3675701
46 changed files with 834 additions and 25 deletions
@@ -14,6 +14,8 @@
<ItemGroup>
<PackageReference Include="FLM.QBittorrent" Version="1.0.0" />
<PackageReference Include="FLM.Transmission" Version="1.0.2" />
<PackageReference Include="Mapster" Version="7.4.0" />
<PackageReference Include="MassTransit" Version="8.3.6" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="9.0.0" />
<PackageReference Include="Quartz" Version="3.13.1" />
@@ -66,7 +66,7 @@ public abstract class ArrClient
return queueResponse;
}
public virtual bool ShouldRemoveFromQueue(InstanceType instanceType, QueueRecord record, bool isPrivateDownload)
public virtual async Task<bool> ShouldRemoveFromQueue(InstanceType instanceType, QueueRecord record, bool isPrivateDownload)
{
if (_queueCleanerConfig.ImportFailedIgnorePrivate && isPrivateDownload)
{
@@ -96,7 +96,7 @@ public abstract class ArrClient
return false;
}
return _striker.StrikeAndCheckLimit(
return await _striker.StrikeAndCheckLimit(
record.DownloadId,
record.Title,
_queueCleanerConfig.ImportFailedMaxStrikes,
@@ -9,6 +9,7 @@ using Infrastructure.Verticals.ItemStriker;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Series = Domain.Models.Sonarr.Series;
namespace Infrastructure.Verticals.Arr;
@@ -26,7 +27,7 @@ public sealed class SonarrClient : ArrClient
protected override string GetQueueUrlPath(int page)
{
return $"/api/v3/queue?page={page}&pageSize=200&includeUnknownSeriesItems=true&includeSeries=true";
return $"/api/v3/queue?page={page}&pageSize=200&includeUnknownSeriesItems=true&includeSeries=true&includeEpisode=true";
}
protected override string GetQueueDeleteUrlPath(long recordId, bool removeFromClient)
@@ -7,8 +7,10 @@ using Domain.Enums;
using Domain.Models.Arr;
using Domain.Models.Arr.Queue;
using Infrastructure.Verticals.Arr;
using Infrastructure.Verticals.Context;
using Infrastructure.Verticals.DownloadClient;
using Infrastructure.Verticals.Jobs;
using Infrastructure.Verticals.Notifications;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Serilog.Context;
@@ -32,12 +34,15 @@ public sealed class ContentBlocker : GenericHandler
LidarrClient lidarrClient,
ArrQueueIterator arrArrQueueIterator,
BlocklistProvider blocklistProvider,
DownloadServiceFactory downloadServiceFactory
DownloadServiceFactory downloadServiceFactory,
NotificationPublisher notifier
) : base(
logger, downloadClientConfig,
sonarrConfig, radarrConfig, lidarrConfig,
sonarrClient, radarrClient, lidarrClient,
arrArrQueueIterator, downloadServiceFactory
arrArrQueueIterator, downloadServiceFactory,
notifier
)
{
_config = config.Value;
@@ -75,6 +80,10 @@ public sealed class ContentBlocker : GenericHandler
BlocklistType blocklistType = _blocklistProvider.GetBlocklistType(instanceType);
ConcurrentBag<string> patterns = _blocklistProvider.GetPatterns(instanceType);
ConcurrentBag<Regex> regexes = _blocklistProvider.GetRegexes(instanceType);
// push to context
ContextProvider.Set(nameof(ArrInstance) + nameof(ArrInstance.Url), instance.Url);
ContextProvider.Set(nameof(InstanceType), instanceType);
await _arrArrQueueIterator.Iterate(arrClient, instance, async items =>
{
@@ -97,6 +106,9 @@ public sealed class ContentBlocker : GenericHandler
continue;
}
// push record to context
ContextProvider.Set(nameof(QueueRecord), record);
_logger.LogDebug("searching unwanted files for {title}", record.Title);
BlockFilesResult result = await _downloadService
@@ -119,6 +131,7 @@ public sealed class ContentBlocker : GenericHandler
}
await arrClient.DeleteQueueItemAsync(instance, record, removeFromClient);
await _notifier.NotifyQueueItemDelete(removeFromClient, DeleteReason.AllFilesBlocked);
}
});
@@ -0,0 +1,24 @@
using System.Collections.Immutable;
namespace Infrastructure.Verticals.Context;
public static class ContextProvider
{
private static readonly AsyncLocal<ImmutableDictionary<string, object>> _asyncLocalDict = new();
public static void Set(string key, object value)
{
ImmutableDictionary<string, object> currentDict = _asyncLocalDict.Value ?? ImmutableDictionary<string, object>.Empty;
_asyncLocalDict.Value = currentDict.SetItem(key, value);
}
public static object? Get(string key)
{
return _asyncLocalDict.Value?.TryGetValue(key, out object? value) is true ? value : null;
}
public static T? Get<T>(string key) where T : class
{
return Get(key) as T;
}
}
@@ -3,6 +3,7 @@ using System.Text.RegularExpressions;
using Common.Configuration.ContentBlocker;
using Common.Configuration.DownloadClient;
using Common.Configuration.QueueCleaner;
using Domain.Enums;
using Domain.Models.Deluge.Response;
using Infrastructure.Verticals.ContentBlocker;
using Infrastructure.Verticals.ItemStriker;
@@ -71,8 +72,18 @@ public sealed class DelugeService : DownloadServiceBase
}
});
result.ShouldRemove = shouldRemove || IsItemStuckAndShouldRemove(status);
if (shouldRemove)
{
result.DeleteReason = DeleteReason.AllFilesBlocked;
}
result.ShouldRemove = shouldRemove || await IsItemStuckAndShouldRemove(status);
result.IsPrivate = status.Private;
if (!shouldRemove && result.ShouldRemove)
{
result.DeleteReason = DeleteReason.Stalled;
}
return result;
}
@@ -180,7 +191,7 @@ public sealed class DelugeService : DownloadServiceBase
await _client.DeleteTorrent(hash);
}
private bool IsItemStuckAndShouldRemove(TorrentStatus status)
private async Task<bool> IsItemStuckAndShouldRemove(TorrentStatus status)
{
if (_queueCleanerConfig.StalledMaxStrikes is 0)
{
@@ -206,7 +217,7 @@ public sealed class DelugeService : DownloadServiceBase
ResetStrikesOnProgress(status.Hash!, status.TotalDone);
return StrikeAndCheckLimit(status.Hash!, status.Name!);
return await StrikeAndCheckLimit(status.Hash!, status.Name!);
}
private async Task<TorrentStatus?> GetTorrentStatus(string hash)
@@ -83,8 +83,8 @@ public abstract class DownloadServiceBase : IDownloadService
/// <param name="hash">The torrent hash.</param>
/// <param name="itemName">The name or title of the item.</param>
/// <returns>True if the limit has been reached; otherwise, false.</returns>
protected bool StrikeAndCheckLimit(string hash, string itemName)
protected async Task<bool> StrikeAndCheckLimit(string hash, string itemName)
{
return _striker.StrikeAndCheckLimit(hash, itemName, _queueCleanerConfig.StalledMaxStrikes, StrikeType.Stalled);
return await _striker.StrikeAndCheckLimit(hash, itemName, _queueCleanerConfig.StalledMaxStrikes, StrikeType.Stalled);
}
}
@@ -4,6 +4,7 @@ using Common.Configuration.ContentBlocker;
using Common.Configuration.DownloadClient;
using Common.Configuration.QueueCleaner;
using Common.Helpers;
using Domain.Enums;
using Infrastructure.Verticals.ContentBlocker;
using Infrastructure.Verticals.ItemStriker;
using Microsoft.Extensions.Caching.Memory;
@@ -73,6 +74,7 @@ public sealed class QBitService : DownloadServiceBase
if (torrent is { CompletionOn: not null, Downloaded: null or 0 })
{
result.ShouldRemove = true;
result.DeleteReason = DeleteReason.AllFilesBlocked;
return result;
}
@@ -82,10 +84,16 @@ public sealed class QBitService : DownloadServiceBase
if (files?.Count is > 0 && files.All(x => x.Priority is TorrentContentPriority.Skip))
{
result.ShouldRemove = true;
result.DeleteReason = DeleteReason.AllFilesBlocked;
return result;
}
result.ShouldRemove = IsItemStuckAndShouldRemove(torrent, result.IsPrivate);
result.ShouldRemove = await IsItemStuckAndShouldRemove(torrent, result.IsPrivate);
if (result.ShouldRemove)
{
result.DeleteReason = DeleteReason.Stalled;
}
return result;
}
@@ -197,7 +205,7 @@ public sealed class QBitService : DownloadServiceBase
_client.Dispose();
}
private bool IsItemStuckAndShouldRemove(TorrentInfo torrent, bool isPrivate)
private async Task<bool> IsItemStuckAndShouldRemove(TorrentInfo torrent, bool isPrivate)
{
if (_queueCleanerConfig.StalledMaxStrikes is 0)
{
@@ -220,6 +228,6 @@ public sealed class QBitService : DownloadServiceBase
ResetStrikesOnProgress(torrent.Hash, torrent.Downloaded ?? 0);
return StrikeAndCheckLimit(torrent.Hash, torrent.Name);
return await StrikeAndCheckLimit(torrent.Hash, torrent.Name);
}
}
@@ -1,4 +1,6 @@
namespace Infrastructure.Verticals.DownloadClient;
using Domain.Enums;
namespace Infrastructure.Verticals.DownloadClient;
public sealed record StalledResult
{
@@ -7,6 +9,8 @@ public sealed record StalledResult
/// </summary>
public bool ShouldRemove { get; set; }
public DeleteReason DeleteReason { get; set; }
/// <summary>
/// True if the download is private; otherwise false.
/// </summary>
@@ -4,6 +4,7 @@ using Common.Configuration.ContentBlocker;
using Common.Configuration.DownloadClient;
using Common.Configuration.QueueCleaner;
using Common.Helpers;
using Domain.Enums;
using Infrastructure.Verticals.ContentBlocker;
using Infrastructure.Verticals.ItemStriker;
using Microsoft.Extensions.Caching.Memory;
@@ -76,9 +77,19 @@ public sealed class TransmissionService : DownloadServiceBase
shouldRemove = false;
}
}
if (shouldRemove)
{
result.DeleteReason = DeleteReason.AllFilesBlocked;
}
// remove if all files are unwanted or download is stuck
result.ShouldRemove = shouldRemove || IsItemStuckAndShouldRemove(torrent);
result.ShouldRemove = shouldRemove || await IsItemStuckAndShouldRemove(torrent);
if (!shouldRemove && result.ShouldRemove)
{
result.DeleteReason = DeleteReason.Stalled;
}
return result;
}
@@ -178,7 +189,7 @@ public sealed class TransmissionService : DownloadServiceBase
{
}
private bool IsItemStuckAndShouldRemove(TorrentInfo torrent)
private async Task<bool> IsItemStuckAndShouldRemove(TorrentInfo torrent)
{
if (_queueCleanerConfig.StalledMaxStrikes is 0)
{
@@ -205,7 +216,7 @@ public sealed class TransmissionService : DownloadServiceBase
ResetStrikesOnProgress(torrent.HashString!, torrent.DownloadedEver ?? 0);
return StrikeAndCheckLimit(torrent.HashString!, torrent.Name!);
return await StrikeAndCheckLimit(torrent.HashString!, torrent.Name!);
}
private async Task<TorrentInfo?> GetTorrentAsync(string hash)
@@ -1,6 +1,8 @@
using Common.Helpers;
using Domain.Enums;
using Infrastructure.Helpers;
using Infrastructure.Verticals.Context;
using Infrastructure.Verticals.Notifications;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
@@ -11,16 +13,18 @@ public class Striker
private readonly ILogger<Striker> _logger;
private readonly IMemoryCache _cache;
private readonly MemoryCacheEntryOptions _cacheOptions;
private readonly NotificationPublisher _notifier;
public Striker(ILogger<Striker> logger, IMemoryCache cache)
public Striker(ILogger<Striker> logger, IMemoryCache cache, NotificationPublisher notifier)
{
_logger = logger;
_cache = cache;
_notifier = notifier;
_cacheOptions = new MemoryCacheEntryOptions()
.SetSlidingExpiration(StaticConfiguration.TriggerValue + Constants.CacheLimitBuffer);
}
public bool StrikeAndCheckLimit(string hash, string itemName, ushort maxStrikes, StrikeType strikeType)
public async Task<bool> StrikeAndCheckLimit(string hash, string itemName, ushort maxStrikes, StrikeType strikeType)
{
if (maxStrikes is 0)
{
@@ -29,7 +33,7 @@ public class Striker
string key = CacheKeys.Strike(strikeType, hash);
if (!_cache.TryGetValue(key, out int? strikeCount))
if (!_cache.TryGetValue(key, out int strikeCount))
{
strikeCount = 1;
}
@@ -39,6 +43,9 @@ public class Striker
}
_logger.LogInformation("item on strike number {strike} | reason {reason} | {name}", strikeCount, strikeType.ToString(), itemName);
await _notifier.NotifyStrike(strikeType, strikeCount);
_cache.Set(key, strikeCount, _cacheOptions);
if (strikeCount < maxStrikes)
@@ -5,6 +5,7 @@ using Domain.Models.Arr;
using Domain.Models.Arr.Queue;
using Infrastructure.Verticals.Arr;
using Infrastructure.Verticals.DownloadClient;
using Infrastructure.Verticals.Notifications;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -22,6 +23,7 @@ public abstract class GenericHandler : IDisposable
protected readonly LidarrClient _lidarrClient;
protected readonly ArrQueueIterator _arrArrQueueIterator;
protected readonly IDownloadService _downloadService;
protected readonly NotificationPublisher _notifier;
protected GenericHandler(
ILogger<GenericHandler> logger,
@@ -33,7 +35,8 @@ public abstract class GenericHandler : IDisposable
RadarrClient radarrClient,
LidarrClient lidarrClient,
ArrQueueIterator arrArrQueueIterator,
DownloadServiceFactory downloadServiceFactory
DownloadServiceFactory downloadServiceFactory,
NotificationPublisher notifier
)
{
_logger = logger;
@@ -46,6 +49,7 @@ public abstract class GenericHandler : IDisposable
_lidarrClient = lidarrClient;
_arrArrQueueIterator = arrArrQueueIterator;
_downloadService = downloadServiceFactory.CreateDownloadClient();
_notifier = notifier;
}
public virtual async Task ExecuteAsync()
@@ -0,0 +1,45 @@
using Infrastructure.Verticals.Notifications.Models;
using MassTransit;
using Microsoft.Extensions.Logging;
namespace Infrastructure.Verticals.Notifications.Consumers;
public sealed class NotificationConsumer<T> : IConsumer<T> where T : Notification
{
private readonly ILogger<NotificationConsumer<T>> _logger;
private readonly NotificationService _notificationService;
public NotificationConsumer(ILogger<NotificationConsumer<T>> logger, NotificationService notificationService)
{
_logger = logger;
_notificationService = notificationService;
}
public async Task Consume(ConsumeContext<T> context)
{
try
{
switch (context.Message)
{
case FailedImportStrikeNotification failedMessage:
await _notificationService.Notify(failedMessage);
break;
case StalledStrikeNotification stalledMessage:
await _notificationService.Notify(stalledMessage);
break;
case QueueItemDeleteNotification queueItemDeleteMessage:
await _notificationService.Notify(queueItemDeleteMessage);
break;
default:
throw new NotImplementedException();
}
// prevent spamming
await Task.Delay(1000);
}
catch (Exception exception)
{
_logger.LogError(exception, "error while processing notifications");
}
}
}
@@ -0,0 +1,10 @@
namespace Infrastructure.Verticals.Notifications;
public interface INotificationFactory
{
List<INotificationProvider> OnFailedImportStrikeEnabled();
List<INotificationProvider> OnStalledStrikeEnabled();
List<INotificationProvider> OnQueueItemDeleteEnabled();
}
@@ -0,0 +1,17 @@
using Common.Configuration.Notification;
using Infrastructure.Verticals.Notifications.Models;
namespace Infrastructure.Verticals.Notifications;
public interface INotificationProvider
{
NotificationConfig Config { get; }
string Name { get; }
Task OnFailedImportStrike(FailedImportStrikeNotification notification);
Task OnStalledStrike(StalledStrikeNotification notification);
Task OnQueueItemDelete(QueueItemDeleteNotification notification);
}
@@ -0,0 +1,5 @@
namespace Infrastructure.Verticals.Notifications.Models;
public sealed record FailedImportStrikeNotification : Notification
{
}
@@ -0,0 +1,20 @@
using Domain.Enums;
namespace Infrastructure.Verticals.Notifications.Models;
public record Notification
{
public required InstanceType InstanceType { get; init; }
public required Uri InstanceUrl { get; init; }
public required string Hash { get; init; }
public required string Title { get; init; }
public required string Description { get; init; }
public Uri? Image { get; init; }
public List<NotificationField>? Fields { get; init; }
}
@@ -0,0 +1,8 @@
namespace Infrastructure.Verticals.Notifications.Models;
public sealed record NotificationField
{
public required string Title { get; init; }
public required string Text { get; init; }
}
@@ -0,0 +1,5 @@
namespace Infrastructure.Verticals.Notifications.Models;
public sealed record QueueItemDeleteNotification : Notification
{
}
@@ -0,0 +1,5 @@
namespace Infrastructure.Verticals.Notifications.Models;
public sealed record StalledStrikeNotification : Notification
{
}
@@ -0,0 +1,6 @@
namespace Infrastructure.Verticals.Notifications.Notifiarr;
public interface INotifiarrProxy
{
Task SendNotification(NotifiarrPayload payload, NotifiarrConfig config);
}
@@ -0,0 +1,30 @@
using Common.Configuration.Notification;
using Microsoft.Extensions.Configuration;
namespace Infrastructure.Verticals.Notifications.Notifiarr;
public sealed record NotifiarrConfig : NotificationConfig
{
public const string SectionName = "Notifiarr";
[ConfigurationKeyName("API_KEY")]
public string? ApiKey { get; init; }
[ConfigurationKeyName("CHANNEL_ID")]
public string? ChannelId { get; init; }
public override bool IsValid()
{
if (string.IsNullOrEmpty(ApiKey?.Trim()))
{
return false;
}
if (string.IsNullOrEmpty(ChannelId?.Trim()))
{
return false;
}
return true;
}
}
@@ -0,0 +1,12 @@
namespace Infrastructure.Verticals.Notifications.Notifiarr;
public class NotifiarrException : Exception
{
public NotifiarrException(string message) : base(message)
{
}
public NotifiarrException(string message, Exception innerException) : base(message, innerException)
{
}
}
@@ -0,0 +1,57 @@
namespace Infrastructure.Verticals.Notifications.Notifiarr;
public class NotifiarrPayload
{
public NotifiarrNotification Notification { get; set; } = new NotifiarrNotification();
public Discord Discord { get; set; }
}
public class NotifiarrNotification
{
public bool Update { get; set; }
public string Name => "Cleanuperr";
public int? Event { get; set; }
}
public class Discord
{
public string Color { get; set; } = string.Empty;
public Ping Ping { get; set; }
public Images Images { get; set; }
public Text Text { get; set; }
public Ids Ids { get; set; }
}
public class Ping
{
public string PingUser { get; set; }
public string PingRole { get; set; }
}
public class Images
{
public Uri? Thumbnail { get; set; }
public Uri? Image { get; set; }
}
public class Text
{
public string Title { get; set; } = string.Empty;
public string Icon { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public string Description { get; set; } = string.Empty;
public List<Field> Fields { get; set; } = new List<Field>();
public string Footer { get; set; } = string.Empty;
}
public class Field
{
public string Title { get; set; } = string.Empty;
public string Text { get; set; } = string.Empty;
public bool Inline { get; set; }
}
public class Ids
{
public string Channel { get; set; }
}
@@ -0,0 +1,75 @@
using Domain.Enums;
using Infrastructure.Verticals.Notifications.Models;
using Mapster;
using Microsoft.Extensions.Options;
namespace Infrastructure.Verticals.Notifications.Notifiarr;
public class NotifiarrProvider : NotificationProvider
{
private readonly NotifiarrConfig _config;
private readonly INotifiarrProxy _proxy;
private const string WarningColor = "f0ad4e";
private const string ImportantColor = "bb2124";
public NotifiarrProvider(IOptions<NotifiarrConfig> config, INotifiarrProxy proxy)
: base(config)
{
_config = config.Value;
_proxy = proxy;
}
public override string Name => "Notifiarr";
public override async Task OnFailedImportStrike(FailedImportStrikeNotification notification)
{
await _proxy.SendNotification(BuildPayload(notification, WarningColor), _config);
}
public override async Task OnStalledStrike(StalledStrikeNotification notification)
{
await _proxy.SendNotification(BuildPayload(notification, WarningColor), _config);
}
public override async Task OnQueueItemDelete(QueueItemDeleteNotification notification)
{
await _proxy.SendNotification(BuildPayload(notification, ImportantColor), _config);
}
private NotifiarrPayload BuildPayload(Notification notification, string color)
{
NotifiarrPayload payload = new()
{
Discord = new()
{
Color = color,
Text = new()
{
Title = notification.Title,
Icon = "https://github.com/flmorg/cleanuperr/blob/main/Logo/48.png?raw=true",
Description = notification.Description,
Fields = new()
{
new() { Title = "Instance type", Text = notification.InstanceType.ToString() },
new() { Title = "Url", Text = notification.InstanceUrl.ToString() },
new() { Title = "Download hash", Text = notification.Hash }
}
},
Ids = new Ids
{
Channel = _config.ChannelId
},
Images = new()
{
Thumbnail = new Uri("https://github.com/flmorg/cleanuperr/blob/main/Logo/48.png?raw=true"),
Image = notification.Image
}
}
};
payload.Discord.Text.Fields.AddRange(notification.Fields?.Adapt<List<Field>>() ?? []);
return payload;
}
}
@@ -0,0 +1,55 @@
using System.Text;
using Common.Helpers;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
namespace Infrastructure.Verticals.Notifications.Notifiarr;
public class NotifiarrProxy : INotifiarrProxy
{
private readonly HttpClient _httpClient;
private const string Url = "https://notifiarr.com/api/v1/notification/passthrough/";
public NotifiarrProxy(IHttpClientFactory httpClientFactory)
{
_httpClient = httpClientFactory.CreateClient(Constants.HttpClientWithRetryName);
}
public async Task SendNotification(NotifiarrPayload payload, NotifiarrConfig config)
{
try
{
string content = JsonConvert.SerializeObject(payload, new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver()
});
using HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, $"{Url}{config.ApiKey}");
request.Method = HttpMethod.Post;
request.Content = new StringContent(content, Encoding.UTF8, "application/json");
using HttpResponseMessage response = await _httpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
}
catch (HttpRequestException exception)
{
if (exception.StatusCode is null)
{
throw new NotifiarrException("unable to send notification", exception);
}
switch ((int)exception.StatusCode)
{
case 401:
throw new NotifiarrException("unable to send notification | API key is invalid");
case 502:
case 503:
case 504:
throw new NotifiarrException("unable to send notification | service unavailable", exception);
default:
throw new NotifiarrException("unable to send notification", exception);
}
}
}
}
@@ -0,0 +1,32 @@
namespace Infrastructure.Verticals.Notifications;
public class NotificationFactory : INotificationFactory
{
private readonly IEnumerable<INotificationProvider> _providers;
public NotificationFactory(IEnumerable<INotificationProvider> providers)
{
_providers = providers;
}
protected List<INotificationProvider> ActiveProviders() =>
_providers
.Where(x => x.Config.IsValid())
.Where(provider => provider.Config.IsEnabled)
.ToList();
public List<INotificationProvider> OnFailedImportStrikeEnabled() =>
ActiveProviders()
.Where(n => n.Config.OnImportFailedStrike)
.ToList();
public List<INotificationProvider> OnStalledStrikeEnabled() =>
ActiveProviders()
.Where(n => n.Config.OnStalledStrike)
.ToList();
public List<INotificationProvider> OnQueueItemDeleteEnabled() =>
ActiveProviders()
.Where(n => n.Config.OnQueueItemDelete)
.ToList();
}
@@ -0,0 +1,23 @@
using Common.Configuration.Notification;
using Infrastructure.Verticals.Notifications.Models;
using Microsoft.Extensions.Options;
namespace Infrastructure.Verticals.Notifications;
public abstract class NotificationProvider : INotificationProvider
{
protected NotificationProvider(IOptions<NotificationConfig> config)
{
Config = config.Value;
}
public abstract string Name { get; }
public NotificationConfig Config { get; }
public abstract Task OnFailedImportStrike(FailedImportStrikeNotification notification);
public abstract Task OnStalledStrike(StalledStrikeNotification notification);
public abstract Task OnQueueItemDelete(QueueItemDeleteNotification notification);
}
@@ -0,0 +1,99 @@
using Common.Configuration.Arr;
using Domain.Enums;
using Domain.Models.Arr.Queue;
using Infrastructure.Verticals.Context;
using Infrastructure.Verticals.Notifications.Models;
using Mapster;
using MassTransit;
using Microsoft.Extensions.Logging;
namespace Infrastructure.Verticals.Notifications;
public sealed class NotificationPublisher
{
private readonly ILogger<NotificationPublisher> _logger;
private readonly IBus _messageBus;
public NotificationPublisher(ILogger<NotificationPublisher> logger, IBus messageBus)
{
_logger = logger;
_messageBus = messageBus;
}
public async Task NotifyStrike(StrikeType strikeType, int strikeCount)
{
try
{
QueueRecord record = GetRecordFromContext();
InstanceType instanceType = GetInstanceTypeFromContext();
Uri instanceUrl = GetInstanceUrlFromContext();
Uri? imageUrl = GetImageFromContext(record, instanceType);
Notification notification = new()
{
InstanceType = instanceType,
InstanceUrl = instanceUrl,
Hash = record.DownloadId.ToLowerInvariant(),
Title = $"Strike received with reason: {strikeType}",
Description = record.Title,
Image = imageUrl,
Fields = [new() { Title = "Strike count", Text = strikeCount.ToString() }]
};
switch (strikeType)
{
case StrikeType.Stalled:
await _messageBus.Publish(notification.Adapt<StalledStrikeNotification>());
break;
case StrikeType.ImportFailed:
await _messageBus.Publish(notification.Adapt<FailedImportStrikeNotification>());
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "failed to notify strike");
}
}
public async Task NotifyQueueItemDelete(bool removeFromClient, DeleteReason reason)
{
QueueRecord record = GetRecordFromContext();
InstanceType instanceType = GetInstanceTypeFromContext();
Uri instanceUrl = GetInstanceUrlFromContext();
Uri? imageUrl = GetImageFromContext(record, instanceType);
Notification notification = new()
{
InstanceType = instanceType,
InstanceUrl = instanceUrl,
Hash = record.DownloadId.ToLowerInvariant(),
Title = $"Deleting item from queue with reason: {reason}",
Description = record.Title,
Image = imageUrl,
Fields = [new() { Title = "Removed from download client?", Text = removeFromClient ? "Yes" : "No" }]
};
await _messageBus.Publish(notification.Adapt<QueueItemDeleteNotification>());
}
private static QueueRecord GetRecordFromContext() =>
ContextProvider.Get<QueueRecord>(nameof(QueueRecord)) ?? throw new Exception("failed to get record from context");
private static InstanceType GetInstanceTypeFromContext() =>
(InstanceType)(ContextProvider.Get<object>(nameof(InstanceType)) ??
throw new Exception("failed to get instance type from context"));
private static Uri GetInstanceUrlFromContext() =>
ContextProvider.Get<Uri>(nameof(ArrInstance) + nameof(ArrInstance.Url)) ??
throw new Exception("failed to get instance url from context");
private static Uri GetImageFromContext(QueueRecord record, InstanceType instanceType) =>
instanceType switch
{
InstanceType.Sonarr => record.Series!.Images.FirstOrDefault(x => x.CoverType == "poster")?.RemoteUrl,
InstanceType.Radarr => record.Movie!.Images.FirstOrDefault(x => x.CoverType == "poster")?.RemoteUrl,
InstanceType.Lidarr => record.Album!.Images.FirstOrDefault(x => x.CoverType == "cover")?.Url,
_ => throw new ArgumentOutOfRangeException(nameof(instanceType))
} ?? throw new Exception("failed to get image url from context");
}
@@ -0,0 +1,61 @@
using Infrastructure.Verticals.Notifications.Models;
using Microsoft.Extensions.Logging;
namespace Infrastructure.Verticals.Notifications;
public class NotificationService
{
private readonly ILogger<NotificationService> _logger;
private readonly INotificationFactory _notificationFactory;
public NotificationService(ILogger<NotificationService> logger, INotificationFactory notificationFactory)
{
_logger = logger;
_notificationFactory = notificationFactory;
}
public async Task Notify(FailedImportStrikeNotification notification)
{
foreach (INotificationProvider provider in _notificationFactory.OnFailedImportStrikeEnabled())
{
try
{
await provider.OnFailedImportStrike(notification);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "failed to send notification | provider {provider}", provider.Name);
}
}
}
public async Task Notify(StalledStrikeNotification notification)
{
foreach (INotificationProvider provider in _notificationFactory.OnStalledStrikeEnabled())
{
try
{
await provider.OnStalledStrike(notification);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "failed to send notification | provider {provider}", provider.Name);
}
}
}
public async Task Notify(QueueItemDeleteNotification notification)
{
foreach (INotificationProvider provider in _notificationFactory.OnQueueItemDeleteEnabled())
{
try
{
await provider.OnQueueItemDelete(notification);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "failed to send notification | provider {provider}", provider.Name);
}
}
}
}
@@ -5,8 +5,10 @@ using Domain.Enums;
using Domain.Models.Arr;
using Domain.Models.Arr.Queue;
using Infrastructure.Verticals.Arr;
using Infrastructure.Verticals.Context;
using Infrastructure.Verticals.DownloadClient;
using Infrastructure.Verticals.Jobs;
using Infrastructure.Verticals.Notifications;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Serilog.Context;
@@ -28,12 +30,14 @@ public sealed class QueueCleaner : GenericHandler
RadarrClient radarrClient,
LidarrClient lidarrClient,
ArrQueueIterator arrArrQueueIterator,
DownloadServiceFactory downloadServiceFactory
DownloadServiceFactory downloadServiceFactory,
NotificationPublisher notifier
) : base(
logger, downloadClientConfig,
sonarrConfig, radarrConfig, lidarrConfig,
sonarrClient, radarrClient, lidarrClient,
arrArrQueueIterator, downloadServiceFactory
arrArrQueueIterator, downloadServiceFactory,
notifier
)
{
_config = config.Value;
@@ -45,6 +49,10 @@ public sealed class QueueCleaner : GenericHandler
HashSet<SearchItem> itemsToBeRefreshed = [];
ArrClient arrClient = GetClient(instanceType);
// push to context
ContextProvider.Set(nameof(ArrInstance) + nameof(ArrInstance.Url), instance.Url);
ContextProvider.Set(nameof(InstanceType), instanceType);
await _arrArrQueueIterator.Iterate(arrClient, instance, async items =>
{
@@ -65,6 +73,9 @@ public sealed class QueueCleaner : GenericHandler
{
continue;
}
// push record to context
ContextProvider.Set(nameof(QueueRecord), record);
StalledResult stalledCheckResult = new();
@@ -75,7 +86,8 @@ public sealed class QueueCleaner : GenericHandler
}
// failed import check
bool shouldRemoveFromArr = arrClient.ShouldRemoveFromQueue(instanceType, record, stalledCheckResult.IsPrivate);
bool shouldRemoveFromArr = await arrClient.ShouldRemoveFromQueue(instanceType, record, stalledCheckResult.IsPrivate);
DeleteReason deleteReason = stalledCheckResult.ShouldRemove ? stalledCheckResult.DeleteReason : DeleteReason.ImportFailed;
if (!shouldRemoveFromArr && !stalledCheckResult.ShouldRemove)
{
@@ -101,6 +113,7 @@ public sealed class QueueCleaner : GenericHandler
}
await arrClient.DeleteQueueItemAsync(instance, record, removeFromClient);
await _notifier.NotifyQueueItemDelete(removeFromClient, deleteReason);
}
});