スクリプトのお勉強 技術

PythonでAESを使用して暗号/復号する

投稿日:2019年5月26日 更新日:

1.つづき

Pythonで、PKCS#12の公開鍵で暗号、秘密鍵で復号するプログラムの続きです。
今回はAESで暗号化/復号を行い、通信しながらRSA暗号/復号,AES暗号/復号を組み合わせたいと思います。

2. AESによる暗号化/復号(aes.py)

2.1 サンプルコード(aes.py)

サンプルコード(aes.py)は以下になります。

from base64 import b64encode
from base64 import b64decode
from Cryptodome.Cipher import AES
from hashlib import sha256
import random


""" AES.CTR_MODE による暗号化/復号 """

def generate_random_secret_key(algorithm_func=sha256):
    return algorithm_func(str(random.random).encode('utf-8')).digest()


def encrypt(secret_key, raw_data):
    crypto = AES.new(secret_key, AES.MODE_CTR)
    return dict(content=b64encode(crypto.encrypt(raw_data)).decode('utf-8'),
                nonce=b64encode(crypto.nonce).decode('utf-8'))


def decrypt(secret_key, encrypted_data):
    """ encrypted_data = dict(content, nonce) """
    crypto = AES.new(secret_key, AES.MODE_CTR, nonce=b64decode(encrypted_data['nonce']))
    return crypto.decrypt(b64decode(encrypted_data['content']))


if __name__ == "__main__":
    # 簡易テスト
    key = generate_random_secret_key()

    raw_data = 'raw_test_data'
    print("raw_data = {}".format(raw_data))

    encrypted_data = encrypt(key, raw_data.encode())

    print("encrypted_data.content = {} nonce = {}".format(encrypted_data['content'], encrypted_data['nonce']))

    decrypted_data = decrypt(key, encrypted_data)

    print("decrypted_data = {}".format(decrypted_data))

ここでは暗号データを利便性のため、以下の辞書にしました。
nonceは事前に設定することも可能ですが、APIから戻ってくる値を使用しています。

{
  content: 暗号データをBase64化
  nonce: nonceをBase64化
}

2.1.1 暗号利用モードとは

AESのようなブロック暗号方式は、そのまま使用すると、その鍵と同じデータ長(==ブロックサイズ)しか暗号化できません。

「暗号利用モード」とは、ブロックサイズよりも、長いメッセージを暗号化または復号化するために、ブロック暗号を使用する方法を言います。
単に「モード」というほうが多いと思います。

素直に、暗号鍵でデータを暗号化/復号するのは「モード」で言えば、ECBになりますが、近年ではあまり使われません。
かなり昔にAESで暗号化したときは、CBCモードを使用しました。今なら違うモードを使用するでしょう。

「暗号利用モード」をどう決めるか

「暗号利用モード」は、暗号をどのようにシステム適用するかによって、何を使用するか変わってきます。だいたいは要件を加味しつつ決めることになるでしょう。

ここでは、通信に使用するので、CTRを使用しました。後述しますが、認証が不要だったのでストリーム暗号用モードから選びました。CTRのいいところは、ストリーム暗号として使用できるため、暗号データの長さが任意なことです。

コード的には、以下で設定しています。AES.MODE_CTRがそれです。

crypto = AES.new(secret_key, AES.MODE_CTR)

Cryptodomeで使用できる暗号モードと簡単な特徴は以下になります。

(1)ストリーム暗号用モード

ブロック暗号を、ストリーム暗号のように使用できるように設計されたモードのことです。

  • MODE_ECB: 素のモード
  • MODE_CBC: IVをあらかじめ決めておく必要あり。データ長は16バイト固定
  • MODE_CFB: CBCの改良版。IVが必要なのは一緒
  • MODE_OFB: CBCの改良版。IVが必要なのは一緒
  • MODE_CTR: IVが不要な代わりにnonceの処理が必要になる。データは可変
  • MODE_OPENPGP: CFBに近いらしい

(2)認証付きモード

認証(注1)付きのモードです。認証機能がついているので、完全性を保証できますが、その分処理が遅くなります。
暗号時にnonce(後述)が必要です。

