In this article we will see how you can track bitcoin addresses used in ransomware attacks using Python and the PlutoHash platform, which provides up-to-date data extracted from the blockchain.
Ransomware is a type of malware that encrypts the files contained in the victim’s device and demands to pay a ransom to unlock them. Businesses, universities, financial institutions and health organizations are the preferred targets of criminals. This is because they are organizations willing to pay to recover sensitive data. In any case, bitcoin payments are not completely anonymous as often believed, but each transaction leaves a trace in the blockchain.
The Bitcoin protocol can be described as pseudo-anonymous. Sending and receiving payments in bitcoin is like writing books under a pseudonym, if the identity of the writer is revealed all the books would be linked to that specific writer. The same logic can be applied to bitcoin transactions, where discovering the identity of the owner of an address would link back to him all transactions made and received at that address. To better preserve the integrity of transactions, bitcoin wallets can also generate different addresses for each transaction. As described in the Bitcoin white paper:
As an additional firewall, a new (address) should be used for each transaction to keep them from being linked to a common owner. The risk is that if the owner of a (address) is revealed, linking could reveal other transactions that belonged to the same owner.
Satoshi Nakamoto
In this notebook we will see how it is possible to link multiple bitcoin addresses to the same person by analyzing multi-input transactions. We’ll start by analyzing an address (the seed address) that we know for sure has been used in a ransomware attack. Comparing input and output addresses of transactions we will see how to link these new addresses to the same owner of the seed address.
Using the PlutoHash platform, we have at our disposal all the data contained in the bitcoin blockchain and a dataset containing the seed addresses of several families of ransomware attacks. This data, along with the BlockSci libraries, is all we need for our analysis. The dataset containing seed addresses was created from an academic research regarding ransomware payments [1].
import blocksci
import pandas as pd
import warnings
warnings.filterwarnings(‘ignore’)
#instantiate the chain object
chain = blocksci.Blockchain(“/BlockSci/config_file”)
#load the dataset containing the ransomware seed addresses
seed_addresses = pd.read_csv(“/data/datasets/ransomware_addresses_list/blockchain/seed_addresses.csv”)
As a reminder, the dataset containing seed addresses and data on bitcoin transactions and addresses are available in the PlutoHash platform. Simply register for our Beta Test Program to get started.We have imported libraries and instantiated the chain object. Let’s take a look at the dataset containing the seed addresses. For each ransomware family, let’s see how many seed addreses have been collected.
seed_addresses[‘family’].value_counts()
We can see that for some ransomware families several addresses have been collected (for example, for the Locky ransomware there are over 7000). We will conduct our analysis starting from a single address, even if of course the clustering logic for addresses would remain the same.
Let’s take in this case the seed address that belongs to CryptXXX ransomware.
CryptXXX_seed_address = seed_addresses.loc[seed_addresses[‘family’] == ‘CryptXXX’]
CryptXXX_seed_address
Now we extract only the bitcoin address from the dataset and save the variable in string format. With the address in string format we can use the BlockSci libraries to create a so-called address object and, always using the BlockSci libraries, display all transactions received and made from this address.
#extract bitcoin address and convert value to string
CryptXXX_seed_address = str(CryptXXX_seed_address.iloc[0][‘address’])
#create the address object from the string
address_obj = chain.address_from_string(address_string = CryptXXX_seed_address)
address_obj
Before continuing, it is important to understand the methodology that allows us to associate different addresses to the same person. These are the conditions that we are going to check in order to be able to say that an address is linked to the ransomware attack or not.
Disclaimer: I am not an expert on blockchain analytics. If you have any doubts or believe there is an error please feel free to leave a comment! 😉
Methodology for Linking Addresses
In this section we will look at two blockchain-based heuristics (Common Spending and One-Time Change) that allow us to connect different addresses to the same actor. These heuristics that we will use to identify ransomware wallets have already been used in various academic research for clustering bitcoin addresses. To start, let’s define a bitcoin transaction as a triplet of elements:
t = (A, B, c)
- A represents the finite multiset of inputs of the transaction t
- B represents the finite multiset of outputs of the transaction t
- c represents the transaction fee
Common Spending (CS)
The first heuristic we will use for tracking ransomware addresses is called Common Spending. It is based on the fact that if two or more input addresses perform a transaction to the same address (output), then all addresses involved in the transaction are controlled by the same person. This may not be true only in the case where multiple people agree to execute a transaction, but this is a very rare case and so we will ignore this possibility. Also because we are talking about a criminal activity, and even if the transaction was performed by multiple people, they would still all be involved in the ransomware. For the heuristic validity, it is necessary that the transaction must have only one output, this is because multi-output transactions (through coin-mixers) are often used to obfuscate transaction history. We can summarize this first heuristic this way:
If two or more addresses are inputs of the same transaction with one output, then all these addresses are controlled by the same user.
One-Time Change (OTC)
The OTC heuristic is based on the standard Bitcoin mechanism where the change from the transaction is returned to a new address. When you send funds from your bitcoin wallet, the specified amount of funds is sent to the intended bitcoin address and the rest of the funds stored in the sending bitcoin address are sent to what is called an “change address” associated with the same wallet of the sender. The conditions we will use to check if a transaction is an OTC transaction were taken from a paper used for bitcoin address clustering [2]. These are the conditions that must be met:
1 Addr(B) = 2, i.e. the transaction t has exactly two outputs.
2 Addr(A) ≠ 2, i.e. the number of t inputs is not equal to two. If Addr(A) = Addr(B) = 2 the transaction is most likely shared send mixer.
3 Both outputs of transaction t, B1 and B2, are not selfchange addresses, i.e. B1, B2 ∈/ Addr(A).
4 One output of the transaction B1 did not exist before transaction t and decimal representation of the value b1 has more than 4 digits after the dot.
If the transaction satisfies the conditions of a One-Time Change transaction, input and output addresses belong to the same user.
Now that we have defined the conditions that will lead us to associate addresses with the seed address, let’s take a look at all the transactions in the blockchain where the seed address appears as an input of the transaction.
To do this we apply the input_txes
method to the address object.
inputs_txs = list(address_obj.input_txes)
print(“Number of transactions involved: “ + str(len(inputs_txs)))
inputs_txs
There are 64 transactions where the seed address appears as an input of the transaction. The transaction list provides various information:
-
len(txins)
is the number of inputs used in the transaction -
len(txouts)
is the number of outputs used in the transaction -
size_bytes
is the value in bytes of the transaction -
block_height
is the block where the transaction is located -
tx_index
is the transaction identification index, this information is not derived from the blockchain, but was added during blockchain parsing
From this list we create new_list
, a list of lists. We then convert it to a DataFrame to work better with the Python libraries.
new_list = []
for i in range(len(inputs_txs)):
new_list.append(str(inputs_txs[i]))
#split values with “,”
new_list[i] = new_list[i].split(‘,’)
df = pd.DataFrame(new_list)
df.columns = ['inputs', 'outputs', 'size', 'block_height', 'tx_index']
df.drop(columns= ["size", "block_height"], axis=1, inplace = True)
df
Values are expressed in str
format. We must then eliminate what we don’t need (for example “len(txins)”) and convert these values to int
format.
df[‘inputs’] = df[‘inputs’].str.replace(‘Tx\(len\(txins\)=’,’’)
df[‘outputs’] = df[‘outputs’].str.replace(‘len\(txouts\)=’,’’)
df[‘tx_index’] = df[‘tx_index’].str.replace(‘tx_index=’,’’)
df[‘tx_index’] = df[‘tx_index’].str.replace(‘\)’,’’)
df[‘inputs’] = df[‘inputs’].astype(int)
df[‘outputs’] = df[‘outputs’].astype(int)
df[‘tx_index’] = df[‘tx_index’].astype(int)
df
Finding transactions that satisfy the first heuristic (Common Spending) is fairly simple. Only two conditions need to be met to show that the input and output addresses belong to the same person. The first is that the inputs are >=2 and that the outputs are = 1.
We create a function that adds a column named heuristic1
and, iterating along each row, inserts 1
if the conditions are verified, 0
if they are not.
def heur1(row):
if row[‘inputs’] >= 2 and row[‘outputs’] == 1:
val = 1
else:
val = 0
return val
df[‘heuristic1’] = df.apply(heur1, axis=1)
df
df[‘heuristic1’].value_counts()
We found only two transactions that respect the first heuristic. We can say that the addresses used in these two transactions (inouts and outputs) belong to the same person.
Let’s see now, if the other transactions, respect the second heuristic. We can easily check the first two conditions of the second heuristic. The first condition requires that the outputs be equal to 2
. We count the values present in the outputs
column.
#first condition -> outputs = 2
df[‘outputs’].value_counts()
There are 62 transactions with 2 outputs and 2 transactions with 1 output (which are those that we have already identified with the first heuristic). We create a second DataFrame that contains only transactions with 2 ouputs and delete the heuristic1
column.
df2 = df.loc[df[‘outputs’]== 2]
df2.drop(columns=”heuristic1", axis=1, inplace=True)
The second condition requires that the inputs to the transaction not be 2 (Addr(A) ≠ 2).
#second condition -> inputs /= 2
df2[‘inputs’].value_counts()
We can see that there is not even a transaction composed of 2 inputs. All transactions verify the second condition.
Before continuing with the condition check, we compile a function that allows us to extract addresses (and all elements) from blocksci objects
.
def create_addresses_list(inputs):
addresses_list = []
for i in range(len(inputs)):
#convert values in str and append to the list
addresses_list.append(str(inputs[i]))
#split values with “,”
addresses_list[i] = addresses_list[i].split(‘,’)
#select value in position 1 i.e. the address
addresses_list[i] = addresses_list[i][1]
stopwords = [‘address’,’=’,’(‘,’)’,’PubkeyHashAddress’,’ ‘,’ScriptHashAddress’]
for word in stopwords:
if word in addresses_list[i]:
addresses_list[i] = addresses_list[i].replace(word,””)
addresses_list = list(dict.fromkeys(addresses_list))
return addresses_list
From df2
we create a list with transaction indexes.
tx_list = list(df2[‘tx_index’])
To verify the third condition we need to check that the output addresses are not present among the input addresses (B1, B2 ∈/ Addr(A)).
condition_dict = {}
#we use a for loop to iterate each transaction in the transaction list
for i in range(len(tx_list)):
#create tx object for each transaction
tx_obj = chain.tx_with_index(tx_list[i])
#create a list of inputs of the transaction
inputs_addresses = create_addresses_list(list(tx_obj.inputs))
#create a list of outputs of the transaction
outputs_addresses = create_addresses_list(list(tx_obj.outputs))
#compares whether the output addresses are present among the input addresses and updates condition_dict
if outputs_addresses not in inputs_addresses:
condition_dict[str(tx_list[i])] = 1
else:
condition_dict[str(tx_list[i])] = 0
#count the number of transactions that satisfy the third condition
sum(map((1).__eq__, condition_dict.values()))
62 transactions, or the total number of transactions under consideration, verify the third condition.
Now there is the fourth coindition to analyze:
One output of the transaction B1 did not exist before transaction t and decimal representation of the value b1 has more than 4 digits after the dot.
The first part of the condition requires that, for at least one output, this is the first transaction, and so that address did not exist before it. The second part requires the value of this transaction to be at least 0.00001000 btc.
We will test the fourth condition in two parts.
For the first part, we can compare the dates of the first transaction of the output addresses and the date of the transaction under analysis. If the date of the transaction matches at least one date of the first transaction of the outputs, then the first part of the condition is verified. We apply the same logic used previously for the third condition, a for loop that performs the analysis for each transaction and updates the dictionary.
d = {}
for i in range(len(tx_list)):
#create tx object
tx_obj = chain.tx_with_index(tx_list[i])
outputs_addresses = create_addresses_list(list(tx_obj.outputs))
for addresses in outputs_addresses:
addresses_list = []
addresses_list.append(chain.address_from_string(address_string = addresses).first_tx.block_time)
block_time_tx = chain.tx_with_index(tx_list[i]).block_time
if block_time_tx not in addresses_list:
d[str(tx_list[i])] = 0
#the date of the transaction is not equal to any of the dates of the first transactions of the output addresses,our dictionary will mark 0 for the index of that transaction
else:
d[str(tx_list[i])] = 1
#the transaction date is equal to at least one of the dates of the first output address transactions,our dictionary will mark 1 for the index of that transaction.
Let’s look at how many times value 1 appears in the dictionary (value indicating that the condition is verified):
print(“number of transactions that verify the condition: “ + str(sum(map((1).__eq__, d.values()))))
Again all transactions are verified for the first part of the fourth condition.
Now we need to verify that the outputs of these transactions were at least 0.00001000 btc.
To extract transaction values we need to slightly modify the previous function used to create the list of addresses. Basically we only need to change the selection index of the list, which instead of selecting the second variable, in position [1]
, will have to select the third, in position [2]
. We also change the name of the function and of the variables to make it clearer, but it remains practically the same function.
def create_value_list(transactions):
value_list = []
for i in range(len(transactions)):
#convert values in str and append to the list
value_list.append(str(transactions[i]))
#split values with “,”
value_list[i] = value_list[i].split(‘,’)
#select value in position 1 i.e. the address
value_list[i] = value_list[i][2]
stopwords = [‘value’,’=’,’)’, ‘ ‘,]
for word in stopwords:
if word in value_list[i]:
value_list[i] = value_list[i].replace(word,””)
value_list = list(dict.fromkeys(value_list))
return value_list
Let’s check the condition:
d2 = {}
for i in range(len(tx_list)):
#create tx object
tx_obj = chain.tx_with_index(tx_list[i])
outputs_transactions = create_value_list(list(tx_obj.outputs))
#convert outputs values from str to int
outputs_transactions = list(map(int, outputs_transactions))
for outputs in outputs_transactions:
if outputs_transactions[0] > 1000 or outputs_transactions[1] > 1000:
d2[str(tx_list[i])] = 1
else:
d2[str(tx_list[i])] = 0
print(“number of transactions that verify the condition: “ + str(sum(map((1).__eq__, d2.values()))))
All transactions also verify the second part of the fourth condition.
Let’s see how many addresses, involved in these transactions, can be associated with the same person.
inputs_addresses_related = create_addresses_list(list(address_obj.input_txes.inputs))
outputs_addresses_related = create_addresses_list(list(address_obj.input_txes.outputs))
print(“Addresses related to seed address: “ + str(len(inputs_addresses_related) + len(outputs_addresses_related)))
Conclusions and Next Steps
Starting with a single address, which we are certain was involved in the CryptXXX ransomware, we found 464 addresses used by the same person. the heuristics we used to assign ownership of multiple addresses to the same person are fairly accurate, but they are not error-free, mainly because methods may have been used to obfuscate the transaction history.
The next steps to perform a more complete analysis, should be to use tag-addressing techniques on the addresses, to understand how the stolen bitcoins were used. Although ransoms for ransomware attacks are demanded in bitcoin, criminals who set up these activities will almost certainly try to withdraw the illicit funds using an exchange. This means that by knowing the addresses of the exchanges, it shouldn’t be difficult to trace the stolen funds and figure out where, how and when the stolen bitcoins were sold.
At PlutoHash we are looking for talented data scientists who want to get their hands on raw data parsed by the blockchain. If you’re interested, you can join us by signing up for our Beta Tester Program.