improve replay attacker, add deadlock tests

This commit is contained in:
2025-11-05 15:24:56 -05:00
parent 3bb51d1263
commit a6ba78790e
8 changed files with 622 additions and 130 deletions

146
src/main-old.py Normal file
View File

@@ -0,0 +1,146 @@
# ==============================================================================
# Author : Jake Ginesin
# Authored : 14 June 2024
# Purpose : synthesize attacker gadgets for attackers that can drop,
# replay, and reorder messages on a channel
# ==============================================================================
import sys, re, subprocess, os, shutil
from typing import List
from utility import *
from model_generate import *
def show_help() -> None:
msg=(
"Usage: \n"
" python main.py [arguments] \n\n"
"Arguments: \n"
" --model=path/to/model.pml Promela model to generate attackers on\n"
" --attacker=[replay,drop,reorder] \n"
" --chan=[chan1, chan2:int, ...] Channels to synthesize attackers on. When specifying over ranges of\n"
" channels, you can give ranges or a list of values\n"
" --nocheck Don't check channel validity\n"
# " --nchan=[nat, nat, ...] If the channel is a set of channels, how many attackers to synthesize?\n"
" --mem=[num] Size of memory. Defaults to '3' \n"
" --mem=unbounded Use the unbounded memory gadget version (not recommended)\n"
" --output=path/to/file.pml Output file name\n"
" --eval Evaluate the outputted file with Spin\n"
" --cleanup Clean up the extra files spin creates, including Korg's \n"
)
print(msg)
# assert "syntax error" not in stdout, "there seems to be a syntax error in the model"
# assert "processes created" in stdout, "the spin model creates no processes ... check to see if it compiles"
def main() -> None:
args = sys.argv[1:]
if len(args) == 0 or args[0] in ["help", "--help", "-h", "-help"]:
show_help()
sys.exit()
mem = 3 # default
for arg in args:
if arg.startswith("--model="):
model_path = arg.split("=", 1)[1]
elif arg.startswith("--attacker="):
attacker = arg.split("=", 1)[1]
elif arg.startswith("--mem="):
mem_read = arg.split("=", 1)[1]
elif arg.startswith("--chan="):
chans = arg.split("=", 1)[1]
elif arg.startswith("--output="):
out_file = arg.split("=", 1)[1]
if "--eval" in args and not "--output" in args:
out_file = "korg-promela-out.pml"
if not model_path or not attacker or not mem or not chans or not out_file:
print("error: all arguments are required. \n")
show_help()
sys.exit(1)
unbounded = mem_read == "unbounded"
if not unbounded : mem = int(mem_read)
ensure_compile(model_path)
model = fileRead(model_path)
channels = parse_channels(fileReadLines(model_path))
mchannels, mchannel_len = parse_mchannels(fileReadLines(model_path))
model_with_attacker = str;
assert mem >= 0, "memory value must be positive"
chans_togen = set()
# first, process the input
mc = chans.split(",")
for chan in mc:
if ":" in chan:
name, num_extr = chan[:chan.index(":")], chan[chan.index(":")+1:]
if "-" in num_extr:
a, b = list(map(lambda a: int(a), num_extr.split("-")))
assert a < b
assert a >= 0
assert b < mchannel_len[name]
for i in range(a,b+1):
chan_name = str(name) + "[" + str(i) + "]"
chans_togen.add(chan_name)
channels[chan_name] = mchannels[name]
else:
a = int(num_extr)
assert a >= 0
assert a < mchannel_len[name]
chan_name = str(name) + "[" + str(a) + "]"
chans_togen.add(chan_name)
channels[chan_name] = mchannels[name]
else : chans_togen.add(chan)
print(chans_togen)
for i in range(len(chans_togen)):
chan = list(chans_togen)[i]
if not "--nocheck" : assert chan in channels, "can't find "+str(chan)+" in model"
match attacker:
case "replay":
if unbounded : attacker_gadget = gen_replay_unbounded(chan, channels[chan], i)
else : attacker_gadget = gen_replay(chan, channels[chan], mem, i)
case "drop":
if unbounded : attacker_gadget = gen_drop_unbounded(chan, channels[chan], i)
else : attacker_gadget = gen_drop(chan, channels[chan], mem, i)
case "reorder":
if unbounded : attacker_gadget = gen_reorder_unbounded(chan, channels[chan], i)
else : attacker_gadget = gen_reorder(chan, channels[chan], mem, i)
case _:
print("error: inputted an invalid attacker model. \n")
sys.exit(1)
if model.rindex("};") >= model.rindex("}"):
model = model[:model.rindex("};")+2] + "\n\n" + attacker_gadget + "\n" + model[model.rindex("};")+2:]
else:
model = model[:model.rindex("}")+1] + "\n\n" + attacker_gadget + "\n" + model[model.rindex("}")+1:]
# Write the modified model to the output file
with open(out_file, 'w') as file:
file.write(model)
if "--eval" in args:
print()
print("generated Promela file with attacker model gadget... now running SPIN on "+str(out_file) + "!\n")
eval_model(out_file)
if "--cleanup" in args:
print("\nCleaning up Spin files...")
cleanup_spin_files()
try:
os.remove(out_file)
print(f"Removed: {out_file}")
except OSError:
pass
if __name__== "__main__":
main()

