PoWを使った簡易ブロックチェーンをpythonで実装

PoWを使った簡易ブロックチェーンをpythonで実装

前回は非常にシンプルな、2つのブロックをハッシュ値でつなぐブロックチェーンを作りました。
Pythonのクラスやインスタンス(オブジェクト指向)の復習にもなるので、ぜひ見てみてください。

ブロックチェーンをpythonで1から書いてみた
最近ふと、ブロックチェーンへの興味がより一層深まってきて、自分でコードを書きたくなりました。 ブロックチェーンは何かを、文章としての説明では理解していたつもりで…
pathlog.jp

ですが、さすがにこれだけではブロックチェーンとは呼べませんので、今回は Proof of Work(PoW)を実装しなぜブロックの改ざんが困難になるのかをコードレベルで確認していきます。

また、それに伴って、マイニングやタイムスタンプ、その他もろもろ(ハッシュの難易度調整、個数調整)を追加しました。

完成品コード

以下が、今回の実装をすべて含んだ完成形のコードです。

import hashlib
import time

class Block:
    def __init__(self, data, previous_hash, nonce, hash, timestamp):
        self.data = data
        self.previous_hash = previous_hash
        self.nonce = nonce
        self.hash = hash
        self.time = timestamp

    def calculate_hash(self):
        content = self.data + self.previous_hash + str(self.nonce) + str(self.time)
        hash = hashlib.sha256(content.encode()).hexdigest()
        return hash
    
    
class Blockchain:
    def __init__(self):
        self.chain = []
        self.difficulty = int(input("Difficulty: "))
        self.max_block = int(input("Number of blocks: "))
        print(f"\n--------------------\n")
        genesis_block = self.create_genesis_block()
        self.chain.append(genesis_block)
        for i in range(1, self.max_block + 1):
            self.add_block(f"block {i}")
        
    #ハッシュ関数
    def hash_block(self, data, previous_hash, nonce, timestamp):
        content = data + previous_hash + str(nonce) + str(timestamp)
        hash = hashlib.sha256(content.encode()).hexdigest()
        return hash

    #ジェネシスブロックの作成
    def create_genesis_block(self):
        genesis = self.mine_block("genesis", "0")
        return Block("genesis", "0", genesis["nonce"], genesis["hash"], genesis["timestamp"])
    
    #マイニング作業
    def mine_block(self, data, previous_hash):
        nonce = 0
        attempts = 0
        target = '0' * self.difficulty
        start_time = time.time()
        timestamp = time.time()
        while True: 
            attempts += 1
            hash = self.hash_block(data, previous_hash, nonce, timestamp)
            if hash.startswith(target):
                break
            nonce += 1
        
        end_time = time.time()
        elapsed_time = end_time - start_time

        result = {
            "nonce": nonce,
            "hash": hash,
            "timestamp":  timestamp,
            "elapsed_time": elapsed_time,
            "attempts": attempts
        }
        return result        

    #2つ目以降のブロック追加
    def add_block(self, data):
        previous_hash = self.chain[-1].hash
        mine_result = self.mine_block(data, previous_hash)
        block_number = len(self.chain)
        nonce = mine_result["nonce"]
        hash = mine_result["hash"]
        timestamp = mine_result["timestamp"]
        elapsed_time = mine_result["elapsed_time"]
        attempts = mine_result["attempts"]

        new_block = Block(data, previous_hash, nonce, hash, timestamp)
        self.chain.append(new_block)

        print(f"[Block {block_number} mined]\ndata: {data}\nnonce: {nonce}\nhash: {hash}\ntimestamp: {time.ctime(timestamp)}\ntime: {elapsed_time}\nattempts: {attempts}\n")
        print(f"--------------------\n")


    #チェーン検証
    def is_chain_valid(self):
        for i in range(1, len(self.chain)):
            current_block = self.chain[i]#現在のBlock
            previous_block = self.chain[i-1]#一つ前のBlock
            if current_block.previous_hash == previous_block.hash:

                if (previous_block.time <= current_block.time) and (current_block.time <= previous_block.time + 100):
                    
                    if current_block.hash == current_block.calculate_hash():
                        target = '0' * self.difficulty

                        if current_block.calculate_hash().startswith(target):
                            continue
                        else:
                             return False
                    else:
                        return False
                else:
                    return False
            else:
                return False
        return True
            
bc = Blockchain()
print("initial chain valid?:", bc.is_chain_valid())

だいぶ「ブロックチェーン」になってきたのではないでしょうか!

解説

このコードは、ざっくり見ると以下のような構成になっています。

class Block
    def __init__
    def calculate_hash

class Blockchain
    def __init___
    def hash_block
    def create_genesis_block
    def mine_block
    def add_block
    def is_chain_valid

