Add CancellationToken support to async functions.

This commit is contained in:
Serge Camille
2020-09-06 13:46:36 +02:00
parent 730ccbf9fc
commit 6c4d4605f0
6 changed files with 104 additions and 61 deletions

View File

@@ -13,15 +13,17 @@ namespace S7.Net.UnitTest
[TestClass]
public class ProtocolUnitTest
{
private TestContext TestContext { get; set; }
[TestMethod]
public void TPKT_Read()
public async Task TPKT_Read()
{
var m = new MemoryStream(StringToByteArray("0300002902f0803203000000010002001400000401ff0400807710000100000103000000033f8ccccd"));
var t = TPKT.Read(m);
Assert.AreEqual(0x03, t.Version);
Assert.AreEqual(0x29, t.Length);
m.Position = 0;
t = TPKT.ReadAsync(m).Result;
t = await TPKT.ReadAsync(m, TestContext.CancellationTokenSource.Token);
Assert.AreEqual(0x03, t.Version);
Assert.AreEqual(0x29, t.Length);
}
@@ -40,7 +42,7 @@ namespace S7.Net.UnitTest
public async Task TPKT_ReadShortAsync()
{
var m = new MemoryStream(StringToByteArray("0300002902f0803203000000010002001400000401ff040080"));
var t = await TPKT.ReadAsync(m);
var t = await TPKT.ReadAsync(m, TestContext.CancellationTokenSource.Token);
}
[TestMethod]
@@ -51,7 +53,7 @@ namespace S7.Net.UnitTest
var t = COTP.TSDU.Read(m);
Assert.IsTrue(expected.SequenceEqual(t));
m.Position = 0;
t = COTP.TSDU.ReadAsync(m).Result;
t = COTP.TSDU.ReadAsync(m, TestContext.CancellationTokenSource.Token).Result;
Assert.IsTrue(expected.SequenceEqual(t));
}

View File

