Python Code Snippets Review

Last updated on

Description

How to do source code review

Hash Length Extension Attack

import hashlib
from flask import Flask,redirect
from secrets import token_hex
secret = "[....]"
app = Flask(__name__)

def sign_for_payment(payment_information):
  # compute signature to ensure the payment details
  # cannot be tampered with
  data = secret+payment_information
  return hashlib.sha256(data.encode('utf-8')).hexdigest()

  
@app.route('/redirect_for_payment')
def redirect_for_payment():
    tx_id = token_hex(16) 
    payment_info = "transaction_id="+tx_id+"&amount=20.00"
    params =payment_info+"&sign="+sign_for_payment(payment_info)
    return redirect("https://pentesterlab.com/payment?"+params, code=302)

The following code is vulnable to length extension attack because of line n.o 11

SSRF

import urllib
import os
from flask import Flask,redirect,request
from secrets import token_hex
app = Flask(__name__)

@app.route('/fetch')
def fetch():
    url = request.args.get('url', '')
    if url.startswith("https://pentesterlab.com"):
      response = urllib.request.urlopen(url)
      html = response.read()
      return html
    return ""

because we are using startswith we can use https://pentesterlab.com@abdulhaq.me or they can use https://pentesterlab.com.abdulhaq.me to visit any website or can extract sensitive information from sending request to the internal services.

Insecure deserialization

from http.server import BaseHTTPRequestHandler, HTTPServer
from http.cookies import SimpleCookie
import base64
import pickle

class MyServer(BaseHTTPRequestHandler):
    def doGET(self):
      cookies = SimpleCookie(self.headers.get('Cookie'))
      if cookies.get('username'):
        username=pickle.loads(base64.b64decode(cookies.get('username').value))
      else:
        username='stranger'
      self.sendresponse(200)
      self.sendheader("Content-type", "text/html")
      self.endheaders()
      self.wfile.write(bytes("<html><head><title>Hello</title></head>", "utf-8"))
      self.wfile.write(bytes("<body>", "utf-8"))
      self.wfile.write(bytes("<h1>Hello %s</h1>" % username, "utf-8"))
      self.wfile.write(bytes("</body></html>", "utf-8"))

if name == "main":
    webServer = HTTPServer(('0.0.0.0', 1337), MyServer)

try:
    webServer.serve_forever()
except KeyboardInterrupt:
    pass

webServer.server_close()
print("Server stopped.")

its using pickel so we can make it load some malicious serialized object and get an RCE. Even the username isn’t escaped so there will be xss too.

Directory traversal

from http.server import BaseHTTPRequestHandler, HTTPServer
from http.cookies import SimpleCookie

class MyServer(BaseHTTPRequestHandler):
    def doGET(self):
      cookies = SimpleCookie(self.headers.get('Cookie'))
      if cookies.get('sessionid'):
        username=open(cookies.get('sessionid').value).readlines()[0] 
      else:
        username='stranger'
      self.sendresponse(200)
      self.sendheader("Content-type", "text/html")
      self.endheaders()
      self.wfile.write(bytes("<html><head><title>Hello</title></head>", "utf-8"))
      self.wfile.write(bytes("<body>", "utf-8"))
      self.wfile.write(bytes("<h1>Hello %s</h1>" % username, "utf-8"))
      self.wfile.write(bytes("</body></html>", "utf-8"))

if name == "main":
    webServer = HTTPServer(('0.0.0.0', 1337), MyServer)

try:
    webServer.serve_forever()
except KeyboardInterrupt:
    pass

webServer.server_close()
print("Server stopped.")

We are getting cookie and trying to open a file and read the contents of it.

SQL Injection

def getpost(postid):
    conn = getdbconnection()
    numformat = re.compile(r'^\d+$', re.M)
    if re.match(numformat,postid):
      post = conn.execute('SELECT * FROM posts WHERE id = '+postid).fetchone()
      conn.close()
      if post is None:
        abort(404)
      return post
    else:
      abort(404)

here the regex is messed up we can do 12123\n UNION SELECT null, null to get sql injection.

Regex filter bypass


from http.server import BaseHTTPRequestHandler, HTTPServer
import re 
from os.path import exists
import os

class MyServer(BaseHTTPRequestHandler):
    def do_GET(self):
      path = os.getcwd()
      pattern = r'/\.\.\/\.\.\/'
      if re.match(pattern, self.path ):
        self.send_response(404)
        return
      path += self.path
      if path.endswith('/'):
        path+='index.html'
      print(path)
      if exists(path):
        self.send_response(200)
        self.send_header("Content-type", "text/html")
        self.end_headers()
        self.wfile.write(open(path).read().encode('utf-8'))
      else:
        self.send_response(404)
          
if __name__ == "__main__":
    webServer = HTTPServer(('0.0.0.0', 1337), MyServer)
    
    try:
        webServer.serve_forever()
    except KeyboardInterrupt:
        pass
        
    webServer.server_close()
    print("Server stopped.")

Directry traversal with a path like .././../ so its a filter bypass

Interseption

import socket
import ssl

hostname = 'pentesterlab.com'
context = ssl.createdefaultcontext()
context.check_hostname = False

with socket.createconnection((hostname, 443)) as sock:
    with context.wrapsocket(sock, server_hostname=hostname) as ssock:
        ssock.write("GET / HTTP/1.1\r\nHost: #{hostname}\r\n\r\n".encode('utf-8'))
        print(ssock.read())

Because of context.check_hostname is set to False an attacker can intersept the traffic between the script and the server.

Padding oracle

from http.server import BaseHTTPRequestHandler, HTTPServer
from http.cookies import SimpleCookie
import base64

from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad

KEY = Random.new().read(AES.block_size)

class MyServer(BaseHTTPRequestHandler):

    def decrypt(self,cookie):
      enc = base64.urlsafe_b64decode(cookie)
      iv = enc[0:AES.block_size]
      encd = enc[AES.block_size:]
      aes = AES.new(KEY, AES.MODE_CBC, iv)
      return unpad(aes.decrypt(encd), AES.block_size).decode('utf-8')

    def do_GET(self):
      cookies = SimpleCookie(self.headers.get('Cookie'))
      if cookies.get('session_id'):
        try:
          username = self.decrypt(cookies.get('session_id').value)
        except:
          self.send_response(500)
          return
      else:
        username='stranger'
      self.send_response(200)
      self.send_header("Content-type", "text/html")
      self.end_headers()
      self.wfile.write(bytes("Hello", "utf-8"))
      self.wfile.write(bytes("", "utf-8"))
      self.wfile.write(bytes("Hello %s" % username, "utf-8"))
      self.wfile.write(bytes("", "utf-8"))
print(encrypt('admin'))

if __name__ == "__main__":
    webServer = HTTPServer(('0.0.0.0', 1337), MyServer)
    try:
        webServer.serve_forever()
    except KeyboardInterrupt:
        pass

    print("Server stopped.")

TODO

RCE

from flask import Flask, jsonify
import os
from os.path import isfile, join, exists

app = Flask(__name__)

@app.route('/files/<username>')
def list_files_for_user(username):
  base = "files/"+username
  if (exists(base) == False):
    os.system("mkdir "+base)
  return jsonify([f for f in os.listdir(base) if isfile(join(base, f))])

os.system is being passed directly to the os.system so this can lead to os command execution.