Adding parallel processing via ThreadPool

This commit is contained in:
Florian Zevedei 2023-12-26 17:40:13 +01:00
parent c1018ac5a0
commit 7fcaa407ad
5 changed files with 119 additions and 18 deletions

View File

@ -32,6 +32,7 @@ internal class Program
}) })
.NoSerialization() .NoSerialization()
.UseEnglish() .UseEnglish()
.UseThreadPool()
.Build(); .Build();

View File

@ -28,6 +28,12 @@ public class MessageClient
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
public string ApiKey { get; }
public ITelegramBotClient TelegramClient { get; set; }
private EventHandlerList Events { get; } = new();
/// <summary> /// <summary>
/// Indicates if all pending Telegram.Bot.Types.Updates should be thrown out before /// Indicates if all pending Telegram.Bot.Types.Updates should be thrown out before
// start polling. If set to true Telegram.Bot.Polling.ReceiverOptions.AllowedUpdates // start polling. If set to true Telegram.Bot.Polling.ReceiverOptions.AllowedUpdates
@ -36,6 +42,12 @@ public class MessageClient
/// </summary> /// </summary>
public bool ThrowPendingUpdates { get; set; } public bool ThrowPendingUpdates { get; set; }
public bool UseThreadPool { get; set; } = false;
public int ThreadPool_WorkerThreads { get; set; } = 1;
public int ThreadPool_IOThreads { get; set; } = 1;
public MessageClient(string apiKey) public MessageClient(string apiKey)
{ {
@ -103,11 +115,6 @@ public class MessageClient
} }
public string ApiKey { get; }
public ITelegramBotClient TelegramClient { get; set; }
private EventHandlerList Events { get; } = new();
public void Prepare() public void Prepare()
@ -124,8 +131,19 @@ public class MessageClient
receiverOptions.ThrowPendingUpdates = ThrowPendingUpdates; receiverOptions.ThrowPendingUpdates = ThrowPendingUpdates;
TelegramClient.StartReceiving(HandleUpdateAsync, HandleErrorAsync, receiverOptions, if (UseThreadPool)
_cancellationTokenSource.Token); {
ThreadPool.SetMaxThreads(ThreadPool_WorkerThreads, ThreadPool_IOThreads);
TelegramClient.StartReceiving(HandleUpdateAsyncThreadPool, HandleErrorAsyncThreadPool, receiverOptions,
_cancellationTokenSource.Token);
}
else
{
TelegramClient.StartReceiving(HandleUpdateAsync, HandleErrorAsync, receiverOptions,
_cancellationTokenSource.Token);
}
} }
public void StopReceiving() public void StopReceiving()
@ -133,6 +151,7 @@ public class MessageClient
_cancellationTokenSource.Cancel(); _cancellationTokenSource.Cancel();
} }
#region "Single Thread"
public async Task HandleUpdateAsync(ITelegramBotClient botClient, Update update, CancellationToken cancellationToken) public async Task HandleUpdateAsync(ITelegramBotClient botClient, Update update, CancellationToken cancellationToken)
{ {
@ -145,6 +164,33 @@ public class MessageClient
await OnReceiveError(new ErrorResult(exception)); await OnReceiveError(new ErrorResult(exception));
} }
#endregion
#region "Thread Pool"
public Task HandleUpdateAsyncThreadPool(ITelegramBotClient botClient, Update update, CancellationToken cancellationToken)
{
ThreadPool.QueueUserWorkItem(async a =>
{
await OnMessageLoop(new UpdateResult(update, null));
});
return Task.CompletedTask;
}
public Task HandleErrorAsyncThreadPool(ITelegramBotClient botClient, Exception exception,
CancellationToken cancellationToken)
{
ThreadPool.QueueUserWorkItem(async a =>
{
await OnReceiveError(new ErrorResult(exception));
});
return Task.CompletedTask;
}
#endregion
/// <summary> /// <summary>
/// This will return the current list of bot commands. /// This will return the current list of bot commands.
/// </summary> /// </summary>

View File

