Объединение всех видов генерации. Оркестрация генерации в цельный генератор транзакций¶

  • ноутбуки лучше просматривать на Github pages, т.к. при просмотре прямо в репозитории могут быть проблемы с отображением, например, обрезка вывода с широкими датафреймами. Если в адресной строке браузера есть iaroslav-dzh.github.io, то вы уже на Github pages.
    Ссылки:
    • Ссылка на этот ноутбук
    • Ссылка на страницу со всеми ноутбуками

Информация о ноутбуке

  • объединение в цельный генератор происходит в скрипте run_generator.py - ссылка в Github - это входная точка в запуск генератора. В этом ноутбуке будут приведены два новых инструмента употребляемых в run_generator.py, ими являются:
    • валидатор некоторых конфигураций, указанных в yaml файлах - класс ConfigsValidator
    • класс AllTxnsRecorder - сборка файлов с транзакциями созданными при генерации каждого раздела транзакций - легальные, compromised fraud, дропы обоих типов - в единый датафрейм и запись в файл в двух копиях
  • также будет приведен и сам скрипт run_generator.py
  • это последний ноутбук описывающий генерацию транзакций

In [1]:
import os
import pandas as pd
In [2]:
from data_generator.utils import load_configs
In [3]:
os.chdir("..")
In [6]:
# Базовые конфиги
base_cfg = load_configs("./config/base.yaml")
# Настройки легальных транзакций
legit_cfg = load_configs("./config/legit.yaml")
# Общие настройки фрода
fraud_cfg = load_configs("./config/fraud.yaml")
# Настройки compromised client фрода
compr_cfg = load_configs("./config/compr.yaml")
# Настройки для дроп фрода
drops_cfg = load_configs("./config/drops.yaml")
# Настройки времени
time_cfg = load_configs("./config/time.yaml") 

# Пути к файлам
data_paths = base_cfg["data_paths"]

# директория текущего запуска генератора. Возьмем из предыдущего ноутбука.
# Т.к. нужны транз-ции сгенерировнные всеми разделами

run_dir = './data/generated/history/generation_run_2025-07-25_121029'

Дополнительные инструменты цельной генерации¶


1. Валидатор конфигураций ConfigsValidator¶

  • модуль data_generator.validator. Ссылка на исходный код в Github

Основной функционал

  • валидация некоторых конфигураций в yaml файлах из папки configs/.
  • подразумевается использование в начале запуска генератора перед генерацией транзакций, чтобы избежать случаев когда часть данных сгенерируется, а потом из-за невыполнимых конфигов будет ошибка и генерация не завершится, но время будет потрачено. Особенно это актуально если успешно сгенерировались легальные транзакции, которые генерируются самыми первыми и дольше всего, при том значительно дольше других разделов генерации; а потом ошибка в генерации compromised фрода или дроп фрода из-за того что их конфиги невыполнимы.
    • Например, легальные транзакции и compromised fraud использовали всех клиентов под свою генерацию либо просто слишком много и для выставленного в конфигах желаемого процента дроп фрода, клиентов уже не хватает, следовательно, выходит ошибка, но время уже потрачено на генерацию легальных и compromised, и приходится менять конфиги и запускать заново. И если конфиги снова невыполнимы, то снова тратится время на генерацию прежде чем все "упадет".
    • также тексты ошибок валидатора помогают быстрее понять, что дело в конфигах и в каких именно

Примеры частей валидации:

  • валидация compromised client fraud rate. Клиентов под compromised фрод мы берем из числа клиентов использованных для генерации легальных транзакций. Но в отличии от легальных транзакций, где на одного клиента много транзакций, что значит что на 3000 клиентов можно сгенерировать например 10000 транзакций; в compromised фроде число транзакций немногим больше числа клиентов т.к. генерация под все антифрод правила кроме одного подразумевает 1 транзакцию на 1 клиента. Если fraud rate в конфигах слишком высок или входящий в него compromised fraud rate слишком высок или и то и другое, то может случиться, что клиентов для фрода, нужно больше чем было задействовано клиентов под легальные транзакции.
  • валидация drop rate. Под дропов берутся клиенты не задействованные в генерации легальных и compromised fraud транзакций. Следовательно, может быть так, что легальные и compromised возьмут слишком много клиентов и для генерации drop rate процента транзакций клиентов уже не хватит

Демонстрация ConfigsValidator¶

  • просто один метод
In [2]:
from data_generator.validator import ConfigsValidator

cfg_validator = ConfigsValidator(base_cfg, legit_cfg, fraud_cfg, drops_cfg)

