Загрузка сгенерированных транзакций и других данных в SQL БД¶

  • ноутбуки лучше просматривать на Github pages, т.к. при просмотре прямо в репозитории могут быть проблемы с отображением, например, обрезка вывода с широкими датафреймами. Если в адресной строке браузера есть iaroslav-dzh.github.io, то вы уже на Github pages.
    Ссылки:
    • Ссылка на этот ноутбук
    • Ссылка на страницу генератора где есть все ноутбуки

Информация о ноутбуке

  • этот ноутбук уже относится к планируемому созданию веб-приложения симулирующего интерфейс антифрод системы
  • в нем загружаем созданные транзакции и некоторые другие таблицы - клиентов, счета, информацию о городах и др. - в созданную заранее базу данных PostgreSQL
  • функции из этого ноутбука находятся в модуле data_to_db.base. Ссылка на исходный код в Github
In [1]:
import pandas as pd
import geopandas as gpd
import os
import pyarrow
import psycopg2
from dotenv import load_dotenv
from sqlalchemy import create_engine, text, MetaData, Table, Column, Integer, Float, \
                       BigInteger, String, Date, Boolean, DateTime
import yaml
from data_generator.utils import load_configs
In [2]:
os.chdir('..')
In [3]:
os.getcwd()
Out[3]:
'C:\\Users\\iaros\\My_documents\\Education\\projects\\fraud_detection_01'
In [30]:
load_dotenv()

db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")
In [5]:
base_cfg = load_configs("./config/base.yaml")

Создание engine для подключения к БД¶

In [32]:
# пустая postgresql БД предварительно создана

engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_pass}@{db_host}:5433/{db_name}')
In [12]:
# записываем структуру БД в переменную

metadata = MetaData()

Создание и заполнение таблицы clients¶

In [6]:
# подгружаем данные всех клиентов
data_paths = base_cfg["data_paths"]
path_to_clts = data_paths["clients"]["clients"]

clients = pd.read_parquet(path_to_clts)
clients.head(2)
Out[6]:
client_id birth_date sex region city lat lon city_id home_ip
0 1 1995-12-07 female Рязанская Рязань 54.625457 39.735999 18 2.60.0.1
1 2 1970-01-29 male Москва Москва 55.753879 37.620373 1 2.60.0.2
In [16]:
clients_types = clients.dtypes
clients_types
Out[16]:
client_id              int64
city_id                int64
birth_date    datetime64[ns]
sex                   object
region                object
city                  object
timezone              object
lat                  float64
lon                  float64
population             int64
home_ip               object
dtype: object
In [47]:
clients_types["birth_date"] = "date"
clients_types
Out[47]:
client_id       int64
city_id         int64
birth_date       date
sex            object
region         object
city           object
timezone       object
lat           float64
lon           float64
population      int64
home_ip        object
dtype: object
In [48]:
def add_table_to_metadata(table_name: str, metadata, df_types: pd.Series):
    """
    Добавление объекта sqlalchemy.Table в объект sqlalchemy.MetaData.
    ------------------------
    table_name: str. Название SQL таблицы.
    metadata: sqlalchemy.MetaData.
    df_types: pd.Series. Типы данных в датафрейме, предназначенном для выгрузки в таблицу.
              Можно передать измененную серию с типами в виде строк. Значения в серии 
              будут маппиться со значениями типов sqlalchemy.
              Принимаемые лейблы типов можно узнать из ключей словаря:
              {'int64':Integer, 'BigInt': BigInteger, 'float64': Float, 'object': String,
              'datetime64[ns]': Date}
            Значения индекса серии будут использованы как навзания колонок в SQL таблице.
    """
    if not isinstance(df_types, pd.Series):
        raise TypeError(f'df_types must be pd.Series object, but got {type(df_types)}')
        
    # маппинг для pandas типов и sqlalchemy типов
    # BigInt это кастомное значение для случаев очень больших целых чисел.
    # если нужен BigInt, то тогда нужно в df_types передать серию где будет значение BigInt для соответсвующей колонки
    
    types_mapping = {'int64':Integer, 'BigInt':BigInteger, 'float64':Float, 'object':String, 'datetime64[ns]':DateTime, \
                     'date':Date, 'bool':Boolean}

    # создание генератора на основании типов в датафрейме и типов sqlalchemy
    # затем добавление таблицы в metadata
    return Table(table_name, metadata, *[Column(col_name, types_mapping[str(dtype)]) \
                                         for col_name, dtype in df_types.items()])
