Дропы. Симуляция активности¶

  • ноутбуки лучше просматривать на Github pages, т.к. при просмотре прямо в репозитории могут быть проблемы с отображением, например, обрезка вывода с широкими датафреймами. Если в адресной строке браузера есть iaroslav-dzh.github.io, то вы уже на Github pages.
    Ссылки:
    • Ссылка на этот ноутбук
    • Ссылка на страницу генератора где есть все ноутбуки

Информация о ноутбуке

  • это последний ноутбук о генерации дропов. Тут объекты собирающие все воедино из прошлых двух ноутбуков:
    • обработчик одной партии полученных дропом денег DropBatchHandler
    • генератор полного "жизненного цикла" дропа DropLifecycleManager - от первой транзакции до последней транзакции когда дроп заблокирован
    • конечный симулятор активности множества дропов DropSimulator - проходит через всех клиентов выбранных под дроп фрод и генерирует полный "жизненный цикл" для каждого
    • оркестратор генерации дропов DropsRunner - сборка нужных классов и запуск генерации
In [66]:
import pandas as pd
import numpy as np
import os
import pyarrow
import yaml
from typing import Union
from pathlib import Path
In [7]:
from data_generator.general_time import *
from data_generator.utils import create_txns_df, load_configs
from data_generator.configs import DropDistributorCfg, DropPurchaserCfg
from data_generator.fraud.drops.build.config import DropConfigBuilder
from data_generator.fraud.drops.build.builder import DropBaseClasses
from data_generator.fraud.drops.txns import CreateDropTxn
In [2]:
os.chdir("..")
os.getcwd()
Out[2]:
'C:\\Users\\iaros\\My_documents\\Education\\projects\\fraud_detection_01'
In [10]:
# Базовые конфиги
base_cfg = load_configs("./config/base.yaml")
# Настройки легальных транзакций
legit_cfg = load_configs("./config/legit.yaml")
# Общие настройки фрода
fraud_cfg = load_configs("./config/fraud.yaml")
# Настройки для дроп фрода
drops_cfg = load_configs("./config/drops.yaml")
# Настройки времени
time_cfg = load_configs("./config/time.yaml") 

# Пути к файлам
data_paths = base_cfg["data_paths"]

# директория текущего запуска генератора. Возьмем из предыдущего ноутбука. Т.к. нужны данные того, что сгенерировано до дроп фрода
# предварительно удалены папки дропов dist_drops и purch_drops т.к. при создании объектов конфиг классов они будут созданы заново 

run_dir = './data/generated/history/generation_run_2025-07-25_121029'

Симуляция активности одного дропа¶


1. Обработка полученной дропом партии денег. Класс DropBatchProcessor¶

  • модуль data_generator.fraud.drops.processor. Ссылка на исходный код в Github

Основной функционал

  • обработка полученной дропом партии (батча) денег
  • определение должны ли текущая и последующие транзакции быть отклонены

Основная логика

  • метод distributor() - обработать партию денег для дропа распределителя
    • генерирует исходящие транзакции пока:
      • либо баланс не будет равен 0
      • либо пока транзакции не будут отклоняться. Но также в этом методе берется случайное число попыток на операции после первой откл. транз. Т.е. транзакции могут быть отклонены, но генерация продолжится если и пока попытки не равны 0. Попытки вычитаются с каждой операцией совершенной после первой отклоненной транз-ции
      • каждую операцию случайно выбирается будет ли это перевод/снятие или покупка криптовалюты
      • перед каждой операцией идет проверка должна ли она быть отклонена. Операции отклоняются если достигнут абсолютный лимит вх. или исх. транз-ций. Лимиты выставлются в drops.yaml
  • метод purchaser() - обработать партию денег для дропа покупателя
    • логика прекращения операций, управления попытками, и проверки на то, что транз. должна быть отклонена - такие же как и в distributor()
    • главная разница с distributor() в том что тут всегда только покупки, без других возможных вариантов
  • метод process_batch() оркестрирует distributor() и purchaser() методы. Вызывает нужный метод в зависимости от типа дропа. Тип дропа приходит из объекта DropBaseClasses класса, когда создается объект DropBatchProcessor. Т.е. под каждый тип дропа создается свой объект DropBatchProcessor