Метод validate_drops_rate()¶

  • не хватает клиентов под дроп фрод
  • ниже в тексте ошибки предлагается снизить кол-во легальных транзакций в конфигах. Это чтобы уменьшить кол-во клиентов которые требуются под генерацию большого кол-ва легальных транзакций
  • также важно отметить, что все абсолютные числа кол-ва клиентов необходимых для того или иного раздела генерации примерные т.к. начиная с генерации легальных транзакций кол-во на каждого клиента определяется по нормальному обрезанному распределению, среднее и станд. откл. которого указывается в конфигах. То есть число легальных транзакций жестко не определено, отсюда кол-во транзакций под выставленный fraud rate считается уже исходя из фактического кол-ва легальных транз-ций. Для валидации конфигов т.к. неизвестно сколько сгенерируется легальных транзакций, берется максимально возможное кол-во легальных транзакций, учитывая вероятность возможного "переполнения" - вероятность что транз. сгенерируется больше чем расчитанное макс. возможное кол-во. Т.е. если допустимая вероятность "переполнения" выставленная в конфигах как 0.001, то это значит, что мы возьмем макс. возможное число лег. транз. как 99,999 перцентиль в распределении возможного числа лег. транз.
In [3]:
cfg_validator.validate_drops_rate()

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[3], line 1
----> 1 cfg_validator.validate_drops_rate()

File ~\My_documents\Education\projects\fraud_detection_01\data_generator\validator.py:199, in ConfigsValidator.validate_drops_rate(self)
    197 if remainder < total_drops:
    198     print("\n")
--> 199     raise ValueError(f"""Total clients number needed for drop fraud generation
    200     exceeds the available clients number.
    201     Clients number needed for drops: {total_drops}
    202     Available clients: {remainder}
    203     Legit txns clients: {legit_clients}
    204     Compr fraud clients: {compr_clients}
    205     Please do one or both:
    206     1. Reduce legit txns number.
    207     2. Reduce total fraud rate.""")
    209 if total_drops <= 0:
    210     raise ValueError(f"""\nTotal drops number cannot be 0 or lower.
    211     Total estimated drops: {total_drops}
    212     Please do one of below or combine:
    213     1. Increase drop fraud rates
    214     2. Increase total fraud rate
    215     3. Increase legit txns number""")

ValueError: Total clients number needed for drop fraud generation
            exceeds the available clients number.
            Clients number needed for drops: 2323
            Available clients: 350
            Legit txns clients: 5000
            Compr fraud clients: 19
            Please do one or both:
            1. Reduce legit txns number.
            2. Reduce total fraud rate.



2. Запись всех созданных транзакций в единый файл. Класс AllTxnsRecorder¶

  • модуль data_generator.recorder. Ссылка на исходный код в Github

Основной функционал

  • сборка целого датафрейма из всех созданных транзакций: легальных, фрода всех типов
  • запись собранного датафрейма в файл в двух директориях: индивидуальную под текущую генерацию и под последнюю генерацию

Основная логика

  • когда сгенерированы транзакции всех разделов:
    1. читает транзакции из всех отдельных файлов каждого раздела
    2. создает из них единый датафрейм и сортирует по времени транзакции
    3. пишет датафрейм в parquet файл в две директории

Демонстрация AllTxnsRecorder¶

  • просто соберем из файлов всех сгенерированных ранее транзакций один датафрейм и запишем в файл через метод build_and_write(), а затем сверим размер цельного датафрейма с суммой размеров всех отдельных датафреймов
In [7]:
from data_generator.recorder import AllTxnsRecorder

all_recorder = AllTxnsRecorder(base_cfg, legit_cfg, compr_cfg, drops_cfg, run_dir)

1. Прочитаем и запишем в переменные одиночные файлы с транзакциями¶

  • для сверки суммарной длины одиночных файлов и цельного файла
In [9]:
# Легальные транзакции. 
# соберем полный путь. run_dir определен в начале ноутбука, остальное из конфигов
leg_storage = legit_cfg["data_storage"]
leg_folder = leg_storage["folder_name"] # поддиректория для лег. транз.
leg_file = leg_storage["files"]["txns"] # название файла с лег. транз
leg_path = os.path.join(run_dir, leg_folder, leg_file)

