import asyncio import sys import re import zlib import json import csv import requests from mitmproxy import http, ctx from mitmproxy import options from mitmproxy.tools import dump import sqlite3 from bs4 import BeautifulSoup # Method 1: Load blocked words from a CSV file def load_blocked_words_from_csv(file_path): """ Loads blocked words from a CSV file. Args: file_path (str): The path to the CSV file containing blocked words. Returns: list: A list of blocked words loaded from the CSV file. """ blocked_words = [] with open(file_path, newline='') as csvfile: reader = csv.reader(csvfile) for row in reader: blocked_words.extend(row) return blocked_words # Method 2: Load blocked words from a web service def load_blocked_words_from_web_service(url): """ Fetches blocked words from a web service. Args: url (str): The URL of the web service to fetch blocked words from. Returns: list: A list of blocked words. """ try: response = requests.get(url) response.raise_for_status() # Raise an error for bad status codes blocked_words = response.json() # Assuming the response is a JSON array of words return blocked_words except requests.RequestException as e: ctx.log.error(f"Error fetching blocked words: {e}") return [] # Load blocked words from the web service blocked_words = ['fdhfghrth']#load_blocked_words_from_csv('Terms-to-Block.csv') #blocked_words = load_blocked_words_from_web_service('https://example.com/api/blocked_words') # Compile a regex pattern for blocked words pattern = re.compile(r'\b(?:' + '|'.join(re.escape(word) for word in blocked_words) + r')\b', re.IGNORECASE) def check_string_in_list(target_string, string_list): """ Checks if any string in a list is present in the target string. Args: target_string (str): The string to search within. string_list (list): The list of strings to search for. Returns: bool: True if any string in the list is found in the target string, False otherwise. """ return any(item in target_string for item in string_list) def search_blocked_words_in_json(data, pattern): """ Recursively searches for blocked words in a JSON object. Args: data (dict or list or str): The JSON data to search within. pattern (re.Pattern): The compiled regex pattern for blocked words. Returns: list: A list of matches found in the JSON data. """ matches = [] if isinstance(data, dict): for key, value in data.items(): matches.extend(search_blocked_words_in_json(value, pattern)) elif isinstance(data, list): for item in data: matches.extend(search_blocked_words_in_json(item, pattern)) elif isinstance(data, str): matches.extend(pattern.findall(data)) return matches class RequestLogger: """ A class to log and filter HTTP requests and responses. """ def __init__(self): """ Initializes the RequestLogger and loads the whitelist. """ self.whitelist = [] asyncio.create_task(self.load_whitelist_periodically()) async def load_whitelist(self): """ Loads the whitelist from the SQLite database. Returns: list: A list of whitelisted URLs. """ conn = sqlite3.connect('whitelist.db') c = conn.cursor() c.execute("SELECT url FROM whitelist WHERE status='approved'") self.whitelist = [row[0] for row in c.fetchall()] conn.close() async def load_whitelist_periodically(self): """ Periodically loads the whitelist every 20 seconds. """ while True: await self.load_whitelist() await asyncio.sleep(20) def request(self, flow: http.HTTPFlow): """ Handles HTTP requests. Blocks requests to non-whitelisted URLs. Args: flow (http.HTTPFlow): The HTTP flow object. """ if flow.request.pretty_host not in self.whitelist: ctx.log.info('Blocked request to %s' % flow.request.host) flow.response = http.Response.make( 403, # Forbidden status code b"Blocked by URL whitelist", # Response content {"Content-Type": "text/plain"} # Response headers ) return def response(self, flow: http.HTTPFlow): """ Handles HTTP responses. Searches for blocked words in the response content. Args: flow (http.HTTPFlow): The HTTP flow object. """ content_type = flow.response.headers.get("Content-Type", "") content = flow.response.content # Handle compressed content if "gzip" in flow.response.headers.get("Content-Encoding", ""): content = zlib.decompress(content, zlib.MAX_WBITS | 16) if "deflate" in flow.response.headers.get("Content-Encoding", ""): content = zlib.decompress(content) # Decode content to text if isinstance(content, bytes): content = content.decode('utf-8', errors='ignore') # Check if content is HTML if "text/html" in content_type: soup = BeautifulSoup(content, 'html.parser') text = soup.get_text() matches = pattern.findall(text) elif "application/json" in content_type: data = json.loads(content) matches = search_blocked_words_in_json(data, pattern) else: matches = [] # Search for matches in the text content if matches: matched_keyword = matches[0] flow.response.text = 'response contains filtered contents' ctx.log.error(matched_keyword) flow.response = http.Response.make( 200, # (optional) status code b"Hello World", # (optional) content {"Content-Type": "text/html"}, # (optional) headers ) async def start_proxy(host, port): """ Starts the proxy server. Args: host (str): The host address to listen on. port (int): The port to listen on. """ opts = options.Options(listen_host=host, listen_port=port) master = dump.DumpMaster( opts, with_termlog=False, with_dumper=False, ) master.addons.add(RequestLogger()) await master.run() return master if __name__ == '__main__': host = sys.argv[1] port = int(sys.argv[2]) asyncio.run(start_proxy(host, port))