In [8]:
class DropBatchHandler:
    """
    Обработка полученной дропом партии (батча) денег
    ---------------------------
    drop_type: str. 'distributor' или 'purchaser'
    amt_hand: DropAmountHandler. Генератор сумм входящих/исходящих транзакций, сумм снятий.
              Управление балансом текущего дропа.
    behav_hand: DistBehaviorHandler | PurchBehaviorHandler.
                Управление поведением дропа: распределителя или покупателя.
    create_txn: CreateDropTxn. Создание транзакций.
    declined: bool. По умолчанию False. Отклоняются ли транзакции.
    txns_fm_batch: list. Транзакции дропа в текущем батче.
    """

    def __init__(self, base: DropBaseClasses, create_txn: CreateDropTxn):
        """
        base: DropBaseClasses. Объекты основных классов для дропов.
        create_txn: CreateDropTxn. Создание транзакций.
        """
        self.drop_type = base.drop_type
        self.amt_hand = base.amt_hand
        self.behav_hand = base.behav_hand
        self.create_txn = create_txn
        self.declined = False
        self.txns_fm_batch = []


    def should_decline(self):
        """
        Проверка будет ли отклонена транзакция.
        Возвращает True или False в зависимости от достижения лимитов.
        Также записывает это значение в self.declined
        """
        self.declined = self.create_txn.limit_reached()
        return self.declined


    def reset_cache(self, all=False):
        """
        Сброс кэша.
        --------
        all: bool. Если True то сбрасывает атрибуты: txns_fm_batch, declined.
             Если False то declined не сбрсывает
             Также передается в методы классов:
             DistBehaviorHandler | PurchBehaviorHandler,
             DropAmountHandler.
        """
        behav_hand = self.behav_hand
        amt_hand = self.amt_hand

        self.txns_fm_batch = []
        amt_hand.reset_cache(all=all)
        behav_hand.reset_cache(all=all)

        if not all:
            return
        
        self.declined = False


    def distributor(self):
        """
        Обработка партии(батча) денег полученных дропом
        распределителем.
        """
        behav_hand = self.behav_hand
        amt_hand = self.amt_hand
        create_txn = self.create_txn
        
        while amt_hand.balance > 0:
            declined = self.should_decline() # будет ли отклонена транзакция
            behav_hand.guide_scenario()

            if behav_hand.to_crypto: # перевод на криптобиржу или нет
                txn_out = create_txn.purchase(declined=declined)
            else: # Иначе перевод/снятие
                to_drop = behav_hand.to_drop # Пробовать ли перевести другому дропу.
                txn_out = create_txn.trf_or_atm(receive=False, 
                                    to_drop=to_drop, declined=declined)
            # Добавляем в список транз-ций батча   
            self.txns_fm_batch.append(txn_out)

            # Сколько попыток будет после первой откл. транз-ции
            behav_hand.attempts_after_decline()

            # Если это не первая отклоненная транзакция, то вычитаем попытку
            # совершить транзакцию после отклонения
            behav_hand.deduct_attempts()
            
            # Решение об остановке после отклоненной транзакции
            if behav_hand.stop_after_decline():
                break
            

    def purchaser(self):
        """
        Обработка партии(батча) денег полученных дропом
        покупателем.
        """
        behav_hand = self.behav_hand
        amt_hand = self.amt_hand
        create_txn = self.create_txn
        
        while amt_hand.balance > 0:
            declined = self.should_decline() # будет ли отклонена транзакция

            txn_out = create_txn.purchase(declined=declined)
            self.txns_fm_batch.append(txn_out)
            
            # Сколько попыток будет после первой откл. транз-ции
            behav_hand.attempts_after_decline()

            # Если это не первая отклоненная транзакция, то вычитаем попытку
            # совершить транзакцию после отклонения
            behav_hand.deduct_attempts()
            
            # Решение об остановке после отклоненной транзакции
            if behav_hand.stop_after_decline():
                break


    def process_batch(self):
        """
        Вызов соответствующего типу дропа метода для обработки
        батча денег.
        Метод выбирается исходя из self.drop_type.
        ---------
        """
        drop_type = self.drop_type

        if drop_type == "distributor":
            self.distributor()
        elif drop_type == "purchaser":
            self.purchaser()

Демонстрация DropBatchProcessor¶

In [9]:
from data_generator.fraud.drops.processor import DropBatchHandler

Функция для очищения кэшей на время демонстрации

In [11]:
def reset_caches(create_txn, behav_hand, amt_hand, time_hand, part_data, batch_hand):
    create_txn.reset_cache(only_counters=False)
    behav_hand.reset_cache(all=True)
    amt_hand.reset_cache(all=True) # batch_txns здесь
    time_hand.reset_cache()
    part_data.reset_cache()
    batch_hand.reset_cache(all=True)

1. Дропы распределители¶

  • разобьем демо на две части согласно типам дропов т.к. для каждого типа нужно создавать отдельные объекты нескольких классов и тестировать отдельные методы
In [12]:
drop_cfg_build = DropConfigBuilder(base_cfg=base_cfg, legit_cfg=legit_cfg, fraud_cfg=fraud_cfg, drop_cfg=drops_cfg, \
                                   time_cfg=time_cfg, run_dir=run_dir)
dist_configs = drop_cfg_build.build_dist_cfg()

dist_base = DropBaseClasses(drop_type="distributor", configs=dist_configs)
dist_base.build_all()

# Объекты базовых классов. Вынос в переменные для удобства
acc_hand = dist_base.acc_hand
amt_hand = dist_base.amt_hand
part_data = dist_base.part_data
time_hand = dist_base.time_hand
behav_hand = dist_base.behav_hand

# Класс генератор одиночных транзакций
create_txn = CreateDropTxn(configs=dist_configs, base=dist_base)

# Демонстрируемый класс
batch_hand = DropBatchHandler(base=dist_base, create_txn=create_txn)

# Нужно взять случайного клиента из конфиг класса и передать его данные в 2 базовых класса
drop_clients = dist_configs.clients
dist_client = list(drop_clients.itertuples(name="Client"))[5]

acc_hand.client_id = dist_client.client_id 
part_data.client_info = dist_client
part_data.client_info
Out[12]:
Client(Index=5, client_id=2995, birth_date=Timestamp('1966-06-09 00:00:00'), sex='female', region='Кировская', city='Киров', lat=58.6035313, lon=49.6679219, city_id=35, home_ip='2.60.11.20')