View File

@@ -1,138 +1,243 @@
# ==============================================================================
# ==============================================================================
# Author : Jake Ginesin
# Authored : 14 June 2024
# Purpose : synthesize attacker gadgets for attackers that can drop,
# replay, and reorder messages on a channel
# ==============================================================================
import sys, re, subprocess, os, shutil
from typing import List
import sys
import subprocess
import os
import shutil
import argparse
from typing import List, Set, Dict, Tuple
from utility import *
from model_generate import *
def show_help() -> None:
msg=(
"Usage: \n"
" python main.py [arguments] \n\n"
"Arguments: \n"
" --model=path/to/model.pml Promela model to generate attackers on\n"
" --attacker=[replay,drop,reorder] \n"
" --chan=[chan1, chan2:int, ...] Channels to synthesize attackers on. When specifying over ranges of\n"
" channels, you can give ranges or a list of values\n"
" --nocheck Don't check channel validity\n"
# " --nchan=[nat, nat, ...] If the channel is a set of channels, how many attackers to synthesize?\n"
" --mem=[num] Size of memory. Defaults to '3' \n"
" --mem=unbounded Use the unbounded memory gadget version (not recommended)\n"
" --output=path/to/file.pml Outputted file name\n"
" --eval Evaluate the outputted file with Spin\n"
" --cleanup Clean up the extra files spin creates, including Korg's \n"
)
print(msg)
# assert "syntax error" not in stdout, "there seems to be a syntax error in the model"
# assert "processes created" in stdout, "the spin model creates no processes ... check to see if it compiles"
def main() -> None:
args = sys.argv[1:]
if len(args) == 0 or args[0] in ["help", "--help", "-h", "-help"]:
show_help()
sys.exit()
mem = 3 # default
for arg in args:
if arg.startswith("--model="):
model_path = arg.split("=", 1)[1]
elif arg.startswith("--attacker="):
attacker = arg.split("=", 1)[1]
elif arg.startswith("--mem="):
mem_read = arg.split("=", 1)[1]
elif arg.startswith("--chan="):
chans = arg.split("=", 1)[1]
elif arg.startswith("--output="):
out_file = arg.split("=", 1)[1]
if "--eval" in args and not "--output" in args:
out_file = "korg-promela-out.pml"
if not model_path or not attacker or not mem or not chans or not out_file:
print("error: all arguments are required. \n")
show_help()
sys.exit(1)
unbounded = mem_read == "unbounded"
if not unbounded : mem = int(mem_read)
ensure_compile(model_path)
model = fileRead(model_path)
channels = parse_channels(fileReadLines(model_path))
mchannels, mchannel_len = parse_mchannels(fileReadLines(model_path))
model_with_attacker = str;
assert mem >= 0, "memory value must be positive"
def _parse_channel_selection(
chans_str: str,
mchannels: Dict[str, List[str]],
mchannel_len: Dict[str, int],
channels: Dict[str, List[str]]
) -> Set[str]:
"""
Parses the user's channel selection string (e.g., "ch1,ch2:1-3,ch3:5").
Raises:
ValueError: If the format is invalid or ranges are out of bounds.
"""
chans_togen = set()
# first, process the input
mc = chans.split(",")
mc = chans_str.split(",")
for chan in mc:
if ":" in chan:
name, num_extr = chan[:chan.index(":")], chan[chan.index(":")+1:]
if "-" in num_extr:
a, b = list(map(lambda a: int(a), num_extr.split("-")))
assert a < b
assert a >= 0
assert b < mchannel_len[name]
for i in range(a,b+1):
chan_name = str(name) + "[" + str(i) + "]"
try:
name, num_extr = chan.split(":", 1)
if name not in mchannel_len:
raise ValueError(f"unknown multi-channel '{name}'")
max_len = mchannel_len[name]
if "-" in num_extr:
a_str, b_str = num_extr.split("-", 1)
a, b = int(a_str), int(b_str)
if not (0 <= a < b < max_len):
raise ValueError(f"invalid range '{a}-{b}' for {name} (max is {max_len-1})")
for i in range(a, b + 1):
chan_name = f"{name}[{i}]"
chans_togen.add(chan_name)
channels[chan_name] = mchannels[name]
else:
a = int(num_extr)
if not (0 <= a < max_len):
raise ValueError(f"invalid index '{a}' for {name} (max is {max_len-1})")
chan_name = f"{name}[{a}]"
chans_togen.add(chan_name)
channels[chan_name] = mchannels[name]
else:
a = int(num_extr)
assert a >= 0
assert a < mchannel_len[name]
chan_name = str(name) + "[" + str(a) + "]"
chans_togen.add(chan_name)
channels[chan_name] = mchannels[name]
except (ValueError, TypeError) as e:
raise ValueError(f"invalid channel format: '{chan}'. {e}")
else : chans_togen.add(chan)
else:
chans_togen.add(chan)
return chans_togen
print(chans_togen)
for i in range(len(chans_togen)):
chan = list(chans_togen)[i]
if not "--nocheck" : assert chan in channels, "can't find "+str(chan)+" in model"
match attacker:
case "replay":
if unbounded : attacker_gadget = gen_replay_unbounded(chan, channels[chan], i)
else : attacker_gadget = gen_replay(chan, channels[chan], mem, i)
case "drop":
if unbounded : attacker_gadget = gen_drop_unbounded(chan, channels[chan], i)
else : attacker_gadget = gen_drop(chan, channels[chan], mem, i)
case "reorder":
if unbounded : attacker_gadget = gen_reorder_unbounded(chan, channels[chan], i)
else : attacker_gadget = gen_reorder(chan, channels[chan], mem, i)
case _:
print("error: inputted an invalid attacker model. \n")
sys.exit(1)
if model.rindex("};") >= model.rindex("}"):
model = model[:model.rindex("};")+2] + "\n\n" + attacker_gadget + "\n" + model[model.rindex("};")+2:]
def _inject_attacker_code(model_content: str, attacker_gadget: str) -> str:
"""
Injects the generated attacker gadget code into the Promela model string.
Note: This injection method is fragile as it relies on finding the
last '};' or '}' in the file.
"""
try:
# Original logic: find the last '};' or '}'
if model_content.rindex("};") >= model_content.rindex("}"):
idx = model_content.rindex("};") + 2
return model_content[:idx] + "\n\n" + attacker_gadget + "\n" + model_content[idx:]
else:
model = model[:model.rindex("}")+1] + "\n\n" + attacker_gadget + "\n" + model[model.rindex("}")+1:]
idx = model_content.rindex("}") + 1
return model_content[:idx] + "\n\n" + attacker_gadget + "\n" + model_content[idx:]
except ValueError:
# Fallback: append to the end if no '}' or '};' is found
return model_content + "\n\n" + attacker_gadget
# Write the modified model to the output file
with open(out_file, 'w') as file:
file.write(model)
if "--eval" in args:
print()
print("generated Promela file with attacker model gadget... now running SPIN on "+str(out_file) + "!\n")
def parse_args() -> argparse.Namespace:
"""Configures and parses command-line arguments."""
parser = argparse.ArgumentParser(
description="Synthesize attacker gadgets for Promela models."
)
parser.add_argument(
"--model",
type=str,
required=True,
help="Promela model to generate attackers on"
)
parser.add_argument(
"--attacker",
type=str,
required=True,
choices=["replay", "drop", "reorder"],
help="The type of attacker to synthesize"
)
parser.add_argument(
"--chan",
type=str,
required=True,
help="Channels to synthesize attackers on. (e.g., 'ch1,ch2:0-2,ch3:4')"
)
parser.add_argument(
"--mem",
type=str,
default="3",
help="Size of memory. Use 'unbounded' for unbounded memory. (default: 3)"
)
parser.add_argument(
"--output",
type=str,
help="Output file name. (default: korg-promela-out.pml if --eval is used)"
)
parser.add_argument(
"--nocheck",
action="store_true",
help="Don't check channel validity"
)
parser.add_argument(
"--eval",
action="store_true",
help="Evaluate the outputted file with Spin"
)
parser.add_argument(
"--cleanup",
action="store_true",
help="Clean up the extra files spin creates, including Korg's"
)
return parser.parse_args()
def main() -> None:
args = parse_args()
# --- 1. Argument Validation ---
# Handle output file logic
out_file = args.output
if args.eval and not args.output:
out_file = "korg-promela-out.pml"
elif not args.output:
print("error: --output=path/to/file.pml is required when --eval is not used.", file=sys.stderr)
sys.exit(1)
# Handle memory logic
unbounded = (args.mem == "unbounded")
mem = 0
if not unbounded:
try:
mem = int(args.mem)
if mem < 0:
raise ValueError("memory value must be positive")
except ValueError as e:
print(f"error: invalid --mem value. {e}", file=sys.stderr)
sys.exit(1)
# --- 2. Model Reading and Compilation Check ---
# Check for file existence and read
model_lines = fileReadLines(args.model)
model_content = fileRead(args.model)
if not model_content: # fileRead returns "" on error
print(f"error: could not read model file at '{args.model}'", file=sys.stderr)
sys.exit(1)
# Ensure model compiles before proceeding
try:
ensure_compile(args.model)
except AssertionError as e:
print(f"error: Model compilation failed. {e}", file=sys.stderr)
sys.exit(1)
# --- 3. Model Parsing ---
channels = parse_channels(model_lines)
mchannels, mchannel_len = parse_mchannels(model_lines)
try:
chans_togen = _parse_channel_selection(
args.chan, mchannels, mchannel_len, channels
)
except ValueError as e:
print(f"error: Failed to parse --chan argument. {e}", file=sys.stderr)
sys.exit(1)
print(f"Generating attackers for: {chans_togen}")
# --- 4. Attacker Generation ---
for i, chan in enumerate(list(chans_togen)):
if not args.nocheck and chan not in channels:
print(f"error: can't find channel '{chan}' in model. (Use --nocheck to override)", file=sys.stderr)
sys.exit(1)
attacker_gadget = ""
chan_type = channels.get(chan, []) # Default to empty list if not found (and --nocheck)
match args.attacker:
case "replay":
attacker_gadget = gen_replay_unbounded(chan, chan_type, i) if unbounded \
else gen_replay(chan, chan_type, mem, i)
case "drop":
attacker_gadget = gen_drop_unbounded(chan, chan_type, i) if unbounded \
else gen_drop(chan, chan_type, mem, i)
case "reorder":
attacker_gadget = gen_reorder_unbounded(chan, chan_type, i) if unbounded \
else gen_reorder(chan, chan_type, mem, i)
case _:
# This should be unreachable due to argparse `choices`
print(f"error: invalid attacker model '{args.attacker}'.", file=sys.stderr)
sys.exit(1)
# Inject the generated gadget into the model
model_content = _inject_attacker_code(model_content, attacker_gadget)
# --- 5. Write Output ---
try:
with open(out_file, 'w') as file:
file.write(model_content)
except IOError as e:
print(f"error: could not write to output file {out_file}. {e}", file=sys.stderr)
sys.exit(1)
# --- 6. Evaluate and Cleanup ---
if args.eval:
print(f"\nGenerated Promela file... now running SPIN on {out_file}!\n")
eval_model(out_file)
if "--cleanup" in args:
if args.cleanup:
print("\nCleaning up Spin files...")
cleanup_spin_files()
try:
@@ -142,5 +247,5 @@ def main() -> None:
pass
if __name__== "__main__":
if __name__ == "__main__":
main()