@@ -65,13 +65,14 @@ namespace S7.Net.UnitTest
[TestClass]
public class StreamTests
{
private TestContext TestContext { get; set; }
[TestMethod]
public async Task TPKT_ReadRestrictedStreamAsync()
{
var fullMessage = ProtocolUnitTest.StringToByteArray("0300002902f0803203000000010002001400000401ff0400807710000100000103000000033f8ccccd");
var m = new TestStream1BytePerRead(fullMessage);
var t = await TPKT.ReadAsync(m);
var t = await TPKT.ReadAsync(m, TestContext.CancellationTokenSource.Token);
Assert.AreEqual(fullMessage.Length, t.Length);
Assert.AreEqual(fullMessage.Last(), t.Data.Last());
}

View File

@@ -1,5 +1,6 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace S7.Net
@@ -66,9 +67,9 @@ namespace S7.Net
/// </summary>
/// <param name="stream">The socket to read from</param>
/// <returns>COTP DPDU instance</returns>
public static async Task<TPDU> ReadAsync(Stream stream)
public static async Task<TPDU> ReadAsync(Stream stream, CancellationToken cancellationToken)
{
var tpkt = await TPKT.ReadAsync(stream);
var tpkt = await TPKT.ReadAsync(stream, cancellationToken);
if (tpkt.Length == 0)
{
throw new TPDUInvalidException("No protocol data received");
@@ -130,9 +131,9 @@ namespace S7.Net
/// </summary>
/// <param name="stream">The stream to read from</param>
/// <returns>Data in TSDU</returns>
public static async Task<byte[]> ReadAsync(Stream stream)
public static async Task<byte[]> ReadAsync(Stream stream, CancellationToken cancellationToken)
{
var segment = await TPDU.ReadAsync(stream);
var segment = await TPDU.ReadAsync(stream, cancellationToken);
if (segment.LastDataUnit)
{
@@ -145,7 +146,7 @@ namespace S7.Net
while (!segment.LastDataUnit)
{
segment = await TPDU.ReadAsync(stream);
segment = await TPDU.ReadAsync(stream, cancellationToken);
var previousLength = buffer.Length;
Array.Resize(ref buffer, buffer.Length + segment.Data.Length);
Array.Copy(segment.Data, 0, buffer, previousLength, segment.Data.Length);

View File

@@ -6,6 +6,7 @@ using System.Net.Sockets;
using System.Threading.Tasks;
using S7.Net.Protocol;
using System.IO;
using System.Threading;
namespace S7.Net
{
@@ -17,14 +18,18 @@ namespace S7.Net
/// <summary>
/// Connects to the PLC and performs a COTP ConnectionRequest and S7 CommunicationSetup.
/// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that the cancellation will not affect opening the socket in any way and only affects data transfers for configuring the connection after the socket connection is successfully established.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous open operation.</returns>
public async Task OpenAsync()
public async Task OpenAsync(CancellationToken cancellationToken = default)
{
await ConnectAsync();
cancellationToken.ThrowIfCancellationRequested();
var stream = GetStreamIfAvailable();
await stream.WriteAsync(ConnectionRequest.GetCOTPConnectionRequest(CPU, Rack, Slot), 0, 22);
var response = await COTP.TPDU.ReadAsync(stream);
var response = await COTP.TPDU.ReadAsync(stream, cancellationToken);
if (response == null)
{
throw new Exception("Error reading Connection Confirm. Malformed TPDU packet");
@@ -36,7 +41,7 @@ namespace S7.Net
await stream.WriteAsync(GetS7ConnectionSetup(), 0, 25);
var s7data = await COTP.TSDU.ReadAsync(stream);
var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken);
if (s7data == null)
throw new WrongNumberOfBytesException("No data received in response to Communication Setup");
if (s7data.Length < 2)
@@ -69,8 +74,10 @@ namespace S7.Net
/// <param name="db">Address of the memory area (if you want to read DB1, this is set to 1). This must be set also for other memory area types: counters, timers,etc.</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="count">Byte count, if you want to read 120 bytes, set this to 120.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>Returns the bytes in an array</returns>
public async Task<byte[]> ReadBytesAsync(DataType dataType, int db, int startByteAdr, int count)
public async Task<byte[]> ReadBytesAsync(DataType dataType, int db, int startByteAdr, int count, CancellationToken cancellationToken = default)
{
var resultBytes = new byte[count];
int index = 0;
@@ -78,7 +85,7 @@ namespace S7.Net
{
//This works up to MaxPDUSize-1 on SNAP7. But not MaxPDUSize-0.
var maxToRead = Math.Min(count, MaxPDUSize - 18);
await ReadBytesWithSingleRequestAsync(dataType, db, startByteAdr + index, resultBytes, index, maxToRead);
await ReadBytesWithSingleRequestAsync(dataType, db, startByteAdr + index, resultBytes, index, maxToRead, cancellationToken);
count -= maxToRead;
index += maxToRead;
}
@@ -96,10 +103,12 @@ namespace S7.Net
/// <param name="varType">Type of the variable/s that you are reading</param>
/// <param name="bitAdr">Address of bit. If you want to read DB1.DBX200.6, set 6 to this parameter.</param>
/// <param name="varCount"></param>
public async Task<object?> ReadAsync(DataType dataType, int db, int startByteAdr, VarType varType, int varCount, byte bitAdr = 0)
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
public async Task<object?> ReadAsync(DataType dataType, int db, int startByteAdr, VarType varType, int varCount, byte bitAdr = 0, CancellationToken cancellationToken = default)
{
int cntBytes = VarTypeToByteLength(varType, varCount);
byte[] bytes = await ReadBytesAsync(dataType, db, startByteAdr, cntBytes);
byte[] bytes = await ReadBytesAsync(dataType, db, startByteAdr, cntBytes, cancellationToken);
return ParseBytes(varType, bytes, varCount, bitAdr);
}
@@ -108,11 +117,13 @@ namespace S7.Net
/// If the read was not successful, check LastErrorCode or LastErrorString.
/// </summary>
/// <param name="variable">Input strings like "DB1.DBX0.0", "DB20.DBD200", "MB20", "T45", etc.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>Returns an object that contains the value. This object must be cast accordingly.</returns>
public async Task<object?> ReadAsync(string variable)
public async Task<object?> ReadAsync(string variable, CancellationToken cancellationToken = default)
{
var adr = new PLCAddress(variable);
return await ReadAsync(adr.DataType, adr.DbNumber, adr.StartByte, adr.VarType, 1, (byte)adr.BitNumber);
return await ReadAsync(adr.DataType, adr.DbNumber, adr.StartByte, adr.VarType, 1, (byte)adr.BitNumber, cancellationToken);
}
/// <summary>
@@ -121,12 +132,14 @@ namespace S7.Net
/// <param name="structType">Type of the struct to be readed (es.: TypeOf(MyStruct)).</param>
/// <param name="db">Address of the DB.</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>Returns a struct that must be cast.</returns>
public async Task<object?> ReadStructAsync(Type structType, int db, int startByteAdr = 0)
public async Task<object?> ReadStructAsync(Type structType, int db, int startByteAdr = 0, CancellationToken cancellationToken = default)
{
int numBytes = Types.Struct.GetStructSize(structType);
// now read the package
var resultBytes = await ReadBytesAsync(DataType.DataBlock, db, startByteAdr, numBytes);
var resultBytes = await ReadBytesAsync(DataType.DataBlock, db, startByteAdr, numBytes, cancellationToken);
// and decode it
return Types.Struct.FromBytes(structType, resultBytes);
@@ -138,10 +151,12 @@ namespace S7.Net
/// <typeparam name="T">The struct type</typeparam>
/// <param name="db">Address of the DB.</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>Returns a nulable struct. If nothing was read null will be returned.</returns>
public async Task<T?> ReadStructAsync<T>(int db, int startByteAdr = 0) where T : struct
public async Task<T?> ReadStructAsync<T>(int db, int startByteAdr = 0, CancellationToken cancellationToken = default) where T : struct
{
return await ReadStructAsync(typeof(T), db, startByteAdr) as T?;
return await ReadStructAsync(typeof(T), db, startByteAdr, cancellationToken) as T?;
}
/// <summary>
@@ -151,8 +166,10 @@ namespace S7.Net
/// <param name="sourceClass">Instance of the class that will store the values</param>
/// <param name="db">Index of the DB; es.: 1 is for DB1</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>The number of read bytes</returns>
public async Task<Tuple<int, object>> ReadClassAsync(object sourceClass, int db, int startByteAdr = 0)
public async Task<Tuple<int, object>> ReadClassAsync(object sourceClass, int db, int startByteAdr = 0, CancellationToken cancellationToken = default)
{
int numBytes = (int)Class.GetClassSize(sourceClass);
if (numBytes <= 0)
@@ -161,7 +178,7 @@ namespace S7.Net
}
// now read the package
var resultBytes = await ReadBytesAsync(DataType.DataBlock, db, startByteAdr, numBytes);
var resultBytes = await ReadBytesAsync(DataType.DataBlock, db, startByteAdr, numBytes, cancellationToken);
// and decode it
Class.FromBytes(sourceClass, resultBytes);
@@ -176,10 +193,12 @@ namespace S7.Net
/// <typeparam name="T">The class that will be instantiated. Requires a default constructor</typeparam>
/// <param name="db">Index of the DB; es.: 1 is for DB1</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>An instance of the class with the values read from the PLC. If no data has been read, null will be returned</returns>
public async Task<T?> ReadClassAsync<T>(int db, int startByteAdr = 0) where T : class
public async Task<T?> ReadClassAsync<T>(int db, int startByteAdr = 0, CancellationToken cancellationToken = default) where T : class
{
return await ReadClassAsync(() => Activator.CreateInstance<T>(), db, startByteAdr);
return await ReadClassAsync(() => Activator.CreateInstance<T>(), db, startByteAdr, cancellationToken);
}
/// <summary>
@@ -190,11 +209,13 @@ namespace S7.Net
/// <param name="classFactory">Function to instantiate the class</param>
/// <param name="db">Index of the DB; es.: 1 is for DB1</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>An instance of the class with the values read from the PLC. If no data has been read, null will be returned</returns>
public async Task<T?> ReadClassAsync<T>(Func<T> classFactory, int db, int startByteAdr = 0) where T : class
public async Task<T?> ReadClassAsync<T>(Func<T> classFactory, int db, int startByteAdr = 0, CancellationToken cancellationToken = default) where T : class
{
var instance = classFactory();
var res = await ReadClassAsync(instance, db, startByteAdr);
var res = await ReadClassAsync(instance, db, startByteAdr, cancellationToken);
int readBytes = res.Item1;
if (readBytes <= 0)
{
@@ -212,7 +233,9 @@ namespace S7.Net
/// The number of DataItems as well as the total size of the requested data can not exceed a certain limit (protocol restriction).
/// </summary>
/// <param name="dataItems">List of dataitems that contains the list of variables that must be read.</param>
public async Task<List<DataItem>> ReadMultipleVarsAsync(List<DataItem> dataItems)
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
public async Task<List<DataItem>> ReadMultipleVarsAsync(List<DataItem> dataItems, CancellationToken cancellationToken = default)
{
//Snap7 seems to choke on PDU sizes above 256 even if snap7
//replies with bigger PDU size in connection setup.
@@ -235,7 +258,7 @@ namespace S7.Net
var dataToSend = package.ToArray();
await stream.WriteAsync(dataToSend, 0, dataToSend.Length);
var s7data = await COTP.TSDU.ReadAsync(stream); //TODO use Async
var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken); //TODO use Async
if (s7data == null || s7data[14] != 0xff)
throw new PlcException(ErrorCode.WrongNumberReceivedBytes);
@@ -260,15 +283,17 @@ namespace S7.Net
/// <param name="db">Address of the memory area (if you want to read DB1, this is set to 1). This must be set also for other memory area types: counters, timers,etc.</param>
/// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param>
/// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteBytesAsync(DataType dataType, int db, int startByteAdr, byte[] value)
public async Task WriteBytesAsync(DataType dataType, int db, int startByteAdr, byte[] value, CancellationToken cancellationToken = default)
{
int localIndex = 0;
int count = value.Length;
while (count > 0)
{
var maxToWrite = (int)Math.Min(count, MaxPDUSize - 35);
await WriteBytesWithASingleRequestAsync(dataType, db, startByteAdr + localIndex, value, localIndex, maxToWrite);
await WriteBytesWithASingleRequestAsync(dataType, db, startByteAdr + localIndex, value, localIndex, maxToWrite, cancellationToken);
count -= maxToWrite;
localIndex += maxToWrite;
}
@@ -282,13 +307,15 @@ namespace S7.Net
/// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param>
/// <param name="bitAdr">The address of the bit. (0-7)</param>
/// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteBitAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool value)
public async Task WriteBitAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool value, CancellationToken cancellationToken = default)
{
if (bitAdr < 0 || bitAdr > 7)
throw new InvalidAddressException(string.Format("Addressing Error: You can only reference bitwise locations 0-7. Address {0} is invalid", bitAdr));
await WriteBitWithASingleRequestAsync(dataType, db, startByteAdr, bitAdr, value);
await WriteBitWithASingleRequestAsync(dataType, db, startByteAdr, bitAdr, value, cancellationToken);
}
/// <summary>
@@ -299,13 +326,15 @@ namespace S7.Net
/// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param>
/// <param name="bitAdr">The address of the bit. (0-7)</param>
/// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteBitAsync(DataType dataType, int db, int startByteAdr, int bitAdr, int value)
public async Task WriteBitAsync(DataType dataType, int db, int startByteAdr, int bitAdr, int value, CancellationToken cancellationToken = default)
{
if (value < 0 || value > 1)
throw new ArgumentException("Value must be 0 or 1", nameof(value));
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, value == 1);
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, value == 1, cancellationToken);
}
/// <summary>
@@ -318,15 +347,17 @@ namespace S7.Net
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="value">Bytes to write. The lenght of this parameter can't be higher than 200. If you need more, use recursion.</param>
/// <param name="bitAdr">The address of the bit. (0-7)</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteAsync(DataType dataType, int db, int startByteAdr, object value, int bitAdr = -1)
public async Task WriteAsync(DataType dataType, int db, int startByteAdr, object value, int bitAdr = -1, CancellationToken cancellationToken = default)
{
if (bitAdr != -1)
{
//Must be writing a bit value as bitAdr is specified
if (value is bool boolean)
{
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, boolean);
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, boolean, cancellationToken);
}
else if (value is int intValue)
{
@@ -336,11 +367,11 @@ namespace S7.Net
"Addressing Error: You can only reference bitwise locations 0-7. Address {0} is invalid",
bitAdr), nameof(bitAdr));
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, intValue == 1);
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, intValue == 1, cancellationToken);
}
else throw new ArgumentException("Value must be a bool or an int to write a bit", nameof(value));
}
else await WriteBytesAsync(dataType, db, startByteAdr, Serialization.SerializeValue(value));
else await WriteBytesAsync(dataType, db, startByteAdr, Serialization.SerializeValue(value), cancellationToken);
}
/// <summary>
@@ -349,11 +380,13 @@ namespace S7.Net
/// </summary>
/// <param name="variable">Input strings like "DB1.DBX0.0", "DB20.DBD200", "MB20", "T45", etc.</param>
/// <param name="value">Value to be written to the PLC</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteAsync(string variable, object value)
public async Task WriteAsync(string variable, object value, CancellationToken cancellationToken = default)
{
var adr = new PLCAddress(variable);
await WriteAsync(adr.DataType, adr.DbNumber, adr.StartByte, value, adr.BitNumber);
await WriteAsync(adr.DataType, adr.DbNumber, adr.StartByte, value, adr.BitNumber, cancellationToken);
}
/// <summary>
@@ -362,11 +395,13 @@ namespace S7.Net
/// <param name="structValue">The struct to be written</param>
/// <param name="db">Db address</param>
/// <param name="startByteAdr">Start bytes on the PLC</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteStructAsync(object structValue, int db, int startByteAdr = 0)
public async Task WriteStructAsync(object structValue, int db, int startByteAdr = 0, CancellationToken cancellationToken = default)
{
var bytes = Struct.ToBytes(structValue).ToList();
await WriteBytesAsync(DataType.DataBlock, db, startByteAdr, bytes.ToArray());
await WriteBytesAsync(DataType.DataBlock, db, startByteAdr, bytes.ToArray(), cancellationToken);
}
/// <summary>
@@ -375,15 +410,17 @@ namespace S7.Net
/// <param name="classValue">The class to be written</param>
/// <param name="db">Db address</param>
/// <param name="startByteAdr">Start bytes on the PLC</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteClassAsync(object classValue, int db, int startByteAdr = 0)
public async Task WriteClassAsync(object classValue, int db, int startByteAdr = 0, CancellationToken cancellationToken = default)
{
byte[] bytes = new byte[(int)Class.GetClassSize(classValue)];
Types.Class.ToBytes(classValue, bytes);
await WriteBytesAsync(DataType.DataBlock, db, startByteAdr, bytes);
await WriteBytesAsync(DataType.DataBlock, db, startByteAdr, bytes, cancellationToken);
}
private async Task ReadBytesWithSingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] buffer, int offset, int count)
private async Task ReadBytesWithSingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var stream = GetStreamIfAvailable();
@@ -395,9 +432,9 @@ namespace S7.Net
BuildReadDataRequestPackage(package, dataType, db, startByteAdr, count);
var dataToSend = package.ToArray();
await stream.WriteAsync(dataToSend, 0, dataToSend.Length);
await stream.WriteAsync(dataToSend, 0, dataToSend.Length, cancellationToken);
var s7data = await COTP.TSDU.ReadAsync(stream);
var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken);
AssertReadResponse(s7data, count);
Array.Copy(s7data, 18, buffer, offset, count);
@@ -419,7 +456,7 @@ namespace S7.Net
var length = S7WriteMultiple.CreateRequest(message, dataItems);
await stream.WriteAsync(message.Array, 0, length).ConfigureAwait(false);
var response = await COTP.TSDU.ReadAsync(stream).ConfigureAwait(false);
var response = await COTP.TSDU.ReadAsync(stream, CancellationToken.None).ConfigureAwait(false);
S7WriteMultiple.ParseResponse(response, response.Length, dataItems);
}
@@ -431,7 +468,7 @@ namespace S7.Net
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="value">Bytes to write. The lenght of this parameter can't be higher than 200. If you need more, use recursion.</param>
/// <returns>A task that represents the asynchronous write operation.</returns>
private async Task WriteBytesWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] value, int dataOffset, int count)
private async Task WriteBytesWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] value, int dataOffset, int count, CancellationToken cancellationToken)
{
try
@@ -441,7 +478,7 @@ namespace S7.Net
await stream.WriteAsync(dataToSend, 0, dataToSend.Length);
var s7data = await COTP.TSDU.ReadAsync(stream);
var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken);
if (s7data == null || s7data[14] != 0xff)
{
throw new PlcException(ErrorCode.WrongNumberReceivedBytes);
@@ -453,7 +490,7 @@ namespace S7.Net
}
}
private async Task WriteBitWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool bitValue)
private async Task WriteBitWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool bitValue, CancellationToken cancellationToken)
{
var stream = GetStreamIfAvailable();
@@ -463,7 +500,7 @@ namespace S7.Net
await stream.WriteAsync(dataToSend, 0, dataToSend.Length);
var s7data = await COTP.TSDU.ReadAsync(stream);
var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken);
if (s7data == null || s7data[14] != 0xff)
{
throw new PlcException(ErrorCode.WrongNumberReceivedBytes);

View File

@@ -1,5 +1,6 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace S7.Net
@@ -39,13 +40,13 @@ namespace S7.Net
/// <param name="offset">the offset in the buffer to read into</param>
/// <param name="count">the amount of bytes to read into the buffer</param>
/// <returns>returns the amount of read bytes</returns>
public static async Task<int> ReadExactAsync(this Stream stream, byte[] buffer, int offset, int count)
public static async Task<int> ReadExactAsync(this Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int read = 0;
int received;
do
{
received = await stream.ReadAsync(buffer, offset + read, count - read);
received = await stream.ReadAsync(buffer, offset + read, count - read, cancellationToken);
read += received;
}
while (read < count && received > 0);

View File

@@ -1,5 +1,6 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace S7.Net
@@ -57,10 +58,10 @@ namespace S7.Net
/// </summary>
/// <param name="stream">The stream to read from</param>
/// <returns>Task TPKT Instace</returns>
public static async Task<TPKT> ReadAsync(Stream stream)
public static async Task<TPKT> ReadAsync(Stream stream, CancellationToken cancellationToken)
{
var buf = new byte[4];
int len = await stream.ReadExactAsync(buf, 0, 4);
int len = await stream.ReadExactAsync(buf, 0, 4, cancellationToken);
if (len < 4) throw new TPKTInvalidException("TPKT is incomplete / invalid");
var version = buf[0];
@@ -68,7 +69,7 @@ namespace S7.Net
var length = buf[2] * 256 + buf[3]; //BigEndian
var data = new byte[length - 4];
len = await stream.ReadExactAsync(data, 0, data.Length);
len = await stream.ReadExactAsync(data, 0, data.Length, cancellationToken);
if (len < data.Length)
throw new TPKTInvalidException("TPKT payload incomplete / invalid");