それぞれ見ていきましょう。

とその前に、まずはハッシュ関数と、時間のライブラリをインポートします。

import hashlib
import time

Blockクラス

このクラスで、一つのブロックのインスタンスを作成できるようにします。

基本的には、やっていることはシンプルで、情報の保存だけです。

インスタンス作成時に、引数としてデータや一つ前のブロックのハッシュ値、ナンス、自らのハッシュ値、タイムスタンプを受け取り、それらを保存します。

class Block:
    def __init__(self, data, previous_hash, nonce, hash, timestamp):
        self.data = data
        self.previous_hash = previous_hash
        self.nonce = nonce
        self.hash = hash
        self.time = timestamp

それに加えて、後で楽になるので、こいつ自身にもハッシュする関数を入れておきます。

def calculate_hash(self):
    content = self.data + self.previous_hash + str(self.nonce) + str(self.time)
    hash = hashlib.sha256(content.encode()).hexdigest()
    return hash

これだけになります。

メインはすべてBlockchainクラスに入っています。

Blockchainクラス

まず、それぞれがどのような役割を分担しているのか確認しましょう。

class Blockchain
    def __init___ 
        #チェーンのリストの作成・ハッシュ難易度と個数調整・ブロックを作る司令を出す。

    def hash_block 
        #メインで使うハッシュ関数

    def create_genesis_block 
        #ジェネシスブロックの作成(mine_blockを使用)

    def mine_block 
        #マイニング機構▶条件にあうハッシュ値になるまでナンスを探索する

    def add_block 
        #二回目以降のブロックの作成・追加(mine_blockを使用)、結果の表示

    def is_chain_valid 
        #前後ハッシュ値の照合、時間をチェック、ハッシュの再計算によるチェック

このようになっております。

__init__とハッシュ関数

まず、Blockchainインスタンスが作成されたら、self.chainというリストを作ります。ここにブロックを入れていきます。

それから難易度(ハッシュ値の最初に0が何個並ぶか)と生成するブロックの個数をinputで取得します。

def __init__(self):
    self.chain = []
    self.difficulty = int(input("Difficulty: "))
    self.max_block = int(input("Number of blocks: "))
    print(f"\n--------------------\n")

    genesis_block = self.create_genesis_block()
    self.chain.append(genesis_block)

    for i in range(1, self.max_block + 1):
        self.add_block(f"block {i}")

def hash_block(self, data, previous_hash, nonce, timestamp):
    content = data + previous_hash + str(nonce) + str(timestamp)
    hash = hashlib.sha256(content.encode()).hexdigest()
    return hash

そのあとは、create_genesis_blockを呼び出して、genesis_blockを作成します。それを先程のリストに入れます。

これが最初のブロックです。

そのあとは、最初のinputの値に応じて、forループでブロックを追加していきます。

二回目以降は、データとしてf”block {i}:を使用しています。つまり3つめのブロックなら、”block 3″ですね。

hash_block関数は、データ、一つ前のハッシュ値、ナンス、タイムスタンプ(ブロック生成時のもの)を引数として受け取ります。

それらをstringとして足し合わせて、SHA-256でハッシュした値を返します。

ジェネシスブロック(create_genesis_block)

次に、ジェネシスブロックの生成の関数があります。

def create_genesis_block(self):
    genesis = self.mine_block("genesis", "0")
    return Block("genesis", "0", genesis["nonce"], genesis["hash"], genesis["timestamp"])

これは、まず呼び出されたときにデータを”genesis”、previous_hashを0としてself.mine_block関数に渡して、マイニングしてもらいます。

見つかった結果をgenesisとして保存します。

これは以下のような辞書形式で返ってきます。

result = {
    "nonce": nonce,
    "hash": hash,
    "timestamp":  timestamp,
    "elapsed_time": elapsed_time,
    "attempts": attempts
}

上から見つかったナンス(調整値)、ハッシュ値、タイムスタンプ(エポック秒)、経過時間、試行回数です。

Blockインスタンスの作成にはdata, previous_hash, nonce, hash, timestampがそれぞれ必要ですから、

Block("genesis", "0", genesis["nonce"], genesis["hash"], genesis["timestamp"])

のようにして、ジェネシスブロックを作成します。

データは”genesis”、前のブロックのハッシュ値は0、ナンス、ハッシュ値、タイムスタンプを渡します。
そうして得られたBlockを返します。

こうして得られた値を、__init__で、リストに追加します。

self.chain.append(genesis_block)

これで、ジェネシスブロックの完成です!

マイニング(mine_block)

次に、今回追加した、Proof of Work(PoW)を使ったマイニングを行う関数です。

ここはとても楽しいところです。