Метод distributor()¶

  • обработчик партии (батча) присланных дропу распределителю денег

Пример

  • Искусственно зададим баланс вместо генерации случайной суммы через создание входящей транзакции. В конечном датафрейме, который получится не будет данных о входящей транзакции; будут просто циклы обработки "полученных" денег
  • вызов метода distributor() также вложим в цикл, который прерывается после окончания работы distributor() если достигнут лимит по вх. или исх. транз-циям, что оперделяется внутри distiributor() и передается в атрибут batch_hand.declined внутри вызова distributor()
  • вложением distributor() в цикл мы имитируем многократное получение дропом денег т.е. несколько циклов обработки батча денег, как это и будет реализовано в симуляции "жизненного цикла" дропа
In [36]:
# Получаем номер счета текущего дропа
acc_hand.get_account(own=True)

# Т.к. не будет генерации входящей транз-ции, то не будет случайного выбора времени первой транзакции
# в жизненном цикле дропа. А это время генерируется с первой входящей транзакцией.
# Поэтому зададим время вручную
start_time = pd.to_datetime("2025-07-02 11:15:00", format="%Y-%m-%d %H:%M:%S")
time_hand.last_unix = pd_timestamp_to_unix(start_time)
time_hand.start_unix = pd_timestamp_to_unix(start_time)

all_txns = []
# Запускаем batch_hand.distributor() пока один из циклов работы batch_hand.distributor()
# не закончится с изменением значения batch_hand.declined
while not batch_hand.declined:
    # Задаем конкретный баланс - это как-будто бы и есть момент получения дропом денег
    amt_hand.balance = 41000
    # Задаем сценарий.
    behav_hand.scen = "atm+transfer"
    # Также прописываем распределение полученных денег частями, для разнообразия сумм
    behav_hand.in_chunks = True

    batch_hand.distributor()
    txns_fm_batch = batch_hand.txns_fm_batch
    all_txns.extend(txns_fm_batch)
    batch_hand.reset_cache(all=False)

Итоговый датафрейм

  • последние 3 транзакции отклонены. Это исходящие транзакции дропа, значит случайно сгенерированное кол-во попыток операций после первой отклоненной транзакции равно двум: первая отклоненная транз-ция + 2 попытки после неё = 3 отклоненных исходящих транзакции
In [42]:
all_txns_dist = pd.DataFrame(all_txns)
all_txns_dist
Out[42]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 2995 2025-07-02 14:13:00 1751465580 13900.0 withdrawal ATM not applicable False NaN Киров 58.603531 49.667922 not applicable NaN 12835.0 False False approved not applicable
1 2995 2025-07-02 14:51:00 1751467860 13000.0 outbound transfer not applicable True NaN Киров 58.603531 49.667922 2.60.11.20 5081.0 16689.0 False False approved not applicable
2 2995 2025-07-02 16:09:00 1751472540 14100.0 outbound transfer not applicable True NaN Киров 58.603531 49.667922 2.60.11.20 5082.0 18515.0 False False approved not applicable
3 2995 2025-07-02 16:40:00 1751474400 20900.0 withdrawal ATM not applicable False NaN Киров 58.603531 49.667922 not applicable NaN 12835.0 False False approved not applicable
4 2995 2025-07-02 18:44:00 1751481840 6000.0 outbound transfer not applicable True NaN Киров 58.603531 49.667922 2.60.11.20 5081.0 24860.0 False False approved not applicable
5 2995 2025-07-03 14:13:00 1751551980 11000.0 outbound transfer not applicable True NaN Киров 58.603531 49.667922 2.60.11.20 5082.0 19439.0 False False approved not applicable
6 2995 2025-07-03 14:57:00 1751554620 3100.0 outbound transfer not applicable True NaN Киров 58.603531 49.667922 2.60.11.20 5082.0 22496.0 False False approved not applicable
7 2995 2025-07-03 16:58:00 1751561880 14100.0 withdrawal ATM not applicable False NaN Киров 58.603531 49.667922 not applicable NaN 12835.0 False False approved not applicable
8 2995 2025-07-03 19:13:00 1751569980 14100.0 outbound transfer not applicable True NaN Киров 58.603531 49.667922 2.60.11.20 5082.0 23129.0 True False declined drop_flow_cashout
9 2995 2025-07-03 21:51:00 1751579460 10600.0 outbound transfer not applicable True NaN Киров 58.603531 49.667922 2.60.11.20 5082.0 20110.0 True False declined drop_flow_cashout
10 2995 2025-07-04 15:20:00 1751642400 7100.0 purchase crypto_exchange balance_top_up True 6833.0 Киров 58.603531 49.667922 2.60.11.20 5081.0 NaN True False declined drop_flow_cashout

2. Дропы покупатели¶

  • создаем объекты классов отдельно под дропов покупателей - purchasers
In [40]:
drop_cfg_build = DropConfigBuilder(base_cfg=base_cfg, legit_cfg=legit_cfg, fraud_cfg=fraud_cfg, drop_cfg=drops_cfg, \
                                   time_cfg=time_cfg, run_dir=run_dir)