View File

@@ -1,6 +1,72 @@
import sys, re, subprocess, os, shutil
from typing import List
# def gen_replay_old(chan : str, chan_type : List[str], mem : int, index : int) -> str:
# ret_string = ""
# ret_string+= "chan attacker_mem_"+str(index)+" = ["+str(mem)+"] of " + ("{ " + str(chan_type)[1:-1] + " }") .replace("'","") + ";\n"
# ret_string+= "\n"
# ret_string+= "active proctype attacker_replay_"+str(index)+"() {\n"
# item_arr = []
# item_count = 0
# # formulate string of general message input variables
# for item in chan_type:
# item_arr.append("b_" + str(item_count))
# ret_string+= str(item) + " " + item_arr[item_count] + ";\n"
# item_count+=1
# fs = (str([item for item in item_arr])[1:-1]).replace("'","")
# ret_string+="int i = "+str(mem)+";\n"
# ret_string+="int b;\n"
# ret_string+="CONSUME:\n"
# ret_string+=" do\n"
# ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
# ret_string+=" "+str(chan)+" ? <"+fs+"> -> attacker_mem_"+str(index)+" ! "+fs+";\n"
# ret_string+=" i--;\n"
# ret_string+=" if\n"
# ret_string+=" :: i == 0 -> goto REPLAY;\n"
# ret_string+=" :: i != 0 -> goto CONSUME;\n"
# ret_string+=" fi\n"
# ret_string+=" }\n"
# ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
# ret_string+=" b = len("+str(chan)+");\n"
# ret_string+=" do\n"
# ret_string+=" :: b != len("+str(chan)+") -> goto CONSUME;\n"
# ret_string+=" od\n"
# ret_string+=" }\n"
# ret_string+=" od\n"
# ret_string+="REPLAY:\n"
# ret_string+=" do\n"
# ret_string+=" :: atomic {\n"
# ret_string+=" int am;\n"
# ret_string+=" select(am : 0 .. len(attacker_mem_"+str(index)+")-1);\n"
# ret_string+=" do\n"
# ret_string+=" :: am != 0 ->\n"
# ret_string+=" am = am-1;\n"
# ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> attacker_mem_"+str(index)+" ! "+fs+";\n"
# ret_string+=" :: am == 0 ->\n"
# ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> "+str(chan)+" ! "+fs+";\n"
# ret_string+=" break;\n"
# ret_string+=" od\n"
# ret_string+=" }\n"
# ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
# ret_string+=" b = len("+str(chan)+");\n"
# ret_string+=" do\n"
# ret_string+=" :: b != len("+str(chan)+") -> goto REPLAY;\n"
# ret_string+=" od\n"
# ret_string+=" }\n"
# ret_string+=" :: atomic {attacker_mem_"+str(index)+" ? "+fs+"; }\n"
# ret_string+=" :: empty(attacker_mem_"+str(index)+") -> goto BREAK;\n"
# ret_string+=" od\n"
# ret_string+="BREAK:\n"
# ret_string+="}\n"
# return ret_string
def gen_replay(chan : str, chan_type : List[str], mem : int, index : int) -> str:
ret_string = ""
@@ -29,10 +95,15 @@ def gen_replay(chan : str, chan_type : List[str], mem : int, index : int) -> str
ret_string+=" i--;\n"
ret_string+=" if\n"
ret_string+=" :: i == 0 -> goto REPLAY;\n"
ret_string+=" :: i != 0 -> goto CONSUME;\n"
ret_string+=" :: i != 0 -> {\n"
ret_string+=" do\n"
ret_string+=" :: goto CONSUME\n"
ret_string+=" :: goto REPLAY\n"
ret_string+=" od\n"
ret_string+=" }\n"
ret_string+=" fi\n"
ret_string+=" }\n"
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> {\n"
ret_string+=" b = len("+str(chan)+");\n"
ret_string+=" do\n"
ret_string+=" :: b != len("+str(chan)+") -> goto CONSUME;\n"
@@ -49,16 +120,25 @@ def gen_replay(chan : str, chan_type : List[str], mem : int, index : int) -> str
ret_string+=" am = am-1;\n"
ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> attacker_mem_"+str(index)+" ! "+fs+";\n"
ret_string+=" :: am == 0 ->\n"
ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> "+str(chan)+" ! "+fs+";\n"
ret_string+=" do\n"
ret_string+=" :: attacker_mem_"+str(index)+" ? ["+fs+"] -> "+str(chan)+" ! "+fs+"; break;\n"
ret_string+=" :: attacker_mem_"+str(index)+" ? "+fs+" -> "+str(chan)+" ! "+fs+"; break;\n"
ret_string+=" od\n"
ret_string+=" break;\n"
ret_string+=" od\n"
ret_string+=" }\n"
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> {\n"
ret_string+=" b = len("+str(chan)+");\n"
ret_string+=" do\n"
ret_string+=" :: b != len("+str(chan)+") -> goto REPLAY;\n"
ret_string+=" od\n"
ret_string+=" }\n"
ret_string+=" :: i != 0 -> {\n"
ret_string+=" b = len("+str(chan)+");\n"
ret_string+=" do\n"
ret_string+=" :: b != len("+str(chan)+") -> goto CONSUME;\n"
ret_string+=" od\n"
ret_string+=" }\n"
ret_string+=" :: atomic {attacker_mem_"+str(index)+" ? "+fs+"; }\n"
ret_string+=" :: empty(attacker_mem_"+str(index)+") -> goto BREAK;\n"
ret_string+=" od\n"
@@ -67,7 +147,7 @@ def gen_replay(chan : str, chan_type : List[str], mem : int, index : int) -> str
return ret_string
def gen_replay_unbounded(chan : str, chan_type : List[str], index : int) -> str:
def gen_replay_unbounded(chan : str, chan_type : List[str], mem : int, index : int) -> str:
ret_string = ""
ret_string+= "chan attacker_mem_"+str(index)+" = [99] of " + ("{ " + str(chan_type)[1:-1] + " }") .replace("'","") + ";\n"
@@ -92,11 +172,10 @@ def gen_replay_unbounded(chan : str, chan_type : List[str], index : int) -> str:
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
ret_string+=" "+str(chan)+" ? <"+fs+"> -> attacker_mem_"+str(index)+" ! "+fs+";\n"
ret_string+=" do\n"
ret_string+=" :: goto REPLAY;\n"
ret_string+=" :: goto CONSUME;\n"
ret_string+=" :: goto CONSUME\n"
ret_string+=" :: goto REPLAY\n"
ret_string+=" od\n"
ret_string+=" }\n"
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> {\n"
ret_string+=" b = len("+str(chan)+");\n"
ret_string+=" do\n"
ret_string+=" :: b != len("+str(chan)+") -> goto CONSUME;\n"
@@ -113,16 +192,25 @@ def gen_replay_unbounded(chan : str, chan_type : List[str], index : int) -> str:
ret_string+=" am = am-1;\n"
ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> attacker_mem_"+str(index)+" ! "+fs+";\n"
ret_string+=" :: am == 0 ->\n"
ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> "+str(chan)+" ! "+fs+";\n"
ret_string+=" do\n"
ret_string+=" :: attacker_mem_"+str(index)+" ? ["+fs+"] -> "+str(chan)+" ! "+fs+"; break;\n"
ret_string+=" :: attacker_mem_"+str(index)+" ? "+fs+" -> "+str(chan)+" ! "+fs+"; break;\n"
ret_string+=" od\n"
ret_string+=" break;\n"
ret_string+=" od\n"
ret_string+=" }\n"
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> {\n"
ret_string+=" b = len("+str(chan)+");\n"
ret_string+=" do\n"
ret_string+=" :: b != len("+str(chan)+") -> goto REPLAY;\n"
ret_string+=" od\n"
ret_string+=" }\n"
ret_string+=" :: {\n"
ret_string+=" b = len("+str(chan)+");\n"
ret_string+=" do\n"
ret_string+=" :: b != len("+str(chan)+") -> goto CONSUME;\n"
ret_string+=" od\n"
ret_string+=" }\n"
ret_string+=" :: atomic {attacker_mem_"+str(index)+" ? "+fs+"; }\n"
ret_string+=" :: empty(attacker_mem_"+str(index)+") -> goto BREAK;\n"
ret_string+=" od\n"
@@ -131,6 +219,70 @@ def gen_replay_unbounded(chan : str, chan_type : List[str], index : int) -> str:
return ret_string
# def gen_replay_unbounded_old(chan : str, chan_type : List[str], index : int) -> str:
# ret_string = ""
# ret_string+= "chan attacker_mem_"+str(index)+" = [99] of " + ("{ " + str(chan_type)[1:-1] + " }") .replace("'","") + ";\n"
# ret_string+= "\n"
# ret_string+= "active proctype attacker_replay_"+str(index)+"() {\n"
# item_arr = []
# item_count = 0
# # formulate string of general message input variables
# for item in chan_type:
# item_arr.append("b_" + str(item_count))
# ret_string+= str(item) + " " + item_arr[item_count] + ";\n"
# item_count+=1
# fs = (str([item for item in item_arr])[1:-1]).replace("'","")
# ret_string+="int b;\n"
# ret_string+="CONSUME:\n"
# ret_string+=" do\n"
# ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
# ret_string+=" "+str(chan)+" ? <"+fs+"> -> attacker_mem_"+str(index)+" ! "+fs+";\n"
# ret_string+=" do\n"
# ret_string+=" :: goto REPLAY;\n"
# ret_string+=" :: goto CONSUME;\n"
# ret_string+=" od\n"
# ret_string+=" }\n"
# ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
# ret_string+=" b = len("+str(chan)+");\n"
# ret_string+=" do\n"
# ret_string+=" :: b != len("+str(chan)+") -> goto CONSUME;\n"
# ret_string+=" od\n"
# ret_string+=" }\n"
# ret_string+=" od\n"
# ret_string+="REPLAY:\n"
# ret_string+=" do\n"
# ret_string+=" :: atomic {\n"
# ret_string+=" int am;\n"
# ret_string+=" select(am : 0 .. len(attacker_mem_"+str(index)+")-1);\n"
# ret_string+=" do\n"
# ret_string+=" :: am != 0 ->\n"
# ret_string+=" am = am-1;\n"
# ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> attacker_mem_"+str(index)+" ! "+fs+";\n"
# ret_string+=" :: am == 0 ->\n"
# ret_string+=" attacker_mem_"+str(index)+" ? "+fs+" -> "+str(chan)+" ! "+fs+";\n"
# ret_string+=" break;\n"
# ret_string+=" od\n"
# ret_string+=" }\n"
# ret_string+=" :: "+str(chan)+" ? ["+str(fs)+"] -> atomic {\n"
# ret_string+=" b = len("+str(chan)+");\n"
# ret_string+=" do\n"
# ret_string+=" :: b != len("+str(chan)+") -> goto REPLAY;\n"
# ret_string+=" od\n"
# ret_string+=" }\n"
# ret_string+=" :: atomic {attacker_mem_"+str(index)+" ? "+fs+"; }\n"
# ret_string+=" :: empty(attacker_mem_"+str(index)+") -> goto BREAK;\n"
# ret_string+=" od\n"
# ret_string+="BREAK:\n"
# ret_string+="}\n"
# return ret_string
def gen_reorder(chan : str, chan_type : List[str], mem : int, index : int) -> str:
ret_string = ""