そもそもPoWとは、データにナンスと呼ばれる任意の数字を加えて、それをハッシュした値の先頭に指定された個数の0が並べば、それが認められるというものです。

この関数は、その条件に合ったときのナンスやハッシュ値、時間などを辞書形式にし、返します。

ナンスはnonce、指定する先頭の0の個数がdifficultyです。

def mine_block(self, data, previous_hash):
    nonce = 0
    attempts = 0
    target = '0' * self.difficulty
    start_time = time.time()
    timestamp = time.time()
    while True: 
        attempts += 1
        hash = self.hash_block(data, previous_hash, nonce, timestamp)
        if hash.startswith(target):
            break
        nonce += 1
    
    end_time = time.time()
    elapsed_time = end_time - start_time

    result = {
        "nonce": nonce,
        "hash": hash,
        "timestamp":  timestamp,
        "elapsed_time": elapsed_time,
        "attempts": attempts
    }
    return result  

ナンスを0、試行回数を0に初期化します。

targetは、0をdifficulty個かけたものです。difficultyが5ならば”00000″です。
ナンスを加えてハッシュした結果が、targetで始まれば、そのナンスで決定です。

経過時間計測用のstart_timeと、timestampも定義します。

ここから、マイニングの作業を定義していきます。

while True: 
    hash = self.hash_block(data, previous_hash, nonce, timestamp)
    if hash.startswith(target):
        break
    nonce += 1
    attempts += 1

whileループを用いて、ハッシュを何度も何度も試行します。

ナンス=0から始めて、だめなら+1して再挑戦、だめなら+1して再挑戦を繰り返します。(attemptsも、この回数を記録します)

そして、多くの試行を経て条件に合うものが見つかったら、breakします。

その後は以下のように処理されます。

end_time = time.time()
elapsed_time = end_time - start_time

result = {
    "nonce": nonce,
    "hash": hash,
    "timestamp":  timestamp,
    "elapsed_time": elapsed_time,
    "attempts": attempts
}
return result      

終了時間を記録し、終了時間から開始時間を引くことで、処理にかかった時間を算出します。

resultという辞書にまとめてこれらの結果を入れ、それを返します。

今回は実際のBitcoinのブロックチェーンに基づいて、時間もハッシュの対象に含めています
これにより(取引)時間も改ざん不可能となりました。

各ブロックは timestamp を含む完全再計算可能な hash を持つのです。

以上が簡易的ですが、マイニングの実装になります。

ブロックの追加(add_block)

ここでは、ジェネシスブロックを除いた、ブロックの追加作業を行っていきます。

具体的には、まずマイニングの指令を先程のマイニング関数に出して、必要な情報をかき集めて、それを使ってBlockのインスタンスを作成します。

完成形は以下のようになります。

def add_block(self, data):
    previous_hash = self.chain[-1].hash
    block_number = len(self.chain)

    #マイニングを行う
    mine_result = self.mine_block(data, previous_hash)

    #マイニングの結果より
    nonce = mine_result["nonce"]
    hash = mine_result["hash"]
    timestamp = mine_result["timestamp"]
    elapsed_time = mine_result["elapsed_time"]
    attempts = mine_result["attempts"]

    #ブロック作成
    new_block = Block(data, previous_hash, nonce, hash, timestamp)
    self.chain.append(new_block)

    #作成されたブロックの内容をprint
    print(f"[Block {block_number} mined]\ndata: {data}\nnonce: {nonce}\nhash: {hash}\ntimestamp: {time.ctime(timestamp)}\ntime: {elapsed_time}\nattempts: {attempts}\n")
    print(f"--------------------\n")

マイニングに必要なのはデータと一つ前のブロックのハッシュ値ですから、

mine_result = self.mine_block(data, previous_hash)

のようにすることで、マイニングの結果(辞書)を取得することができます。

それらをわかりやすくするために変数に代入し、↓

nonce = mine_result["nonce"]
hash = mine_result["hash"]
timestamp = mine_result["timestamp"]
elapsed_time = mine_result["elapsed_time"]
attempts = mine_result["attempts"]

ブロックを作成してチェーンリストに追加します。

new_block = Block(data, previous_hash, nonce, hash, timestamp)
self.chain.append(new_block)

あとは結果をわかりやすく表示すれば完成です。
このような形式でターミナルに表示されます。

--------------------

[Block 1 mined]
data: block 1
nonce: 1375971
hash: 000005c808a2a8cfc9d1707c4511d2db4467efaebb371af15e901b8645f613d3
timestamp: Mon Feb  2 21:47:30 2026
time: 1.0131967067718506
attempts: 1375971

--------------------

これで肝心な動く部分は完成です!

チェーン検証(is_chain_valid)