purch_configs = drop_cfg_build.build_purch_cfg()

purch_base = DropBaseClasses(drop_type="purchaser", configs=purch_configs)
purch_base.build_all()

# Объекты базовых классов. Вынос в переменные для удобства
acc_hand = purch_base.acc_hand
amt_hand = purch_base.amt_hand
part_data = purch_base.part_data
time_hand = purch_base.time_hand
behav_hand = purch_base.behav_hand

# Класс генератор одиночных транзакций
create_txn = CreateDropTxn(configs=purch_configs, base=purch_base)

# Демонстрируемый класс
batch_hand = DropBatchHandler(base=purch_base, create_txn=create_txn)

# Нужно взять случайного клиента из конфиг класса и передать его данные в 2 базовых класса
drop_clients = purch_configs.clients
purch_client = list(drop_clients.itertuples(name="Client"))[2]

acc_hand.client_id = purch_client.client_id 
part_data.client_info = purch_client
part_data.client_info
Out[40]:
Client(Index=2, client_id=295, birth_date=Timestamp('1964-03-20 00:00:00'), sex='male', region='Карелия', city='Петрозаводск', lat=61.78909, lon=34.3596263, city_id=12, home_ip='2.60.1.23')

Метод purchaser()¶

  • обработчик партии (батча) присланных дропу покупателю денег
  • логика вся та же самая как и в демонстрации distributor()
In [44]:
reset_caches(create_txn, behav_hand, amt_hand, time_hand, part_data, batch_hand)
acc_hand.reset_cache()

# Получаем номер счета текущего дропа
acc_hand.get_account(own=True)

# Т.к. не будет генерации входящей транз-ции, то не будет случайного выбора времени первой транзакции
# в жизненном цикле дропа. А это время генерируется с первой входящей транзакцией.
# Поэтому зададим время вручную
start_time = pd.to_datetime("2025-07-02 11:15:00", format="%Y-%m-%d %H:%M:%S")
time_hand.last_unix = pd_timestamp_to_unix(start_time)
time_hand.start_unix = pd_timestamp_to_unix(start_time)

all_txns = []
# Запускаем batch_hand.distributor() пока один из циклов работы batch_hand.distributor() не
# не закончится с изменением значения batch_hand.declined
while not batch_hand.declined:
    # Задаем конкретный баланс - это как-будто бы и есть момент получения дропом денег
    amt_hand.balance = 50000
    # Задаем сценарий.
    behav_hand.scen = "split_money"
    # Также прописываем распределение полученных денег частями, для разнообразия сумм
    behav_hand.in_chunks = True

    batch_hand.purchaser()
    txns_fm_batch = batch_hand.txns_fm_batch
    all_txns.extend(txns_fm_batch)
    batch_hand.reset_cache(all=False)

Итоговый датафрейм

  • последние 4 транзакции отклонены. Это исходящие транзакции дропа, значит случайно сгенерированное кол-во попыток операций после первой отклоненной транзакции равно трем: первая отклоненная транз-ция + 3 попытки после неё = 4 отклоненных исходящих транзакции
In [45]:
all_txns_purch = pd.DataFrame(all_txns)
all_txns_purch
Out[45]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 295 2025-07-02 14:11:00 1751465460 30000.0 purchase ecom shopping_net True 6963 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN False False approved not applicable
1 295 2025-07-02 15:52:00 1751471520 16000.0 purchase ecom shopping_net True 6809 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN False False approved not applicable
2 295 2025-07-02 17:05:00 1751475900 3000.0 purchase ecom shopping_net True 6832 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN False False approved not applicable
3 295 2025-07-02 18:15:00 1751480100 1000.0 purchase ecom shopping_net True 6825 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN False False approved not applicable
4 295 2025-07-02 20:00:00 1751486400 20000.0 purchase ecom shopping_net True 6896 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN False False approved not applicable
5 295 2025-07-03 14:05:00 1751551500 14000.0 purchase ecom misc_net True 6902 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN False False approved not applicable
6 295 2025-07-03 15:07:00 1751555220 14000.0 purchase ecom shopping_net True 6812 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN False False approved not applicable
7 295 2025-07-03 15:38:00 1751557080 2000.0 purchase ecom shopping_net True 6906 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN False False approved not applicable
8 295 2025-07-03 18:04:00 1751565840 38000.0 purchase ecom shopping_net True 6879 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN True False declined drop_purchaser
9 295 2025-07-03 20:39:00 1751575140 28500.0 purchase ecom misc_net True 6858 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN True False declined drop_purchaser
10 295 2025-07-04 16:49:00 1751647740 19000.0 purchase ecom shopping_net True 6960 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN True False declined drop_purchaser
11 295 2025-07-04 17:36:00 1751650560 9500.0 purchase ecom shopping_net True 6872 Петрозаводск 61.78909 34.359626 2.60.1.23 496 NaN True False declined drop_purchaser



2. Симуляция полного жизненного цикла дропа. Класс DropLifecycleManager¶

  • модуль data_generator.fraud.drops.simulator. Ссылка на исходный код в Github