また、MAC tag(タグ)を用いて、検証(verify)ができます。

  • MODE_CCM: Counter with CBC-MAC Mode。CTR+認証。
  • MODE_EAX: An AEAD mode。CTR+OMAC。CCMの置き換え。
  • MODE_GCM: CTR+GMAC。GCM
  • MODE_SIV: Syntethic Initialization Vector。RFC 5297に規定されてる。
  • MODE_OCB: Offset Code Book。RFC7253に規定されてる。OCB

(注1): ここでの認証とは完全性の保証を意味する。暗号したデータと復号データが同一かどうかチェックできる機能のことを「認証」という

2.1.2 nonce

nonce とは、一般的に、リプレイ攻撃を防ぐための乱数を指します。暗号機能的には、同じ鍵で行った暗号化の結果を、メッセージごとに異なるデータにするために使用します。

基本的に使い捨てで、常に違う値を使用します。

2.2 動作結果

動作させると以下になります。

$ python3 aes.py
raw_data = raw_test_data
encrypted_data.content = n45+WSLBRjZPAw8zgA== nonce = 8vgqlzs24/Y=
decrypted_data = b'raw_test_data'

ここまでで、データの暗号化は出来ました。次は実際に通信で使用できるか確認します。

3 server/client 通信サンプル(server.py)

3.1 サンプルコード

サンプルコードは以下になります。

import socketserver
from base64 import b64encode, b64decode
import socket
import threading
import json

from pkcs12 import PKCS12
from rsa import RSACipher
from aes import encrypt, decrypt, generate_random_secret_key


BUFFER=2048


class CipherRequestHandler(socketserver.BaseRequestHandler):

    def handle(self):
        # クライアントへ 受信したデータを返却する
        enc_data = self.request.recv(BUFFER)
        print("enc_data at server: {}".format(enc_data))
        dec_data = dec_process(enc_data)
        print("enc_data decrypting at server: {}".format(dec_data))
        self.request.send(dec_data)


def get_b64_to_json_load(b64_json_encrypted_data):
    return json.loads(b64decode(b64_json_encrypted_data).decode('utf-8'))


def dec_process(b64_json_enc_data):
    # dict(secret_key, enc_message)
    enc_data = get_b64_to_json_load(b64_json_enc_data)

    rsa = RSACipher()
    with open(PKCS12.PRIVATE_KEY_FILENAME) as f:
        rsa.import_privkey(f.read())

    enc_secret_key = b64decode(enc_data['secret_key'])
    # 秘密鍵で復号
    dec_key = rsa.decrypt(enc_secret_key)
    print("decrypted key: {}".format(dec_key))

    dec_data = decrypt(dec_key, enc_data['enc_message'])
    print("dec_data = {}".format(dec_data))

    return dec_data


def enc_process(raw_message):
    # 公開鍵を読み込み
    rsa = RSACipher()
    with open(PKCS12.PUBLIC_KEY_FILENAME) as f:
        rsa.import_pubkey(f.read())

    # 公開鍵で秘密鍵(secret_key)を暗号化
    secret_key = generate_random_secret_key()
    print("encrypted secret key: {}".format(secret_key))
    enc_secret_key = rsa.encrypt(secret_key)

    enc_message = encrypt(secret_key, raw_message.encode())

    enc_struct = dict(secret_key=b64encode(enc_secret_key).decode('utf-8'), enc_message=enc_message)
    print("enc_struct {}".format(enc_struct))
    enc_data = b64encode(json.dumps(enc_struct).encode('utf-8'))
    print(enc_data)

    return enc_data


def tcp_process():
    # サーバ側
    address = ('localhost', 19012)
    socketserver.TCPServer.allow_reuse_address = True
    server = socketserver.TCPServer(address, CipherRequestHandler)
    ip, port = server.server_address # 与えられたポート番号を調べる

    t = threading.Thread(target=server.serve_forever)
    t.setDaemon(True) # 終了時にハングアップしない
    t.start()

    # クライアント側

    # サーバへ接続する
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((ip, port))

    # データを送る
    message = 'Hello, world'
    print('Message : "{}"'.format(message))
    enc_data = enc_process(message)
    print('Sending Encrypt Data : "{}"'.format(enc_data))
    len_sent = s.send(enc_data)

    # レスポンスを受けとる
    response = s.recv(len_sent)
    print('Received: "{}"'.format(response))

    # クリーンアップ
    s.close()
    server.socket.close()


