From 8035f71a1659b0129c80d1eff7e6a3d709b9225e Mon Sep 17 00:00:00 2001 From: Michael Croes Date: Mon, 31 May 2021 22:57:13 +0200 Subject: [PATCH] Apply synchronization to stream actions --- S7.Net/Internal/TaskQueue.cs | 28 ++++++++++++++++++++++++++++ S7.Net/PLC.cs | 3 +++ S7.Net/PlcAsynchronous.cs | 22 ++++++++++++++-------- S7.Net/PlcSynchronous.cs | 7 +------ 4 files changed, 46 insertions(+), 14 deletions(-) create mode 100644 S7.Net/Internal/TaskQueue.cs diff --git a/S7.Net/Internal/TaskQueue.cs b/S7.Net/Internal/TaskQueue.cs new file mode 100644 index 0000000..bd0cd6b --- /dev/null +++ b/S7.Net/Internal/TaskQueue.cs @@ -0,0 +1,28 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace S7.Net.Internal +{ + internal class TaskQueue + { + private static readonly object Sentinel = new object(); + + private Task prev = Task.FromResult(Sentinel); + + public async Task Enqueue(Func> action) + { + var tcs = new TaskCompletionSource(); + await Interlocked.Exchange(ref prev, tcs.Task).ConfigureAwait(false); + + try + { + return await action.Invoke().ConfigureAwait(false); + } + finally + { + tcs.SetResult(Sentinel); + } + } + } +} \ No newline at end of file diff --git a/S7.Net/PLC.cs b/S7.Net/PLC.cs index b7ede94..6180ad5 100644 --- a/S7.Net/PLC.cs +++ b/S7.Net/PLC.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; using System.Net.Sockets; +using S7.Net.Internal; using S7.Net.Protocol; using S7.Net.Types; @@ -14,6 +15,8 @@ namespace S7.Net /// public partial class Plc : IDisposable { + private readonly TaskQueue queue = new TaskQueue(); + private const int CONNECTION_TIMED_OUT_ERROR_CODE = 10060; //TCP connection to device diff --git a/S7.Net/PlcAsynchronous.cs b/S7.Net/PlcAsynchronous.cs index 3e523d8..91c2922 100644 --- a/S7.Net/PlcAsynchronous.cs +++ b/S7.Net/PlcAsynchronous.cs @@ -501,27 +501,33 @@ namespace S7.Net } } - private async Task RequestTpduAsync(byte[] requestData, CancellationToken cancellationToken = default) + private Task RequestTpduAsync(byte[] requestData, CancellationToken cancellationToken = default) { var stream = GetStreamIfAvailable(); - await stream.WriteAsync(requestData, 0, requestData.Length, cancellationToken).ConfigureAwait(false); - var response = await COTP.TPDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false); + return queue.Enqueue(async () => + { + await stream.WriteAsync(requestData, 0, requestData.Length, cancellationToken).ConfigureAwait(false); + var response = await COTP.TPDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false); - return response; + return response; + }); } private Task RequestTsduAsync(byte[] requestData, CancellationToken cancellationToken = default) => RequestTsduAsync(requestData, 0, requestData.Length, cancellationToken); - private async Task RequestTsduAsync(byte[] requestData, int offset, int length, CancellationToken cancellationToken = default) + private Task RequestTsduAsync(byte[] requestData, int offset, int length, CancellationToken cancellationToken = default) { var stream = GetStreamIfAvailable(); - await stream.WriteAsync(requestData, offset, length, cancellationToken).ConfigureAwait(false); - var response = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false); + return queue.Enqueue(async () => + { + await stream.WriteAsync(requestData, offset, length, cancellationToken).ConfigureAwait(false); + var response = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false); - return response; + return response; + }); } } } diff --git a/S7.Net/PlcSynchronous.cs b/S7.Net/PlcSynchronous.cs index 881b7bf..80e4738 100644 --- a/S7.Net/PlcSynchronous.cs +++ b/S7.Net/PlcSynchronous.cs @@ -469,12 +469,7 @@ namespace S7.Net private byte[] RequestTsdu(byte[] requestData, int offset, int length) { - var stream = GetStreamIfAvailable(); - - stream.Write(requestData, offset, length); - var response = COTP.TSDU.Read(stream); - - return response; + return RequestTsduAsync(requestData, offset, length).GetAwaiter().GetResult(); } } }