@ -18,7 +18,7 @@ namespace TelegramBotBase.Builder;
public class BotBaseBuilder : IAPIKeySelectionStage, IMessageLoopSelectionStage, IStartFormSelectionStage, public class BotBaseBuilder : IAPIKeySelectionStage, IMessageLoopSelectionStage, IStartFormSelectionStage,
IBuildingStage, INetworkingSelectionStage, IBotCommandsStage, ISessionSerializationStage, IBuildingStage, INetworkingSelectionStage, IBotCommandsStage, ISessionSerializationStage,
ILanguageSelectionStage ILanguageSelectionStage, IThreadingStage
{ {
private string _apiKey; private string _apiKey;
@ -87,6 +87,8 @@ public class BotBaseBuilder : IAPIKeySelectionStage, IMessageLoopSelectionStage,
DefaultLanguage(); DefaultLanguage();
UseSingleThread();
return this; return this;
} }
@ -107,6 +109,8 @@ public class BotBaseBuilder : IAPIKeySelectionStage, IMessageLoopSelectionStage,
DefaultLanguage(); DefaultLanguage();
UseSingleThread();
return this; return this;
} }
@ -125,6 +129,8 @@ public class BotBaseBuilder : IAPIKeySelectionStage, IMessageLoopSelectionStage,
DefaultLanguage(); DefaultLanguage();
UseSingleThread();
return this; return this;
} }
@ -391,34 +397,58 @@ public class BotBaseBuilder : IAPIKeySelectionStage, IMessageLoopSelectionStage,
#region "Step 7 (Language)" #region "Step 7 (Language)"
public IBuildingStage DefaultLanguage() public IThreadingStage DefaultLanguage()
{ {
return this; return this;
} }
public IBuildingStage UseEnglish() public IThreadingStage UseEnglish()
{ {
Default.Language = new English(); Default.Language = new English();
return this; return this;
} }
public IBuildingStage UseGerman() public IThreadingStage UseGerman()
{ {
Default.Language = new German(); Default.Language = new German();
return this; return this;
} }
public IBuildingStage UsePersian() public IThreadingStage UsePersian()
{ {
Default.Language = new Persian(); Default.Language = new Persian();
return this; return this;
} }
public IBuildingStage Custom(Localization language) public IThreadingStage Custom(Localization language)
{ {
Default.Language = language; Default.Language = language;
return this; return this;
} }
#endregion #endregion
#region "Step 8 (Threading)"
public IBuildingStage UseSingleThread()
{
_client.UseThreadPool = false;
return this;
}
public IBuildingStage UseThreadPool(int workerThreads = 2, int ioThreads = 1)
{
_client.UseThreadPool = true;
_client.ThreadPool_WorkerThreads = workerThreads;
_client.ThreadPool_IOThreads = ioThreads;
return this;
}
#endregion
} }

View File

@ -8,29 +8,29 @@ public interface ILanguageSelectionStage
/// Selects the default language for control usage. (English) /// Selects the default language for control usage. (English)
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
IBuildingStage DefaultLanguage(); IThreadingStage DefaultLanguage();
/// <summary> /// <summary>
/// Selects english as the default language for control labels. /// Selects english as the default language for control labels.
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
IBuildingStage UseEnglish(); IThreadingStage UseEnglish();
/// <summary> /// <summary>
/// Selects german as the default language for control labels. /// Selects german as the default language for control labels.
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
IBuildingStage UseGerman(); IThreadingStage UseGerman();
/// <summary> /// <summary>
/// Selects persian as the default language for control labels. /// Selects persian as the default language for control labels.
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
IBuildingStage UsePersian(); IThreadingStage UsePersian();
/// <summary> /// <summary>
/// Selects a custom language as the default language for control labels. /// Selects a custom language as the default language for control labels.
/// </summary> /// </summary>
/// <returns></returns> /// <returns></returns>
IBuildingStage Custom(Localization language); IThreadingStage Custom(Localization language);
} }

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Text;
namespace TelegramBotBase.Builder.Interfaces
{
public interface IThreadingStage
{
/// <summary>
/// Uses one single thread for message loop. (Default)
/// </summary>
/// <returns></returns>
public IBuildingStage UseSingleThread();
/// <summary>
/// Using the threadpool for managing requests.
/// </summary>
/// <param name="workerThreads">Number of concurrent working threads.</param>
/// <param name="ioThreads">Number of concurrent I/O threads.</param>
/// <returns></returns>
public IBuildingStage UseThreadPool(int workerThreads = 2, int ioThreads = 1);
}
}