Основная логика

  • создается объект DropLifecycleManager
  • берется один клиент под дроп фрод при помощи передачи данных клиента и ID клиентав атрибуты DropTxnPartData.client_info и DropAccountHandler.client_id
  • внутри метода DropLifecycleManager.run_drop_lifecycle():
    1. получаем счет текущего клиента
    2. помечаем текущего клиента как дропа в таблице accounts в атрибуте DropAccountHandler.accounts
    3. переводим дропу деньги - создаем вх. транз-цию и добавляем в список транзакций текущего дропа. Перед этим проверяем будет ли входящая транзакция отклонена, но при первой транзакции такого не бывает.
    4. Если транзакция отклонена, то генерация прерывается. Но для первой транзакции это не актуально.
    5. случайно выбираем сценарий, как дроп будет распределять полученные деньги
    6. в соответствии с выбранным сценарием определяем будут ли полученные деньги распределены на части или будет одна операция
    7. запускаем метод обработки батча полученных денег DropBatchHandler.process_batch(), который обрабатывает полученный батч в соответствии с типом дропа
    8. сгенерированные при обработке батча транзакции добавляем в список транзакций текущего дропа
    9. снова возвращаемся к п.3 перевод новой партии денег дропу. Если при обработке предыдущего батча был достигнут абсолютный лимит транз-ций дропа, то входящая транзакция будет отклонена и жизненный цикл дропа закончится, будто после первой отклоненной входящей транзакции дропу больше не пытаются послать деньги
    10. если транзакция не отклонена, то снова запускается DropBatchHandler.process_batch() и так далее
In [ ]:
class DropLifecycleManager:
    """
    Управление полным жизненный циклом одного дропа.
    ------------------
    drop_type: str. 'distributor' или 'purchaser'
    acc_hand: DropAccountHandler. Генератор номеров счетов входящих/исходящих транзакций.
              Учет использованных счетов.
    amt_hand: DropAmountHandler. Генератор сумм входящих/исходящих транзакций, сумм снятий.
              Управление балансом текущего дропа.
    time_hand: DropTimeHandler.
               Управление временем транзакций дропа.
    behav_hand: DistBehaviorHandler | PurchBehaviorHandler.
                Управление поведением дропа: распределителя или покупателя.
    part_data: DropTxnPartData.
               Генерация части данных о транзакции дропа.
    behav_hand: DistBehaviorHandler | PurchBehaviorHandler.
                Управление поведением дропа: распределителя или покупателя.
    create_txn: CreateDropTxn. Создание транзакций.
    batch_hand: DropBatchHandler. Обработка полученной дропом партии (батча) денег
    drop_txns: list. Созданные транзакции дропа.
    """
    def __init__(self, base: DropBaseClasses, create_txn: CreateDropTxn):
        """
        base: DropBaseClasses. Объекты основных классов для дропов.
        create_txn: CreateDropTxn. Создание транзакций.
        """
        self.drop_type = base.drop_type
        self.acc_hand = base.acc_hand
        self.amt_hand = base.amt_hand
        self.time_hand = base.time_hand
        self.part_data = base.part_data
        self.behav_hand = base.behav_hand
        self.create_txn = create_txn
        self.batch_hand = DropBatchHandler(base=base, create_txn=create_txn)
        self.drop_txns = []


    def reset_all_caches(self):
        """
        Сброс кэшей когда активность дропа закончена совсем
        """
        # Сброс всего кэша batch_hand включает в себя полный сброс кэша
        # в behav_hand и amt_hand
        self.batch_hand.reset_cache(all=True)
        self.time_hand.reset_cache()
        self.part_data.reset_cache()
        self.create_txn.reset_cache()
        self.drop_txns = []
        

    def run_drop_lifecycle(self):
        # создать счет дропа, записать is_drop = True в таблице acc_hand.accounts
        acc_hand = self.acc_hand
        # получить номер счета дропа. Пишется в атрибут acc_hand.account
        acc_hand.get_account(own=True) 
        acc_hand.label_drop() # помечаем клиента как дропа в таблице acc_hand.accounts
        
        behav_hand = self.behav_hand
        batch_hand = self.batch_hand
        create_txn = self.create_txn

        while True:
            declined = batch_hand.should_decline() # статус транзакции. будет ли она отклонена
            # входящая транзакция. Новый батч денег.
            receive_txn = create_txn.trf_or_atm(declined=declined, \
                                                to_drop=False, receive=True) 
            drop_txns = self.drop_txns
            drop_txns.append(receive_txn)
            # если у дропа достигнут лимит то транзакции отклоняются. 
            # Если входящая отклонена, дропу больше не пытаются послать деньги
            if declined: 
                break

            behav_hand.sample_scenario() # выбрать сценарий
            behav_hand.in_chunks_val() # транзакции по частям или нет

            batch_hand.process_batch() # обработка полученного батча

            txns_fm_batch = batch_hand.txns_fm_batch
            drop_txns.extend(txns_fm_batch)
            # сброс кэша после завершения обработки батча
            batch_hand.reset_cache(all=False)

ДемонстрацияDropLifecycleManager¶

Импорт нужных классов и создание объектов

  • демонстрация на примере дропа распределителя. Ограничимся этим типом дропов т.к. у покупателей та же самая логика
