add content blocker (#5)

* refactored code
added deluge support
added transmission support
added content blocker
added blacklist and whitelist

* increased level on some logs; updated test docker compose; updated dev appsettings

* updated docker compose and readme

* moved some logs

* fixed env var typo; fixed sonarr and radarr default download client
This commit is contained in:
Marius Nechifor
2024-11-18 20:08:01 +02:00
committed by GitHub
parent b323cb40ae
commit e0a6c7842b
154 changed files with 4752 additions and 789 deletions
@@ -12,6 +12,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="FLM.Transmission" Version="1.0.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.1" />
<PackageReference Include="QBittorrent.Client" Version="1.9.24285.1" />
@@ -0,0 +1,53 @@
using Common.Configuration;
using Domain.Arr.Queue;
using Microsoft.Extensions.Logging;
namespace Infrastructure.Verticals.Arr;
public sealed class ArrQueueIterator
{
private readonly ILogger<ArrQueueIterator> _logger;
public ArrQueueIterator(ILogger<ArrQueueIterator> logger)
{
_logger = logger;
}
public async Task Iterate(ArrClient arrClient, ArrInstance arrInstance, Func<IReadOnlyList<QueueRecord>, Task> action)
{
const ushort maxPage = 100;
ushort page = 1;
int totalRecords = 0;
int processedRecords = 0;
do
{
QueueListResponse queueResponse = await arrClient.GetQueueItemsAsync(arrInstance, page);
if (totalRecords is 0)
{
totalRecords = queueResponse.TotalRecords;
_logger.LogInformation(
"{items} items found in queue | {url}",
queueResponse.TotalRecords, arrInstance.Url);
}
if (queueResponse.Records.Count is 0)
{
break;
}
await action(queueResponse.Records);
processedRecords += queueResponse.Records.Count;
if (processedRecords >= totalRecords)
{
break;
}
page++;
} while (processedRecords < totalRecords && page < maxPage);
}
}
@@ -1,6 +1,6 @@
using System.Text;
using Common.Configuration;
using Domain.Radarr;
using Domain.Models.Radarr;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
@@ -1,6 +1,6 @@
using System.Text;
using Common.Configuration;
using Domain.Sonarr;
using Domain.Models.Sonarr;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
@@ -0,0 +1,124 @@
using System.Diagnostics;
using System.Text.RegularExpressions;
using Common.Configuration.ContentBlocker;
using Domain.Enums;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Infrastructure.Verticals.ContentBlocker;
public sealed class BlocklistProvider
{
private readonly ILogger<BlocklistProvider> _logger;
private readonly ContentBlockerConfig _config;
private readonly HttpClient _httpClient;
public BlocklistType BlocklistType { get; }
public List<string> Patterns { get; } = [];
public List<Regex> Regexes { get; } = [];
public BlocklistProvider(
ILogger<BlocklistProvider> logger,
IOptions<ContentBlockerConfig> config,
IHttpClientFactory httpClientFactory)
{
_logger = logger;
_config = config.Value;
_httpClient = httpClientFactory.CreateClient();
_config.Validate();
if (_config.Blacklist?.Enabled is true)
{
BlocklistType = BlocklistType.Blacklist;
}
if (_config.Whitelist?.Enabled is true)
{
BlocklistType = BlocklistType.Whitelist;
}
}
public async Task LoadBlocklistAsync()
{
if (Patterns.Count > 0 || Regexes.Count > 0)
{
_logger.LogDebug("blocklist already loaded");
return;
}
try
{
await LoadPatternsAndRegexesAsync();
}
catch
{
_logger.LogError("failed to load {type}", BlocklistType.ToString());
throw;
}
}
private async Task LoadPatternsAndRegexesAsync()
{
string[] patterns;
if (BlocklistType is BlocklistType.Blacklist)
{
patterns = await ReadContentAsync(_config.Blacklist.Path);
}
else
{
patterns = await ReadContentAsync(_config.Whitelist.Path);
}
long startTime = Stopwatch.GetTimestamp();
ParallelOptions options = new() { MaxDegreeOfParallelism = 5 };
Parallel.ForEach(patterns, options, pattern =>
{
try
{
Regex regex = new(pattern, RegexOptions.Compiled);
Regexes.Add(regex);
}
catch (ArgumentException)
{
Patterns.Add(pattern);
}
});
TimeSpan elapsed = Stopwatch.GetElapsedTime(startTime);
_logger.LogDebug("loaded {count} patterns", Patterns.Count);
_logger.LogDebug("loaded {count} regexes", Regexes.Count);
_logger.LogDebug("blocklist loaded in {elapsed} ms", elapsed.TotalMilliseconds);
}
private async Task<string[]> ReadContentAsync(string path)
{
if (Uri.TryCreate(path, UriKind.Absolute, out var uri) && (uri.Scheme == Uri.UriSchemeHttp || uri.Scheme == Uri.UriSchemeHttps))
{
// http(s) url
return await ReadFromUrlAsync(path);
}
if (File.Exists(path))
{
// local file path
return await File.ReadAllLinesAsync(path);
}
throw new ArgumentException($"blocklist not found | {path}");
}
private async Task<string[]> ReadFromUrlAsync(string url)
{
using HttpResponseMessage response = await _httpClient.GetAsync(url);
response.EnsureSuccessStatusCode();
return (await response.Content.ReadAsStringAsync())
.Split(['\r','\n'], StringSplitOptions.RemoveEmptyEntries);
}
}
@@ -0,0 +1,98 @@
using Common.Configuration;
using Domain.Arr.Queue;
using Domain.Enums;
using Infrastructure.Verticals.Arr;
using Infrastructure.Verticals.DownloadClient;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Infrastructure.Verticals.ContentBlocker;
public sealed class ContentBlocker : IDisposable
{
private readonly ILogger<ContentBlocker> _logger;
private readonly SonarrConfig _sonarrConfig;
private readonly RadarrConfig _radarrConfig;
private readonly SonarrClient _sonarrClient;
private readonly RadarrClient _radarrClient;
private readonly ArrQueueIterator _arrArrQueueIterator;
private readonly BlocklistProvider _blocklistProvider;
private readonly IDownloadService _downloadService;
public ContentBlocker(
ILogger<ContentBlocker> logger,
IOptions<SonarrConfig> sonarrConfig,
IOptions<RadarrConfig> radarrConfig,
SonarrClient sonarrClient,
RadarrClient radarrClient,
ArrQueueIterator arrArrQueueIterator,
BlocklistProvider blocklistProvider,
DownloadServiceFactory downloadServiceFactory
)
{
_logger = logger;
_sonarrConfig = sonarrConfig.Value;
_radarrConfig = radarrConfig.Value;
_sonarrClient = sonarrClient;
_radarrClient = radarrClient;
_arrArrQueueIterator = arrArrQueueIterator;
_blocklistProvider = blocklistProvider;
_downloadService = downloadServiceFactory.CreateDownloadClient();
}
public async Task ExecuteAsync()
{
await _blocklistProvider.LoadBlocklistAsync();
await _downloadService.LoginAsync();
await ProcessArrConfigAsync(_sonarrConfig, InstanceType.Sonarr);
await ProcessArrConfigAsync(_radarrConfig, InstanceType.Radarr);
}
private async Task ProcessArrConfigAsync(ArrConfig config, InstanceType instanceType)
{
if (!config.Enabled)
{
return;
}
foreach (ArrInstance arrInstance in config.Instances)
{
try
{
await ProcessInstanceAsync(arrInstance, instanceType);
}
catch (Exception exception)
{
_logger.LogError(exception, "failed to block content for {type} instance | {url}", instanceType, arrInstance.Url);
}
}
}
private async Task ProcessInstanceAsync(ArrInstance instance, InstanceType instanceType)
{
ArrClient arrClient = GetClient(instanceType);
await _arrArrQueueIterator.Iterate(arrClient, instance, async items =>
{
foreach (QueueRecord record in items)
{
_logger.LogDebug("searching unwanted files for {title}", record.Title);
await _downloadService.BlockUnwantedFilesAsync(record.DownloadId);
}
});
}
private ArrClient GetClient(InstanceType type) =>
type switch
{
InstanceType.Sonarr => _sonarrClient,
InstanceType.Radarr => _radarrClient,
_ => throw new NotImplementedException($"instance type {type} is not yet supported")
};
public void Dispose()
{
_downloadService.Dispose();
}
}
@@ -0,0 +1,81 @@
using Domain.Enums;
using Microsoft.Extensions.Logging;
namespace Infrastructure.Verticals.ContentBlocker;
public sealed class FilenameEvaluator
{
private readonly ILogger<FilenameEvaluator> _logger;
private readonly BlocklistProvider _blocklistProvider;
public FilenameEvaluator(ILogger<FilenameEvaluator> logger, BlocklistProvider blocklistProvider)
{
_logger = logger;
_blocklistProvider = blocklistProvider;
}
// TODO create unit tests
public bool IsValid(string filename)
{
return IsValidAgainstPatterns(filename) && IsValidAgainstRegexes(filename);
}
private bool IsValidAgainstPatterns(string filename)
{
if (_blocklistProvider.Patterns.Count is 0)
{
return true;
}
return _blocklistProvider.BlocklistType switch
{
BlocklistType.Blacklist => !_blocklistProvider.Patterns.Any(pattern => MatchesPattern(filename, pattern)),
BlocklistType.Whitelist => _blocklistProvider.Patterns.Any(pattern => MatchesPattern(filename, pattern)),
_ => true
};
}
private bool IsValidAgainstRegexes(string filename)
{
if (_blocklistProvider.Regexes.Count is 0)
{
return true;
}
return _blocklistProvider.BlocklistType switch
{
BlocklistType.Blacklist => !_blocklistProvider.Regexes.Any(regex => regex.IsMatch(filename)),
BlocklistType.Whitelist => _blocklistProvider.Regexes.Any(regex => regex.IsMatch(filename)),
_ => true
};
}
private static bool MatchesPattern(string filename, string pattern)
{
bool hasStartWildcard = pattern.StartsWith('*');
bool hasEndWildcard = pattern.EndsWith('*');
if (hasStartWildcard && hasEndWildcard)
{
return filename.Contains(
pattern.Substring(1, pattern.Length - 2),
StringComparison.InvariantCultureIgnoreCase
);
}
if (hasStartWildcard)
{
return filename.EndsWith(pattern.Substring(1), StringComparison.InvariantCultureIgnoreCase);
}
if (hasEndWildcard)
{
return filename.StartsWith(
pattern.Substring(0, pattern.Length - 1),
StringComparison.InvariantCultureIgnoreCase
);
}
return filename == pattern;
}
}
@@ -0,0 +1,138 @@
using System.Net.Http.Headers;
using System.Text.Json.Serialization;
using Common.Configuration;
using Domain.Models.Deluge.Exceptions;
using Domain.Models.Deluge.Request;
using Domain.Models.Deluge.Response;
using Infrastructure.Verticals.DownloadClient.Deluge.Extensions;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
namespace Infrastructure.Verticals.DownloadClient.Deluge;
public sealed class DelugeClient
{
private readonly DelugeConfig _config;
private readonly HttpClient _httpClient;
public DelugeClient(IOptions<DelugeConfig> config, IHttpClientFactory httpClientFactory)
{
_config = config.Value;
_httpClient = httpClientFactory.CreateClient(nameof(DelugeService));
}
public async Task<bool> LoginAsync()
{
return await SendRequest<bool>("auth.login", _config.Password);
}
public async Task<bool> Logout()
{
return await SendRequest<bool>("auth.delete_session");
}
public async Task<List<DelugeTorrent>> ListTorrents(Dictionary<string, string>? filters = null)
{
filters ??= new Dictionary<string, string>();
var keys = typeof(DelugeTorrent).GetAllJsonPropertyFromType();
Dictionary<string, DelugeTorrent> result =
await SendRequest<Dictionary<string, DelugeTorrent>>("core.get_torrents_status", filters, keys);
return result.Values.ToList();
}
public async Task<List<DelugeTorrentExtended>> ListTorrentsExtended(Dictionary<string, string>? filters = null)
{
filters ??= new Dictionary<string, string>();
var keys = typeof(DelugeTorrentExtended).GetAllJsonPropertyFromType();
Dictionary<string, DelugeTorrentExtended> result =
await SendRequest<Dictionary<string, DelugeTorrentExtended>>("core.get_torrents_status", filters, keys);
return result.Values.ToList();
}
public async Task<DelugeTorrent?> GetTorrent(string hash)
{
List<DelugeTorrent> torrents = await ListTorrents(new Dictionary<string, string>() { { "hash", hash } });
return torrents.FirstOrDefault();
}
public async Task<DelugeTorrentExtended?> GetTorrentExtended(string hash)
{
List<DelugeTorrentExtended> torrents =
await ListTorrentsExtended(new Dictionary<string, string> { { "hash", hash } });
return torrents.FirstOrDefault();
}
public async Task<DelugeContents?> GetTorrentFiles(string hash)
{
return await SendRequest<DelugeContents?>("web.get_torrent_files", hash);
}
public async Task ChangeFilesPriority(string hash, List<int> priorities)
{
Dictionary<string, List<int>> filePriorities = new()
{
{ "file_priorities", priorities }
};
await SendRequest<DelugeResponse<object>>("core.set_torrent_options", hash, filePriorities);
}
private async Task<String> PostJson(String json)
{
StringContent content = new StringContent(json);
content.Headers.ContentType = new MediaTypeWithQualityHeaderValue("application/json");
var responseMessage = await _httpClient.PostAsync(new Uri(_config.Url, "/json"), content);
responseMessage.EnsureSuccessStatusCode();
var responseJson = await responseMessage.Content.ReadAsStringAsync();
return responseJson;
}
private DelugeRequest CreateRequest(string method, params object[] parameters)
{
if (String.IsNullOrWhiteSpace(method))
{
throw new ArgumentException(nameof(method));
}
return new DelugeRequest(1, method, parameters);
}
public async Task<T> SendRequest<T>(string method, params object[] parameters)
{
return await SendRequest<T>(CreateRequest(method, parameters));
}
public async Task<T> SendRequest<T>(DelugeRequest webRequest)
{
var requestJson = JsonConvert.SerializeObject(webRequest, Formatting.None, new JsonSerializerSettings
{
NullValueHandling = webRequest.NullValueHandling
});
var responseJson = await PostJson(requestJson);
var settings = new JsonSerializerSettings
{
Error = (_, args) =>
{
// Suppress the error and continue
args.ErrorContext.Handled = true;
}
};
DelugeResponse<T>? webResponse = JsonConvert.DeserializeObject<DelugeResponse<T>>(responseJson, settings);
if (webResponse?.Error != null)
{
throw new DelugeClientException(webResponse.Error.Message);
}
if (webResponse?.ResponseId != webRequest.RequestId)
{
throw new DelugeClientException("desync");
}
return webResponse.Result;
}
}
@@ -0,0 +1,164 @@
using Common.Configuration;
using Domain.Models.Deluge.Response;
using Infrastructure.Verticals.ContentBlocker;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Infrastructure.Verticals.DownloadClient.Deluge;
public sealed class DelugeService : IDownloadService
{
private readonly ILogger<DelugeService> _logger;
private readonly DelugeClient _client;
private readonly FilenameEvaluator _filenameEvaluator;
public DelugeService(
ILogger<DelugeService> logger,
IOptions<DelugeConfig> config,
IHttpClientFactory httpClientFactory,
FilenameEvaluator filenameEvaluator
)
{
_logger = logger;
_client = new (config, httpClientFactory);
_filenameEvaluator = filenameEvaluator;
}
public async Task LoginAsync()
{
await _client.LoginAsync();
}
public async Task<bool> ShouldRemoveFromArrQueueAsync(string hash)
{
hash = hash.ToLowerInvariant();
DelugeContents? contents = null;
if (!await HasMinimalStatus(hash))
{
return false;
}
try
{
contents = await _client.GetTorrentFiles(hash);
}
catch (Exception exception)
{
_logger.LogDebug(exception, "failed to find torrent {hash} in the download client", hash);
}
if (contents is null)
{
return false;
}
bool shouldRemove = true;
ProcessFiles(contents.Contents, (_, file) =>
{
if (file.Priority > 0)
{
shouldRemove = false;
}
});
return shouldRemove;
}
public async Task BlockUnwantedFilesAsync(string hash)
{
hash = hash.ToLowerInvariant();
if (!await HasMinimalStatus(hash))
{
return;
}
DelugeContents? contents = null;
try
{
contents = await _client.GetTorrentFiles(hash);
}
catch (Exception exception)
{
_logger.LogDebug(exception, "failed to find torrent {hash} in the download client", hash);
}
if (contents is null)
{
return;
}
Dictionary<int, int> priorities = [];
bool hasPriorityUpdates = false;
ProcessFiles(contents.Contents, (name, file) =>
{
int priority = file.Priority;
if (file.Priority is not 0 && !_filenameEvaluator.IsValid(name))
{
priority = 0;
hasPriorityUpdates = true;
_logger.LogInformation("unwanted file found | {file}", file.Path);
}
priorities.Add(file.Index, priority);
});
if (!hasPriorityUpdates)
{
return;
}
_logger.LogDebug("changing priorities | torrent {hash}", hash);
List<int> sortedPriorities = priorities
.OrderBy(x => x.Key)
.Select(x => x.Value)
.ToList();
await _client.ChangeFilesPriority(hash, sortedPriorities);
}
private async Task<bool> HasMinimalStatus(string hash)
{
DelugeMinimalStatus? status = await _client.SendRequest<DelugeMinimalStatus?>(
"web.get_torrent_status",
hash,
new[] { "hash" }
);
if (status?.Hash is null)
{
_logger.LogDebug("failed to find torrent {hash} in the download client", hash);
return false;
}
return true;
}
private static void ProcessFiles(Dictionary<string, DelugeFileOrDirectory> contents, Action<string, DelugeFileOrDirectory> processFile)
{
foreach (var (name, data) in contents)
{
switch (data.Type)
{
case "file":
processFile(name, data);
break;
case "dir" when data.Contents is not null:
// Recurse into subdirectories
ProcessFiles(data.Contents, processFile);
break;
}
}
}
public void Dispose()
{
}
}
@@ -0,0 +1,20 @@
using Newtonsoft.Json;
namespace Infrastructure.Verticals.DownloadClient.Deluge.Extensions;
internal static class DelugeExtensions
{
public static List<String?> GetAllJsonPropertyFromType(this Type t)
{
var type = typeof(JsonPropertyAttribute);
var props = t.GetProperties()
.Where(prop => Attribute.IsDefined(prop, type))
.ToList();
return props
.Select(x => x.GetCustomAttributes(type, true).Single())
.Cast<JsonPropertyAttribute>()
.Select(x => x.PropertyName)
.ToList();
}
}
@@ -0,0 +1,65 @@
using Common.Configuration;
using Infrastructure.Verticals.DownloadClient.Deluge;
using Infrastructure.Verticals.DownloadClient.QBittorrent;
using Infrastructure.Verticals.DownloadClient.Transmission;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
namespace Infrastructure.Verticals.DownloadClient;
public sealed class DownloadServiceFactory
{
private readonly QBitConfig _qBitConfig;
private readonly DelugeConfig _delugeConfig;
private readonly TransmissionConfig _transmissionConfig;
private readonly IServiceProvider _serviceProvider;
public DownloadServiceFactory(
IOptions<QBitConfig> qBitConfig,
IOptions<DelugeConfig> delugeConfig,
IOptions<TransmissionConfig> transmissionConfig,
IServiceProvider serviceProvider)
{
_qBitConfig = qBitConfig.Value;
_delugeConfig = delugeConfig.Value;
_transmissionConfig = transmissionConfig.Value;
_serviceProvider = serviceProvider;
_qBitConfig.Validate();
_delugeConfig.Validate();
_transmissionConfig.Validate();
int enabledCount = new[] { _qBitConfig.Enabled, _delugeConfig.Enabled, _transmissionConfig.Enabled }
.Count(enabled => enabled);
if (enabledCount > 1)
{
throw new Exception("only one download client can be enabled");
}
if (enabledCount == 0)
{
throw new Exception("no download client is enabled");
}
}
public IDownloadService CreateDownloadClient()
{
if (_qBitConfig.Enabled)
{
return _serviceProvider.GetRequiredService<QBitService>();
}
if (_delugeConfig.Enabled)
{
return _serviceProvider.GetRequiredService<DelugeService>();
}
if (_transmissionConfig.Enabled)
{
return _serviceProvider.GetRequiredService<TransmissionService>();
}
throw new NotSupportedException();
}
}
@@ -0,0 +1,10 @@
namespace Infrastructure.Verticals.DownloadClient;
public interface IDownloadService : IDisposable
{
public Task LoginAsync();
public Task<bool> ShouldRemoveFromArrQueueAsync(string hash);
public Task BlockUnwantedFilesAsync(string hash);
}
@@ -0,0 +1,95 @@
using Common.Configuration;
using Infrastructure.Verticals.ContentBlocker;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using QBittorrent.Client;
namespace Infrastructure.Verticals.DownloadClient.QBittorrent;
public sealed class QBitService : IDownloadService
{
private readonly ILogger<QBitService> _logger;
private readonly QBitConfig _config;
private readonly QBittorrentClient _client;
private readonly FilenameEvaluator _filenameEvaluator;
public QBitService(
ILogger<QBitService> logger,
IOptions<QBitConfig> config,
FilenameEvaluator filenameEvaluator
)
{
_logger = logger;
_config = config.Value;
_client = new(_config.Url);
_filenameEvaluator = filenameEvaluator;
}
public async Task LoginAsync()
{
await _client.LoginAsync(_config.Username, _config.Password);
}
public async Task<bool> ShouldRemoveFromArrQueueAsync(string hash)
{
TorrentInfo? torrent = (await _client.GetTorrentListAsync(new TorrentListQuery { Hashes = [hash] }))
.FirstOrDefault();
if (torrent is null)
{
return false;
}
// if all files were blocked by qBittorrent
if (torrent is { CompletionOn: not null, Downloaded: null or 0 })
{
return true;
}
IReadOnlyList<TorrentContent>? files = await _client.GetTorrentContentsAsync(hash);
if (files is null)
{
return false;
}
// if all files are marked as skip
if (files.All(x => x.Priority is TorrentContentPriority.Skip))
{
return true;
}
return false;
}
public async Task BlockUnwantedFilesAsync(string hash)
{
IReadOnlyList<TorrentContent>? files = await _client.GetTorrentContentsAsync(hash);
if (files is null)
{
return;
}
foreach (TorrentContent file in files)
{
if (!file.Index.HasValue)
{
continue;
}
if (file.Priority is TorrentContentPriority.Skip || _filenameEvaluator.IsValid(file.Name))
{
continue;
}
_logger.LogInformation("unwanted file found | {file}", file.Name);
await _client.SetFilePriorityAsync(hash, file.Index.Value, TorrentContentPriority.Skip);
}
}
public void Dispose()
{
_client.Dispose();
}
}
@@ -0,0 +1,141 @@
using Common.Configuration;
using Infrastructure.Verticals.ContentBlocker;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Transmission.API.RPC;
using Transmission.API.RPC.Arguments;
using Transmission.API.RPC.Entity;
namespace Infrastructure.Verticals.DownloadClient.Transmission;
public sealed class TransmissionService : IDownloadService
{
private readonly ILogger<TransmissionService> _logger;
private readonly TransmissionConfig _config;
private readonly Client _client;
private readonly FilenameEvaluator _filenameEvaluator;
private TorrentInfo[]? _torrentsCache;
public TransmissionService(
ILogger<TransmissionService> logger,
IOptions<TransmissionConfig> config,
FilenameEvaluator filenameEvaluator
)
{
_logger = logger;
_config = config.Value;
_client = new(
new Uri(_config.Url, "/transmission/rpc").ToString(),
login: _config.Username,
password: _config.Password
);
_filenameEvaluator = filenameEvaluator;
}
public async Task LoginAsync()
{
await _client.GetSessionInformationAsync();
}
public async Task<bool> ShouldRemoveFromArrQueueAsync(string hash)
{
TorrentInfo? torrent = await GetTorrentAsync(hash);
if (torrent is null)
{
return false;
}
foreach (TransmissionTorrentFileStats? stats in torrent.FileStats ?? [])
{
if (!stats.Wanted.HasValue)
{
// if any files stats are missing, do not remove
return false;
}
if (stats.Wanted.HasValue && stats.Wanted.Value)
{
// if any files are wanted, do not remove
return false;
}
}
// remove if all files are unwanted
return true;
}
public async Task BlockUnwantedFilesAsync(string hash)
{
TorrentInfo? torrent = await GetTorrentAsync(hash);
if (torrent?.FileStats is null || torrent.Files is null)
{
return;
}
List<long> unwantedFiles = [];
for (int i = 0; i < torrent.Files.Length; i++)
{
if (torrent.FileStats?[i].Wanted == null)
{
continue;
}
if (!torrent.FileStats[i].Wanted.Value || _filenameEvaluator.IsValid(torrent.Files[i].Name))
{
continue;
}
_logger.LogInformation("unwanted file found | {file}", torrent.Files[i].Name);
unwantedFiles.Add(i);
}
if (unwantedFiles.Count is 0)
{
return;
}
_logger.LogDebug("changing priorities | torrent {hash}", hash);
await _client.TorrentSetAsync(new TorrentSettings
{
Ids = [ torrent.Id ],
FilesUnwanted = unwantedFiles.ToArray(),
});
}
public void Dispose()
{
}
private async Task<TorrentInfo?> GetTorrentAsync(string hash)
{
TorrentInfo? torrent = _torrentsCache?
.FirstOrDefault(x => x.HashString.Equals(hash, StringComparison.InvariantCultureIgnoreCase));
if (_torrentsCache is null || torrent is null)
{
string[] fields = [TorrentFields.FILES, TorrentFields.FILE_STATS, TorrentFields.HASH_STRING, TorrentFields.ID];
// refresh cache
_torrentsCache = (await _client.TorrentGetAsync(fields))
?.Torrents;
}
if (_torrentsCache?.Length is null or 0)
{
_logger.LogDebug("could not list torrents | {url}", _config.Url);
}
torrent = _torrentsCache?.FirstOrDefault(x => x.HashString.Equals(hash, StringComparison.InvariantCultureIgnoreCase));
if (torrent is null)
{
_logger.LogDebug("could not find torrent | {hash} | {url}", hash, _config.Url);
}
return torrent;
}
}
@@ -0,0 +1,126 @@
using Common.Configuration;
using Domain.Arr.Queue;
using Domain.Enums;
using Infrastructure.Verticals.Arr;
using Infrastructure.Verticals.DownloadClient;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Infrastructure.Verticals.QueueCleaner;
public sealed class QueueCleaner : IDisposable
{
private readonly ILogger<QueueCleaner> _logger;
private readonly SonarrConfig _sonarrConfig;
private readonly RadarrConfig _radarrConfig;
private readonly SonarrClient _sonarrClient;
private readonly RadarrClient _radarrClient;
private readonly ArrQueueIterator _arrArrQueueIterator;
private readonly IDownloadService _downloadService;
public QueueCleaner(
ILogger<QueueCleaner> logger,
IOptions<SonarrConfig> sonarrConfig,
IOptions<RadarrConfig> radarrConfig,
SonarrClient sonarrClient,
RadarrClient radarrClient,
ArrQueueIterator arrArrQueueIterator,
DownloadServiceFactory downloadServiceFactory
)
{
_logger = logger;
_sonarrConfig = sonarrConfig.Value;
_radarrConfig = radarrConfig.Value;
_sonarrClient = sonarrClient;
_radarrClient = radarrClient;
_arrArrQueueIterator = arrArrQueueIterator;
_downloadService = downloadServiceFactory.CreateDownloadClient();
}
public async Task ExecuteAsync()
{
await _downloadService.LoginAsync();
await ProcessArrConfigAsync(_sonarrConfig, InstanceType.Sonarr);
await ProcessArrConfigAsync(_radarrConfig, InstanceType.Radarr);
// await _downloadClient.LogoutAsync();
}
private async Task ProcessArrConfigAsync(ArrConfig config, InstanceType instanceType)
{
if (!config.Enabled)
{
return;
}
foreach (ArrInstance arrInstance in config.Instances)
{
try
{
await ProcessInstanceAsync(arrInstance, instanceType);
}
catch (Exception exception)
{
_logger.LogError(exception, "failed to clean {type} instance | {url}", instanceType, arrInstance.Url);
}
}
}
private async Task ProcessInstanceAsync(ArrInstance instance, InstanceType instanceType)
{
HashSet<int> itemsToBeRefreshed = [];
ArrClient arrClient = GetClient(instanceType);
await _arrArrQueueIterator.Iterate(arrClient, instance, async items =>
{
foreach (QueueRecord record in items)
{
if (record.Protocol is not "torrent")
{
continue;
}
if (string.IsNullOrEmpty(record.DownloadId))
{
_logger.LogDebug("skip | download id is null for {title}", record.Title);
continue;
}
if (!await _downloadService.ShouldRemoveFromArrQueueAsync(record.DownloadId))
{
_logger.LogInformation("skip | {title}", record.Title);
continue;
}
itemsToBeRefreshed.Add(GetRecordId(instanceType, record));
await arrClient.DeleteQueueItemAsync(instance, record);
}
});
await arrClient.RefreshItemsAsync(instance, itemsToBeRefreshed);
}
private ArrClient GetClient(InstanceType type) =>
type switch
{
InstanceType.Sonarr => _sonarrClient,
InstanceType.Radarr => _radarrClient,
_ => throw new NotImplementedException($"instance type {type} is not yet supported")
};
private int GetRecordId(InstanceType type, QueueRecord record) =>
type switch
{
// TODO add episode id
InstanceType.Sonarr => record.SeriesId,
InstanceType.Radarr => record.MovieId,
_ => throw new NotImplementedException($"instance type {type} is not yet supported")
};
public void Dispose()
{
_downloadService.Dispose();
}
}
@@ -1,140 +0,0 @@
using Common.Configuration;
using Domain.Arr.Enums;
using Domain.Arr.Queue;
using Infrastructure.Verticals.Arr;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using QBittorrent.Client;
namespace Infrastructure.Verticals.QueueCleaner;
public sealed class QueueCleanerHandler
{
private readonly ILogger<QueueCleanerHandler> _logger;
private readonly QBitConfig _qBitConfig;
private readonly SonarrConfig _sonarrConfig;
private readonly RadarrConfig _radarrConfig;
private readonly SonarrClient _sonarrClient;
private readonly RadarrClient _radarrClient;
public QueueCleanerHandler(
ILogger<QueueCleanerHandler> logger,
IOptions<QBitConfig> qBitConfig,
IOptions<SonarrConfig> sonarrConfig,
IOptions<RadarrConfig> radarrConfig,
SonarrClient sonarrClient,
RadarrClient radarrClient)
{
_logger = logger;
_qBitConfig = qBitConfig.Value;
_sonarrConfig = sonarrConfig.Value;
_radarrConfig = radarrConfig.Value;
_sonarrClient = sonarrClient;
_radarrClient = radarrClient;
}
public async Task HandleAsync()
{
QBittorrentClient qBitClient = new(_qBitConfig.Url);
await qBitClient.LoginAsync(_qBitConfig.Username, _qBitConfig.Password);
await ProcessArrConfigAsync(qBitClient, _sonarrConfig, InstanceType.Sonarr);
await ProcessArrConfigAsync(qBitClient, _radarrConfig, InstanceType.Radarr);
}
private async Task ProcessArrConfigAsync(QBittorrentClient qBitClient, ArrConfig config, InstanceType instanceType)
{
if (!config.Enabled)
{
return;
}
foreach (ArrInstance arrInstance in config.Instances)
{
try
{
await ProcessInstanceAsync(qBitClient, arrInstance, instanceType);
}
catch (Exception exception)
{
_logger.LogError(exception, "failed to clean {type} instance | {url}", instanceType, arrInstance.Url);
}
}
}
private async Task ProcessInstanceAsync(QBittorrentClient qBitClient, ArrInstance instance, InstanceType instanceType)
{
ushort page = 1;
int totalRecords = 0;
int processedRecords = 0;
HashSet<int> itemsToBeRefreshed = [];
ArrClient arrClient = GetClient(instanceType);
do
{
QueueListResponse queueResponse = await arrClient.GetQueueItemsAsync(instance, page);
if (totalRecords is 0)
{
totalRecords = queueResponse.TotalRecords;
_logger.LogInformation(
"{items} items found in queue | {url}",
queueResponse.TotalRecords, instance.Url);
}
foreach (QueueRecord record in queueResponse.Records)
{
if (record.Protocol is not "torrent")
{
continue;
}
TorrentInfo? torrent = (await qBitClient.GetTorrentListAsync(new TorrentListQuery { Hashes = [record.DownloadId] }))
.FirstOrDefault();
if (torrent is not { CompletionOn: not null, Downloaded: null or 0 })
{
_logger.LogInformation("skip | {torrent}", record.Title);
continue;
}
itemsToBeRefreshed.Add(GetRecordId(instanceType, record));
await arrClient.DeleteQueueItemAsync(instance, record);
}
if (queueResponse.Records.Count is 0)
{
break;
}
processedRecords += queueResponse.Records.Count;
if (processedRecords >= totalRecords)
{
break;
}
page++;
} while (processedRecords < totalRecords);
await arrClient.RefreshItemsAsync(instance, itemsToBeRefreshed);
}
private ArrClient GetClient(InstanceType type) =>
type switch
{
InstanceType.Sonarr => _sonarrClient,
InstanceType.Radarr => _radarrClient,
_ => throw new NotImplementedException($"instance type {type} is not yet supported")
};
private int GetRecordId(InstanceType type, QueueRecord record) =>
type switch
{
InstanceType.Sonarr => record.SeriesId,
InstanceType.Radarr => record.MovieId,
_ => throw new NotImplementedException($"instance type {type} is not yet supported")
};
}