init - first working version
This commit is contained in:
BIN
src/__pycache__/model_generate.cpython-313.pyc
Normal file
BIN
src/__pycache__/model_generate.cpython-313.pyc
Normal file
Binary file not shown.
BIN
src/__pycache__/utility.cpython-313.pyc
Normal file
BIN
src/__pycache__/utility.cpython-313.pyc
Normal file
Binary file not shown.
146
src/main.py
Normal file
146
src/main.py
Normal file
@@ -0,0 +1,146 @@
|
||||
# ==============================================================================
|
||||
# Author : Jake Ginesin
|
||||
# Authored : 14 June 2024
|
||||
# Purpose : synthesize attacker gadgets for attackers that can drop,
|
||||
# replay, and reorder messages on a channel
|
||||
# ==============================================================================
|
||||
import sys, re, subprocess, os, shutil
|
||||
from typing import List
|
||||
|
||||
from utility import *
|
||||
from model_generate import *
|
||||
|
||||
def show_help() -> None:
|
||||
msg=(
|
||||
"Usage: \n"
|
||||
" python main.py [arguments] \n\n"
|
||||
"Arguments: \n"
|
||||
" --model=path/to/model.pml Promela model to generate attackers on\n"
|
||||
" --attacker=[replay,drop,reorder] \n"
|
||||
" --chan=[chan1, chan2:int, ...] Channels to synthesize attackers on. When specifying over ranges of\n"
|
||||
" channels, you can give ranges or a list of values\n"
|
||||
" --nocheck Don't check channel validity\n"
|
||||
# " --nchan=[nat, nat, ...] If the channel is a set of channels, how many attackers to synthesize?\n"
|
||||
" --mem=[num] Size of memory. Defaults to '3' \n"
|
||||
" --mem=unbounded Use the unbounded memory gadget version (not recommended)\n"
|
||||
" --output=path/to/file.pml Outputted file name\n"
|
||||
" --eval Evaluate the outputted file with Spin\n"
|
||||
" --cleanup Clean up the extra files spin creates, including Korg's \n"
|
||||
)
|
||||
print(msg)
|
||||
|
||||
# assert "syntax error" not in stdout, "there seems to be a syntax error in the model"
|
||||
# assert "processes created" in stdout, "the spin model creates no processes ... check to see if it compiles"
|
||||
|
||||
def main() -> None:
|
||||
args = sys.argv[1:]
|
||||
if len(args) == 0 or args[0] in ["help", "--help", "-h", "-help"]:
|
||||
show_help()
|
||||
sys.exit()
|
||||
|
||||
mem = 3 # default
|
||||
|
||||
for arg in args:
|
||||
if arg.startswith("--model="):
|
||||
model_path = arg.split("=", 1)[1]
|
||||
elif arg.startswith("--attacker="):
|
||||
attacker = arg.split("=", 1)[1]
|
||||
elif arg.startswith("--mem="):
|
||||
mem_read = arg.split("=", 1)[1]
|
||||
elif arg.startswith("--chan="):
|
||||
chans = arg.split("=", 1)[1]
|
||||
elif arg.startswith("--output="):
|
||||
out_file = arg.split("=", 1)[1]
|
||||
|
||||
if "--eval" in args and not "--output" in args:
|
||||
out_file = "korg-promela-out.pml"
|
||||
|
||||
if not model_path or not attacker or not mem or not chans or not out_file:
|
||||
print("error: all arguments are required. \n")
|
||||
show_help()
|
||||
sys.exit(1)
|
||||
|
||||
unbounded = mem_read == "unbounded"
|
||||
if not unbounded : mem = int(mem_read)
|
||||
|
||||
ensure_compile(model_path)
|
||||
model = fileRead(model_path)
|
||||
|
||||
channels = parse_channels(fileReadLines(model_path))
|
||||
mchannels, mchannel_len = parse_mchannels(fileReadLines(model_path))
|
||||
model_with_attacker = str;
|
||||
assert mem >= 0, "memory value must be positive"
|
||||
|
||||
chans_togen = set()
|
||||
|
||||
# first, process the input
|
||||
mc = chans.split(",")
|
||||
for chan in mc:
|
||||
if ":" in chan:
|
||||
name, num_extr = chan[:chan.index(":")], chan[chan.index(":")+1:]
|
||||
if "-" in num_extr:
|
||||
a, b = list(map(lambda a: int(a), num_extr.split("-")))
|
||||
assert a < b
|
||||
assert a >= 0
|
||||
assert b < mchannel_len[name]
|
||||
for i in range(a,b+1):
|
||||
chan_name = str(name) + "[" + str(i) + "]"
|
||||
chans_togen.add(chan_name)
|
||||
channels[chan_name] = mchannels[name]
|
||||
else:
|
||||
a = int(num_extr)
|
||||
assert a >= 0
|
||||
assert a < mchannel_len[name]
|
||||
chan_name = str(name) + "[" + str(a) + "]"
|
||||
chans_togen.add(chan_name)
|
||||
channels[chan_name] = mchannels[name]
|
||||
|
||||
else : chans_togen.add(chan)
|
||||
|
||||
print(chans_togen)
|
||||
|
||||
for i in range(len(chans_togen)):
|
||||
chan = list(chans_togen)[i]
|
||||
|
||||
if not "--nocheck" : assert chan in channels, "can't find "+str(chan)+" in model"
|
||||
|
||||
match attacker:
|
||||
case "replay":
|
||||
if unbounded : attacker_gadget = gen_replay_unbounded(chan, channels[chan], i)
|
||||
else : attacker_gadget = gen_replay(chan, channels[chan], mem, i)
|
||||
case "drop":
|
||||
if unbounded : attacker_gadget = gen_drop_unbounded(chan, channels[chan], i)
|
||||
else : attacker_gadget = gen_drop(chan, channels[chan], mem, i)
|
||||
case "reorder":
|
||||
if unbounded : attacker_gadget = gen_reorder_unbounded(chan, channels[chan], i)
|
||||
else : attacker_gadget = gen_reorder(chan, channels[chan], mem, i)
|
||||
case _:
|
||||
print("error: inputted an invalid attacker model. \n")
|
||||
sys.exit(1)
|
||||
|
||||
if model.rindex("};") >= model.rindex("}"):
|
||||
model = model[:model.rindex("};")+2] + "\n\n" + attacker_gadget + "\n" + model[model.rindex("};")+2:]
|
||||
else:
|
||||
model = model[:model.rindex("}")+1] + "\n\n" + attacker_gadget + "\n" + model[model.rindex("}")+1:]
|
||||
|
||||
# Write the modified model to the output file
|
||||
with open(out_file, 'w') as file:
|
||||
file.write(model)
|
||||
|
||||
if "--eval" in args:
|
||||
print()
|
||||
print("generated Promela file with attacker model gadget... now running SPIN on "+str(out_file) + "!\n")
|
||||
eval_model(out_file)
|
||||
|
||||
if "--cleanup" in args:
|
||||
print("\nCleaning up Spin files...")
|
||||
cleanup_spin_files()
|
||||
try:
|
||||
os.remove(out_file)
|
||||
print(f"Removed: {out_file}")
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
if __name__== "__main__":
|
||||
main()
|
||||
346
src/model_generate.py
Normal file
346
src/model_generate.py
Normal file
@@ -0,0 +1,346 @@
|
||||
import sys, re, subprocess, os, shutil
|
||||
from typing import List
|
||||
|
||||
def gen_replay(chan : str, chan_type : List[str], mem : int, index : int) -> str:
|
||||
ret_string = ""
|
||||
|
||||
ret_string+= "chan attacker_mem_"+str(index)+" = ["+str(mem)+"] of " + ("{ " + str(chan_type)[1:-1] + " }") .replace("'","") + ";\n"
|
||||
ret_string+= "\n"
|
||||
|
||||
ret_string+= "active proctype attacker_replay_"+str(index)+"() {\n"
|
||||
|
||||
item_arr = []
|
||||
item_count = 0
|
||||
|
||||
# formulate string of general message input variables
|
||||
for item in chan_type:
|
||||
item_arr.append("b_" + str(item_count))
|
||||
ret_string+= str(item) + " " + item_arr[item_count] + ";\n"
|
||||
item_count+=1
|
||||
|
||||
fs = (str([item for item in item_arr])[1:-1]).replace("'","")
|
||||
|
||||
ret_string+="int i = "+str(mem)+";\n"
|
||||
ret_string+="int b;\n"
|
||||
ret_string+="CONSUME:\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
|
||||
ret_string+=" "+str(chan)+" ? <"+fs+"> -> attacker_mem_"+str(index)+" ! "+fs+";\n"
|
||||
ret_string+=" i--;\n"
|
||||
ret_string+=" if\n"
|
||||
ret_string+=" :: i == 0 -> goto REPLAY;\n"
|
||||
ret_string+=" :: i != 0 -> goto CONSUME;\n"
|
||||
ret_string+=" fi\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
|
||||
ret_string+=" b = len("+str(chan)+");\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: b != len("+str(chan)+") -> goto CONSUME;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+="REPLAY:\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: atomic {\n"
|
||||
ret_string+=" int am;\n"
|
||||
ret_string+=" select(am : 0 .. len(attacker_mem_"+str(index)+")-1);\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: am != 0 ->\n"
|
||||
ret_string+=" am = am-1;\n"
|
||||
ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> attacker_mem_"+str(index)+" ! "+fs+";\n"
|
||||
ret_string+=" :: am == 0 ->\n"
|
||||
ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> "+str(chan)+" ! "+fs+";\n"
|
||||
ret_string+=" break;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
|
||||
ret_string+=" b = len("+str(chan)+");\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: b != len("+str(chan)+") -> goto REPLAY;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: atomic {attacker_mem_"+str(index)+" ? "+fs+"; }\n"
|
||||
ret_string+=" :: empty(attacker_mem_"+str(index)+") -> goto BREAK;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+="BREAK:\n"
|
||||
ret_string+="}\n"
|
||||
|
||||
return ret_string
|
||||
|
||||
def gen_replay_unbounded(chan : str, chan_type : List[str], index : int) -> str:
|
||||
ret_string = ""
|
||||
|
||||
ret_string+= "chan attacker_mem_"+str(index)+" = [99] of " + ("{ " + str(chan_type)[1:-1] + " }") .replace("'","") + ";\n"
|
||||
ret_string+= "\n"
|
||||
|
||||
ret_string+= "active proctype attacker_replay_"+str(index)+"() {\n"
|
||||
|
||||
item_arr = []
|
||||
item_count = 0
|
||||
|
||||
# formulate string of general message input variables
|
||||
for item in chan_type:
|
||||
item_arr.append("b_" + str(item_count))
|
||||
ret_string+= str(item) + " " + item_arr[item_count] + ";\n"
|
||||
item_count+=1
|
||||
|
||||
fs = (str([item for item in item_arr])[1:-1]).replace("'","")
|
||||
|
||||
ret_string+="int b;\n"
|
||||
ret_string+="CONSUME:\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
|
||||
ret_string+=" "+str(chan)+" ? <"+fs+"> -> attacker_mem_"+str(index)+" ! "+fs+";\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: goto REPLAY;\n"
|
||||
ret_string+=" :: goto CONSUME;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
|
||||
ret_string+=" b = len("+str(chan)+");\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: b != len("+str(chan)+") -> goto CONSUME;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+="REPLAY:\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: atomic {\n"
|
||||
ret_string+=" int am;\n"
|
||||
ret_string+=" select(am : 0 .. len(attacker_mem_"+str(index)+")-1);\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: am != 0 ->\n"
|
||||
ret_string+=" am = am-1;\n"
|
||||
ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> attacker_mem_"+str(index)+" ! "+fs+";\n"
|
||||
ret_string+=" :: am == 0 ->\n"
|
||||
ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> "+str(chan)+" ! "+fs+";\n"
|
||||
ret_string+=" break;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
|
||||
ret_string+=" b = len("+str(chan)+");\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: b != len("+str(chan)+") -> goto REPLAY;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: atomic {attacker_mem_"+str(index)+" ? "+fs+"; }\n"
|
||||
ret_string+=" :: empty(attacker_mem_"+str(index)+") -> goto BREAK;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+="BREAK:\n"
|
||||
ret_string+="}\n"
|
||||
|
||||
return ret_string
|
||||
|
||||
|
||||
def gen_reorder(chan : str, chan_type : List[str], mem : int, index : int) -> str:
|
||||
ret_string = ""
|
||||
|
||||
ret_string+= "chan attacker_mem_"+str(index)+" = ["+str(mem)+"] of " + ("{ " + str(chan_type)[1:-1] + " }") .replace("'","") + ";\n"
|
||||
ret_string+= "\n"
|
||||
|
||||
ret_string+= "active proctype attacker_reorder_"+str(index)+"() priority 99 {\n"
|
||||
|
||||
item_arr = []
|
||||
item_count = 0
|
||||
|
||||
attacker_mem = "attacker_mem_" + str(index)
|
||||
|
||||
# formulate string of general message input variables
|
||||
for item in chan_type:
|
||||
item_arr.append("b_" + str(item_count))
|
||||
ret_string+= str(item) + " " + item_arr[item_count] + ";\n"
|
||||
item_count+=1
|
||||
|
||||
fs = (str([item for item in item_arr])[1:-1]).replace("'","")
|
||||
ret_string+="int i = "+str(mem)+";\n"
|
||||
ret_string+="int b;\n"
|
||||
ret_string+="INIT:\n"
|
||||
ret_string+="do\n"
|
||||
# ret_string+=" :: true -> {\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> {\n"
|
||||
ret_string+=" b = len("+str(chan)+");\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: b != len("+str(chan)+") -> goto INIT;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: goto CONSUME;\n"
|
||||
ret_string+="od\n"
|
||||
ret_string+="CONSUME:\n"
|
||||
ret_string+="do\n"
|
||||
ret_string+=" :: "+str(chan)+" ? "+str(fs)+" -> atomic {\n"
|
||||
ret_string+=" "+str(attacker_mem)+" ! "+str(fs)+";\n"
|
||||
ret_string+=" i--;\n"
|
||||
ret_string+=" if\n"
|
||||
ret_string+=" :: i == 0 -> goto REPLAY;\n"
|
||||
ret_string+=" :: i != 0 -> goto CONSUME;\n"
|
||||
ret_string+=" fi\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+="od\n"
|
||||
ret_string+="REPLAY:\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: atomic {\n"
|
||||
ret_string+=" int am;\n"
|
||||
ret_string+=" select(am : 0 .. len("+str(attacker_mem)+")-1);\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: am != 0 -> \n"
|
||||
ret_string+=" am = am-1;\n"
|
||||
ret_string+=" "+str(attacker_mem)+" ? "+str(fs)+" -> "+str(attacker_mem)+" ! "+str(fs)+";\n"
|
||||
ret_string+=" :: am == 0 ->\n"
|
||||
ret_string+=" "+str(attacker_mem)+" ? "+str(fs)+" -> "+str(chan)+" ! "+str(fs)+";\n"
|
||||
ret_string+=" break;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: empty("+str(attacker_mem)+") -> goto BREAK;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+="BREAK:\n"
|
||||
ret_string+="}\n"
|
||||
|
||||
return ret_string
|
||||
|
||||
def gen_reorder_unbounded(chan : str, chan_type : List[str], index : int) -> str:
|
||||
ret_string = ""
|
||||
|
||||
ret_string+= "chan attacker_mem_"+str(index)+" = [99] of " + ("{ " + str(chan_type)[1:-1] + " }") .replace("'","") + ";\n"
|
||||
ret_string+= "\n"
|
||||
|
||||
ret_string+= "active proctype attacker_reorder_"+str(index)+"() priority 99 {\n"
|
||||
|
||||
item_arr = []
|
||||
item_count = 0
|
||||
|
||||
attacker_mem = "attacker_mem_" + str(index)
|
||||
|
||||
# formulate string of general message input variables
|
||||
for item in chan_type:
|
||||
item_arr.append("b_" + str(item_count))
|
||||
ret_string+= str(item) + " " + item_arr[item_count] + ";\n"
|
||||
item_count+=1
|
||||
|
||||
fs = (str([item for item in item_arr])[1:-1]).replace("'","")
|
||||
ret_string+="int b;\n"
|
||||
ret_string+="INIT:\n"
|
||||
ret_string+="do\n"
|
||||
# ret_string+=" :: true -> {\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> {\n"
|
||||
ret_string+=" b = len("+str(chan)+");\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: b != len("+str(chan)+") -> goto INIT;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: goto CONSUME;\n"
|
||||
ret_string+="od\n"
|
||||
ret_string+="CONSUME:\n"
|
||||
ret_string+="do\n"
|
||||
ret_string+=" :: "+str(chan)+" ? "+str(fs)+" -> atomic {\n"
|
||||
ret_string+=" "+str(attacker_mem)+" ! "+str(fs)+";\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: goto REPLAY;\n"
|
||||
ret_string+=" :: goto CONSUME;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+="od\n"
|
||||
ret_string+="REPLAY:\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: atomic {\n"
|
||||
ret_string+=" int am;\n"
|
||||
ret_string+=" select(am : 0 .. len("+str(attacker_mem)+")-1);\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: am != 0 -> \n"
|
||||
ret_string+=" am = am-1;\n"
|
||||
ret_string+=" "+str(attacker_mem)+" ? "+str(fs)+" -> "+str(attacker_mem)+" ! "+str(fs)+";\n"
|
||||
ret_string+=" :: am == 0 ->\n"
|
||||
ret_string+=" "+str(attacker_mem)+" ? "+str(fs)+" -> "+str(chan)+" ! "+str(fs)+";\n"
|
||||
ret_string+=" break;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: empty("+str(attacker_mem)+") -> goto BREAK;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+="BREAK:\n"
|
||||
ret_string+="}\n"
|
||||
|
||||
return ret_string
|
||||
|
||||
def gen_drop(chan : str, chan_type : List[str], mem : int, index : int) -> str:
|
||||
ret_string = ""
|
||||
|
||||
ret_string+= "active proctype attacker_drop_"+str(index)+"() {\n"
|
||||
|
||||
# proctype variables
|
||||
item_arr = []
|
||||
item_count = 0
|
||||
|
||||
|
||||
# formulate string of general message input variables
|
||||
for item in chan_type:
|
||||
item_arr.append("b_" + str(item_count))
|
||||
ret_string+= str(item) + " " + item_arr[item_count] + ";\n"
|
||||
item_count+=1
|
||||
|
||||
fs = (str([item for item in item_arr])[1:-1]).replace("'","")
|
||||
|
||||
ret_string+="byte i = "+str(mem)+";\n"
|
||||
ret_string+="int b;\n"
|
||||
ret_string+="MAIN:\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+fs+"] -> atomic {\n"
|
||||
ret_string+=" if\n"
|
||||
ret_string+=" :: i == 0 -> goto BREAK;\n"
|
||||
ret_string+=" :: else ->\n"
|
||||
ret_string+=" "+str(chan)+" ? "+fs+";\n"
|
||||
ret_string+=" i = i - 1;\n"
|
||||
ret_string+=" goto MAIN;\n"
|
||||
ret_string+=" fi\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
|
||||
ret_string+=" b = len("+str(chan)+");\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: b != len("+str(chan)+") -> goto MAIN;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: goto BREAK;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+="BREAK:\n"
|
||||
ret_string+="}\n"
|
||||
|
||||
return ret_string
|
||||
|
||||
def gen_drop_unbounded(chan : str, chan_type : List[str], index : int) -> str:
|
||||
ret_string = ""
|
||||
|
||||
ret_string+= "active proctype attacker_drop_"+str(index)+"() {\n"
|
||||
|
||||
# proctype variables
|
||||
item_arr = []
|
||||
item_count = 0
|
||||
|
||||
# formulate string of general message input variables
|
||||
for item in chan_type:
|
||||
item_arr.append("b_" + str(item_count))
|
||||
ret_string+= str(item) + " " + item_arr[item_count] + ";\n"
|
||||
item_count+=1
|
||||
|
||||
fs = (str([item for item in item_arr])[1:-1]).replace("'","")
|
||||
|
||||
ret_string+="int b;\n"
|
||||
ret_string+="MAIN:\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+fs+"] -> atomic {\n"
|
||||
ret_string+=" do \n"
|
||||
ret_string+=" :: goto BREAK;\n"
|
||||
ret_string+=" :: true ->\n"
|
||||
ret_string+=" "+str(chan)+" ? "+fs+";\n"
|
||||
ret_string+=" goto MAIN;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
|
||||
ret_string+=" b = len("+str(chan)+");\n"
|
||||
ret_string+=" do\n"
|
||||
ret_string+=" :: b != len("+str(chan)+") -> goto MAIN;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+=" }\n"
|
||||
ret_string+=" :: goto BREAK;\n"
|
||||
ret_string+=" od\n"
|
||||
ret_string+="BREAK:\n"
|
||||
ret_string+="}\n"
|
||||
|
||||
return ret_string
|
||||
168
src/utility.py
Normal file
168
src/utility.py
Normal file
@@ -0,0 +1,168 @@
|
||||
import sys, re, subprocess, os, shutil
|
||||
from typing import List
|
||||
|
||||
def fileReadLines(fileName : str) -> str:
|
||||
try:
|
||||
txt = None
|
||||
with open(fileName, 'r') as fr:
|
||||
txt = fr.readlines()
|
||||
return txt
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def fileRead(fileName : str) -> str:
|
||||
try:
|
||||
txt = None
|
||||
with open(fileName, 'r') as fr:
|
||||
txt = fr.read()
|
||||
return txt
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def parse_channels(model : str) -> dict():
|
||||
channels = {}
|
||||
define_mapping = {}
|
||||
for line in model:
|
||||
if line.startswith("#define"):
|
||||
data = re.search(r"\#define\s*([A-Za-z\_\-]+)\s*([0-9])", line)
|
||||
define_mapping[data.group(1)] = int(data.group(2))
|
||||
if line.startswith("chan"):
|
||||
# parsing regular channels
|
||||
data = re.search(r"chan\s*([a-zA-Z\_\-]+).*\{(.+)\}", line)
|
||||
# note, we don't have to think very hard about parsing Promela types.
|
||||
# this is because mtype:whatever, mtype, and generic types are interchangable in Promela grammar
|
||||
name, ctype = data.group(1), data.group(2).replace(" ","").split(",")
|
||||
channels[name] = list(tuple(ctype))
|
||||
|
||||
# data_multichan = re.search(r"chan\s*([A-Za-z\_\-0-9]+)\[([A-Za-z0-9\_\-]+)\].*\{(.+)\}", line)
|
||||
# m_name, m_cvalue, m_ctype = data.group(1), data.group(2), data.group(3).replace(" ","").split(",")
|
||||
|
||||
# try:
|
||||
# m_cvalue = int(c_value)
|
||||
# except ValueError:
|
||||
# if type(m_cvalue) == str:
|
||||
# assert m_cvalue in define_mapping, "{c_value} isn't defined, yet your Promela file still parsed. Did you recursively define {c_value}?"
|
||||
# m_cvalue = define_mapping[m_cvalue]
|
||||
|
||||
# channels[m_name] = (m_cvalue, m_ctype)
|
||||
|
||||
else : continue
|
||||
# print(channels)
|
||||
return channels
|
||||
|
||||
def parse_mchannels(model : str) -> (dict(), dict()):
|
||||
channels = {}
|
||||
channel_lens = {}
|
||||
define_mapping = {}
|
||||
for line in model:
|
||||
if line.startswith("#define"):
|
||||
data = re.search(r"\#define\s*([A-Za-z\_\-]+)\s*([0-9])", line)
|
||||
define_mapping[data.group(1)] = int(data.group(2))
|
||||
# print(define_mapping)
|
||||
if line.startswith("chan"):
|
||||
# parsing multichannels
|
||||
data_multichan = re.search(r"chan\s*([A-Za-z\_\-0-9]+)\[([A-Za-z0-9\_\-]+)\].*\{(.+)\}", line)
|
||||
if data_multichan:
|
||||
m_name, m_cvalue, m_ctype = data_multichan.group(1), data_multichan.group(2), data_multichan.group(3).replace(" ","").split(",")
|
||||
else : continue
|
||||
|
||||
try:
|
||||
m_cvalue = int(m_cvalue)
|
||||
except ValueError:
|
||||
if type(m_cvalue) == str:
|
||||
assert m_cvalue in define_mapping, "{m_cvalue} isn't defined, yet your Promela file still parsed. Did you recursively define {c_value}?"
|
||||
m_cvalue = define_mapping[m_cvalue]
|
||||
|
||||
channels[m_name] = m_ctype
|
||||
channel_lens[m_name] = m_cvalue
|
||||
|
||||
|
||||
else : continue
|
||||
# print(channels)
|
||||
|
||||
return channels, channel_lens
|
||||
|
||||
def ensure_compile(model_path : str) -> None:
|
||||
cmd = ['spin', '-a', model_path]
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
stdout, stderr = proc.communicate()
|
||||
filename = os.path.basename(model_path)
|
||||
userdir = os.getcwd()
|
||||
|
||||
# Convert bytes to string
|
||||
stdout = stdout.decode()
|
||||
stderr = stderr.decode()
|
||||
assert "error" not in stdout, "there seems to be a syntax error in the model!"
|
||||
# assert "syntax error" not in stdout, "there seems to be a syntax error in the model"
|
||||
# assert "processes created" in stdout, "the spin model creates no processes ... check to see if it compiles"
|
||||
|
||||
def eval_model(model_path : str) -> None:
|
||||
cmd = ['spin', '-run', '-a', '-DNOREDUCE', model_path]
|
||||
# Set text=True to get string output
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
|
||||
out = ""
|
||||
while True:
|
||||
output = proc.stdout.readline()
|
||||
if output == '' and proc.poll() is not None:
|
||||
break
|
||||
if output:
|
||||
out+=output
|
||||
print(output, end='')
|
||||
|
||||
# No need to decode since output is already a string
|
||||
# stdout, stderr = proc.communicate()
|
||||
|
||||
# filename = os.path.basename(model_path)
|
||||
# userdir = os.getcwd()
|
||||
|
||||
# print(stderr)
|
||||
|
||||
if "pan: wrote" in out: # we know we wrote a trail
|
||||
print("attack trace found!!!! printing!\n")
|
||||
cd = os.getcwd()
|
||||
if "/" in model_path:
|
||||
od = cd + model_path[model_path.rindex("/"):] + ".trail"
|
||||
shutil.copy(od, model_path + ".trail")
|
||||
shutil.copy(cd + "/pan", model_path[:model_path.rindex("/"):] + "/pan")
|
||||
else:
|
||||
od = cd + model_path + ".trail"
|
||||
# shutil.copy(od, model_path + ".trail")
|
||||
# shutil.copy(cd + "/pan", model_path[:model_path.rindex("/"):] + "/pan")
|
||||
|
||||
cmd = ['spin', '-t0', '-s', '-r', model_path]
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
stdout, stderr = proc.communicate()
|
||||
filename = os.path.basename(model_path)
|
||||
userdir = os.getcwd()
|
||||
|
||||
# Convert bytes to string
|
||||
stdout = stdout.decode()
|
||||
stderr = stderr.decode()
|
||||
|
||||
print(stdout)
|
||||
else:
|
||||
print()
|
||||
print("Korg's exhaustive search is complete, no attacks found :)")
|
||||
|
||||
def cleanup_spin_files() -> None:
|
||||
"""Remove files generated by Spin"""
|
||||
files_to_remove = [
|
||||
'pan', 'pan.*', '*.trail', '_spin_nvr.tmp',
|
||||
'*.tcl', 'pan.b', 'pan.c', 'pan.h', 'pan.m', 'pan.t'
|
||||
]
|
||||
for pattern in files_to_remove:
|
||||
if '*' in pattern:
|
||||
import glob
|
||||
for f in glob.glob(pattern):
|
||||
try:
|
||||
os.remove(f)
|
||||
print(f"Removed: {f}")
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
os.remove(pattern)
|
||||
print(f"Removed: {pattern}")
|
||||
except OSError:
|
||||
pass
|
||||
Reference in New Issue
Block a user