In [46]:
from data_generator.fraud.drops.build.config import DropConfigBuilder
from data_generator.fraud.drops.build.builder import DropBaseClasses
from data_generator.fraud.drops.txns import CreateDropTxn
from data_generator.fraud.drops.simulator import DropLifecycleManager
In [47]:
drop_cfg_build = DropConfigBuilder(base_cfg=base_cfg, legit_cfg=legit_cfg, fraud_cfg=fraud_cfg, drop_cfg=drops_cfg, \
                                   time_cfg=time_cfg, run_dir=run_dir)
dist_configs = drop_cfg_build.build_dist_cfg()

dist_base = DropBaseClasses(drop_type="distributor", configs=dist_configs)
dist_base.build_all()

# Объекты базовых классов. Вынос в переменные для удобства
acc_hand = dist_base.acc_hand
amt_hand = dist_base.amt_hand
part_data = dist_base.part_data
time_hand = dist_base.time_hand
behav_hand = dist_base.behav_hand

# Класс генератор одиночных транзакций
create_txn = CreateDropTxn(configs=dist_configs, base=dist_base)

# обработчик получаемых батчей денег
batch_hand = DropBatchHandler(base=dist_base, create_txn=create_txn)

# Демонстрируемый класс
life_manager = DropLifecycleManager(base=dist_base, create_txn=create_txn)

# Нужно взять случайного клиента из конфиг класса и передать его данные в 2 базовых класса
drop_clients = dist_configs.clients
dist_client = list(drop_clients.itertuples(name="Client"))[5]

acc_hand.client_id = dist_client.client_id 
part_data.client_info = dist_client
part_data.client_info
Out[47]:
Client(Index=5, client_id=599, birth_date=Timestamp('2002-05-12 00:00:00'), sex='female', region='Москва', city='Москва', lat=55.7538789, lon=37.6203735, city_id=1, home_ip='2.60.2.59')

Метод run_drop_lifecycle()

In [57]:
# Запуск жизненного цикла
life_manager.run_drop_lifecycle()
drop_txns = life_manager.drop_txns

Итоговый датафрейм

  • на этот раз обратите внимание что присутствуют записи о входящих транзакциях в отличие от демонстрации для DropBatchHandler
  • также среди отклоненных транзакций одна входящая (последняя запись)
  • это и есть полный результат для одного дропа
In [58]:
all_txns_df = pd.DataFrame(drop_txns)
all_txns_df
Out[58]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 599 2025-01-02 15:12:00 1735830720 16800.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10570.0 False False approved not applicable
1 599 2025-01-02 16:16:00 1735834560 9000.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1002.0 16537.0 False False approved not applicable
2 599 2025-01-02 18:17:00 1735841820 6000.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1002.0 21733.0 False False approved not applicable
3 599 2025-01-02 19:53:00 1735847580 1800.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1001.0 18426.0 False False approved not applicable
4 599 2025-01-02 20:59:00 1735851540 30600.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10570.0 False False approved not applicable
5 599 2025-01-03 15:29:00 1735918140 30600.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1001.0 20706.0 False False approved not applicable
6 599 2025-01-03 17:53:00 1735926780 15700.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10570.0 False False approved not applicable
7 599 2025-01-03 19:03:00 1735930980 15700.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1002.0 22198.0 False False approved not applicable
8 599 2025-01-03 19:49:00 1735933740 20400.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10570.0 False False approved not applicable
9 599 2025-01-04 14:38:00 1736001480 20400.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1001.0 21100.0 False False approved not applicable
10 599 2025-01-04 15:20:00 1736004000 57300.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10570.0 False False approved not applicable
11 599 2025-01-04 17:53:00 1736013180 31300.0 withdrawal ATM not applicable False NaN Москва 55.753879 37.620373 not applicable NaN 10570.0 False False approved not applicable
12 599 2025-01-04 20:46:00 1736023560 14000.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1002.0 22722.0 False False approved not applicable
13 599 2025-01-04 23:38:00 1736033880 12000.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1001.0 16851.0 True False declined drop_flow_cashout
14 599 2025-01-05 01:51:00 1736041860 9000.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1002.0 23711.0 True False declined drop_flow_cashout
15 599 2025-01-05 15:22:00 1736090520 6000.0 outbound transfer not applicable True NaN Москва 55.753879 37.620373 2.60.2.59 1002.0 22027.0 True False declined drop_flow_cashout
16 599 2025-01-05 16:27:00 1736094420 3000.0 purchase crypto_exchange balance_top_up True 6825.0 Москва 55.753879 37.620373 2.60.2.59 1002.0 NaN True False declined drop_flow_cashout
17 599 2025-01-05 17:48:00 1736099280 48100.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10570.0 True False declined drop_flow_cashout



Симуляция активности множества дропов¶


Симулятор активности. Класс DropSimulator¶

  • модуль data_generator.fraud.drops.simulator. Ссылка на исходный код в Github

Основная логика

  1. итерирование через клиентов семплированных под определенный тип дроп фрода
  2. на каждой итерации симуляция полного жизненного цикла текущего дропа
  3. запись созданных транзакций дропа общий список всех транзакций дропов данного типа
  4. в конце запись измененного датафрейма accounts в файл в двух директориях. Запись нужна, чтобы при генерации другого типа дропов можно было узнать изменения т.к. там помечается кто из клиентов дроп; и, возможно для выгрузки актуальной таблицы в БД
  5. запись сгенерированных транзакций в файл в две директории: созданную под текущий запуск генератора транзакций и директорию последнего запуска генератора
