Apply synchronization to stream actions

This commit is contained in:
Michael Croes
2021-05-31 22:57:13 +02:00
parent df4f258290
commit 8035f71a16
4 changed files with 46 additions and 14 deletions

View File

@@ -0,0 +1,28 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace S7.Net.Internal
{
internal class TaskQueue
{
private static readonly object Sentinel = new object();
private Task prev = Task.FromResult(Sentinel);
public async Task<T> Enqueue<T>(Func<Task<T>> action)
{
var tcs = new TaskCompletionSource<object>();
await Interlocked.Exchange(ref prev, tcs.Task).ConfigureAwait(false);
try
{
return await action.Invoke().ConfigureAwait(false);
}
finally
{
tcs.SetResult(Sentinel);
}
}
}
}

View File

@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using S7.Net.Internal;
using S7.Net.Protocol;
using S7.Net.Types;
@@ -14,6 +15,8 @@ namespace S7.Net
/// </summary>
public partial class Plc : IDisposable
{
private readonly TaskQueue queue = new TaskQueue();
private const int CONNECTION_TIMED_OUT_ERROR_CODE = 10060;
//TCP connection to device

View File

@@ -501,27 +501,33 @@ namespace S7.Net
}
}
private async Task<COTP.TPDU> RequestTpduAsync(byte[] requestData, CancellationToken cancellationToken = default)
private Task<COTP.TPDU> RequestTpduAsync(byte[] requestData, CancellationToken cancellationToken = default)
{
var stream = GetStreamIfAvailable();
return queue.Enqueue(async () =>
{
await stream.WriteAsync(requestData, 0, requestData.Length, cancellationToken).ConfigureAwait(false);
var response = await COTP.TPDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);
return response;
});
}
private Task<byte[]> RequestTsduAsync(byte[] requestData, CancellationToken cancellationToken = default) =>
RequestTsduAsync(requestData, 0, requestData.Length, cancellationToken);
private async Task<byte[]> RequestTsduAsync(byte[] requestData, int offset, int length, CancellationToken cancellationToken = default)
private Task<byte[]> RequestTsduAsync(byte[] requestData, int offset, int length, CancellationToken cancellationToken = default)
{
var stream = GetStreamIfAvailable();
return queue.Enqueue(async () =>
{
await stream.WriteAsync(requestData, offset, length, cancellationToken).ConfigureAwait(false);
var response = await COTP.TSDU.ReadAsync(stream, cancellationToken).ConfigureAwait(false);
return response;
});
}
}
}

View File

@@ -469,12 +469,7 @@ namespace S7.Net
private byte[] RequestTsdu(byte[] requestData, int offset, int length)
{
var stream = GetStreamIfAvailable();
stream.Write(requestData, offset, length);
var response = COTP.TSDU.Read(stream);
return response;
return RequestTsduAsync(requestData, offset, length).GetAwaiter().GetResult();
}
}
}