ここでは以下3つの観点から、ブロックチェーンの正しさをチェックしています。

  • ブロック同士のハッシュの繋がり
  • タイムスタンプの関係
  • PoWにおけるナンスも含めたハッシュ値の整合

この3点です。

これを一段階具体的なロジックにすると、(4段階)

  1. 現在のブロックに保存されている「一つ前のブロックのハッシュ値」と、一つ前のブロックのハッシュ値が同値である
  2. 現在のブロックの作成された時間が、一つ前のブロックの作成された時間よりも遅い
    かつ

    現在のブロックの作成された時間が、一つ前のブロックが作成されてから〇〇秒以内になっている
  3. 保存された現在のブロックのハッシュ値が、計算した結果と同値である
  4. 現在のブロックのハッシュ値を計算し、それが指定された数の0で始まる

ということになります。

本実装では簡略化のため、ブロック生成間隔の上限を100秒とハードコードします。

擬似コードにすると、

もしcurrent_block.previous_hash == previous_block.hashならOK
↓次に進む

もし(previous_block.time <= current_block.time) and (current_block.time <= previous_block.time + 100)ならOK
↓次に進む

もしcurrent_block.hash == current_block.calculate_hash()ならOK
↓次に進む

target = '0' * self.difficulty
もしcurrent_block.calculate_hash().startswith(target)ならOK
↓

完了

となります。

これをブロックごとにループで回しますので、最終的にコードは以下のようになります。

def is_chain_valid(self):
    #ブロックごとにループ
    for i in range(1, len(self.chain)):

        current_block = self.chain[i] #現在のBlock
        previous_block = self.chain[i-1] #一つ前のBlock

        #①
        if current_block.previous_hash == previous_block.hash:

            #②
            if (previous_block.time <= current_block.time) and (current_block.time <= previous_block.time + 100):
                
                #③
                if current_block.hash == current_block.calculate_hash():
                    target = '0' * self.difficulty

                    #④
                    if current_block.calculate_hash().startswith(target):
                        continue
                    else:
                            return False
                else:
                    return False
            else:
                return False
        else:
            return False
    return True

ここで、Blockクラス自身のハッシュ関数を使っています。

チェーンがうまくいっていれば、Trueを返します。

実行

bc = Blockchain()
print("initial chain valid?:", bc.is_chain_valid())

bcというBlockchainクラスのインスタンスを作成します。
is_chain_validの結果を表示するようにします。

実行結果は以下のようになります。

難易度6のブロック個数5と入力しました。

Difficulty: 6
Number of blocks: 5

--------------------

[Block 1 mined]
data: block 1
nonce: 19045810
hash: 000000bc15dfaf59dbc8f62eb688b20e4edcfa53bc092687fd6c2512c29f0f95
timestamp: Tue Feb  3 10:09:02 2026
time: 13.969925880432129
attempts: 19045811

--------------------

[Block 2 mined]
data: block 2
nonce: 19282913
hash: 0000008c88f0dab0ae6430d842fc4fff208e6b36c3bf513c3497c9d8fc9ea61e
timestamp: Tue Feb  3 10:09:16 2026
time: 14.559286117553711
attempts: 19282914

--------------------

[Block 3 mined]
data: block 3
nonce: 29935602
hash: 0000003c8c2aa0014275ed9e559d7fb9856769a679c2f95ca3aa9df81dffbfc0
timestamp: Tue Feb  3 10:09:30 2026
time: 22.947882890701294
attempts: 29935603

--------------------

[Block 4 mined]
data: block 4
nonce: 1030480
hash: 0000000dc1d9be902c11fb8c2938585a44e036548c65594eb62161a529eea927
timestamp: Tue Feb  3 10:09:53 2026
time: 0.8383839130401611
attempts: 1030481

--------------------

[Block 5 mined]
data: block 5
nonce: 10172800
hash: 0000003659f70afdf16476c33731135c8e29ba4b98aed737ecb3c36d79d7b08c
timestamp: Tue Feb  3 10:09:54 2026
time: 7.655301094055176
attempts: 10172801

--------------------

initial chain valid?: True

以上で完成です!お疲れ様でした。

おわりに

自分でブロックチェーンを書いて、着々と仕組みを深く理解していくのが、本当に楽しくなってきました。

ここまで自力で書ける自分にも驚いています。
時間を忘れてやってしまいます笑

次はこの愛着のある自作ブロックチェーンを使って、数値をいじくって観察したいと思います。

今後はこの実装を使って、以下のような実験を行う予定です。

  1. difficulty と attempts の関係の統計分析
  2. ブロック生成時間の分布の可視化
  3. nonce をランダム初期化した場合の偏り検証
  4. difficulty を途中で変更した場合の fork 実験

以上です!