Дропы. Симуляция активности¶
- ноутбуки лучше просматривать на Github pages, т.к. при просмотре прямо в репозитории могут быть проблемы с отображением, например, обрезка вывода с широкими датафреймами. Если в адресной строке браузера есть
iaroslav-dzh.github.io
, то вы уже на Github pages.
Ссылки:
Информация о ноутбуке
- это последний ноутбук о генерации дропов. Тут объекты собирающие все воедино из прошлых двух ноутбуков:
- обработчик одной партии полученных дропом денег
DropBatchHandler
- генератор полного "жизненного цикла" дропа
DropLifecycleManager
- от первой транзакции до последней транзакции когда дроп заблокирован - конечный симулятор активности множества дропов
DropSimulator
- проходит через всех клиентов выбранных под дроп фрод и генерирует полный "жизненный цикл" для каждого - оркестратор генерации дропов
DropsRunner
- сборка нужных классов и запуск генерации
- обработчик одной партии полученных дропом денег
import pandas as pd
import numpy as np
import os
import pyarrow
import yaml
from typing import Union
from pathlib import Path
from data_generator.general_time import *
from data_generator.utils import create_txns_df, load_configs
from data_generator.configs import DropDistributorCfg, DropPurchaserCfg
from data_generator.fraud.drops.build.config import DropConfigBuilder
from data_generator.fraud.drops.build.builder import DropBaseClasses
from data_generator.fraud.drops.txns import CreateDropTxn
os.chdir("..")
os.getcwd()
'C:\\Users\\iaros\\My_documents\\Education\\projects\\fraud_detection_01'
# Базовые конфиги
base_cfg = load_configs("./config/base.yaml")
# Настройки легальных транзакций
legit_cfg = load_configs("./config/legit.yaml")
# Общие настройки фрода
fraud_cfg = load_configs("./config/fraud.yaml")
# Настройки для дроп фрода
drops_cfg = load_configs("./config/drops.yaml")
# Настройки времени
time_cfg = load_configs("./config/time.yaml")
# Пути к файлам
data_paths = base_cfg["data_paths"]
# директория текущего запуска генератора. Возьмем из предыдущего ноутбука. Т.к. нужны данные того, что сгенерировано до дроп фрода
# предварительно удалены папки дропов dist_drops и purch_drops т.к. при создании объектов конфиг классов они будут созданы заново
run_dir = './data/generated/history/generation_run_2025-07-25_121029'
Симуляция активности одного дропа¶
1. Обработка полученной дропом партии денег. Класс DropBatchProcessor
¶
- модуль
data_generator.fraud.drops.processor
. Ссылка на исходный код в Github
Основной функционал
- обработка полученной дропом партии (батча) денег
- определение должны ли текущая и последующие транзакции быть отклонены
Основная логика
- метод
distributor()
- обработать партию денег для дропа распределителя- генерирует исходящие транзакции пока:
- либо баланс не будет равен 0
- либо пока транзакции не будут отклоняться. Но также в этом методе берется случайное число попыток на операции после первой откл. транз. Т.е. транзакции могут быть отклонены, но генерация продолжится если и пока попытки не равны 0. Попытки вычитаются с каждой операцией совершенной после первой отклоненной транз-ции
- каждую операцию случайно выбирается будет ли это перевод/снятие или покупка криптовалюты
- перед каждой операцией идет проверка должна ли она быть отклонена. Операции отклоняются если достигнут абсолютный лимит вх. или исх. транз-ций. Лимиты выставлются в
drops.yaml
- генерирует исходящие транзакции пока:
- метод
purchaser()
- обработать партию денег для дропа покупателя- логика прекращения операций, управления попытками, и проверки на то, что транз. должна быть отклонена - такие же как и в
distributor()
- главная разница с
distributor()
в том что тут всегда только покупки, без других возможных вариантов
- логика прекращения операций, управления попытками, и проверки на то, что транз. должна быть отклонена - такие же как и в
- метод
process_batch()
оркестрируетdistributor()
иpurchaser()
методы. Вызывает нужный метод в зависимости от типа дропа. Тип дропа приходит из объектаDropBaseClasses
класса, когда создается объектDropBatchProcessor
. Т.е. под каждый тип дропа создается свой объектDropBatchProcessor
class DropBatchHandler:
"""
Обработка полученной дропом партии (батча) денег
---------------------------
drop_type: str. 'distributor' или 'purchaser'
amt_hand: DropAmountHandler. Генератор сумм входящих/исходящих транзакций, сумм снятий.
Управление балансом текущего дропа.
behav_hand: DistBehaviorHandler | PurchBehaviorHandler.
Управление поведением дропа: распределителя или покупателя.
create_txn: CreateDropTxn. Создание транзакций.
declined: bool. По умолчанию False. Отклоняются ли транзакции.
txns_fm_batch: list. Транзакции дропа в текущем батче.
"""
def __init__(self, base: DropBaseClasses, create_txn: CreateDropTxn):
"""
base: DropBaseClasses. Объекты основных классов для дропов.
create_txn: CreateDropTxn. Создание транзакций.
"""
self.drop_type = base.drop_type
self.amt_hand = base.amt_hand
self.behav_hand = base.behav_hand
self.create_txn = create_txn
self.declined = False
self.txns_fm_batch = []
def should_decline(self):
"""
Проверка будет ли отклонена транзакция.
Возвращает True или False в зависимости от достижения лимитов.
Также записывает это значение в self.declined
"""
self.declined = self.create_txn.limit_reached()
return self.declined
def reset_cache(self, all=False):
"""
Сброс кэша.
--------
all: bool. Если True то сбрасывает атрибуты: txns_fm_batch, declined.
Если False то declined не сбрсывает
Также передается в методы классов:
DistBehaviorHandler | PurchBehaviorHandler,
DropAmountHandler.
"""
behav_hand = self.behav_hand
amt_hand = self.amt_hand
self.txns_fm_batch = []
amt_hand.reset_cache(all=all)
behav_hand.reset_cache(all=all)
if not all:
return
self.declined = False
def distributor(self):
"""
Обработка партии(батча) денег полученных дропом
распределителем.
"""
behav_hand = self.behav_hand
amt_hand = self.amt_hand
create_txn = self.create_txn
while amt_hand.balance > 0:
declined = self.should_decline() # будет ли отклонена транзакция
behav_hand.guide_scenario()
if behav_hand.to_crypto: # перевод на криптобиржу или нет
txn_out = create_txn.purchase(declined=declined)
else: # Иначе перевод/снятие
to_drop = behav_hand.to_drop # Пробовать ли перевести другому дропу.
txn_out = create_txn.trf_or_atm(receive=False,
to_drop=to_drop, declined=declined)
# Добавляем в список транз-ций батча
self.txns_fm_batch.append(txn_out)
# Сколько попыток будет после первой откл. транз-ции
behav_hand.attempts_after_decline()
# Если это не первая отклоненная транзакция, то вычитаем попытку
# совершить транзакцию после отклонения
behav_hand.deduct_attempts()
# Решение об остановке после отклоненной транзакции
if behav_hand.stop_after_decline():
break
def purchaser(self):
"""
Обработка партии(батча) денег полученных дропом
покупателем.
"""
behav_hand = self.behav_hand
amt_hand = self.amt_hand
create_txn = self.create_txn
while amt_hand.balance > 0:
declined = self.should_decline() # будет ли отклонена транзакция
txn_out = create_txn.purchase(declined=declined)
self.txns_fm_batch.append(txn_out)
# Сколько попыток будет после первой откл. транз-ции
behav_hand.attempts_after_decline()
# Если это не первая отклоненная транзакция, то вычитаем попытку
# совершить транзакцию после отклонения
behav_hand.deduct_attempts()
# Решение об остановке после отклоненной транзакции
if behav_hand.stop_after_decline():
break
def process_batch(self):
"""
Вызов соответствующего типу дропа метода для обработки
батча денег.
Метод выбирается исходя из self.drop_type.
---------
"""
drop_type = self.drop_type
if drop_type == "distributor":
self.distributor()
elif drop_type == "purchaser":
self.purchaser()
Демонстрация DropBatchProcessor
¶
from data_generator.fraud.drops.processor import DropBatchHandler
Функция для очищения кэшей на время демонстрации
def reset_caches(create_txn, behav_hand, amt_hand, time_hand, part_data, batch_hand):
create_txn.reset_cache(only_counters=False)
behav_hand.reset_cache(all=True)
amt_hand.reset_cache(all=True) # batch_txns здесь
time_hand.reset_cache()
part_data.reset_cache()
batch_hand.reset_cache(all=True)
1. Дропы распределители¶
- разобьем демо на две части согласно типам дропов т.к. для каждого типа нужно создавать отдельные объекты нескольких классов и тестировать отдельные методы
drop_cfg_build = DropConfigBuilder(base_cfg=base_cfg, legit_cfg=legit_cfg, fraud_cfg=fraud_cfg, drop_cfg=drops_cfg, \
time_cfg=time_cfg, run_dir=run_dir)
dist_configs = drop_cfg_build.build_dist_cfg()
dist_base = DropBaseClasses(drop_type="distributor", configs=dist_configs)
dist_base.build_all()
# Объекты базовых классов. Вынос в переменные для удобства
acc_hand = dist_base.acc_hand
amt_hand = dist_base.amt_hand
part_data = dist_base.part_data
time_hand = dist_base.time_hand
behav_hand = dist_base.behav_hand
# Класс генератор одиночных транзакций
create_txn = CreateDropTxn(configs=dist_configs, base=dist_base)
# Демонстрируемый класс
batch_hand = DropBatchHandler(base=dist_base, create_txn=create_txn)
# Нужно взять случайного клиента из конфиг класса и передать его данные в 2 базовых класса
drop_clients = dist_configs.clients
dist_client = list(drop_clients.itertuples(name="Client"))[5]
acc_hand.client_id = dist_client.client_id
part_data.client_info = dist_client
part_data.client_info
Client(Index=5, client_id=2995, birth_date=Timestamp('1966-06-09 00:00:00'), sex='female', region='Кировская', city='Киров', lat=58.6035313, lon=49.6679219, city_id=35, home_ip='2.60.11.20')
Метод distributor()
¶
- обработчик партии (батча) присланных дропу распределителю денег
Пример
- Искусственно зададим баланс вместо генерации случайной суммы через создание входящей транзакции. В конечном датафрейме, который получится не будет данных о входящей транзакции; будут просто циклы обработки "полученных" денег
- вызов метода
distributor()
также вложим в цикл, который прерывается после окончания работыdistributor()
если достигнут лимит по вх. или исх. транз-циям, что оперделяется внутриdistiributor()
и передается в атрибутbatch_hand.declined
внутри вызоваdistributor()
- вложением
distributor()
в цикл мы имитируем многократное получение дропом денег т.е. несколько циклов обработки батча денег, как это и будет реализовано в симуляции "жизненного цикла" дропа
# Получаем номер счета текущего дропа
acc_hand.get_account(own=True)
# Т.к. не будет генерации входящей транз-ции, то не будет случайного выбора времени первой транзакции
# в жизненном цикле дропа. А это время генерируется с первой входящей транзакцией.
# Поэтому зададим время вручную
start_time = pd.to_datetime("2025-07-02 11:15:00", format="%Y-%m-%d %H:%M:%S")
time_hand.last_unix = pd_timestamp_to_unix(start_time)
time_hand.start_unix = pd_timestamp_to_unix(start_time)
all_txns = []
# Запускаем batch_hand.distributor() пока один из циклов работы batch_hand.distributor()
# не закончится с изменением значения batch_hand.declined
while not batch_hand.declined:
# Задаем конкретный баланс - это как-будто бы и есть момент получения дропом денег
amt_hand.balance = 41000
# Задаем сценарий.
behav_hand.scen = "atm+transfer"
# Также прописываем распределение полученных денег частями, для разнообразия сумм
behav_hand.in_chunks = True
batch_hand.distributor()
txns_fm_batch = batch_hand.txns_fm_batch
all_txns.extend(txns_fm_batch)
batch_hand.reset_cache(all=False)
Итоговый датафрейм
- последние 3 транзакции отклонены. Это исходящие транзакции дропа, значит случайно сгенерированное кол-во попыток операций после первой отклоненной транзакции равно двум: первая отклоненная транз-ция + 2 попытки после неё = 3 отклоненных исходящих транзакции
all_txns_dist = pd.DataFrame(all_txns)
all_txns_dist
client_id | txn_time | unix_time | amount | type | channel | category | online | merchant_id | trans_city | trans_lat | trans_lon | trans_ip | device_id | account | is_fraud | is_suspicious | status | rule | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2995 | 2025-07-02 14:13:00 | 1751465580 | 13900.0 | withdrawal | ATM | not applicable | False | NaN | Киров | 58.603531 | 49.667922 | not applicable | NaN | 12835.0 | False | False | approved | not applicable |
1 | 2995 | 2025-07-02 14:51:00 | 1751467860 | 13000.0 | outbound | transfer | not applicable | True | NaN | Киров | 58.603531 | 49.667922 | 2.60.11.20 | 5081.0 | 16689.0 | False | False | approved | not applicable |
2 | 2995 | 2025-07-02 16:09:00 | 1751472540 | 14100.0 | outbound | transfer | not applicable | True | NaN | Киров | 58.603531 | 49.667922 | 2.60.11.20 | 5082.0 | 18515.0 | False | False | approved | not applicable |
3 | 2995 | 2025-07-02 16:40:00 | 1751474400 | 20900.0 | withdrawal | ATM | not applicable | False | NaN | Киров | 58.603531 | 49.667922 | not applicable | NaN | 12835.0 | False | False | approved | not applicable |
4 | 2995 | 2025-07-02 18:44:00 | 1751481840 | 6000.0 | outbound | transfer | not applicable | True | NaN | Киров | 58.603531 | 49.667922 | 2.60.11.20 | 5081.0 | 24860.0 | False | False | approved | not applicable |
5 | 2995 | 2025-07-03 14:13:00 | 1751551980 | 11000.0 | outbound | transfer | not applicable | True | NaN | Киров | 58.603531 | 49.667922 | 2.60.11.20 | 5082.0 | 19439.0 | False | False | approved | not applicable |
6 | 2995 | 2025-07-03 14:57:00 | 1751554620 | 3100.0 | outbound | transfer | not applicable | True | NaN | Киров | 58.603531 | 49.667922 | 2.60.11.20 | 5082.0 | 22496.0 | False | False | approved | not applicable |
7 | 2995 | 2025-07-03 16:58:00 | 1751561880 | 14100.0 | withdrawal | ATM | not applicable | False | NaN | Киров | 58.603531 | 49.667922 | not applicable | NaN | 12835.0 | False | False | approved | not applicable |
8 | 2995 | 2025-07-03 19:13:00 | 1751569980 | 14100.0 | outbound | transfer | not applicable | True | NaN | Киров | 58.603531 | 49.667922 | 2.60.11.20 | 5082.0 | 23129.0 | True | False | declined | drop_flow_cashout |
9 | 2995 | 2025-07-03 21:51:00 | 1751579460 | 10600.0 | outbound | transfer | not applicable | True | NaN | Киров | 58.603531 | 49.667922 | 2.60.11.20 | 5082.0 | 20110.0 | True | False | declined | drop_flow_cashout |
10 | 2995 | 2025-07-04 15:20:00 | 1751642400 | 7100.0 | purchase | crypto_exchange | balance_top_up | True | 6833.0 | Киров | 58.603531 | 49.667922 | 2.60.11.20 | 5081.0 | NaN | True | False | declined | drop_flow_cashout |
2. Дропы покупатели¶
- создаем объекты классов отдельно под дропов покупателей - purchasers
drop_cfg_build = DropConfigBuilder(base_cfg=base_cfg, legit_cfg=legit_cfg, fraud_cfg=fraud_cfg, drop_cfg=drops_cfg, \
time_cfg=time_cfg, run_dir=run_dir)
purch_configs = drop_cfg_build.build_purch_cfg()
purch_base = DropBaseClasses(drop_type="purchaser", configs=purch_configs)
purch_base.build_all()
# Объекты базовых классов. Вынос в переменные для удобства
acc_hand = purch_base.acc_hand
amt_hand = purch_base.amt_hand
part_data = purch_base.part_data
time_hand = purch_base.time_hand
behav_hand = purch_base.behav_hand
# Класс генератор одиночных транзакций
create_txn = CreateDropTxn(configs=purch_configs, base=purch_base)
# Демонстрируемый класс
batch_hand = DropBatchHandler(base=purch_base, create_txn=create_txn)
# Нужно взять случайного клиента из конфиг класса и передать его данные в 2 базовых класса
drop_clients = purch_configs.clients
purch_client = list(drop_clients.itertuples(name="Client"))[2]
acc_hand.client_id = purch_client.client_id
part_data.client_info = purch_client
part_data.client_info
Client(Index=2, client_id=295, birth_date=Timestamp('1964-03-20 00:00:00'), sex='male', region='Карелия', city='Петрозаводск', lat=61.78909, lon=34.3596263, city_id=12, home_ip='2.60.1.23')
Метод purchaser()
¶
- обработчик партии (батча) присланных дропу покупателю денег
- логика вся та же самая как и в демонстрации
distributor()
reset_caches(create_txn, behav_hand, amt_hand, time_hand, part_data, batch_hand)
acc_hand.reset_cache()
# Получаем номер счета текущего дропа
acc_hand.get_account(own=True)
# Т.к. не будет генерации входящей транз-ции, то не будет случайного выбора времени первой транзакции
# в жизненном цикле дропа. А это время генерируется с первой входящей транзакцией.
# Поэтому зададим время вручную
start_time = pd.to_datetime("2025-07-02 11:15:00", format="%Y-%m-%d %H:%M:%S")
time_hand.last_unix = pd_timestamp_to_unix(start_time)
time_hand.start_unix = pd_timestamp_to_unix(start_time)
all_txns = []
# Запускаем batch_hand.distributor() пока один из циклов работы batch_hand.distributor() не
# не закончится с изменением значения batch_hand.declined
while not batch_hand.declined:
# Задаем конкретный баланс - это как-будто бы и есть момент получения дропом денег
amt_hand.balance = 50000
# Задаем сценарий.
behav_hand.scen = "split_money"
# Также прописываем распределение полученных денег частями, для разнообразия сумм
behav_hand.in_chunks = True
batch_hand.purchaser()
txns_fm_batch = batch_hand.txns_fm_batch
all_txns.extend(txns_fm_batch)
batch_hand.reset_cache(all=False)
Итоговый датафрейм
- последние 4 транзакции отклонены. Это исходящие транзакции дропа, значит случайно сгенерированное кол-во попыток операций после первой отклоненной транзакции равно трем: первая отклоненная транз-ция + 3 попытки после неё = 4 отклоненных исходящих транзакции
all_txns_purch = pd.DataFrame(all_txns)
all_txns_purch
client_id | txn_time | unix_time | amount | type | channel | category | online | merchant_id | trans_city | trans_lat | trans_lon | trans_ip | device_id | account | is_fraud | is_suspicious | status | rule | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 295 | 2025-07-02 14:11:00 | 1751465460 | 30000.0 | purchase | ecom | shopping_net | True | 6963 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | False | False | approved | not applicable |
1 | 295 | 2025-07-02 15:52:00 | 1751471520 | 16000.0 | purchase | ecom | shopping_net | True | 6809 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | False | False | approved | not applicable |
2 | 295 | 2025-07-02 17:05:00 | 1751475900 | 3000.0 | purchase | ecom | shopping_net | True | 6832 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | False | False | approved | not applicable |
3 | 295 | 2025-07-02 18:15:00 | 1751480100 | 1000.0 | purchase | ecom | shopping_net | True | 6825 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | False | False | approved | not applicable |
4 | 295 | 2025-07-02 20:00:00 | 1751486400 | 20000.0 | purchase | ecom | shopping_net | True | 6896 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | False | False | approved | not applicable |
5 | 295 | 2025-07-03 14:05:00 | 1751551500 | 14000.0 | purchase | ecom | misc_net | True | 6902 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | False | False | approved | not applicable |
6 | 295 | 2025-07-03 15:07:00 | 1751555220 | 14000.0 | purchase | ecom | shopping_net | True | 6812 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | False | False | approved | not applicable |
7 | 295 | 2025-07-03 15:38:00 | 1751557080 | 2000.0 | purchase | ecom | shopping_net | True | 6906 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | False | False | approved | not applicable |
8 | 295 | 2025-07-03 18:04:00 | 1751565840 | 38000.0 | purchase | ecom | shopping_net | True | 6879 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | True | False | declined | drop_purchaser |
9 | 295 | 2025-07-03 20:39:00 | 1751575140 | 28500.0 | purchase | ecom | misc_net | True | 6858 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | True | False | declined | drop_purchaser |
10 | 295 | 2025-07-04 16:49:00 | 1751647740 | 19000.0 | purchase | ecom | shopping_net | True | 6960 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | True | False | declined | drop_purchaser |
11 | 295 | 2025-07-04 17:36:00 | 1751650560 | 9500.0 | purchase | ecom | shopping_net | True | 6872 | Петрозаводск | 61.78909 | 34.359626 | 2.60.1.23 | 496 | NaN | True | False | declined | drop_purchaser |
2. Симуляция полного жизненного цикла дропа. Класс DropLifecycleManager
¶
- модуль
data_generator.fraud.drops.simulator
. Ссылка на исходный код в Github
Основная логика
- создается объект
DropLifecycleManager
- берется один клиент под дроп фрод при помощи передачи данных клиента и ID клиентав атрибуты
DropTxnPartData.client_info
иDropAccountHandler.client_id
- внутри метода
DropLifecycleManager.run_drop_lifecycle()
:- получаем счет текущего клиента
- помечаем текущего клиента как дропа в таблице
accounts
в атрибутеDropAccountHandler.accounts
- переводим дропу деньги - создаем вх. транз-цию и добавляем в список транзакций текущего дропа. Перед этим проверяем будет ли входящая транзакция отклонена, но при первой транзакции такого не бывает.
- Если транзакция отклонена, то генерация прерывается. Но для первой транзакции это не актуально.
- случайно выбираем сценарий, как дроп будет распределять полученные деньги
- в соответствии с выбранным сценарием определяем будут ли полученные деньги распределены на части или будет одна операция
- запускаем метод обработки батча полученных денег
DropBatchHandler.process_batch()
, который обрабатывает полученный батч в соответствии с типом дропа - сгенерированные при обработке батча транзакции добавляем в список транзакций текущего дропа
- снова возвращаемся к п.3 перевод новой партии денег дропу. Если при обработке предыдущего батча был достигнут абсолютный лимит транз-ций дропа, то входящая транзакция будет отклонена и жизненный цикл дропа закончится, будто после первой отклоненной входящей транзакции дропу больше не пытаются послать деньги
- если транзакция не отклонена, то снова запускается
DropBatchHandler.process_batch()
и так далее
class DropLifecycleManager:
"""
Управление полным жизненный циклом одного дропа.
------------------
drop_type: str. 'distributor' или 'purchaser'
acc_hand: DropAccountHandler. Генератор номеров счетов входящих/исходящих транзакций.
Учет использованных счетов.
amt_hand: DropAmountHandler. Генератор сумм входящих/исходящих транзакций, сумм снятий.
Управление балансом текущего дропа.
time_hand: DropTimeHandler.
Управление временем транзакций дропа.
behav_hand: DistBehaviorHandler | PurchBehaviorHandler.
Управление поведением дропа: распределителя или покупателя.
part_data: DropTxnPartData.
Генерация части данных о транзакции дропа.
behav_hand: DistBehaviorHandler | PurchBehaviorHandler.
Управление поведением дропа: распределителя или покупателя.
create_txn: CreateDropTxn. Создание транзакций.
batch_hand: DropBatchHandler. Обработка полученной дропом партии (батча) денег
drop_txns: list. Созданные транзакции дропа.
"""
def __init__(self, base: DropBaseClasses, create_txn: CreateDropTxn):
"""
base: DropBaseClasses. Объекты основных классов для дропов.
create_txn: CreateDropTxn. Создание транзакций.
"""
self.drop_type = base.drop_type
self.acc_hand = base.acc_hand
self.amt_hand = base.amt_hand
self.time_hand = base.time_hand
self.part_data = base.part_data
self.behav_hand = base.behav_hand
self.create_txn = create_txn
self.batch_hand = DropBatchHandler(base=base, create_txn=create_txn)
self.drop_txns = []
def reset_all_caches(self):
"""
Сброс кэшей когда активность дропа закончена совсем
"""
# Сброс всего кэша batch_hand включает в себя полный сброс кэша
# в behav_hand и amt_hand
self.batch_hand.reset_cache(all=True)
self.time_hand.reset_cache()
self.part_data.reset_cache()
self.create_txn.reset_cache()
self.drop_txns = []
def run_drop_lifecycle(self):
# создать счет дропа, записать is_drop = True в таблице acc_hand.accounts
acc_hand = self.acc_hand
# получить номер счета дропа. Пишется в атрибут acc_hand.account
acc_hand.get_account(own=True)
acc_hand.label_drop() # помечаем клиента как дропа в таблице acc_hand.accounts
behav_hand = self.behav_hand
batch_hand = self.batch_hand
create_txn = self.create_txn
while True:
declined = batch_hand.should_decline() # статус транзакции. будет ли она отклонена
# входящая транзакция. Новый батч денег.
receive_txn = create_txn.trf_or_atm(declined=declined, \
to_drop=False, receive=True)
drop_txns = self.drop_txns
drop_txns.append(receive_txn)
# если у дропа достигнут лимит то транзакции отклоняются.
# Если входящая отклонена, дропу больше не пытаются послать деньги
if declined:
break
behav_hand.sample_scenario() # выбрать сценарий
behav_hand.in_chunks_val() # транзакции по частям или нет
batch_hand.process_batch() # обработка полученного батча
txns_fm_batch = batch_hand.txns_fm_batch
drop_txns.extend(txns_fm_batch)
# сброс кэша после завершения обработки батча
batch_hand.reset_cache(all=False)
ДемонстрацияDropLifecycleManager
¶
Импорт нужных классов и создание объектов
- демонстрация на примере дропа распределителя. Ограничимся этим типом дропов т.к. у покупателей та же самая логика
from data_generator.fraud.drops.build.config import DropConfigBuilder
from data_generator.fraud.drops.build.builder import DropBaseClasses
from data_generator.fraud.drops.txns import CreateDropTxn
from data_generator.fraud.drops.simulator import DropLifecycleManager
drop_cfg_build = DropConfigBuilder(base_cfg=base_cfg, legit_cfg=legit_cfg, fraud_cfg=fraud_cfg, drop_cfg=drops_cfg, \
time_cfg=time_cfg, run_dir=run_dir)
dist_configs = drop_cfg_build.build_dist_cfg()
dist_base = DropBaseClasses(drop_type="distributor", configs=dist_configs)
dist_base.build_all()
# Объекты базовых классов. Вынос в переменные для удобства
acc_hand = dist_base.acc_hand
amt_hand = dist_base.amt_hand
part_data = dist_base.part_data
time_hand = dist_base.time_hand
behav_hand = dist_base.behav_hand
# Класс генератор одиночных транзакций
create_txn = CreateDropTxn(configs=dist_configs, base=dist_base)
# обработчик получаемых батчей денег
batch_hand = DropBatchHandler(base=dist_base, create_txn=create_txn)
# Демонстрируемый класс
life_manager = DropLifecycleManager(base=dist_base, create_txn=create_txn)
# Нужно взять случайного клиента из конфиг класса и передать его данные в 2 базовых класса
drop_clients = dist_configs.clients
dist_client = list(drop_clients.itertuples(name="Client"))[5]
acc_hand.client_id = dist_client.client_id
part_data.client_info = dist_client
part_data.client_info
Client(Index=5, client_id=599, birth_date=Timestamp('2002-05-12 00:00:00'), sex='female', region='Москва', city='Москва', lat=55.7538789, lon=37.6203735, city_id=1, home_ip='2.60.2.59')
Метод run_drop_lifecycle()
# Запуск жизненного цикла
life_manager.run_drop_lifecycle()
drop_txns = life_manager.drop_txns
Итоговый датафрейм
- на этот раз обратите внимание что присутствуют записи о входящих транзакциях в отличие от демонстрации для
DropBatchHandler
- также среди отклоненных транзакций одна входящая (последняя запись)
- это и есть полный результат для одного дропа
all_txns_df = pd.DataFrame(drop_txns)
all_txns_df
client_id | txn_time | unix_time | amount | type | channel | category | online | merchant_id | trans_city | trans_lat | trans_lon | trans_ip | device_id | account | is_fraud | is_suspicious | status | rule | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 599 | 2025-01-02 15:12:00 | 1735830720 | 16800.0 | inbound | transfer | not applicable | True | NaN | not applicable | NaN | NaN | not applicable | NaN | 10570.0 | False | False | approved | not applicable |
1 | 599 | 2025-01-02 16:16:00 | 1735834560 | 9000.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1002.0 | 16537.0 | False | False | approved | not applicable |
2 | 599 | 2025-01-02 18:17:00 | 1735841820 | 6000.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1002.0 | 21733.0 | False | False | approved | not applicable |
3 | 599 | 2025-01-02 19:53:00 | 1735847580 | 1800.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1001.0 | 18426.0 | False | False | approved | not applicable |
4 | 599 | 2025-01-02 20:59:00 | 1735851540 | 30600.0 | inbound | transfer | not applicable | True | NaN | not applicable | NaN | NaN | not applicable | NaN | 10570.0 | False | False | approved | not applicable |
5 | 599 | 2025-01-03 15:29:00 | 1735918140 | 30600.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1001.0 | 20706.0 | False | False | approved | not applicable |
6 | 599 | 2025-01-03 17:53:00 | 1735926780 | 15700.0 | inbound | transfer | not applicable | True | NaN | not applicable | NaN | NaN | not applicable | NaN | 10570.0 | False | False | approved | not applicable |
7 | 599 | 2025-01-03 19:03:00 | 1735930980 | 15700.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1002.0 | 22198.0 | False | False | approved | not applicable |
8 | 599 | 2025-01-03 19:49:00 | 1735933740 | 20400.0 | inbound | transfer | not applicable | True | NaN | not applicable | NaN | NaN | not applicable | NaN | 10570.0 | False | False | approved | not applicable |
9 | 599 | 2025-01-04 14:38:00 | 1736001480 | 20400.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1001.0 | 21100.0 | False | False | approved | not applicable |
10 | 599 | 2025-01-04 15:20:00 | 1736004000 | 57300.0 | inbound | transfer | not applicable | True | NaN | not applicable | NaN | NaN | not applicable | NaN | 10570.0 | False | False | approved | not applicable |
11 | 599 | 2025-01-04 17:53:00 | 1736013180 | 31300.0 | withdrawal | ATM | not applicable | False | NaN | Москва | 55.753879 | 37.620373 | not applicable | NaN | 10570.0 | False | False | approved | not applicable |
12 | 599 | 2025-01-04 20:46:00 | 1736023560 | 14000.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1002.0 | 22722.0 | False | False | approved | not applicable |
13 | 599 | 2025-01-04 23:38:00 | 1736033880 | 12000.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1001.0 | 16851.0 | True | False | declined | drop_flow_cashout |
14 | 599 | 2025-01-05 01:51:00 | 1736041860 | 9000.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1002.0 | 23711.0 | True | False | declined | drop_flow_cashout |
15 | 599 | 2025-01-05 15:22:00 | 1736090520 | 6000.0 | outbound | transfer | not applicable | True | NaN | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1002.0 | 22027.0 | True | False | declined | drop_flow_cashout |
16 | 599 | 2025-01-05 16:27:00 | 1736094420 | 3000.0 | purchase | crypto_exchange | balance_top_up | True | 6825.0 | Москва | 55.753879 | 37.620373 | 2.60.2.59 | 1002.0 | NaN | True | False | declined | drop_flow_cashout |
17 | 599 | 2025-01-05 17:48:00 | 1736099280 | 48100.0 | inbound | transfer | not applicable | True | NaN | not applicable | NaN | NaN | not applicable | NaN | 10570.0 | True | False | declined | drop_flow_cashout |
Симуляция активности множества дропов¶
Симулятор активности. Класс DropSimulator
¶
- модуль
data_generator.fraud.drops.simulator
. Ссылка на исходный код в Github
Основная логика
- итерирование через клиентов семплированных под определенный тип дроп фрода
- на каждой итерации симуляция полного жизненного цикла текущего дропа
- запись созданных транзакций дропа общий список всех транзакций дропов данного типа
- в конце запись измененного датафрейма
accounts
в файл в двух директориях. Запись нужна, чтобы при генерации другого типа дропов можно было узнать изменения т.к. там помечается кто из клиентов дроп; и, возможно для выгрузки актуальной таблицы в БД - запись сгенерированных транзакций в файл в две директории: созданную под текущий запуск генератора транзакций и директорию последнего запуска генератора
class DropSimulator:
"""
Генерация активности множества дропов.
------------------------
base_cfg: dict. Конфиги из base.yaml
drop_type: str. 'distributor' или 'purchaser'
drop_clients: pd.DataFrame. Клиенты которые будут дропами.
part_data: DropTxnPartData. Генерация части данных транзакции.
acc_hand: DropAccountHandler. Генератор номеров счетов входящих/исходящих
транзакций. Учет использованных счетов.
txn_recorder: FraudTxnsRecorder. Запись транзакций в файл.
life_manager: DropLifecycleManager. Управление полным жизненный циклом
одного дропа.
all_txns: list. Список для записи всех созданных транзакций.
txns_df: pd.DataFrame. Пустой датафрейм с колонками и проставленными типами
"""
def __init__(self, base_cfg, configs, base, create_txn, txn_recorder):
"""
base_cfg: dict. Конфиги из base.yaml
configs: DropDistributorCfg | DropPurchaserCfg.
Конфиги и данные для создания дроп транзакций.
base: Объекты основных классов для дропов.
create_txn: CreateDropTxn. Создание транзакций.
"""
self.base_cfg = base_cfg
self.drop_type = base.drop_type
self.drop_clients = configs.clients
self.part_data = base.part_data
self.acc_hand = base.acc_hand
self.txn_recorder = txn_recorder
self.txns_df = configs.transactions
self.life_manager = DropLifecycleManager(base=base, create_txn=create_txn)
self.run_dir = configs.run_dir
self.all_txns = []
def run(self):
"""
Полная генерация активности дропов соответсвующего типа
"""
drop_clients = self.drop_clients
part_data = self.part_data
acc_hand = self.acc_hand
life_manager = self.life_manager
all_txns = self.all_txns
txn_recorder = self.txn_recorder
# Итерируемся через семплированных клиентов под дроп
for client in drop_clients.itertuples():
# Запись данных текущего клиента в атрибуты
# некоторых классов
part_data.client_info = client
acc_hand.client_id = client.client_id
# Генерация полного цикла активности одного дропа
life_manager.run_drop_lifecycle()
# Запись транзакций дропа в общий список
drop_txns = life_manager.drop_txns
all_txns.extend(drop_txns)
# Сброс кэша дропа для следующей итерации
life_manager.reset_all_caches()
# Запись измененного датафрейма accounts в csv файл в двух экземплярах
# В папку data/generated/latest и в папку текущей генерации
accounts = acc_hand.accounts
gen_files = self.base_cfg["data_paths"]["generated"]
acc_path_01 = Path(self.run_dir) / "accounts.csv" # путь в директории текущей генерации
acc_path_02 = gen_files["accounts"] # путь в директории data/generated/latest
accounts.to_csv(acc_path_01, index=False)
accounts.to_csv(acc_path_02, index=False)
# Запись всех созданных транзакций дропов в parquet файл
txn_recorder.all_txns = pd.DataFrame(self.all_txns)
txn_recorder.write_to_file() # Это уже метод FraudTxnsRecorder
Демонстрация DropSimulator
¶
- сгенерируем активность дропов и прочитаем получившийся файл с их транзакциями
Метод run()
¶
- единственный метод класса
from data_generator.runner.drops import DropsRunner
# Продемонстриуем на дропах покупателях
drop_type = "purchaser"
drop_runner = DropsRunner(base_cfg=base_cfg, legit_cfg=legit_cfg, \
time_cfg=time_cfg, fraud_cfg=fraud_cfg, \
drops_cfg=drops_cfg, run_dir=run_dir, \
drop_type=drop_type)
drop_runner.run()
Purchaser drops generation... completed.
Соберем путь к файлу с транзакциями дропов покупателей
- одна часть это путь к директории текущего запуска генератора
- остальные две это название папки дропов покупателей и название файла с транзакциями дропов покупателей. Они хранятся в конфигах
drops.yaml
data_storage = drops_cfg["purchaser"]["data_storage"]
purch_folder_name = data_storage["folder_name"]
purch_txns_name = data_storage["files"]["txns"]
purch_txns_path = Path(run_dir, purch_folder_name, purch_txns_name) #os.path.join(run_dir, purch_folder_name, purch_txns_name)
purch_txns_path
WindowsPath('data/generated/history/generation_run_2025-07-25_121029/purch_drops/purch_drop_txns.parquet')
Читаем файл
purch_txns = pd.read_parquet(purch_txns_path)
print(f"Кол-во транзакций всего: {purch_txns.shape[0]}\n\n")
purch_txns.head()
Кол-во транзакций всего: 66
client_id | txn_time | unix_time | amount | type | channel | category | online | merchant_id | trans_city | trans_lat | trans_lon | trans_ip | device_id | account | is_fraud | is_suspicious | status | rule | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 454 | 2025-01-06 07:06:00 | 1736147160 | 31300.0 | inbound | transfer | not applicable | True | NaN | not applicable | NaN | NaN | not applicable | NaN | 10432.0 | False | False | approved | not applicable |
1 | 454 | 2025-01-06 08:14:00 | 1736151240 | 21000.0 | purchase | ecom | shopping_net | True | 6919.0 | Саратов | 51.530376 | 45.953026 | 2.60.1.177 | 763.0 | NaN | False | False | approved | not applicable |
2 | 454 | 2025-01-06 09:01:00 | 1736154060 | 8000.0 | purchase | ecom | misc_net | True | 6951.0 | Саратов | 51.530376 | 45.953026 | 2.60.1.177 | 762.0 | NaN | False | False | approved | not applicable |
3 | 454 | 2025-01-06 10:29:00 | 1736159340 | 2300.0 | purchase | ecom | shopping_net | True | 6786.0 | Саратов | 51.530376 | 45.953026 | 2.60.1.177 | 762.0 | NaN | False | False | approved | not applicable |
4 | 454 | 2025-01-06 11:30:00 | 1736163000 | 27800.0 | inbound | transfer | not applicable | True | NaN | not applicable | NaN | NaN | not applicable | NaN | 10432.0 | False | False | approved | not applicable |
Смотрим кол-во уникальных дропов в датафрейме транзакций
- при этом проверяем, что кол-во равно кол-ву дропов семплированных под генерацию
sampled_clients = drop_runner.configs.clients
assert purch_txns.client_id.nunique() == sampled_clients.shape[0]
purch_txns.client_id.nunique()
5
Запуск run()
для дропов распределителей
- это не для демонстрации. Просто, чтобы были созданы транзакции и для дропов распределителей. Чтобы был полный набор по типам транзакций для демонстрации в следующем ноутбуке
from data_generator.runner.drops import DropsRunner
# Продемонстриуем на дропах покупателях
drop_type = "distributor"
drop_runner = DropsRunner(base_cfg=base_cfg, legit_cfg=legit_cfg, \
time_cfg=time_cfg, fraud_cfg=fraud_cfg, \
drops_cfg=drops_cfg, run_dir=run_dir, \
drop_type=drop_type)
drop_runner.run()
Distributor drops generation... completed.
Оркестрация генерации дроп фрода¶
Класс DropsRunner
¶
модуль
data_generator.runner.drops
. Ссылка на исходный код в Githubоркестрация генерации дроп фрод транзакций указанного типа
это конечный уровень для дроп транзакций. Метод
run()
этого класса вызывается в файле запуска генератора всех транзакцийrun_generator.py
под каждый тип дропов создается свой объект
DropsRunner
DropsRunner
создает все объекты необходимых дляDropSimulator
классов, затем создает сам объектDropSimulator
и вызывает его методDropSimulator.run()
class DropsRunner:
"""
Запуск генерации транзакций дропов указанного типа.
---------
Атрибуты:
---------
base_cfg: dict. Конфиги из base.yaml
cfg_builder: DropConfigBuilder.
drop_type: str. Тип дропа: 'distributor' или 'purchaser'.
text: str. Текст для вставки в спиннер.
configs: DropDistributorCfg | DropPurchaserCfg.
Конфиги и данные для генерации дроп транзакций.
По умолчанию None. Создается при вызове self.build_sim().
base: DropBaseClasses. Создатель основных классов для генерации
дроп фрода. По умолчанию None. Создается при вызове self.build_sim().
txn_recorder: FraudTxnsRecorder. Запись транзакций в файл.
По умолчанию None. Создается при вызове self.build_sim().
drop_sim: DropSimulator. Генератор дроп фрода. По умолчанию None.
Создается при вызове self.build_sim().
"""
def __init__(self, base_cfg, legit_cfg, time_cfg, fraud_cfg, drops_cfg, run_dir, drop_type):
"""
base_cfg: dict. Конфиги из base.yaml
legit_cfg: dict. Конфиги из legit.yaml
time_cfg: dict. Конфиги из time.yaml
fraud_cfg: dict. Общие конфиги фрода из fraud.yaml
drops_cfg: dict. Конфиги для дроп фрода из drops.yaml
run_dir: str. Название директории для хранения сгенерированных
данных текущей генерации.
"""
self.base_cfg = base_cfg
self.cfg_builder = DropConfigBuilder(base_cfg=base_cfg, legit_cfg=legit_cfg, time_cfg=time_cfg, \
fraud_cfg=fraud_cfg, drop_cfg=drops_cfg, run_dir=run_dir)
self.drop_type = drop_type
self.text = f"{drop_type.capitalize()} drops generation"
self.configs = None
self.base = None
self.txn_recorder = None
self.drop_sim = None
def build_sim(self):
"""
Создать объект DropSimulator.
"""
drop_type = self.drop_type
base_cfg = self.base_cfg
if drop_type == "distributor":
self.configs = self.cfg_builder.build_dist_cfg()
elif drop_type == "purchaser":
self.configs = self.cfg_builder.build_purch_cfg()
self.base = DropBaseClasses(drop_type=drop_type, configs=self.configs)
self.base.build_all() # Создать объекты основных дроп классов
self.txn_recorder = FraudTxnsRecorder(configs=self.configs)
create_txn = CreateDropTxn(configs=self.configs, base=self.base)
self.drop_sim = DropSimulator(base_cfg=base_cfg, configs=self.configs, base=self.base, \
create_txn=create_txn, txn_recorder=self.txn_recorder)
@spinner_decorator
def run(self):
"""
Создать объект DropSimulator и запустить процесс генерации дроп фрода
для дропов указанного типа.
"""
self.build_sim()
self.drop_sim.run()