In [52]:
clients_tab = add_table_to_metadata(table_name='clients', metadata=metadata, df_types=clients_types)
In [53]:
# Создаем таблицу clients в нашей БД
clients_tab.create(engine)
In [36]:
# функция добавления данных из датафрейма в имеющиюся таблицу SQL

def append_df_to_sql(df, table_name, engine, if_exists='append', index=False, \
                     chunksize: int | None=None):
    """
    df: pd.DataFrame. Данные для загрузки.
    table_name: str. Название таблицы в БД.
    engine: sqlalchemy.engine.base.Engine.
    if_exists: str. Аргумент pd.DataFrame.to_sql(). По умолчанию 'append'.
    index: bool. Аргумент pd.DataFrame.to_sql(). По умолчанию False.
    """
    if not isinstance(table_name, str):
        raise TypeError(f'table_name must be a string, but got {type(table_name)}')
        
    df.to_sql(table_name, engine, if_exists=if_exists, index=index, chunksize=chunksize)
In [55]:
# добавляем данные из датафрейма в таблицу accounts

append_df_to_sql(df=clients, table_name="clients", engine=engine)
In [38]:
# функция сверки количества строк в датафрейме и строк добавленных в БД
def count_rows(engine, table_name: str, df: pd.DataFrame | None=None):

    # проверка типа для table_name
    if not isinstance(table_name, str):
        raise TypeError(f'table_name must be a string, but got {type(table_name)}')
        
    with engine.connect() as con:
        query = con.execute(text(f'SELECT COUNT(*) FROM {table_name}'))
        row_count = query.scalar() # результат запроса в виде числа
    
    if df is not None:
        df_rows = df.shape[0]
        if df_rows != row_count:
            raise ValueError(f'''Dataframe row count is not equal to SQL table row count!
Dataframe: {df_rows}
SQL table: {row_count}''')
        else:
            print(f'Dataframe and SQL table row counts are equal.\n{row_count} rows')
    else:
        print(f'{row_count} rows in the SQL table')
        return row_count
In [56]:
# сверяем количество строк в датафрейме и в БД

count_rows(engine=engine, table_name="clients", df=clients)
Dataframe and SQL table row counts are equal.
5369 rows

Создание и заполнение таблицы txns¶

In [44]:
txns_path = data_paths["generated"]["all_txns"]
all_txns = pd.read_parquet(txns_path)
all_txns.head(2)
Out[44]:
client_id txn_time unix_time amount type channel category online merchant_id trans_city trans_lat trans_lon trans_ip device_id account is_fraud is_suspicious status rule
0 3937 2025-01-01 00:02:00 1735689720 1878.0 purchase POS grocery_pos False 989.0 Пермь 58.045040 56.170369 not applicable NaN NaN False False approved not applicable
1 4275 2025-01-01 00:27:00 1735691220 1000.0 purchase POS gas_transport False 727.0 Тамбов 52.715932 41.465163 not applicable NaN NaN False False approved not applicable
In [57]:
txns_dtypes = all_txns.dtypes
txns_dtypes
Out[57]:
client_id                 int64
txn_time         datetime64[ns]
unix_time                 int64
amount                  float64
type                     object
channel                  object
category                 object
online                     bool
merchant_id             float64
trans_city               object
trans_lat               float64
trans_lon               float64
trans_ip                 object
device_id               float64
account                 float64
is_fraud                   bool
is_suspicious              bool
status                   object
rule                     object
dtype: object
In [58]:
txns_dtypes["unix_time"] = "BigInt"
In [66]:
txns_table = add_table_to_metadata(table_name='txns', metadata=metadata, df_types=txns_dtypes)
In [62]:
txns_table.create(engine)
In [70]:
append_df_to_sql(df=all_txns, table_name="txns", engine=engine)
In [71]:
# сверяем размеры датафрейма и БД таблицы

count_rows(engine, 'txns', all_txns)
Dataframe and SQL table row counts are equal.
19982 rows

Создание и заполнение таблицы accounts¶

