Python Code Snippets Review

Description
How to do source code review
Hash Length Extension Attack
import hashlib
from flask import Flask,redirect
from secrets import token_hex
secret = "[....]"
app = Flask(__name__)
def sign_for_payment(payment_information):
# compute signature to ensure the payment details
# cannot be tampered with
data = secret+payment_information
return hashlib.sha256(data.encode('utf-8')).hexdigest()
@app.route('/redirect_for_payment')
def redirect_for_payment():
tx_id = token_hex(16)
payment_info = "transaction_id="+tx_id+"&amount=20.00"
params =payment_info+"&sign="+sign_for_payment(payment_info)
return redirect("https://pentesterlab.com/payment?"+params, code=302)
The following code is vulnable to length extension attack because of line n.o 11
SSRF
import urllib
import os
from flask import Flask,redirect,request
from secrets import token_hex
app = Flask(__name__)
@app.route('/fetch')
def fetch():
url = request.args.get('url', '')
if url.startswith("https://pentesterlab.com"):
response = urllib.request.urlopen(url)
html = response.read()
return html
return ""
because we are using startswith
we can use https://pentesterlab.com@abdulhaq.me
or they can use https://pentesterlab.com.abdulhaq.me
to visit any website or can extract sensitive information from sending request to the internal services.
Insecure deserialization
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.cookies import SimpleCookie
import base64
import pickle
class MyServer(BaseHTTPRequestHandler):
def doGET(self):
cookies = SimpleCookie(self.headers.get('Cookie'))
if cookies.get('username'):
username=pickle.loads(base64.b64decode(cookies.get('username').value))
else:
username='stranger'
self.sendresponse(200)
self.sendheader("Content-type", "text/html")
self.endheaders()
self.wfile.write(bytes("<html><head><title>Hello</title></head>", "utf-8"))
self.wfile.write(bytes("<body>", "utf-8"))
self.wfile.write(bytes("<h1>Hello %s</h1>" % username, "utf-8"))
self.wfile.write(bytes("</body></html>", "utf-8"))
if name == "main":
webServer = HTTPServer(('0.0.0.0', 1337), MyServer)
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
print("Server stopped.")
its using pickel so we can make it load some malicious serialized object and get an RCE. Even the username isn’t escaped so there will be xss too.
Directory traversal
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.cookies import SimpleCookie
class MyServer(BaseHTTPRequestHandler):
def doGET(self):
cookies = SimpleCookie(self.headers.get('Cookie'))
if cookies.get('sessionid'):
username=open(cookies.get('sessionid').value).readlines()[0]
else:
username='stranger'
self.sendresponse(200)
self.sendheader("Content-type", "text/html")
self.endheaders()
self.wfile.write(bytes("<html><head><title>Hello</title></head>", "utf-8"))
self.wfile.write(bytes("<body>", "utf-8"))
self.wfile.write(bytes("<h1>Hello %s</h1>" % username, "utf-8"))
self.wfile.write(bytes("</body></html>", "utf-8"))
if name == "main":
webServer = HTTPServer(('0.0.0.0', 1337), MyServer)
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
print("Server stopped.")
We are getting cookie and trying to open a file and read the contents of it.
SQL Injection
def getpost(postid):
conn = getdbconnection()
numformat = re.compile(r'^\d+$', re.M)
if re.match(numformat,postid):
post = conn.execute('SELECT * FROM posts WHERE id = '+postid).fetchone()
conn.close()
if post is None:
abort(404)
return post
else:
abort(404)
here the regex is messed up we can do 12123\n UNION SELECT null, null
to get sql injection.
Regex filter bypass
from http.server import BaseHTTPRequestHandler, HTTPServer
import re
from os.path import exists
import os
class MyServer(BaseHTTPRequestHandler):
def do_GET(self):
path = os.getcwd()
pattern = r'/\.\.\/\.\.\/'
if re.match(pattern, self.path ):
self.send_response(404)
return
path += self.path
if path.endswith('/'):
path+='index.html'
print(path)
if exists(path):
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(open(path).read().encode('utf-8'))
else:
self.send_response(404)
if __name__ == "__main__":
webServer = HTTPServer(('0.0.0.0', 1337), MyServer)
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
webServer.server_close()
print("Server stopped.")
Directry traversal with a path like .././../
so its a filter bypass
Interseption
import socket
import ssl
hostname = 'pentesterlab.com'
context = ssl.createdefaultcontext()
context.check_hostname = False
with socket.createconnection((hostname, 443)) as sock:
with context.wrapsocket(sock, server_hostname=hostname) as ssock:
ssock.write("GET / HTTP/1.1\r\nHost: #{hostname}\r\n\r\n".encode('utf-8'))
print(ssock.read())
Because of context.check_hostname
is set to False
an attacker can intersept the traffic between the script and the server.
Padding oracle
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.cookies import SimpleCookie
import base64
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
KEY = Random.new().read(AES.block_size)
class MyServer(BaseHTTPRequestHandler):
def decrypt(self,cookie):
enc = base64.urlsafe_b64decode(cookie)
iv = enc[0:AES.block_size]
encd = enc[AES.block_size:]
aes = AES.new(KEY, AES.MODE_CBC, iv)
return unpad(aes.decrypt(encd), AES.block_size).decode('utf-8')
def do_GET(self):
cookies = SimpleCookie(self.headers.get('Cookie'))
if cookies.get('session_id'):
try:
username = self.decrypt(cookies.get('session_id').value)
except:
self.send_response(500)
return
else:
username='stranger'
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(bytes("Hello", "utf-8"))
self.wfile.write(bytes("", "utf-8"))
self.wfile.write(bytes("Hello %s" % username, "utf-8"))
self.wfile.write(bytes("", "utf-8"))
print(encrypt('admin'))
if __name__ == "__main__":
webServer = HTTPServer(('0.0.0.0', 1337), MyServer)
try:
webServer.serve_forever()
except KeyboardInterrupt:
pass
print("Server stopped.")
TODO
RCE
from flask import Flask, jsonify
import os
from os.path import isfile, join, exists
app = Flask(__name__)
@app.route('/files/<username>')
def list_files_for_user(username):
base = "files/"+username
if (exists(base) == False):
os.system("mkdir "+base)
return jsonify([f for f in os.listdir(base) if isfile(join(base, f))])
os.system is being passed directly to the os.system so this can lead to os command execution.