-
Notifications
You must be signed in to change notification settings - Fork 0
/
apriori_s.py
160 lines (139 loc) · 7.32 KB
/
apriori_s.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def add_len_of_sequence_to_dataset(dataset: []) -> ([], int):
"""To optimize performance and avoid checking of length of sequence every iteration, calculate the length once and
save it in the dataset (as element with index 2 in the tuple).
Additionally, this function returns the maximum value of sequence length.
:param dataset: initial dataset
:return: (dataset_with_lengths, longest_sequence_length)
"""
longest_sequence_length = 0
dataset_with_lengths = []
for sequence_tuple in dataset:
length_of_sequence = len(sequence_tuple[1])
dataset_with_lengths.append(sequence_tuple + (length_of_sequence,))
if length_of_sequence > longest_sequence_length:
longest_sequence_length = length_of_sequence
return dataset_with_lengths, longest_sequence_length
def get_frequent_elements(dataset: [], min_support: float, iteration: int) -> []:
"""Return list of elements, which in given iteration (under given list index in all sequences) occurred at least
min_support times in the whole dataset (all sequences).
:param dataset: dataset with sequence lengths
:param min_support: minimum support
:param iteration: starts from 1. Specifies which index from sequence list (sequence_tuple[1]) should be considered
:return: list of frequent elements
"""
elements_with_occurrences = {}
for sequence_tuple in dataset:
if sequence_tuple[2] >= iteration:
element = sequence_tuple[1][iteration - 1]
if element in elements_with_occurrences.keys():
elements_with_occurrences[element] += 1
else:
elements_with_occurrences[element] = 1
frequent_elements = []
for element, occurrences in elements_with_occurrences.items():
if occurrences >= min_support:
frequent_elements.append(element)
return frequent_elements
def get_candidate_sequences(current_frequent_elements: [], previous_frequent_sequences: []) -> []:
"""It takes each frequent sequence from frequent sequences from previous iteration, and appends to it each of
current frequent elements, to create candidate sequence (this way, candidate sequences' length is equal to
length of previous frequent sentence + 1).
Due to initial assumptions of the project, in the sequence there cannot be two the same elements in a row, that's
why we have to check if last element of previous frequent sequence is not equal to current frequent element.
:param current_frequent_elements: frequent elements from current iteration
:param previous_frequent_sequences: frequent sequences from previous iteration
:return: list of candidate sequences without support
"""
if previous_frequent_sequences:
candidate_sequences = []
for seq in previous_frequent_sequences:
for frequent_elem in current_frequent_elements:
if seq[-1] != frequent_elem:
candidate_sequences.append(seq + [frequent_elem])
return candidate_sequences
else: # it means this is first iteration, so candidate sequences are equal to current frequent elements
return [[element] for element in current_frequent_elements]
def get_candidate_sequences_with_support(dataset: [], candidate_sequences_without_sup: []) -> []:
"""
:param dataset: dataset with lengths of sequences
:param candidate_sequences_without_sup: cs generated by functionget_candidate_sequences()
:return: list of tuples, where index 0 of tuple is candidate sequence (list), and index 1 is number of occurrences
"""
candidate_sequences_with_support = []
for candidate_seq in candidate_sequences_without_sup:
occurrences = 0
for seq_tuple in dataset:
candidate_sequence_length = len(candidate_seq)
if seq_tuple[2] >= candidate_sequence_length: # if seq from dataset is at least as long as candidate seq
iterator = 0
for element in candidate_seq:
if element == seq_tuple[1][iterator]:
iterator += 1
else:
break
if iterator == candidate_sequence_length:
occurrences += 1
candidate_sequences_with_support.append((candidate_seq, occurrences))
return candidate_sequences_with_support
def get_frequent_sequences(candidate_sequences_with_support: [], min_support: float) -> []:
"""
:param candidate_sequences_with_support: cs generated by get_candidate_sequences_with_support()
:param min_support: minimum support (minimum number of occurrences, inclusive)
:return: list of sequences which occurred at least min_support times
"""
frequent_sequences = []
for cs in candidate_sequences_with_support:
if cs[1] >= min_support:
frequent_sequences.append(cs[0])
return frequent_sequences
def print_line_by_line(list_to_be_printed: []):
if len(list_to_be_printed):
for line in list_to_be_printed:
print(line)
else:
print("Empty")
def apriori_s(dataset: [], min_support: float):
print("Minimum support: ", min_support)
print("Original dataset:")
print_line_by_line(dataset)
dataset_with_lengths, maximum_sequence_length = add_len_of_sequence_to_dataset(dataset)
print("\nDataset with added lengths of sequences (index number 2 in the tuple):")
print_line_by_line(dataset_with_lengths)
print("\n\n****** Starting AprioriS algorithm ******")
previous_frequent_sequences = None
for i in range(1, maximum_sequence_length + 1):
print("\n------- ITERATION {} -------".format(i))
frequent_elements = get_frequent_elements(dataset_with_lengths, min_support, i)
print("Frequent elements in iteration {}: {}".format(i, frequent_elements))
cs_without_support = get_candidate_sequences(frequent_elements, previous_frequent_sequences)
cs_with_support = get_candidate_sequences_with_support(dataset_with_lengths, cs_without_support)
print("Candidate sequences with support in iteration {}:".format(i))
print_line_by_line(cs_with_support)
frequent_sequences = get_frequent_sequences(cs_with_support, min_support)
previous_frequent_sequences = frequent_sequences
print("Frequent sequences in iteration {}:".format(i))
print_line_by_line(frequent_sequences)
if not len(frequent_sequences):
print("****** No frequent sequences in interation {}, ending the algorithm ******".format(i))
def main():
dataset = [
('1', ['p1', 'p2', 'p5']),
('2', ['p2', 'p1', 'p4']),
('3', ['p1', 'p2', 'p3']),
('4', ['p2', 'p3', 'p1']),
('5', ['p2', 'p1', 'p3', 'p4']),
('6', ['p3', 'p1', 'p2', 'p5']),
('7', ['p2', 'p1', 'p2', 'p3']),
('8', ['p3', 'p1', 'p3']),
('9', ['p2', 'p3', 'p2', 'p1']),
('10', ['p3', 'p1', 'p2', 'p4']),
('11', ['p3', 'p1', 'p2', 'p1', 'p3']),
('12', ['p3', 'p1', 'p2', 'p4', 'p5']),
('13', ['p1', 'p2', 'p1', 'p2', 'p5']),
('14', ['p3', 'p1', 'p4', 'p1']),
('15', ['p2', 'p1', 'p3', 'p4'])
]
min_support = 2
apriori_s(dataset, min_support)
if __name__ == '__main__':
main()