In [72]:
acc_path = data_paths["generated"]["accounts"]
accounts = pd.read_csv(acc_path)
accounts.head(2)
Out[72]:
client_id account_id is_drop
0 1 10000 False
1 2 10001 False
In [73]:
acc_dtypes = accounts.dtypes
acc_dtypes
Out[73]:
client_id     int64
account_id    int64
is_drop        bool
dtype: object
In [74]:
accounts_tab = add_table_to_metadata(table_name="accounts", metadata=metadata, df_types=acc_dtypes)
In [75]:
accounts_tab.create(engine)
In [76]:
# заполняем таблицу accounts

append_df_to_sql(df=accounts, table_name="accounts", engine=engine)
In [77]:
# сверяем количество строк

count_rows(engine, 'accounts', accounts)
Dataframe and SQL table row counts are equal.
5369 rows

Создание и заполнение таблицы cities¶

In [ ]:
cities_path = data_paths["base"]["cities"]
cities = gpd.read_file(cities_path)
cities.drop(columns="geometry", inplace=True) # это геополигоны городов. Они не нужны в БД.
cities.head()
Out[ ]:
region city timezone lat lon population city_id clients
0 Москва Москва UTC+3 55.753879 37.620373 11514330 1 663
1 Санкт-Петербург Санкт-Петербург UTC+3 59.939125 30.315822 4848742 74 180
2 Новосибирская Новосибирск UTC+7 55.028102 82.921058 1498921 70 169
3 Свердловская Екатеринбург UTC+5 56.838633 60.605489 1377738 54 155
4 Нижегородская Нижний Новгород UTC+3 56.324209 44.005395 1250615 64 109
In [81]:
cities_dtypes = cities.dtypes
cities_dtypes
Out[81]:
region         object
city           object
timezone       object
lat           float64
lon           float64
population      int64
city_id         int64
clients         int64
dtype: object
In [82]:
# создаем таблицу в БД

cities_tab = add_table_to_metadata(table_name='cities', metadata=metadata, \
                                   df_types=cities_dtypes)
In [83]:
cities_tab.create(engine)
In [84]:
# заполняем таблицу

append_df_to_sql(df=cities, table_name="cities", engine=engine)
In [85]:
# проверяем целостность
count_rows(engine, 'cities', cities)
Dataframe and SQL table row counts are equal.
77 rows

Создание и заполнение таблицы client_devices¶

In [86]:
clnt_dev_path = data_paths["base"]["client_devices"]
client_devices = pd.read_csv(clnt_dev_path)
client_devices.head(2)
Out[86]:
client_id platform device_id
0 1 iOS 1
1 2 Android 3
In [87]:
cl_dev_dtypes = client_devices.dtypes
cl_dev_dtypes
Out[87]:
client_id     int64
platform     object
device_id     int64
dtype: object
In [88]:
cl_dev_tab = add_table_to_metadata(table_name='client_devices', metadata=metadata, \
                                   df_types=cl_dev_dtypes)
In [89]:
cl_dev_tab.create(engine)
In [90]:
append_df_to_sql(df=client_devices, table_name="client_devices", engine=engine)
In [91]:
count_rows(engine, 'client_devices', client_devices)
Dataframe and SQL table row counts are equal.
9718 rows

Создание и заполнение таблицы offline_merchants¶

In [7]:
off_mer_path = data_paths["base"]["offline_merchants"]
offline_merchants = pd.read_parquet(off_mer_path)
offline_merchants.head()
Out[7]:
city city_id category merchant_id merchant_lat merchant_lon
0 Москва 1 gas_transport 1.0 55.711178 37.863932
1 Москва 1 grocery_pos 2.0 55.896746 37.370257
2 Москва 1 home 3.0 55.797594 37.382283
3 Москва 1 shopping_pos 4.0 55.723753 37.654267
4 Москва 1 kids_pets 5.0 55.907808 37.542508
In [8]:
off_mer_dtypes = offline_merchants.dtypes
off_mer_dtypes
Out[8]:
city             object
city_id           int64
category         object
merchant_id     float64
merchant_lat    float64
merchant_lon    float64
dtype: object
In [94]:
merchants_table = add_table_to_metadata(table_name='offline_merchants', metadata=metadata, \
                                        df_types=off_mer_dtypes)
In [95]:
merchants_table.create(engine)
In [96]:
append_df_to_sql(df=offline_merchants, table_name="offline_merchants", engine=engine)
In [97]:
count_rows(engine, 'offline_merchants', offline_merchants)
Dataframe and SQL table row counts are equal.
6776 rows

Создание и заполнение таблицы fraud_devices¶