def main():
    tcp_process()

    # message = 'Hello, world'
    # enc_data = enc_process(message)

    # dec_data = dec_process(enc_data)


if __name__ == '__main__':
    main()

難しいところはありませんが、どのデータが何であるかが、分かりにくかったので、一つ一つprint文で出力しています。

動作としては以下になります。

  • (1) main()前半でサーバ側(復号側)の処理
    • CipherRequestHandlerで復号します。
  • (2) mail()後半でクライアント側(暗号側)の処理
    • 暗号文(上記ではmessage = ‘Hello, world’)を送信します。
  • (3) クライアント側(暗号) -> サーバ側(復号) -> クライアント側(復号データを表示)

3.2 動作結果

動作結果は以下です。

$ python3 server.py
Message : "Hello, world"
encrypted secret key: b'+\xc5\xd3\xfd\xbf\xb2\xe3l\xd7q\xde\x0e\x81\xfb\xab`\xb4["\xe4s_\xad\xa9\x02\x91>\x05\xef\xbaA\xf6'
enc_struct {'secret_key': 'QMZkAGPltSo6V5vFDGZ4S7rcAzy2ewGHaAPv47b2Y4qPJd25u2ba9YyL76+gMl9swqpZ2y2VIa56P/0+qLCm1uC6HmoMqPSN1FQufDeCJuAC/idYPN/F9LnKMy6BaylZS7VvVbyvhe1ECwj1RwnaYR/HadXZAQuJFtZc6tq+g1RlWpL9hJq3UjBfIPBWwc18LD133etsMQh7VLroaQ8M0uRiuNnyT3c4SjJ1zs+9jh3jToR8Q+N4oaqF5fn4zHjOsIfiI/cib31Gqn0RB66AneXzsxnvmQz5MYuwvSVHUJNyDC7LI+nj9iR0ZD6TteWmp8WPmdHGTZV86srpUN09og==', 'enc_message': {'content': 'uN/JIsednvVuaZ2E', 'nonce': 'Xtg4IZV474E='}}
b'eyJzZWNyZXRfa2V5IjogIlFNWmtBR1BsdFNvNlY1dkZER1o0UzdyY0F6eTJld0dIYUFQdjQ3YjJZNHFQSmQyNXUyYmE5WXlMNzYrZ01sOXN3cXBaMnkyVklhNTZQLzArcUxDbTF1QzZIbW9NcVBTTjFGUXVmRGVDSnVBQy9pZFlQTi9GOUxuS015NkJheWxaUzdWdlZieXZoZTFFQ3dqMVJ3bmFZUi9IYWRYWkFRdUpGdFpjNnRxK2cxUmxXcEw5aEpxM1VqQmZJUEJXd2MxOExEMTMzZXRzTVFoN1ZMcm9hUThNMHVSaXVObnlUM2M0U2pKMXpzKzlqaDNqVG9SOFErTjRvYXFGNWZuNHpIak9zSWZpSS9jaWIzMUdxbjBSQjY2QW5lWHpzeG52bVF6NU1ZdXd2U1ZIVUpOeURDN0xJK25qOWlSMFpENlR0ZVdtcDhXUG1kSEdUWlY4NnNycFVOMDlvZz09IiwgImVuY19tZXNzYWdlIjogeyJjb250ZW50IjogInVOL0pJc2VkbnZWdWFaMkUiLCAibm9uY2UiOiAiWHRnNElaVjQ3NEU9In19'
Sending Encrypt Data : "b'eyJzZWNyZXRfa2V5IjogIlFNWmtBR1BsdFNvNlY1dkZER1o0UzdyY0F6eTJld0dIYUFQdjQ3YjJZNHFQSmQyNXUyYmE5WXlMNzYrZ01sOXN3cXBaMnkyVklhNTZQLzArcUxDbTF1QzZIbW9NcVBTTjFGUXVmRGVDSnVBQy9pZFlQTi9GOUxuS015NkJheWxaUzdWdlZieXZoZTFFQ3dqMVJ3bmFZUi9IYWRYWkFRdUpGdFpjNnRxK2cxUmxXcEw5aEpxM1VqQmZJUEJXd2MxOExEMTMzZXRzTVFoN1ZMcm9hUThNMHVSaXVObnlUM2M0U2pKMXpzKzlqaDNqVG9SOFErTjRvYXFGNWZuNHpIak9zSWZpSS9jaWIzMUdxbjBSQjY2QW5lWHpzeG52bVF6NU1ZdXd2U1ZIVUpOeURDN0xJK25qOWlSMFpENlR0ZVdtcDhXUG1kSEdUWlY4NnNycFVOMDlvZz09IiwgImVuY19tZXNzYWdlIjogeyJjb250ZW50IjogInVOL0pJc2VkbnZWdWFaMkUiLCAibm9uY2UiOiAiWHRnNElaVjQ3NEU9In19'"
enc_data at server: b'eyJzZWNyZXRfa2V5IjogIlFNWmtBR1BsdFNvNlY1dkZER1o0UzdyY0F6eTJld0dIYUFQdjQ3YjJZNHFQSmQyNXUyYmE5WXlMNzYrZ01sOXN3cXBaMnkyVklhNTZQLzArcUxDbTF1QzZIbW9NcVBTTjFGUXVmRGVDSnVBQy9pZFlQTi9GOUxuS015NkJheWxaUzdWdlZieXZoZTFFQ3dqMVJ3bmFZUi9IYWRYWkFRdUpGdFpjNnRxK2cxUmxXcEw5aEpxM1VqQmZJUEJXd2MxOExEMTMzZXRzTVFoN1ZMcm9hUThNMHVSaXVObnlUM2M0U2pKMXpzKzlqaDNqVG9SOFErTjRvYXFGNWZuNHpIak9zSWZpSS9jaWIzMUdxbjBSQjY2QW5lWHpzeG52bVF6NU1ZdXd2U1ZIVUpOeURDN0xJK25qOWlSMFpENlR0ZVdtcDhXUG1kSEdUWlY4NnNycFVOMDlvZz09IiwgImVuY19tZXNzYWdlIjogeyJjb250ZW50IjogInVOL0pJc2VkbnZWdWFaMkUiLCAibm9uY2UiOiAiWHRnNElaVjQ3NEU9In19'
decrypted key: b'+\xc5\xd3\xfd\xbf\xb2\xe3l\xd7q\xde\x0e\x81\xfb\xab`\xb4["\xe4s_\xad\xa9\x02\x91>\x05\xef\xbaA\xf6'
dec_data = b'Hello, world'
enc_data decrypting at server: b'Hello, world'
Received: "b'Hello, world'"

