Merge pull request #307 from scamille/fb-asyncCancellationTokens

Add CancellationToken support to async functions.
This commit is contained in:
Michael Croes
2020-09-10 23:50:41 +02:00
committed by GitHub
7 changed files with 154 additions and 62 deletions

View File

@@ -13,15 +13,17 @@ namespace S7.Net.UnitTest
[TestClass] [TestClass]
public class ProtocolUnitTest public class ProtocolUnitTest
{ {
private TestContext TestContext { get; set; }
[TestMethod] [TestMethod]
public void TPKT_Read() public async Task TPKT_Read()
{ {
var m = new MemoryStream(StringToByteArray("0300002902f0803203000000010002001400000401ff0400807710000100000103000000033f8ccccd")); var m = new MemoryStream(StringToByteArray("0300002902f0803203000000010002001400000401ff0400807710000100000103000000033f8ccccd"));
var t = TPKT.Read(m); var t = TPKT.Read(m);
Assert.AreEqual(0x03, t.Version); Assert.AreEqual(0x03, t.Version);
Assert.AreEqual(0x29, t.Length); Assert.AreEqual(0x29, t.Length);
m.Position = 0; m.Position = 0;
t = TPKT.ReadAsync(m).Result; t = await TPKT.ReadAsync(m, TestContext.CancellationTokenSource.Token);
Assert.AreEqual(0x03, t.Version); Assert.AreEqual(0x03, t.Version);
Assert.AreEqual(0x29, t.Length); Assert.AreEqual(0x29, t.Length);
} }
@@ -40,7 +42,7 @@ namespace S7.Net.UnitTest
public async Task TPKT_ReadShortAsync() public async Task TPKT_ReadShortAsync()
{ {
var m = new MemoryStream(StringToByteArray("0300002902f0803203000000010002001400000401ff040080")); var m = new MemoryStream(StringToByteArray("0300002902f0803203000000010002001400000401ff040080"));
var t = await TPKT.ReadAsync(m); var t = await TPKT.ReadAsync(m, TestContext.CancellationTokenSource.Token);
} }
[TestMethod] [TestMethod]
@@ -51,7 +53,7 @@ namespace S7.Net.UnitTest
var t = COTP.TSDU.Read(m); var t = COTP.TSDU.Read(m);
Assert.IsTrue(expected.SequenceEqual(t)); Assert.IsTrue(expected.SequenceEqual(t));
m.Position = 0; m.Position = 0;
t = COTP.TSDU.ReadAsync(m).Result; t = COTP.TSDU.ReadAsync(m, TestContext.CancellationTokenSource.Token).Result;
Assert.IsTrue(expected.SequenceEqual(t)); Assert.IsTrue(expected.SequenceEqual(t));
} }

View File

@@ -10,6 +10,7 @@ using System.ServiceProcess;
using S7.Net.Types; using S7.Net.Types;
using S7.UnitTest.Helpers; using S7.UnitTest.Helpers;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Threading;
#endregion #endregion
@@ -908,6 +909,42 @@ namespace S7.Net.UnitTest
Assert.AreEqual(x % 256, res[x], string.Format("Bit {0} failed", x)); Assert.AreEqual(x % 256, res[x], string.Format("Bit {0} failed", x));
} }
} }
/// <summary>
/// Write a large amount of data and test cancellation
/// </summary>
[TestMethod]
public async Task Test_Async_WriteLargeByteArrayWithCancellation()
{
Assert.IsTrue(plc.IsConnected, "Before executing this test, the plc must be connected. Check constructor.");
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
var randomEngine = new Random();
var data = new byte[8192];
var db = 2;
randomEngine.NextBytes(data);
cancellationSource.CancelAfter(TimeSpan.FromMilliseconds(5));
try
{
await plc.WriteBytesAsync(DataType.DataBlock, db, 0, data, cancellationToken);
}
catch(TaskCanceledException)
{
// everything is good, that is the exception we expect
Console.WriteLine("Task was cancelled as expected.");
return;
}
catch(Exception e)
{
Assert.Fail($"Wrong exception type received. Expected {typeof(TaskCanceledException)}, received {e.GetType()}.");
}
// Depending on how tests run, this can also just succeed without getting cancelled at all. Do nothing in this case.
Console.WriteLine("Task was not cancelled as expected.");
}
#endregion #endregion
} }
} }

View File