In [ ]:
class DropSimulator:
    """
    Генерация активности множества дропов.
    ------------------------
    base_cfg: dict. Конфиги из base.yaml
    drop_type: str. 'distributor' или 'purchaser'
    drop_clients: pd.DataFrame. Клиенты которые будут дропами.
    part_data: DropTxnPartData. Генерация части данных транзакции.
    acc_hand: DropAccountHandler. Генератор номеров счетов входящих/исходящих 
              транзакций. Учет использованных счетов.
    txn_recorder: FraudTxnsRecorder. Запись транзакций в файл.
    life_manager: DropLifecycleManager. Управление полным жизненный циклом
                  одного дропа.
    all_txns: list. Список для записи всех созданных транзакций.
    txns_df: pd.DataFrame. Пустой датафрейм с колонками и проставленными типами
    """

    def __init__(self, base_cfg, configs, base, create_txn, txn_recorder):
        """
        base_cfg: dict. Конфиги из base.yaml
        configs: DropDistributorCfg | DropPurchaserCfg.
                 Конфиги и данные для создания дроп транзакций.
        base: Объекты основных классов для дропов. 
        create_txn: CreateDropTxn. Создание транзакций.
        """
        self.base_cfg = base_cfg
        self.drop_type = base.drop_type
        self.drop_clients = configs.clients
        self.part_data = base.part_data
        self.acc_hand = base.acc_hand
        self.txn_recorder = txn_recorder
        self.txns_df = configs.transactions
        self.life_manager = DropLifecycleManager(base=base, create_txn=create_txn)
        self.run_dir = configs.run_dir
        self.all_txns = []
    

    def run(self):
        """
        Полная генерация активности дропов соответсвующего типа
        """
        drop_clients = self.drop_clients
        part_data = self.part_data
        acc_hand = self.acc_hand
        life_manager = self.life_manager
        all_txns = self.all_txns
        txn_recorder = self.txn_recorder

        # Итерируемся через семплированных клиентов под дроп
        for client in drop_clients.itertuples():
            # Запись данных текущего клиента в атрибуты
            # некоторых классов
            part_data.client_info = client
            acc_hand.client_id = client.client_id

            # Генерация полного цикла активности одного дропа
            life_manager.run_drop_lifecycle()
            # Запись транзакций дропа в общий список
            drop_txns = life_manager.drop_txns
            all_txns.extend(drop_txns)

            # Сброс кэша дропа для следующей итерации
            life_manager.reset_all_caches()
        
        # Запись измененного датафрейма accounts в csv файл в двух экземплярах
        # В папку data/generated/latest и в папку текущей генерации 
        accounts = acc_hand.accounts
        gen_files = self.base_cfg["data_paths"]["generated"]
        acc_path_01 = Path(self.run_dir) / "accounts.csv" # путь в директории текущей генерации
        acc_path_02 = gen_files["accounts"]  # путь в директории data/generated/latest
        accounts.to_csv(acc_path_01, index=False)
        accounts.to_csv(acc_path_02, index=False)
        
        # Запись всех созданных транзакций дропов в parquet файл
        txn_recorder.all_txns = pd.DataFrame(self.all_txns)
        
        txn_recorder.write_to_file() # Это уже метод FraudTxnsRecorder

Демонстрация DropSimulator¶

  • сгенерируем активность дропов и прочитаем получившийся файл с их транзакциями

Метод run()¶

  • единственный метод класса
In [60]:
from data_generator.runner.drops import DropsRunner

# Продемонстриуем на дропах покупателях
drop_type = "purchaser"

drop_runner = DropsRunner(base_cfg=base_cfg, legit_cfg=legit_cfg, \
                            time_cfg=time_cfg, fraud_cfg=fraud_cfg, \
                            drops_cfg=drops_cfg, run_dir=run_dir, \
                            drop_type=drop_type)
In [61]:
drop_runner.run()
Purchaser drops generation... completed.          

Соберем путь к файлу с транзакциями дропов покупателей

  • одна часть это путь к директории текущего запуска генератора
  • остальные две это название папки дропов покупателей и название файла с транзакциями дропов покупателей. Они хранятся в конфигах drops.yaml
In [64]:
data_storage = drops_cfg["purchaser"]["data_storage"]
purch_folder_name = data_storage["folder_name"]
purch_txns_name = data_storage["files"]["txns"]
purch_txns_path = Path(run_dir, purch_folder_name, purch_txns_name) #os.path.join(run_dir, purch_folder_name, purch_txns_name)
purch_txns_path
Out[64]:
WindowsPath('data/generated/history/generation_run_2025-07-25_121029/purch_drops/purch_drop_txns.parquet')

Читаем файл

In [70]:
purch_txns = pd.read_parquet(purch_txns_path)
print(f"Кол-во транзакций всего: {purch_txns.shape[0]}\n\n")
purch_txns.head()
Кол-во транзакций всего: 66