-スクリプトのお勉強, 技術

執筆者:

関連記事

Python3 – VCR.py でネットワーク系テストを簡単に作成する

1. 始めに python3で実装すると、モックテストをしたくなります。モックを使って、比較的簡単にテストできるからです。 問題はネットワーク系テスト モックテストで問題になるのは、外部に依存するテス …

悪いほうが良い? でも限度があるよね。。

自分のその時の状態によって結論が変わる https://tech.nikkeibp.co.jp/atcl/nxt/column/18/00620/040900010/を見て書こうと思いました。 今やっ …

vscodeのRemote Developmentで「権限がない」と怒られた時

小ネタです。 vscodeのRemote Developmentにてパスワードなしログイン vscodeは時々使うのですが、リモート開発するのにRemote Developmentが便利なので使ってま …

Ubuntu 20.04のMySQL8.0.22でrootパスワードをリセットする

小ネタです。 休みなので 久しぶりにローカル環境のUbuntuでMySQLにアクセスしようとして、パスワードを見事に忘れたことに気づきました。 ubuntu 20.04なので、以下の手順でパスワードの …

pipenv + Apache + Django起動設定

仕事でwebアプリケーションを作成しています。 Djangoで作成し、webサーバをApache、環境をpipenvで設定したpython3環境上で動かす予定です。 Apacheが起動するまでに、苦労 …