209 lines
6.5 KiB
Python
209 lines
6.5 KiB
Python
import asyncio
|
|
import sys
|
|
import re
|
|
import zlib
|
|
import json
|
|
import csv
|
|
import requests
|
|
from mitmproxy import http, ctx
|
|
from mitmproxy import options
|
|
from mitmproxy.tools import dump
|
|
import sqlite3
|
|
from bs4 import BeautifulSoup
|
|
|
|
# Method 1: Load blocked words from a CSV file
|
|
def load_blocked_words_from_csv(file_path):
|
|
"""
|
|
Loads blocked words from a CSV file.
|
|
|
|
Args:
|
|
file_path (str): The path to the CSV file containing blocked words.
|
|
|
|
Returns:
|
|
list: A list of blocked words loaded from the CSV file.
|
|
"""
|
|
blocked_words = []
|
|
with open(file_path, newline='') as csvfile:
|
|
reader = csv.reader(csvfile)
|
|
for row in reader:
|
|
blocked_words.extend(row)
|
|
return blocked_words
|
|
|
|
# Method 2: Load blocked words from a web service
|
|
def load_blocked_words_from_web_service(url):
|
|
"""
|
|
Fetches blocked words from a web service.
|
|
|
|
Args:
|
|
url (str): The URL of the web service to fetch blocked words from.
|
|
|
|
Returns:
|
|
list: A list of blocked words.
|
|
"""
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status() # Raise an error for bad status codes
|
|
blocked_words = response.json() # Assuming the response is a JSON array of words
|
|
return blocked_words
|
|
except requests.RequestException as e:
|
|
ctx.log.error(f"Error fetching blocked words: {e}")
|
|
return []
|
|
|
|
# Load blocked words from the web service
|
|
blocked_words = ['fdhfghrth']#load_blocked_words_from_csv('Terms-to-Block.csv')
|
|
#blocked_words = load_blocked_words_from_web_service('https://example.com/api/blocked_words')
|
|
|
|
# Compile a regex pattern for blocked words
|
|
pattern = re.compile(r'\b(?:' + '|'.join(re.escape(word) for word in blocked_words) + r')\b', re.IGNORECASE)
|
|
|
|
def check_string_in_list(target_string, string_list):
|
|
"""
|
|
Checks if any string in a list is present in the target string.
|
|
|
|
Args:
|
|
target_string (str): The string to search within.
|
|
string_list (list): The list of strings to search for.
|
|
|
|
Returns:
|
|
bool: True if any string in the list is found in the target string, False otherwise.
|
|
"""
|
|
return any(item in target_string for item in string_list)
|
|
|
|
def search_blocked_words_in_json(data, pattern):
|
|
"""
|
|
Recursively searches for blocked words in a JSON object.
|
|
|
|
Args:
|
|
data (dict or list or str): The JSON data to search within.
|
|
pattern (re.Pattern): The compiled regex pattern for blocked words.
|
|
|
|
Returns:
|
|
list: A list of matches found in the JSON data.
|
|
"""
|
|
matches = []
|
|
if isinstance(data, dict):
|
|
for key, value in data.items():
|
|
matches.extend(search_blocked_words_in_json(value, pattern))
|
|
elif isinstance(data, list):
|
|
for item in data:
|
|
matches.extend(search_blocked_words_in_json(item, pattern))
|
|
elif isinstance(data, str):
|
|
matches.extend(pattern.findall(data))
|
|
return matches
|
|
|
|
class RequestLogger:
|
|
"""
|
|
A class to log and filter HTTP requests and responses.
|
|
"""
|
|
def __init__(self):
|
|
"""
|
|
Initializes the RequestLogger and loads the whitelist.
|
|
"""
|
|
self.whitelist = []
|
|
asyncio.create_task(self.load_whitelist_periodically())
|
|
|
|
async def load_whitelist(self):
|
|
"""
|
|
Loads the whitelist from the SQLite database.
|
|
|
|
Returns:
|
|
list: A list of whitelisted URLs.
|
|
"""
|
|
conn = sqlite3.connect('whitelist.db')
|
|
c = conn.cursor()
|
|
c.execute("SELECT url FROM whitelist WHERE status='approved'")
|
|
self.whitelist = [row[0] for row in c.fetchall()]
|
|
conn.close()
|
|
|
|
async def load_whitelist_periodically(self):
|
|
"""
|
|
Periodically loads the whitelist every 20 seconds.
|
|
"""
|
|
while True:
|
|
await self.load_whitelist()
|
|
await asyncio.sleep(20)
|
|
|
|
def request(self, flow: http.HTTPFlow):
|
|
"""
|
|
Handles HTTP requests. Blocks requests to non-whitelisted URLs.
|
|
|
|
Args:
|
|
flow (http.HTTPFlow): The HTTP flow object.
|
|
"""
|
|
if flow.request.pretty_host not in self.whitelist:
|
|
ctx.log.info('Blocked request to %s' % flow.request.host)
|
|
flow.response = http.Response.make(
|
|
403, # Forbidden status code
|
|
b"Blocked by URL whitelist", # Response content
|
|
{"Content-Type": "text/plain"} # Response headers
|
|
)
|
|
return
|
|
|
|
def response(self, flow: http.HTTPFlow):
|
|
"""
|
|
Handles HTTP responses. Searches for blocked words in the response content.
|
|
|
|
Args:
|
|
flow (http.HTTPFlow): The HTTP flow object.
|
|
"""
|
|
content_type = flow.response.headers.get("Content-Type", "")
|
|
content = flow.response.content
|
|
|
|
# Handle compressed content
|
|
if "gzip" in flow.response.headers.get("Content-Encoding", ""):
|
|
content = zlib.decompress(content, zlib.MAX_WBITS | 16)
|
|
|
|
if "deflate" in flow.response.headers.get("Content-Encoding", ""):
|
|
content = zlib.decompress(content)
|
|
|
|
# Decode content to text
|
|
if isinstance(content, bytes):
|
|
content = content.decode('utf-8', errors='ignore')
|
|
|
|
# Check if content is HTML
|
|
if "text/html" in content_type:
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
text = soup.get_text()
|
|
matches = pattern.findall(text)
|
|
elif "application/json" in content_type:
|
|
data = json.loads(content)
|
|
matches = search_blocked_words_in_json(data, pattern)
|
|
else:
|
|
matches = []
|
|
|
|
# Search for matches in the text content
|
|
if matches:
|
|
matched_keyword = matches[0]
|
|
flow.response.text = 'response contains filtered contents'
|
|
ctx.log.error(matched_keyword)
|
|
|
|
flow.response = http.Response.make(
|
|
200, # (optional) status code
|
|
b"Hello World", # (optional) content
|
|
{"Content-Type": "text/html"}, # (optional) headers
|
|
)
|
|
|
|
async def start_proxy(host, port):
|
|
"""
|
|
Starts the proxy server.
|
|
|
|
Args:
|
|
host (str): The host address to listen on.
|
|
port (int): The port to listen on.
|
|
"""
|
|
opts = options.Options(listen_host=host, listen_port=port)
|
|
|
|
master = dump.DumpMaster(
|
|
opts,
|
|
with_termlog=False,
|
|
with_dumper=False,
|
|
)
|
|
master.addons.add(RequestLogger())
|
|
|
|
await master.run()
|
|
return master
|
|
|
|
if __name__ == '__main__':
|
|
host = sys.argv[1]
|
|
port = int(sys.argv[2])
|
|
asyncio.run(start_proxy(host, port)) |