In [98]:
fr_dev_path = data_paths["base_fraud"]["fraud_devices"]
fraud_devices = pd.read_csv(fr_dev_path)
fraud_devices.head()
Out[98]:
device_id platform
0 9719 Android
1 9720 iOS
2 9721 Windows
3 9722 Windows
4 9723 Android
In [99]:
fr_dev_dtypes = fraud_devices.dtypes
fr_dev_dtypes
Out[99]:
device_id     int64
platform     object
dtype: object
In [100]:
fr_dev_table = add_table_to_metadata(table_name='fraud_devices', metadata=metadata, \
                                     df_types=fr_dev_dtypes)
In [101]:
fr_dev_table.create(engine)
In [102]:
append_df_to_sql(df=fraud_devices, table_name="fraud_devices", engine=engine)
In [103]:
count_rows(engine, 'fraud_devices', fraud_devices)
Dataframe and SQL table row counts are equal.
5500 rows

Создание и наполнение таблицы fraud_ips¶

In [105]:
fr_ip_path = data_paths["base_fraud"]["fraud_ips"]
fraud_ips = pd.read_parquet(fr_ip_path)
fraud_ips.head(2)
Out[105]:
city lat lon fraud_ip
0 Москва 55.753879 37.620373 5.3.252.223
1 Санкт-Петербург 59.939125 30.315822 5.3.252.224
In [106]:
fr_ip_dtypes = fraud_ips.dtypes
fr_ip_dtypes
Out[106]:
city         object
lat         float64
lon         float64
fraud_ip     object
dtype: object
In [110]:
fr_ip_table = add_table_to_metadata(table_name='fraud_ips', metadata=metadata, \
                                    df_types=fr_ip_dtypes)
In [111]:
fr_ip_table.create(engine)
In [112]:
append_df_to_sql(df=fraud_ips, table_name="fraud_ips", engine=engine)
In [113]:
count_rows(engine, 'fraud_ips', fraud_ips)
Dataframe and SQL table row counts are equal.
7700 rows

Создание и наполнение таблицы rules¶

In [115]:
rules_path = data_paths["base_fraud"]["rules"]
rules = pd.read_csv(rules_path)
rules
Out[115]:
rule weight online
0 fast_geo_change 0.12500 False
1 fast_geo_change_online 0.21875 True
2 new_ip_and_device_high_amount 0.25000 True
3 new_device_and_high_amount 0.18750 True
4 trans_freq_increase 0.21875 True
In [116]:
rules_dtypes = rules.dtypes
rules_dtypes
Out[116]:
rule       object
weight    float64
online       bool
dtype: object
In [117]:
rules_table = add_table_to_metadata(table_name='rules', metadata=metadata, \
                                    df_types=rules_dtypes)
In [118]:
append_df_to_sql(df=rules, table_name="rules", engine=engine)
In [119]:
count_rows(engine, 'rules', rules)
Dataframe and SQL table row counts are equal.
5 rows

Демонстрационные запросы к наполненной БД¶

  • проверка общей корректности работы

Посчитаем процент платформ устройств с которых был совершен фрод¶

In [ ]:
# Создадим единую devices таблицу т.к. во фроде участвуют как клиенты банка с устройствами из client_devices
# так и сторонние лица с устройствами из fraud_devices.
# Потом заджоиним devices ко всем транзакциям по device_id и c условием что транзакция - фрод
# Затем посчитаем кол-во девайсов во фроде по платформам. И дальше вычислим процент платфромы во фрод
# транзакциях
plat_query = """
            WITH devices AS (SELECT platform, device_id
            				FROM
            				client_devices
            				UNION
            				SELECT platform, device_id
            				FROM
            				fraud_devices),
			
            platforms AS (SELECT platform, COUNT(t.device_id) AS devices
                            FROM txns as t
                            JOIN devices as d
                            ON t.device_id = d.device_id AND t.is_fraud = true
                            GROUP BY platform)

            SELECT platform,
                   ROUND(100 * devices / (SELECT SUM(devices) FROM platforms), 2) AS percentage
            FROM platforms
            """
In [127]:
with engine.connect() as con:
    rs = con.execute(text(plat_query))
    plat_stats = pd.DataFrame(rs.fetchall())
In [128]:
plat_stats
Out[128]:
platform percentage
0 macOS 5.90
1 Windows 58.03
2 Android 30.82
3 iOS 3.28
4 Linux 1.97