-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_pcap.py
executable file
·348 lines (300 loc) · 9.61 KB
/
generate_pcap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
#!/usr/bin/env python3
# © 2023 GE Vernova and/or its affiliates. All rights reserved.
import sys
import struct
import argparse
import numpy as np
parser = argparse.ArgumentParser(description="Generate IEC 61850 SV streams")
parser.add_argument(
"-a", "--app_id", type=int, help="SV stream APPID", default=16384 # 0x4000
)
parser.add_argument(
"-s", "--start_id", type=int, help="start index of svID streams", default=0
)
parser.add_argument(
"-n", "--nb_streams", type=int, help="Number of SV streams", default=64
)
parser.add_argument("-p", "--svID_prefix", type=str, help="SV ID prefix", default="svID")
parser.add_argument("-d", "--svID_digits", type=int, help="Number of SV ID digits", default=4)
parser.add_argument(
"-l",
"--loop",
type=int,
help="Number of iterations."
"The cmpCnt field will be increased at each loop",
default=4000,
)
parser.add_argument(
"-f", "--frequency", type=float, default=60, help="Loop frequency"
)
parser.add_argument(
"-i",
"--i_rms",
type=float,
default=1,
help="RMS desired for Current channels",
)
parser.add_argument(
"-v",
"--v_rms",
type=float,
default=57,
help="RMS desired for Voltage channels",
)
parser.add_argument(
"--mac_source",
type=str,
help="Source MAC address",
default="c4:b5:12:00:00:01",
)
parser.add_argument(
"--mac_dest",
type=str,
help="Destination MAC address",
default="01:0c:cd:01:00:01",
)
parser.add_argument(
"--vlanID",
type=int,
help="VLAN ID. 0 to disable VLAN",
default=0,
)
parser.add_argument(
"--vlanPriority",
type=int,
help="VLAN Priority",
default=4,
)
parser.add_argument(
"output",
type=str,
help="Path to the output pcap file which can be replayed using tcpreplay",
)
args = parser.parse_args()
freq = args.frequency
final_pcap = args.output
app_id = args.app_id
start_id = args.start_id
nb_streams = args.nb_streams
max_counter = args.loop
i_rms = args.i_rms
v_rms = args.v_rms
nb_digits = args.svID_digits
svID_max = 10 ** nb_digits - 1
svID_prefix = args.svID_prefix
mac_source = args.mac_source
mac_dest = args.mac_dest
vlanID = args.vlanID
vlanPriority = args.vlanPriority
try:
svID_prefix.encode("ascii")
except UnicodeEncodeError:
print("Error svID_prefix must be an ASCII string", file=sys.stderr)
sys.exit(1)
if nb_digits < 1 or nb_digits > 8:
print("Error nb_digits must be between 1 and 8", file=sys.stderr)
sys.exit(1)
if app_id < 0x4000 or app_id > 0x4FFF:
print("Error app_id must be between 0x4000 and 0x4FFFF", file=sys.stderr)
sys.exit(1)
if start_id < 0 or start_id + nb_streams > svID_max:
print("Error in start_id", file=sys.stderr)
sys.exit(1)
if nb_streams > svID_max or nb_streams < 1:
print(f"Error nb_streams must be between 1 and {svID_max}", file=sys.stderr)
sys.exit(1)
if max_counter < 1 or max_counter > 65536:
print("Error loop must be between 1 and 65536", file=sys.stderr)
sys.exit(1)
if freq <= 0:
print("Error frequency must be greater than 0", file=sys.stderr)
sys.exit(1)
if len(mac_source) != 17 or len(mac_dest) != 17:
print("Error MAC address must be in the format XX:XX:XX:XX:XX:XX", file=sys.stderr)
sys.exit(1)
def mac_string_to_bytes(mac):
return bytes.fromhex(mac.replace(":", ""))
# Check if the MAC address is valid
try:
mac_source = mac_string_to_bytes(mac_source)
mac_dest = mac_string_to_bytes(mac_dest)
except ValueError:
print("Error MAC address must be in the format XX:XX:XX:XX:XX:XX", file=sys.stderr)
sys.exit(1)
if vlanID < 0 or vlanID > 4095:
print("Error VLAN ID must be between 0 and 4095", file=sys.stderr)
sys.exit(1)
if vlanPriority < 0 or vlanPriority > 7:
print("Error VLAN Priority must be between 0 and 7", file=sys.stderr)
sys.exit(1)
HEADER = (
b"\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x04\x00\x01\x00\x00\x00"
)
start_id_str = f"{start_id:0{nb_digits}d}"
svIDFirst = svID_prefix + start_id_str
def get_second_microsecond(ts):
second = int(ts)
microsecond = int(round((ts - second) * 1000000))
return (second, microsecond)
def write_bytes_le(sv, offset, data, len=-2):
for i in data:
if len == -1:
break
sv[offset] = i
offset += 1
if len > 0:
len -= 1
def write_bytes_be(sv, offset, data):
offset += len(data) - 1
for i in data:
sv[offset] = i
offset -= 1
if vlanID != 0:
# 802.1Q VLAN Tag Header
# 0x8100: 802.1Q VLAN Tag Protocol Identifier
# 0x00: Priority Code Point (PCP) (3 bits)
# 0x00: Canonical Format Indicator (CFI) (1 bit)
# 0x000: VLAN Identifier (VID) (12 bits)
tpid = 0x8100
tci = (vlanPriority << 13) | vlanID
vlan_header = struct.pack("!HH", tpid, tci)
else:
vlan_header = b""
vlan_size = len(vlan_header)
# This data are generated from an pcap file
# It is possible to change the SV to sent. The only restriction is that the
# SV ID must have 8 bytes lenght and must contains only one ASDU
# To change this:
# 1. Capture an SV using Wireshark or tcpdump and generate a pcap file with
# times in µs. The file must contained only one SV
# 2. Drop the pcap header (0 to 0x18)
# 3. Set timestamp to 0 (set the next 8 bytes to 0)
# 5. At offset 0x28 Ensure the MAC Address destination is a IEEE 802.1X
# Multicast sampled values address (01:0C:CD:04:00:00 to 01:0C:CD:04:01:FF)
# 6. Adjust in the SV_ID_OFFSET and CMP_CNT_OFFSET in this file
SV_DATA = (
# Timestamp
b"\x00\x00\x00\x00\x00\x00\x00\x00"
# Packet length
+ bytes([0x70 + len(svIDFirst) + vlan_size]) + b"\x00\x00\x00"
# Capture length
+ bytes([0x70 + len(svIDFirst) + vlan_size]) + b"\x00\x00\x00"
# Destination MAC
+ mac_dest +
# Source MAC
mac_source +
# VLAN Header
vlan_header +
# Ethertype
b"\x88\xBA"
# AppId
b"\x40\x00"
# Length
b"\x00" + bytes([0x62 + len(svIDFirst)]) +
# Reserved 1 & Reserved 2
b"\x00\x00\x00\x00"
# savPDU 0x60 Length
b"\x60" + bytes([0x58 + len(svIDFirst)]) +
# Number of asdu 0x80 L(1) 8
b"\x80\x01\x01"
# Sequence of asdu 0xA2 L
b"\xa2" + bytes([0x53 + len(svIDFirst)]) +
# Sequence ASDU1 0x30 L
b"\x30" + bytes([0x51 + len(svIDFirst)]) +
# SvID 0x80 L Values
b"\x80" + bytes([len(svIDFirst)]) + svIDFirst.encode("ascii") +
# smpCnt 0x82 L(2) value
b"\x82\x02\x00\x00"
# ConfRev 0x83 L(4) value
b"\x83\x04\x00\x00\x00\x01"
# smpSync 0x85 L(1) value
b"\x85\x01\x00"
# Sequence of Data 0x87 L(64) Dataset 8 CH
# (4 bytes sample + 4 bytes quality)
b"\x87\x40"
# b"\x00\x00\x00\x01\x00\x00\x00\x00"
# b"\x00\x00\x00\x02\x00\x00\x00\x00"
# b"\x00\x00\x00\x03\x00\x00\x00\x00"
# b"\x00\x00\x00\x04\x00\x00\x00\x00"
# b"\x00\x00\x00\x05\x00\x00\x00\x00"
# b"\x00\x00\x00\x06\x00\x00\x00\x00"
# b"\x00\x00\x00\x07\x00\x00\x00\x00"
# b"\x00\x00\x00\x08\x00\x00\x00\x00"
)
TS_OFFSET = 0
APP_ID_OFFSET = 0x1E + vlan_size
SV_ID_OFFSET = 0x31 + vlan_size
SMP_CNT_OFFSET = SV_ID_OFFSET + 2 + len(svIDFirst)
pcap_data = bytearray()
# Add header
pcap_data += HEADER
sv_data = bytearray(SV_DATA)
ts = 0
samples_per_cyle = 80
sampling_rate = samples_per_cyle * freq
scale_factor_amps = 1000
scale_factor_volts = 100
voltage_channels = ["Va", "Vb", "Vc", "Vn"]
current_channels = ["Ia", "Ib", "Ic", "In"]
for i in range(0, max_counter):
(second, microsecond) = get_second_microsecond(ts)
ts = (i + 1) / sampling_rate
# Write AppID
write_bytes_be(
sv_data, APP_ID_OFFSET, struct.pack("H", app_id)
)
# Write cmpCnt
write_bytes_be(
sv_data, SMP_CNT_OFFSET, struct.pack("H", (i % int(sampling_rate)))
)
# Write TS add a +1 offset to second and microsecond to avoid a tcpreplay
# limitation. tcpreplay do not support frames with a 0 timestamp.
write_bytes_le(
sv_data, TS_OFFSET, struct.pack("II", second + 1, microsecond + 1)
)
for st in range(0, nb_streams):
# svID string are svID_prefixXXXX
svID_data = f"{svID_prefix}{start_id+st:0{nb_digits}d}".encode("ascii")
write_bytes_le(sv_data, SV_ID_OFFSET, svID_data, len(svID_data) - 1)
pcap_data += sv_data
for index, channel in np.ndenumerate(current_channels):
# 4 bytes of sample
if channel != "In":
signal = int(
scale_factor_amps
* i_rms
* np.sqrt(2)
* np.sin(
(2 * np.pi * freq * i * (1 / sampling_rate))
+ ((2 * np.pi / 3) * index[0])
)
)
else:
signal = 0
pcap_data += bytearray(
signal.to_bytes(4, byteorder="big", signed=True)
)
# 4 bytes of quality
pcap_data += bytearray(b"\x00\x00\x00\x00")
for index, channel in np.ndenumerate(voltage_channels):
# 4 bytes of sample
if channel != "Vn":
signal = int(
scale_factor_volts
* v_rms
* np.sqrt(2)
* np.cos(
(2 * np.pi * freq * i * (1 / sampling_rate))
+ ((2 * np.pi / 3) * index[0])
)
)
else:
signal = 0
pcap_data += bytearray(
signal.to_bytes(4, byteorder="big", signed=True)
)
# 4 bytes of quality
pcap_data += bytearray(b"\x00\x00\x00\x00")
with open(final_pcap, "wb") as f:
f.write(pcap_data)