leg_txns = pd.read_parquet(leg_path)
leg_txns.head(2)
Out[9]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 3937 2025-01-01 00:02:00 1735689720 1878.0 purchase POS grocery_pos False 989.0 Пермь 58.045040 56.170369 not applicable NaN NaN False False approved not applicable
1 4275 2025-01-01 00:27:00 1735691220 1000.0 purchase POS gas_transport False 727.0 Тамбов 52.715932 41.465163 not applicable NaN NaN False False approved not applicable
In [10]:
# compromised fraud транзакции.
compr_storage = compr_cfg["data_storage"]
compr_folder = compr_storage["folder_name"] # поддиректория
compr_file = compr_storage["files"]["txns"] # название файла
compr_path = os.path.join(run_dir, compr_folder, compr_file)

compr_txns = pd.read_parquet(compr_path)
compr_txns.head(2)
Out[10]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 1114 2025-01-20 11:13:47 1737371627 24741.85 purchase ecom shopping_net True 6949.0 Иваново 56.999468 40.972823 5.8.23.242 12408.0 NaN True False declined new_ip_and_device_high_amount
1 1125 2025-01-18 18:47:28 1737226048 19148.95 purchase ecom misc_net True 6835.0 Новокузнецк 53.794276 87.214405 5.8.0.153 14458.0 NaN True False declined new_device_and_high_amount
In [12]:
# транзакции дропов распределителей
dist_storage = drops_cfg["distributor"]["data_storage"]
dist_folder = dist_storage["folder_name"] # поддиректория
dist_file = dist_storage["files"]["txns"] # название файла
dist_path = os.path.join(run_dir, dist_folder, dist_file)

dist_txns = pd.read_parquet(dist_path)
dist_txns.head(2)
Out[12]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 2101 2025-01-15 05:25:00 1736918700 15600.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 11986.0 False False approved not applicable
1 2101 2025-01-15 06:12:00 1736921520 9000.0 outbound transfer not applicable True NaN Волгоград 48.7072 44.517021 2.60.7.195 3548.0 22668.0 False False approved not applicable
In [13]:
# транзакции дропов покупателей
purch_storage = drops_cfg["purchaser"]["data_storage"]
purch_folder = purch_storage["folder_name"] # поддиректория
purch_file = purch_storage["files"]["txns"] # название файла
purch_path = os.path.join(run_dir, purch_folder, purch_file)

purch_txns = pd.read_parquet(purch_path)
purch_txns.head(2)
Out[13]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 454 2025-01-06 07:06:00 1736147160 31300.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10432.0 False False approved not applicable
1 454 2025-01-06 08:14:00 1736151240 21000.0 purchase ecom shopping_net True 6919.0 Саратов 51.530376 45.953026 2.60.1.177 763.0 NaN False False approved not applicable

2. сборка всех транзакций в целый датафрейм и запись в один файл через метод build_and_write()¶

  • затем прочитаем этот целый файл
In [14]:
all_recorder.build_and_write()
Building and writing all txns dataframe... completed.          

Читаем

  • файл находится прямо в директории текущего запуска генератора run_dir
In [17]:
base_storage = base_cfg["data_storage"]
all_txns_file = base_storage["files"]["all_txns"] # название файла
all_txns_path = os.path.join(run_dir, all_txns_file)

all_txns = pd.read_parquet(all_txns_path)
all_txns.head(2)
Out[17]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 3937 2025-01-01 00:02:00 1735689720 1878.0 purchase POS grocery_pos False 989.0 Пермь 58.045040 56.170369 not applicable NaN NaN False False approved not applicable
1 4275 2025-01-01 00:27:00 1735691220 1000.0 purchase POS gas_transport False 727.0 Тамбов 52.715932 41.465163 not applicable NaN NaN False False approved not applicable

3. Сверяем¶

In [20]:
# суммарная длина датафреймов из отдельных файлов
sep_file_len_sum = sum([leg_txns.shape[0], compr_txns.shape[0], dist_txns.shape[0], purch_txns.shape[0]])
sep_file_len_sum
Out[20]:
19903
In [23]:
# длина датафрейма из единого файла
whole_df_len = all_txns.shape[0]

assert sep_file_len_sum == whole_df_len, "Суммарная длина отдельных датафреймов не равна длине единого датафрейма."
print(f"""Длина единого датафрейма равна суммарной длине отдельных датафреймов.
Длина: {whole_df_len}""")
Длина единого датафрейма равна суммарной длине отдельных датафреймов.
Длина: 19903



Оркестрация разделов генерации в единый генератор транзакций¶


Единая точка входа. Скрипт запуска генератора run_generator.py¶

  • Ссылка на исходный код в Github