@@ -65,13 +65,14 @@ namespace S7.Net.UnitTest
[TestClass] [TestClass]
public class StreamTests public class StreamTests
{ {
private TestContext TestContext { get; set; }
[TestMethod] [TestMethod]
public async Task TPKT_ReadRestrictedStreamAsync() public async Task TPKT_ReadRestrictedStreamAsync()
{ {
var fullMessage = ProtocolUnitTest.StringToByteArray("0300002902f0803203000000010002001400000401ff0400807710000100000103000000033f8ccccd"); var fullMessage = ProtocolUnitTest.StringToByteArray("0300002902f0803203000000010002001400000401ff0400807710000100000103000000033f8ccccd");
var m = new TestStream1BytePerRead(fullMessage); var m = new TestStream1BytePerRead(fullMessage);
var t = await TPKT.ReadAsync(m); var t = await TPKT.ReadAsync(m, TestContext.CancellationTokenSource.Token);
Assert.AreEqual(fullMessage.Length, t.Length); Assert.AreEqual(fullMessage.Length, t.Length);
Assert.AreEqual(fullMessage.Last(), t.Data.Last()); Assert.AreEqual(fullMessage.Last(), t.Data.Last());
} }

View File

@@ -1,5 +1,6 @@
using System; using System;
using System.IO; using System.IO;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace S7.Net namespace S7.Net
@@ -66,9 +67,9 @@ namespace S7.Net
/// </summary> /// </summary>
/// <param name="stream">The socket to read from</param> /// <param name="stream">The socket to read from</param>
/// <returns>COTP DPDU instance</returns> /// <returns>COTP DPDU instance</returns>
public static async Task<TPDU> ReadAsync(Stream stream) public static async Task<TPDU> ReadAsync(Stream stream, CancellationToken cancellationToken)
{ {
var tpkt = await TPKT.ReadAsync(stream); var tpkt = await TPKT.ReadAsync(stream, cancellationToken);
if (tpkt.Length == 0) if (tpkt.Length == 0)
{ {
throw new TPDUInvalidException("No protocol data received"); throw new TPDUInvalidException("No protocol data received");
@@ -130,9 +131,9 @@ namespace S7.Net
/// </summary> /// </summary>
/// <param name="stream">The stream to read from</param> /// <param name="stream">The stream to read from</param>
/// <returns>Data in TSDU</returns> /// <returns>Data in TSDU</returns>
public static async Task<byte[]> ReadAsync(Stream stream) public static async Task<byte[]> ReadAsync(Stream stream, CancellationToken cancellationToken)
{ {
var segment = await TPDU.ReadAsync(stream); var segment = await TPDU.ReadAsync(stream, cancellationToken);
if (segment.LastDataUnit) if (segment.LastDataUnit)
{ {
@@ -145,7 +146,7 @@ namespace S7.Net
while (!segment.LastDataUnit) while (!segment.LastDataUnit)
{ {
segment = await TPDU.ReadAsync(stream); segment = await TPDU.ReadAsync(stream, cancellationToken);
var previousLength = buffer.Length; var previousLength = buffer.Length;
Array.Resize(ref buffer, buffer.Length + segment.Data.Length); Array.Resize(ref buffer, buffer.Length + segment.Data.Length);
Array.Copy(segment.Data, 0, buffer, previousLength, segment.Data.Length); Array.Copy(segment.Data, 0, buffer, previousLength, segment.Data.Length);

View File

@@ -6,6 +6,7 @@ using System.Net.Sockets;
using System.Threading.Tasks; using System.Threading.Tasks;
using S7.Net.Protocol; using S7.Net.Protocol;
using System.IO; using System.IO;
using System.Threading;
namespace S7.Net namespace S7.Net
{ {
@@ -17,14 +18,18 @@ namespace S7.Net
/// <summary> /// <summary>
/// Connects to the PLC and performs a COTP ConnectionRequest and S7 CommunicationSetup. /// Connects to the PLC and performs a COTP ConnectionRequest and S7 CommunicationSetup.
/// </summary> /// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that the cancellation will not affect opening the socket in any way and only affects data transfers for configuring the connection after the socket connection is successfully established.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous open operation.</returns> /// <returns>A task that represents the asynchronous open operation.</returns>
public async Task OpenAsync() public async Task OpenAsync(CancellationToken cancellationToken = default)
{ {
await ConnectAsync(); await ConnectAsync();
cancellationToken.ThrowIfCancellationRequested();
var stream = GetStreamIfAvailable(); var stream = GetStreamIfAvailable();
await stream.WriteAsync(ConnectionRequest.GetCOTPConnectionRequest(CPU, Rack, Slot), 0, 22); await stream.WriteAsync(ConnectionRequest.GetCOTPConnectionRequest(CPU, Rack, Slot), 0, 22);
var response = await COTP.TPDU.ReadAsync(stream); var response = await COTP.TPDU.ReadAsync(stream, cancellationToken);
if (response == null) if (response == null)
{ {
throw new Exception("Error reading Connection Confirm. Malformed TPDU packet"); throw new Exception("Error reading Connection Confirm. Malformed TPDU packet");
@@ -36,7 +41,7 @@ namespace S7.Net
await stream.WriteAsync(GetS7ConnectionSetup(), 0, 25); await stream.WriteAsync(GetS7ConnectionSetup(), 0, 25);
var s7data = await COTP.TSDU.ReadAsync(stream); var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken);
if (s7data == null) if (s7data == null)
throw new WrongNumberOfBytesException("No data received in response to Communication Setup"); throw new WrongNumberOfBytesException("No data received in response to Communication Setup");
if (s7data.Length < 2) if (s7data.Length < 2)
@@ -69,8 +74,10 @@ namespace S7.Net
/// <param name="db">Address of the memory area (if you want to read DB1, this is set to 1). This must be set also for other memory area types: counters, timers,etc.</param> /// <param name="db">Address of the memory area (if you want to read DB1, this is set to 1). This must be set also for other memory area types: counters, timers,etc.</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="count">Byte count, if you want to read 120 bytes, set this to 120.</param> /// <param name="count">Byte count, if you want to read 120 bytes, set this to 120.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>Returns the bytes in an array</returns> /// <returns>Returns the bytes in an array</returns>
public async Task<byte[]> ReadBytesAsync(DataType dataType, int db, int startByteAdr, int count) public async Task<byte[]> ReadBytesAsync(DataType dataType, int db, int startByteAdr, int count, CancellationToken cancellationToken = default)
{ {
var resultBytes = new byte[count]; var resultBytes = new byte[count];
int index = 0; int index = 0;
@@ -78,7 +85,7 @@ namespace S7.Net
{ {
//This works up to MaxPDUSize-1 on SNAP7. But not MaxPDUSize-0. //This works up to MaxPDUSize-1 on SNAP7. But not MaxPDUSize-0.
var maxToRead = Math.Min(count, MaxPDUSize - 18); var maxToRead = Math.Min(count, MaxPDUSize - 18);
await ReadBytesWithSingleRequestAsync(dataType, db, startByteAdr + index, resultBytes, index, maxToRead); await ReadBytesWithSingleRequestAsync(dataType, db, startByteAdr + index, resultBytes, index, maxToRead, cancellationToken);
count -= maxToRead; count -= maxToRead;
index += maxToRead; index += maxToRead;
} }
@@ -96,10 +103,12 @@ namespace S7.Net
/// <param name="varType">Type of the variable/s that you are reading</param> /// <param name="varType">Type of the variable/s that you are reading</param>
/// <param name="bitAdr">Address of bit. If you want to read DB1.DBX200.6, set 6 to this parameter.</param> /// <param name="bitAdr">Address of bit. If you want to read DB1.DBX200.6, set 6 to this parameter.</param>
/// <param name="varCount"></param> /// <param name="varCount"></param>
public async Task<object?> ReadAsync(DataType dataType, int db, int startByteAdr, VarType varType, int varCount, byte bitAdr = 0) /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
public async Task<object?> ReadAsync(DataType dataType, int db, int startByteAdr, VarType varType, int varCount, byte bitAdr = 0, CancellationToken cancellationToken = default)
{ {
int cntBytes = VarTypeToByteLength(varType, varCount); int cntBytes = VarTypeToByteLength(varType, varCount);
byte[] bytes = await ReadBytesAsync(dataType, db, startByteAdr, cntBytes); byte[] bytes = await ReadBytesAsync(dataType, db, startByteAdr, cntBytes, cancellationToken);
return ParseBytes(varType, bytes, varCount, bitAdr); return ParseBytes(varType, bytes, varCount, bitAdr);
} }
@@ -108,11 +117,13 @@ namespace S7.Net
/// If the read was not successful, check LastErrorCode or LastErrorString. /// If the read was not successful, check LastErrorCode or LastErrorString.
/// </summary> /// </summary>
/// <param name="variable">Input strings like "DB1.DBX0.0", "DB20.DBD200", "MB20", "T45", etc.</param> /// <param name="variable">Input strings like "DB1.DBX0.0", "DB20.DBD200", "MB20", "T45", etc.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>Returns an object that contains the value. This object must be cast accordingly.</returns> /// <returns>Returns an object that contains the value. This object must be cast accordingly.</returns>
public async Task<object?> ReadAsync(string variable) public async Task<object?> ReadAsync(string variable, CancellationToken cancellationToken = default)
{ {
var adr = new PLCAddress(variable); var adr = new PLCAddress(variable);
return await ReadAsync(adr.DataType, adr.DbNumber, adr.StartByte, adr.VarType, 1, (byte)adr.BitNumber); return await ReadAsync(adr.DataType, adr.DbNumber, adr.StartByte, adr.VarType, 1, (byte)adr.BitNumber, cancellationToken);
} }
/// <summary> /// <summary>
@@ -121,12 +132,14 @@ namespace S7.Net
/// <param name="structType">Type of the struct to be readed (es.: TypeOf(MyStruct)).</param> /// <param name="structType">Type of the struct to be readed (es.: TypeOf(MyStruct)).</param>
/// <param name="db">Address of the DB.</param> /// <param name="db">Address of the DB.</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>Returns a struct that must be cast.</returns> /// <returns>Returns a struct that must be cast.</returns>
public async Task<object?> ReadStructAsync(Type structType, int db, int startByteAdr = 0) public async Task<object?> ReadStructAsync(Type structType, int db, int startByteAdr = 0, CancellationToken cancellationToken = default)
{ {
int numBytes = Types.Struct.GetStructSize(structType); int numBytes = Types.Struct.GetStructSize(structType);
// now read the package // now read the package
var resultBytes = await ReadBytesAsync(DataType.DataBlock, db, startByteAdr, numBytes); var resultBytes = await ReadBytesAsync(DataType.DataBlock, db, startByteAdr, numBytes, cancellationToken);
// and decode it // and decode it
return Types.Struct.FromBytes(structType, resultBytes); return Types.Struct.FromBytes(structType, resultBytes);
@@ -138,10 +151,12 @@ namespace S7.Net
/// <typeparam name="T">The struct type</typeparam> /// <typeparam name="T">The struct type</typeparam>
/// <param name="db">Address of the DB.</param> /// <param name="db">Address of the DB.</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>Returns a nulable struct. If nothing was read null will be returned.</returns> /// <returns>Returns a nulable struct. If nothing was read null will be returned.</returns>
public async Task<T?> ReadStructAsync<T>(int db, int startByteAdr = 0) where T : struct public async Task<T?> ReadStructAsync<T>(int db, int startByteAdr = 0, CancellationToken cancellationToken = default) where T : struct
{ {
return await ReadStructAsync(typeof(T), db, startByteAdr) as T?; return await ReadStructAsync(typeof(T), db, startByteAdr, cancellationToken) as T?;
} }
/// <summary> /// <summary>
@@ -151,8 +166,10 @@ namespace S7.Net
/// <param name="sourceClass">Instance of the class that will store the values</param> /// <param name="sourceClass">Instance of the class that will store the values</param>
/// <param name="db">Index of the DB; es.: 1 is for DB1</param> /// <param name="db">Index of the DB; es.: 1 is for DB1</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>The number of read bytes</returns> /// <returns>The number of read bytes</returns>
public async Task<Tuple<int, object>> ReadClassAsync(object sourceClass, int db, int startByteAdr = 0) public async Task<Tuple<int, object>> ReadClassAsync(object sourceClass, int db, int startByteAdr = 0, CancellationToken cancellationToken = default)
{ {
int numBytes = (int)Class.GetClassSize(sourceClass); int numBytes = (int)Class.GetClassSize(sourceClass);
if (numBytes <= 0) if (numBytes <= 0)
@@ -161,7 +178,7 @@ namespace S7.Net
} }
// now read the package // now read the package
var resultBytes = await ReadBytesAsync(DataType.DataBlock, db, startByteAdr, numBytes); var resultBytes = await ReadBytesAsync(DataType.DataBlock, db, startByteAdr, numBytes, cancellationToken);
// and decode it // and decode it
Class.FromBytes(sourceClass, resultBytes); Class.FromBytes(sourceClass, resultBytes);
@@ -176,10 +193,12 @@ namespace S7.Net
/// <typeparam name="T">The class that will be instantiated. Requires a default constructor</typeparam> /// <typeparam name="T">The class that will be instantiated. Requires a default constructor</typeparam>
/// <param name="db">Index of the DB; es.: 1 is for DB1</param> /// <param name="db">Index of the DB; es.: 1 is for DB1</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>An instance of the class with the values read from the PLC. If no data has been read, null will be returned</returns> /// <returns>An instance of the class with the values read from the PLC. If no data has been read, null will be returned</returns>
public async Task<T?> ReadClassAsync<T>(int db, int startByteAdr = 0) where T : class public async Task<T?> ReadClassAsync<T>(int db, int startByteAdr = 0, CancellationToken cancellationToken = default) where T : class
{ {
return await ReadClassAsync(() => Activator.CreateInstance<T>(), db, startByteAdr); return await ReadClassAsync(() => Activator.CreateInstance<T>(), db, startByteAdr, cancellationToken);
} }
/// <summary> /// <summary>
@@ -190,11 +209,13 @@ namespace S7.Net
/// <param name="classFactory">Function to instantiate the class</param> /// <param name="classFactory">Function to instantiate the class</param>
/// <param name="db">Index of the DB; es.: 1 is for DB1</param> /// <param name="db">Index of the DB; es.: 1 is for DB1</param>
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>An instance of the class with the values read from the PLC. If no data has been read, null will be returned</returns> /// <returns>An instance of the class with the values read from the PLC. If no data has been read, null will be returned</returns>
public async Task<T?> ReadClassAsync<T>(Func<T> classFactory, int db, int startByteAdr = 0) where T : class public async Task<T?> ReadClassAsync<T>(Func<T> classFactory, int db, int startByteAdr = 0, CancellationToken cancellationToken = default) where T : class
{ {
var instance = classFactory(); var instance = classFactory();
var res = await ReadClassAsync(instance, db, startByteAdr); var res = await ReadClassAsync(instance, db, startByteAdr, cancellationToken);
int readBytes = res.Item1; int readBytes = res.Item1;
if (readBytes <= 0) if (readBytes <= 0)
{ {
@@ -212,7 +233,9 @@ namespace S7.Net
/// The number of DataItems as well as the total size of the requested data can not exceed a certain limit (protocol restriction). /// The number of DataItems as well as the total size of the requested data can not exceed a certain limit (protocol restriction).
/// </summary> /// </summary>
/// <param name="dataItems">List of dataitems that contains the list of variables that must be read.</param> /// <param name="dataItems">List of dataitems that contains the list of variables that must be read.</param>
public async Task<List<DataItem>> ReadMultipleVarsAsync(List<DataItem> dataItems) /// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
public async Task<List<DataItem>> ReadMultipleVarsAsync(List<DataItem> dataItems, CancellationToken cancellationToken = default)
{ {
//Snap7 seems to choke on PDU sizes above 256 even if snap7 //Snap7 seems to choke on PDU sizes above 256 even if snap7
//replies with bigger PDU size in connection setup. //replies with bigger PDU size in connection setup.
@@ -235,7 +258,7 @@ namespace S7.Net
var dataToSend = package.ToArray(); var dataToSend = package.ToArray();
await stream.WriteAsync(dataToSend, 0, dataToSend.Length); await stream.WriteAsync(dataToSend, 0, dataToSend.Length);
var s7data = await COTP.TSDU.ReadAsync(stream); //TODO use Async var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken); //TODO use Async
if (s7data == null || s7data[14] != 0xff) if (s7data == null || s7data[14] != 0xff)
throw new PlcException(ErrorCode.WrongNumberReceivedBytes); throw new PlcException(ErrorCode.WrongNumberReceivedBytes);
@@ -245,6 +268,10 @@ namespace S7.Net
{ {
throw new PlcException(ErrorCode.ReadData, socketException); throw new PlcException(ErrorCode.ReadData, socketException);
} }
catch (OperationCanceledException)
{
throw;
}
catch (Exception exc) catch (Exception exc)
{ {
throw new PlcException(ErrorCode.ReadData, exc); throw new PlcException(ErrorCode.ReadData, exc);
@@ -260,15 +287,17 @@ namespace S7.Net
/// <param name="db">Address of the memory area (if you want to read DB1, this is set to 1). This must be set also for other memory area types: counters, timers,etc.</param> /// <param name="db">Address of the memory area (if you want to read DB1, this is set to 1). This must be set also for other memory area types: counters, timers,etc.</param>
/// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param>
/// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param> /// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns> /// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteBytesAsync(DataType dataType, int db, int startByteAdr, byte[] value) public async Task WriteBytesAsync(DataType dataType, int db, int startByteAdr, byte[] value, CancellationToken cancellationToken = default)
{ {
int localIndex = 0; int localIndex = 0;
int count = value.Length; int count = value.Length;
while (count > 0) while (count > 0)
{ {
var maxToWrite = (int)Math.Min(count, MaxPDUSize - 35); var maxToWrite = (int)Math.Min(count, MaxPDUSize - 35);
await WriteBytesWithASingleRequestAsync(dataType, db, startByteAdr + localIndex, value, localIndex, maxToWrite); await WriteBytesWithASingleRequestAsync(dataType, db, startByteAdr + localIndex, value, localIndex, maxToWrite, cancellationToken);
count -= maxToWrite; count -= maxToWrite;
localIndex += maxToWrite; localIndex += maxToWrite;
} }
@@ -282,13 +311,15 @@ namespace S7.Net
/// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param>
/// <param name="bitAdr">The address of the bit. (0-7)</param> /// <param name="bitAdr">The address of the bit. (0-7)</param>
/// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param> /// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns> /// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteBitAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool value) public async Task WriteBitAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool value, CancellationToken cancellationToken = default)
{ {
if (bitAdr < 0 || bitAdr > 7) if (bitAdr < 0 || bitAdr > 7)
throw new InvalidAddressException(string.Format("Addressing Error: You can only reference bitwise locations 0-7. Address {0} is invalid", bitAdr)); throw new InvalidAddressException(string.Format("Addressing Error: You can only reference bitwise locations 0-7. Address {0} is invalid", bitAdr));
await WriteBitWithASingleRequestAsync(dataType, db, startByteAdr, bitAdr, value); await WriteBitWithASingleRequestAsync(dataType, db, startByteAdr, bitAdr, value, cancellationToken);
} }
/// <summary> /// <summary>
@@ -299,13 +330,15 @@ namespace S7.Net
/// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to write DB1.DBW200, this is 200.</param>
/// <param name="bitAdr">The address of the bit. (0-7)</param> /// <param name="bitAdr">The address of the bit. (0-7)</param>
/// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param> /// <param name="value">Bytes to write. If more than 200, multiple requests will be made.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns> /// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteBitAsync(DataType dataType, int db, int startByteAdr, int bitAdr, int value) public async Task WriteBitAsync(DataType dataType, int db, int startByteAdr, int bitAdr, int value, CancellationToken cancellationToken = default)
{ {
if (value < 0 || value > 1) if (value < 0 || value > 1)
throw new ArgumentException("Value must be 0 or 1", nameof(value)); throw new ArgumentException("Value must be 0 or 1", nameof(value));
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, value == 1); await WriteBitAsync(dataType, db, startByteAdr, bitAdr, value == 1, cancellationToken);
} }
/// <summary> /// <summary>
@@ -318,15 +351,17 @@ namespace S7.Net
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="value">Bytes to write. The lenght of this parameter can't be higher than 200. If you need more, use recursion.</param> /// <param name="value">Bytes to write. The lenght of this parameter can't be higher than 200. If you need more, use recursion.</param>
/// <param name="bitAdr">The address of the bit. (0-7)</param> /// <param name="bitAdr">The address of the bit. (0-7)</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns> /// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteAsync(DataType dataType, int db, int startByteAdr, object value, int bitAdr = -1) public async Task WriteAsync(DataType dataType, int db, int startByteAdr, object value, int bitAdr = -1, CancellationToken cancellationToken = default)
{ {
if (bitAdr != -1) if (bitAdr != -1)
{ {
//Must be writing a bit value as bitAdr is specified //Must be writing a bit value as bitAdr is specified
if (value is bool boolean) if (value is bool boolean)
{ {
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, boolean); await WriteBitAsync(dataType, db, startByteAdr, bitAdr, boolean, cancellationToken);
} }
else if (value is int intValue) else if (value is int intValue)
{ {
@@ -336,11 +371,11 @@ namespace S7.Net
"Addressing Error: You can only reference bitwise locations 0-7. Address {0} is invalid", "Addressing Error: You can only reference bitwise locations 0-7. Address {0} is invalid",
bitAdr), nameof(bitAdr)); bitAdr), nameof(bitAdr));
await WriteBitAsync(dataType, db, startByteAdr, bitAdr, intValue == 1); await WriteBitAsync(dataType, db, startByteAdr, bitAdr, intValue == 1, cancellationToken);
} }
else throw new ArgumentException("Value must be a bool or an int to write a bit", nameof(value)); else throw new ArgumentException("Value must be a bool or an int to write a bit", nameof(value));
} }
else await WriteBytesAsync(dataType, db, startByteAdr, Serialization.SerializeValue(value)); else await WriteBytesAsync(dataType, db, startByteAdr, Serialization.SerializeValue(value), cancellationToken);
} }
/// <summary> /// <summary>
@@ -349,11 +384,13 @@ namespace S7.Net
/// </summary> /// </summary>
/// <param name="variable">Input strings like "DB1.DBX0.0", "DB20.DBD200", "MB20", "T45", etc.</param> /// <param name="variable">Input strings like "DB1.DBX0.0", "DB20.DBD200", "MB20", "T45", etc.</param>
/// <param name="value">Value to be written to the PLC</param> /// <param name="value">Value to be written to the PLC</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns> /// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteAsync(string variable, object value) public async Task WriteAsync(string variable, object value, CancellationToken cancellationToken = default)
{ {
var adr = new PLCAddress(variable); var adr = new PLCAddress(variable);
await WriteAsync(adr.DataType, adr.DbNumber, adr.StartByte, value, adr.BitNumber); await WriteAsync(adr.DataType, adr.DbNumber, adr.StartByte, value, adr.BitNumber, cancellationToken);
} }
/// <summary> /// <summary>
@@ -362,11 +399,13 @@ namespace S7.Net
/// <param name="structValue">The struct to be written</param> /// <param name="structValue">The struct to be written</param>
/// <param name="db">Db address</param> /// <param name="db">Db address</param>
/// <param name="startByteAdr">Start bytes on the PLC</param> /// <param name="startByteAdr">Start bytes on the PLC</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns> /// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteStructAsync(object structValue, int db, int startByteAdr = 0) public async Task WriteStructAsync(object structValue, int db, int startByteAdr = 0, CancellationToken cancellationToken = default)
{ {
var bytes = Struct.ToBytes(structValue).ToList(); var bytes = Struct.ToBytes(structValue).ToList();
await WriteBytesAsync(DataType.DataBlock, db, startByteAdr, bytes.ToArray()); await WriteBytesAsync(DataType.DataBlock, db, startByteAdr, bytes.ToArray(), cancellationToken);
} }
/// <summary> /// <summary>
@@ -375,15 +414,17 @@ namespace S7.Net
/// <param name="classValue">The class to be written</param> /// <param name="classValue">The class to be written</param>
/// <param name="db">Db address</param> /// <param name="db">Db address</param>
/// <param name="startByteAdr">Start bytes on the PLC</param> /// <param name="startByteAdr">Start bytes on the PLC</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is None.
/// Please note that cancellation is advisory/cooperative and will not lead to immediate cancellation in all cases.</param>
/// <returns>A task that represents the asynchronous write operation.</returns> /// <returns>A task that represents the asynchronous write operation.</returns>
public async Task WriteClassAsync(object classValue, int db, int startByteAdr = 0) public async Task WriteClassAsync(object classValue, int db, int startByteAdr = 0, CancellationToken cancellationToken = default)
{ {
byte[] bytes = new byte[(int)Class.GetClassSize(classValue)]; byte[] bytes = new byte[(int)Class.GetClassSize(classValue)];
Types.Class.ToBytes(classValue, bytes); Types.Class.ToBytes(classValue, bytes);
await WriteBytesAsync(DataType.DataBlock, db, startByteAdr, bytes); await WriteBytesAsync(DataType.DataBlock, db, startByteAdr, bytes, cancellationToken);
} }
private async Task ReadBytesWithSingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] buffer, int offset, int count) private async Task ReadBytesWithSingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{ {
var stream = GetStreamIfAvailable(); var stream = GetStreamIfAvailable();
@@ -395,9 +436,9 @@ namespace S7.Net
BuildReadDataRequestPackage(package, dataType, db, startByteAdr, count); BuildReadDataRequestPackage(package, dataType, db, startByteAdr, count);
var dataToSend = package.ToArray(); var dataToSend = package.ToArray();
await stream.WriteAsync(dataToSend, 0, dataToSend.Length); await stream.WriteAsync(dataToSend, 0, dataToSend.Length, cancellationToken);
var s7data = await COTP.TSDU.ReadAsync(stream); var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken);
AssertReadResponse(s7data, count); AssertReadResponse(s7data, count);
Array.Copy(s7data, 18, buffer, offset, count); Array.Copy(s7data, 18, buffer, offset, count);
@@ -419,7 +460,7 @@ namespace S7.Net
var length = S7WriteMultiple.CreateRequest(message, dataItems); var length = S7WriteMultiple.CreateRequest(message, dataItems);
await stream.WriteAsync(message.Array, 0, length).ConfigureAwait(false); await stream.WriteAsync(message.Array, 0, length).ConfigureAwait(false);
var response = await COTP.TSDU.ReadAsync(stream).ConfigureAwait(false); var response = await COTP.TSDU.ReadAsync(stream, CancellationToken.None).ConfigureAwait(false);
S7WriteMultiple.ParseResponse(response, response.Length, dataItems); S7WriteMultiple.ParseResponse(response, response.Length, dataItems);
} }
@@ -431,7 +472,7 @@ namespace S7.Net
/// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param> /// <param name="startByteAdr">Start byte address. If you want to read DB1.DBW200, this is 200.</param>
/// <param name="value">Bytes to write. The lenght of this parameter can't be higher than 200. If you need more, use recursion.</param> /// <param name="value">Bytes to write. The lenght of this parameter can't be higher than 200. If you need more, use recursion.</param>
/// <returns>A task that represents the asynchronous write operation.</returns> /// <returns>A task that represents the asynchronous write operation.</returns>
private async Task WriteBytesWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] value, int dataOffset, int count) private async Task WriteBytesWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, byte[] value, int dataOffset, int count, CancellationToken cancellationToken)
{ {
try try
@@ -439,21 +480,25 @@ namespace S7.Net
var stream = GetStreamIfAvailable(); var stream = GetStreamIfAvailable();
var dataToSend = BuildWriteBytesPackage(dataType, db, startByteAdr, value, dataOffset, count); var dataToSend = BuildWriteBytesPackage(dataType, db, startByteAdr, value, dataOffset, count);
await stream.WriteAsync(dataToSend, 0, dataToSend.Length); await stream.WriteAsync(dataToSend, 0, dataToSend.Length, cancellationToken);
var s7data = await COTP.TSDU.ReadAsync(stream); var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken);
if (s7data == null || s7data[14] != 0xff) if (s7data == null || s7data[14] != 0xff)
{ {
throw new PlcException(ErrorCode.WrongNumberReceivedBytes); throw new PlcException(ErrorCode.WrongNumberReceivedBytes);
} }
} }
catch (OperationCanceledException)
{
throw;
}
catch (Exception exc) catch (Exception exc)
{ {
throw new PlcException(ErrorCode.WriteData, exc); throw new PlcException(ErrorCode.WriteData, exc);
} }
} }
private async Task WriteBitWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool bitValue) private async Task WriteBitWithASingleRequestAsync(DataType dataType, int db, int startByteAdr, int bitAdr, bool bitValue, CancellationToken cancellationToken)
{ {
var stream = GetStreamIfAvailable(); var stream = GetStreamIfAvailable();
@@ -463,12 +508,16 @@ namespace S7.Net
await stream.WriteAsync(dataToSend, 0, dataToSend.Length); await stream.WriteAsync(dataToSend, 0, dataToSend.Length);
var s7data = await COTP.TSDU.ReadAsync(stream); var s7data = await COTP.TSDU.ReadAsync(stream, cancellationToken);
if (s7data == null || s7data[14] != 0xff) if (s7data == null || s7data[14] != 0xff)
{ {
throw new PlcException(ErrorCode.WrongNumberReceivedBytes); throw new PlcException(ErrorCode.WrongNumberReceivedBytes);
} }
} }
catch (OperationCanceledException)
{
throw;
}
catch (Exception exc) catch (Exception exc)
{ {
throw new PlcException(ErrorCode.WriteData, exc); throw new PlcException(ErrorCode.WriteData, exc);

View File

@@ -1,5 +1,6 @@
using System; using System;
using System.IO; using System.IO;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace S7.Net namespace S7.Net
@@ -39,13 +40,13 @@ namespace S7.Net
/// <param name="offset">the offset in the buffer to read into</param> /// <param name="offset">the offset in the buffer to read into</param>
/// <param name="count">the amount of bytes to read into the buffer</param> /// <param name="count">the amount of bytes to read into the buffer</param>
/// <returns>returns the amount of read bytes</returns> /// <returns>returns the amount of read bytes</returns>
public static async Task<int> ReadExactAsync(this Stream stream, byte[] buffer, int offset, int count) public static async Task<int> ReadExactAsync(this Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{ {
int read = 0; int read = 0;
int received; int received;
do do
{ {
received = await stream.ReadAsync(buffer, offset + read, count - read); received = await stream.ReadAsync(buffer, offset + read, count - read, cancellationToken);
read += received; read += received;
} }
while (read < count && received > 0); while (read < count && received > 0);

View File

@@ -1,5 +1,6 @@
using System; using System;
using System.IO; using System.IO;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace S7.Net namespace S7.Net
@@ -57,10 +58,10 @@ namespace S7.Net
/// </summary> /// </summary>
/// <param name="stream">The stream to read from</param> /// <param name="stream">The stream to read from</param>
/// <returns>Task TPKT Instace</returns> /// <returns>Task TPKT Instace</returns>
public static async Task<TPKT> ReadAsync(Stream stream) public static async Task<TPKT> ReadAsync(Stream stream, CancellationToken cancellationToken)
{ {
var buf = new byte[4]; var buf = new byte[4];
int len = await stream.ReadExactAsync(buf, 0, 4); int len = await stream.ReadExactAsync(buf, 0, 4, cancellationToken);
if (len < 4) throw new TPKTInvalidException("TPKT is incomplete / invalid"); if (len < 4) throw new TPKTInvalidException("TPKT is incomplete / invalid");
var version = buf[0]; var version = buf[0];
@@ -68,7 +69,7 @@ namespace S7.Net
var length = buf[2] * 256 + buf[3]; //BigEndian var length = buf[2] * 256 + buf[3]; //BigEndian
var data = new byte[length - 4]; var data = new byte[length - 4];
len = await stream.ReadExactAsync(data, 0, data.Length); len = await stream.ReadExactAsync(data, 0, data.Length, cancellationToken);
if (len < data.Length) if (len < data.Length)
throw new TPKTInvalidException("TPKT payload incomplete / invalid"); throw new TPKTInvalidException("TPKT payload incomplete / invalid");