Out[70]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 454 2025-01-06 07:06:00 1736147160 31300.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10432.0 False False approved not applicable
1 454 2025-01-06 08:14:00 1736151240 21000.0 purchase ecom shopping_net True 6919.0 Саратов 51.530376 45.953026 2.60.1.177 763.0 NaN False False approved not applicable
2 454 2025-01-06 09:01:00 1736154060 8000.0 purchase ecom misc_net True 6951.0 Саратов 51.530376 45.953026 2.60.1.177 762.0 NaN False False approved not applicable
3 454 2025-01-06 10:29:00 1736159340 2300.0 purchase ecom shopping_net True 6786.0 Саратов 51.530376 45.953026 2.60.1.177 762.0 NaN False False approved not applicable
4 454 2025-01-06 11:30:00 1736163000 27800.0 inbound transfer not applicable True NaN not applicable NaN NaN not applicable NaN 10432.0 False False approved not applicable

Смотрим кол-во уникальных дропов в датафрейме транзакций

  • при этом проверяем, что кол-во равно кол-ву дропов семплированных под генерацию
In [72]:
sampled_clients = drop_runner.configs.clients

assert purch_txns.client_id.nunique() == sampled_clients.shape[0]
purch_txns.client_id.nunique()
Out[72]:
5

Запуск run() для дропов распределителей

  • это не для демонстрации. Просто, чтобы были созданы транзакции и для дропов распределителей. Чтобы был полный набор по типам транзакций для демонстрации в следующем ноутбуке
In [77]:
from data_generator.runner.drops import DropsRunner

# Продемонстриуем на дропах покупателях
drop_type = "distributor"

drop_runner = DropsRunner(base_cfg=base_cfg, legit_cfg=legit_cfg, \
                            time_cfg=time_cfg, fraud_cfg=fraud_cfg, \
                            drops_cfg=drops_cfg, run_dir=run_dir, \
                            drop_type=drop_type)
drop_runner.run()
Distributor drops generation... completed.          



Оркестрация генерации дроп фрода¶


Класс DropsRunner¶

  • модуль data_generator.runner.drops. Ссылка на исходный код в Github

  • оркестрация генерации дроп фрод транзакций указанного типа

  • это конечный уровень для дроп транзакций. Метод run() этого класса вызывается в файле запуска генератора всех транзакций run_generator.py

  • под каждый тип дропов создается свой объект DropsRunner

  • DropsRunner создает все объекты необходимых для DropSimulator классов, затем создает сам объект DropSimulator и вызывает его метод DropSimulator.run()

In [ ]:
class DropsRunner:
    """
    Запуск генерации транзакций дропов указанного типа.
    ---------
    Атрибуты:
    ---------
    base_cfg: dict. Конфиги из base.yaml
    cfg_builder: DropConfigBuilder.
    drop_type: str. Тип дропа: 'distributor' или 'purchaser'.
    text: str. Текст для вставки в спиннер.
    configs: DropDistributorCfg | DropPurchaserCfg. 
             Конфиги и данные для генерации дроп транзакций.
             По умолчанию None. Создается при вызове self.build_sim().
    base: DropBaseClasses. Создатель основных классов для генерации
          дроп фрода. По умолчанию None. Создается при вызове self.build_sim().
    txn_recorder: FraudTxnsRecorder. Запись транзакций в файл.
                  По умолчанию None. Создается при вызове self.build_sim().
    drop_sim: DropSimulator. Генератор дроп фрода. По умолчанию None. 
              Создается при вызове self.build_sim().
    """
    def __init__(self, base_cfg, legit_cfg, time_cfg, fraud_cfg, drops_cfg, run_dir, drop_type):
        """
        base_cfg: dict. Конфиги из base.yaml
        legit_cfg: dict. Конфиги из legit.yaml
        time_cfg: dict. Конфиги из time.yaml
        fraud_cfg: dict. Общие конфиги фрода из fraud.yaml
        drops_cfg: dict. Конфиги для дроп фрода из drops.yaml
        run_dir: str. Название директории для хранения сгенерированных
                 данных текущей генерации.
        """
        self.base_cfg = base_cfg
        self.cfg_builder = DropConfigBuilder(base_cfg=base_cfg, legit_cfg=legit_cfg, time_cfg=time_cfg, \
                                             fraud_cfg=fraud_cfg, drop_cfg=drops_cfg, run_dir=run_dir)
        self.drop_type = drop_type
        self.text = f"{drop_type.capitalize()} drops generation"
        self.configs = None
        self.base = None
        self.txn_recorder = None
        self.drop_sim = None
        

    def build_sim(self):
        """
        Создать объект DropSimulator.
        """
        drop_type = self.drop_type
        base_cfg = self.base_cfg

        if drop_type == "distributor":
            self.configs = self.cfg_builder.build_dist_cfg()
        elif drop_type == "purchaser":
            self.configs = self.cfg_builder.build_purch_cfg()

        self.base = DropBaseClasses(drop_type=drop_type, configs=self.configs)
        self.base.build_all() # Создать объекты основных дроп классов
        self.txn_recorder = FraudTxnsRecorder(configs=self.configs)
        create_txn = CreateDropTxn(configs=self.configs, base=self.base)

        self.drop_sim = DropSimulator(base_cfg=base_cfg, configs=self.configs, base=self.base, \
                                      create_txn=create_txn, txn_recorder=self.txn_recorder)
        
    @spinner_decorator
    def run(self):
        """
        Создать объект DropSimulator и запустить процесс генерации дроп фрода
        для дропов указанного типа.
        """
        self.build_sim()
        self.drop_sim.run()