Основной функционал

  • создание индивидуальной директории под текущий запуск генератора
  • запуск всех разделов генерации: легальные транз., compromised fraud, дропы распределители, дропы покупатели
  • сборка отдельных файлов с транзакциями каждого раздела в один датафрейм и запись его в единый файл

Основная логика

  1. загрузка всех конфигов из config/
  2. валидация конфигов при помощи ConfigsValidator
  3. создание директории под текущий запуск
  4. поочередное создание объектов оркестраторов каждого раздела генерации и запуск генерации для каждого раздела. Порядок:
    • создать объект LegitRunner, запустить генерацию легальных транзакций
    • после, создать объект ComprRunner, запустить генерацию compromised fraud транзакций
    • после, создать объект DropsRunner для дропов распределителей, запустить генерацию транзакций
    • после, создать объект DropsRunner для дропов покупателей, запустить генерацию транзакций
  5. создание объекта AllTxnsRecorder на основании конфиг файлов и пути директории текущего запуска
  6. сборка единого датафрейма и запись в файл в две директории: текущего запуска и последнего запуска при помощи метода AllTxnsRecorder.build_and_write()
  7. Вывод сообщения о том где искать конечный файл со сгенерированными транзакциями
In [1]:
from pathlib import Path

from data_generator.utils import load_configs
from data_generator.validator import ConfigsValidator 
from data_generator.runner.utils import make_dir_for_run
from data_generator.runner.legit import LegitRunner
from data_generator.runner.compr import ComprRunner
from data_generator.runner.drops import DropsRunner
from data_generator.recorder import AllTxnsRecorder

# Общие настройки
base_cfg = load_configs("./config/base.yaml")
# Настройки легальных транзакций
legit_cfg = load_configs("./config/legit.yaml")
# Общие настройки фрода
fraud_cfg = load_configs("./config/fraud.yaml")
# Настройки compromised client фрода
compr_cfg = load_configs("./config/compr.yaml")
# Настройки времени
time_cfg = load_configs("./config/time.yaml")
# Настройки дроп фрода
drop_cfg = load_configs("./config/drops.yaml")


# Валидация основных конфигов перед началом генерации
cfg_validator = ConfigsValidator(base_cfg=base_cfg, legit_cfg=legit_cfg, \
                                 fraud_cfg=fraud_cfg, drop_cfg=drop_cfg)
cfg_validator.validate_all()

# Создаем папку под файлы текущей генерации
run_dir = make_dir_for_run(base_cfg=base_cfg)

# Генерация легальных транзакций
legit_runner = LegitRunner(base_cfg=base_cfg, legit_cfg=legit_cfg, \
                           time_cfg=time_cfg, run_dir=run_dir)
legit_runner.run()


# Генерация compromised client fraud транзакций
compr_runner = ComprRunner(base_cfg=base_cfg, legit_cfg=legit_cfg, \
                           time_cfg=time_cfg, fraud_cfg=fraud_cfg, \
                           compr_cfg=compr_cfg, run_dir=run_dir)
compr_runner.run()


# Генерация фрода дропов распределителей 
dist_drops_runner = DropsRunner(base_cfg=base_cfg, legit_cfg=legit_cfg, \
                                time_cfg=time_cfg, fraud_cfg=fraud_cfg, \
                                drops_cfg=drop_cfg, run_dir=run_dir, \
                                drop_type="distributor")
dist_drops_runner.run()


# Генерация фрода дропов покупателей 
purch_drops_runner = DropsRunner(base_cfg=base_cfg, legit_cfg=legit_cfg, \
                                time_cfg=time_cfg, fraud_cfg=fraud_cfg, \
                                drops_cfg=drop_cfg, run_dir=run_dir, \
                                drop_type="purchaser")
purch_drops_runner.run()


# Сборка созданных транзакций в один датафрейм и запись в один файл в двух директориях
recorder = AllTxnsRecorder(base_cfg=base_cfg, legit_cfg=legit_cfg, compr_cfg=compr_cfg, \
                           drops_cfg=drop_cfg, run_dir=run_dir)

recorder.build_and_write()

# Вывод сообщения о том куда сохранены сгенерированные транзакции
latest_path = Path(base_cfg["data_paths"]["generated"]["latest"])
print(f"""\n
Generated files are located in {run_dir} - individual folder for this run.
And in {latest_path} - contains files of the last run only.""")
input("\nPress Enter to exit...")



Конец описания генерации транзакций¶

  • на этом описание процесса генерации транзакций закончено. Следующий ноутбук будет о загрузке созданных транзакций и других таблиц в базу данных SQL. БД планируется использовать при создании веб-приложения для просмотра транзакций и фрода.