-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathBaseModule.py
More file actions
466 lines (383 loc) · 14.8 KB
/
BaseModule.py
File metadata and controls
466 lines (383 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
"""
Base Module class for ExploitDF Framework
This module provides the base functionality that all auxiliary and scanner
modules should inherit from, including variable handling, option management,
and common functionality.
"""
import sys
import os
import threading
import time
import socket
from abc import ABC, abstractmethod
from datetime import datetime
class BaseModule(ABC):
"""
Base class for all ExploitDF modules (auxiliary, scanners, etc.)
Provides common functionality for variable handling, options, and execution
"""
def __init__(self):
# Module information should be overridden by subclasses
self.info = {
'name': 'Base Module',
'description': 'Base module class',
'author': 'ExploitDF Team',
'version': '1.0',
'references': [],
'rank': 'Normal',
'targets': [],
'platform': ['Windows', 'Linux', 'macOS'],
'type': 'auxiliary'
}
# Module options - configurable parameters
self.options = {}
# Advanced options - typically not shown unless requested
self.advanced_options = {}
# Required options that must be set before running
self.required_options = set()
# Module state
self.running = False
self.session_id = None
# Default options all modules should have
self._setup_default_options()
def _setup_default_options(self):
"""Setup default options that all modules should have"""
self.options.update({
'VERBOSE': False,
'WORKSPACE': '',
'ShowProgress': True,
'ShowProgressPercent': 10,
'THREADS': 1
})
self.advanced_options.update({
'CPORT': 0,
'CHOST': '',
'ConnectTimeout': 10,
'SSL': False,
'SSLVersion': 'Auto'
})
def set_option(self, name, value):
"""
Set an option value with type conversion
Args:
name (str): Option name
value (str): Option value as string
"""
if name in self.options:
# Convert value to appropriate type
current_value = self.options[name]
if isinstance(current_value, bool):
self.options[name] = value.lower() in ('true', '1', 'yes', 'on')
elif isinstance(current_value, int):
try:
self.options[name] = int(value)
except ValueError:
print(f"Invalid integer value for {name}: {value}")
return False
elif isinstance(current_value, float):
try:
self.options[name] = float(value)
except ValueError:
print(f"Invalid float value for {name}: {value}")
return False
else:
self.options[name] = value
return True
elif name in self.advanced_options:
# Handle advanced options similarly
current_value = self.advanced_options[name]
if isinstance(current_value, bool):
self.advanced_options[name] = value.lower() in ('true', '1', 'yes', 'on')
elif isinstance(current_value, int):
try:
self.advanced_options[name] = int(value)
except ValueError:
print(f"Invalid integer value for {name}: {value}")
return False
else:
self.advanced_options[name] = value
return True
else:
print(f"Unknown option: {name}")
return False
def get_option(self, name):
"""
Get an option value
Args:
name (str): Option name
Returns:
Option value or None if not found
"""
if name in self.options:
return self.options[name]
elif name in self.advanced_options:
return self.advanced_options[name]
return None
def show_info(self):
"""Display module information"""
print(f"""
Name: {self.info['name']}
Module: {self.info.get('module_path', 'N/A')}
Platform: {', '.join(self.info['platform'])}
Rank: {self.info['rank']}
Disclosed: {self.info.get('disclosure_date', 'N/A')}
Provided by:
{self.info['author']}
Available targets:
""")
if self.info['targets']:
for i, target in enumerate(self.info['targets']):
print(f" {i} {target}")
else:
print(" No specific targets")
print(f"\nBasic options:\n")
self.show_options()
print(f"\nDescription:")
print(f" {self.info['description']}")
if self.info['references']:
print(f"\nReferences:")
for ref in self.info['references']:
print(f" {ref}")
def show_options(self):
"""Display module options in a formatted table"""
if not self.options and not self.advanced_options:
print(" No options available")
return
print(f"{'Name':<20} {'Current Setting':<20} {'Required':<10} {'Description':<30}")
print("-" * 80)
# Show regular options
for name, value in self.options.items():
required = "yes" if name in self.required_options else "no"
description = self._get_option_description(name)
print(f"{name:<20} {str(value):<20} {required:<10} {description:<30}")
def show_advanced_options(self):
"""Display advanced module options"""
if not self.advanced_options:
print(" No advanced options available")
return
print(f"{'Name':<20} {'Current Setting':<20} {'Required':<10} {'Description':<30}")
print("-" * 80)
for name, value in self.advanced_options.items():
required = "yes" if name in self.required_options else "no"
description = self._get_option_description(name)
print(f"{name:<20} {str(value):<20} {required:<10} {description:<30}")
def _get_option_description(self, name):
"""Get description for an option"""
descriptions = {
'VERBOSE': 'Enable verbose output',
'WORKSPACE': 'Specify workspace',
'ShowProgress': 'Display progress',
'ShowProgressPercent': 'Progress update percentage',
'THREADS': 'Number of threads',
'CPORT': 'Connection port',
'CHOST': 'Connection host',
'ConnectTimeout': 'Connection timeout',
'SSL': 'Use SSL/TLS',
'SSLVersion': 'SSL/TLS version',
'RHOSTS': 'Target host(s)',
'RPORT': 'Target port',
'LHOST': 'Local host',
'LPORT': 'Local port'
}
return descriptions.get(name, 'No description available')
def check_requirements(self):
"""
Check if all required options are set
Returns:
tuple: (bool, list) - (all_set, missing_options)
"""
missing = []
for option in self.required_options:
value = self.get_option(option)
if value is None or value == '' or value == 0:
missing.append(option)
return len(missing) == 0, missing
def print_status(self, message):
"""Print a status message with timestamp"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [*] {message}")
def print_good(self, message):
"""Print a success message"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [+] {message}")
def print_error(self, message):
"""Print an error message"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [-] {message}")
def print_warning(self, message):
"""Print a warning message"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [!] {message}")
def vprint(self, message):
"""Print verbose message if VERBOSE is enabled"""
if self.get_option('VERBOSE'):
self.print_status(f"VERBOSE: {message}")
def progress_update(self, current, total, message=""):
"""Update progress if ShowProgress is enabled"""
if not self.get_option('ShowProgress'):
return
percent = int((current / total) * 100) if total > 0 else 0
update_threshold = self.get_option('ShowProgressPercent')
if percent % update_threshold == 0 or current == total:
status_msg = f"Progress: {current}/{total} ({percent}%)"
if message:
status_msg += f" - {message}"
self.print_status(status_msg)
def create_socket(self, host=None, port=None, timeout=None):
"""
Create a socket connection with module options
Args:
host (str): Target host (uses RHOST if not provided)
port (int): Target port (uses RPORT if not provided)
timeout (int): Connection timeout (uses ConnectTimeout if not provided)
Returns:
socket.socket: Connected socket or None if failed
"""
host = host or self.get_option('RHOST')
port = port or self.get_option('RPORT')
timeout = timeout or self.get_option('ConnectTimeout')
if not host or not port:
self.print_error("Host and port must be specified")
return None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
self.vprint(f"Connecting to {host}:{port}")
sock.connect((host, port))
# Handle SSL if enabled
if self.get_option('SSL'):
import ssl
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = context.wrap_socket(sock, server_hostname=host)
self.vprint("SSL connection established")
return sock
except Exception as e:
self.print_error(f"Connection failed: {e}")
return None
def run_threaded(self, target_func, targets, max_threads=None):
"""
Run a function against multiple targets using threads
Args:
target_func: Function to run against each target
targets: List of targets
max_threads: Maximum number of threads (uses THREADS option if not provided)
"""
max_threads = max_threads or self.get_option('THREADS')
threads = []
# Create semaphore to limit concurrent threads
semaphore = threading.Semaphore(max_threads)
def worker(target):
with semaphore:
try:
target_func(target)
except Exception as e:
self.print_error(f"Thread error for {target}: {e}")
# Start threads
for target in targets:
thread = threading.Thread(target=worker, args=(target,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
def stop(self):
"""Stop the module execution"""
self.running = False
self.print_status("Module execution stopped")
def cleanup(self):
"""Clean up resources after module execution"""
pass
@abstractmethod
def run(self):
"""
Main module execution method - must be implemented by subclasses
"""
# Check requirements before running
requirements_met, missing = self.check_requirements()
if not requirements_met:
self.print_error(f"Missing required options: {', '.join(missing)}")
return False
# Set running state
self.running = True
# Module-specific execution logic should be implemented in subclasses
pass
class AuxiliaryModule(BaseModule):
"""
Base class specifically for auxiliary modules
Provides additional functionality common to auxiliary modules
"""
def __init__(self):
super().__init__()
self.info['type'] = 'auxiliary'
# Common auxiliary options
self.options.update({
'RHOSTS': '',
'RPORT': 80,
'THREADS': 1
})
# Common required options for auxiliary modules
self.required_options.add('RHOSTS')
class ScannerModule(AuxiliaryModule, BaseModule):
"""
Base class specifically for scanner modules
Inherits from AuxiliaryModule and adds scanning-specific functionality
"""
def __init__(self):
super().__init__()
self.info['type'] = 'scanner'
# Scanner-specific options
self.options.update({
'THREADS': 10, # Scanners typically use more threads
'ConnectTimeout': 5, # Shorter timeout for scanning
})
def parse_targets(self, targets_string):
"""
Parse target string into list of individual targets
Supports ranges, CIDR, comma-separated, etc.
Args:
targets_string (str): Target specification string
Returns:
list: List of individual target strings
"""
# Simple implementation - can be expanded for CIDR, ranges, etc.
if not targets_string:
return []
targets = []
for target in targets_string.split(','):
target = target.strip()
if target:
targets.append(target)
return targets
def scan_target(self, target):
"""
Scan a single target - should be overridden by specific scanner modules
Args:
target (str): Target to scan
"""
self.print_status(f"Scanning {target}")
def run(self):
"""Run the scanner against all specified targets"""
# Call parent run method for common checks
if super().run() == False:
return False
targets_string = self.get_option('RHOSTS')
targets = self.parse_targets(targets_string)
if not targets:
self.print_error("No targets specified in RHOSTS")
return False
self.print_status(f"Scanning {len(targets)} targets...")
# Use threading for multiple targets
if len(targets) > 1 and self.get_option('THREADS') > 1:
self.run_threaded(self.scan_target, targets)
else:
# Single-threaded execution
for i, target in enumerate(targets):
if not self.running:
break
self.progress_update(i + 1, len(targets), f"Scanning {target}")
self.scan_target(target)
self.print_good("Scan completed")
self.cleanup()
return True