Загрузка сгенерированных транзакций и других данных в SQL БД¶
- ноутбуки лучше просматривать на Github pages, т.к. при просмотре прямо в репозитории могут быть проблемы с отображением, например, обрезка вывода с широкими датафреймами. Если в адресной строке браузера есть
iaroslav-dzh.github.io
, то вы уже на Github pages.
Ссылки:
Информация о ноутбуке
- этот ноутбук уже относится к планируемому созданию веб-приложения симулирующего интерфейс антифрод системы
- в нем загружаем созданные транзакции и некоторые другие таблицы - клиентов, счета, информацию о городах и др. - в созданную заранее базу данных PostgreSQL
- функции из этого ноутбука находятся в модуле
data_to_db.base
. Ссылка на исходный код в Github
In [1]:
import pandas as pd
import geopandas as gpd
import os
import pyarrow
import psycopg2
from dotenv import load_dotenv
from sqlalchemy import create_engine, text, MetaData, Table, Column, Integer, Float, \
BigInteger, String, Date, Boolean, DateTime
import yaml
from data_generator.utils import load_configs
In [2]:
os.chdir('..')
In [3]:
os.getcwd()
Out[3]:
'C:\\Users\\iaros\\My_documents\\Education\\projects\\fraud_detection_01'
In [30]:
load_dotenv()
db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_name = os.getenv("DB_NAME")
In [5]:
base_cfg = load_configs("./config/base.yaml")
Создание engine для подключения к БД¶
In [32]:
# пустая postgresql БД предварительно создана
engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_pass}@{db_host}:5433/{db_name}')
In [12]:
# записываем структуру БД в переменную
metadata = MetaData()
Создание и заполнение таблицы clients
¶
In [6]:
# подгружаем данные всех клиентов
data_paths = base_cfg["data_paths"]
path_to_clts = data_paths["clients"]["clients"]
clients = pd.read_parquet(path_to_clts)
clients.head(2)
Out[6]:
client_id | birth_date | sex | region | city | lat | lon | city_id | home_ip | |
---|---|---|---|---|---|---|---|---|---|
0 | 1 | 1995-12-07 | female | Рязанская | Рязань | 54.625457 | 39.735999 | 18 | 2.60.0.1 |
1 | 2 | 1970-01-29 | male | Москва | Москва | 55.753879 | 37.620373 | 1 | 2.60.0.2 |
In [16]:
clients_types = clients.dtypes
clients_types
Out[16]:
client_id int64 city_id int64 birth_date datetime64[ns] sex object region object city object timezone object lat float64 lon float64 population int64 home_ip object dtype: object
In [47]:
clients_types["birth_date"] = "date"
clients_types
Out[47]:
client_id int64 city_id int64 birth_date date sex object region object city object timezone object lat float64 lon float64 population int64 home_ip object dtype: object
In [48]:
def add_table_to_metadata(table_name: str, metadata, df_types: pd.Series):
"""
Добавление объекта sqlalchemy.Table в объект sqlalchemy.MetaData.
------------------------
table_name: str. Название SQL таблицы.
metadata: sqlalchemy.MetaData.
df_types: pd.Series. Типы данных в датафрейме, предназначенном для выгрузки в таблицу.
Можно передать измененную серию с типами в виде строк. Значения в серии
будут маппиться со значениями типов sqlalchemy.
Принимаемые лейблы типов можно узнать из ключей словаря:
{'int64':Integer, 'BigInt': BigInteger, 'float64': Float, 'object': String,
'datetime64[ns]': Date}
Значения индекса серии будут использованы как навзания колонок в SQL таблице.
"""
if not isinstance(df_types, pd.Series):
raise TypeError(f'df_types must be pd.Series object, but got {type(df_types)}')
# маппинг для pandas типов и sqlalchemy типов
# BigInt это кастомное значение для случаев очень больших целых чисел.
# если нужен BigInt, то тогда нужно в df_types передать серию где будет значение BigInt для соответсвующей колонки
types_mapping = {'int64':Integer, 'BigInt':BigInteger, 'float64':Float, 'object':String, 'datetime64[ns]':DateTime, \
'date':Date, 'bool':Boolean}
# создание генератора на основании типов в датафрейме и типов sqlalchemy
# затем добавление таблицы в metadata
return Table(table_name, metadata, *[Column(col_name, types_mapping[str(dtype)]) \
for col_name, dtype in df_types.items()])
In [52]:
clients_tab = add_table_to_metadata(table_name='clients', metadata=metadata, df_types=clients_types)
In [53]:
# Создаем таблицу clients в нашей БД
clients_tab.create(engine)
In [36]:
# функция добавления данных из датафрейма в имеющиюся таблицу SQL
def append_df_to_sql(df, table_name, engine, if_exists='append', index=False, \
chunksize: int | None=None):
"""
df: pd.DataFrame. Данные для загрузки.
table_name: str. Название таблицы в БД.
engine: sqlalchemy.engine.base.Engine.
if_exists: str. Аргумент pd.DataFrame.to_sql(). По умолчанию 'append'.
index: bool. Аргумент pd.DataFrame.to_sql(). По умолчанию False.
"""
if not isinstance(table_name, str):
raise TypeError(f'table_name must be a string, but got {type(table_name)}')
df.to_sql(table_name, engine, if_exists=if_exists, index=index, chunksize=chunksize)
In [55]:
# добавляем данные из датафрейма в таблицу accounts
append_df_to_sql(df=clients, table_name="clients", engine=engine)
In [38]:
# функция сверки количества строк в датафрейме и строк добавленных в БД
def count_rows(engine, table_name: str, df: pd.DataFrame | None=None):
# проверка типа для table_name
if not isinstance(table_name, str):
raise TypeError(f'table_name must be a string, but got {type(table_name)}')
with engine.connect() as con:
query = con.execute(text(f'SELECT COUNT(*) FROM {table_name}'))
row_count = query.scalar() # результат запроса в виде числа
if df is not None:
df_rows = df.shape[0]
if df_rows != row_count:
raise ValueError(f'''Dataframe row count is not equal to SQL table row count!
Dataframe: {df_rows}
SQL table: {row_count}''')
else:
print(f'Dataframe and SQL table row counts are equal.\n{row_count} rows')
else:
print(f'{row_count} rows in the SQL table')
return row_count
In [56]:
# сверяем количество строк в датафрейме и в БД
count_rows(engine=engine, table_name="clients", df=clients)
Dataframe and SQL table row counts are equal. 5369 rows
Создание и заполнение таблицы txns
¶
In [44]:
txns_path = data_paths["generated"]["all_txns"]
all_txns = pd.read_parquet(txns_path)
all_txns.head(2)
Out[44]:
client_id | txn_time | unix_time | amount | type | channel | category | online | merchant_id | trans_city | trans_lat | trans_lon | trans_ip | device_id | account | is_fraud | is_suspicious | status | rule | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 3937 | 2025-01-01 00:02:00 | 1735689720 | 1878.0 | purchase | POS | grocery_pos | False | 989.0 | Пермь | 58.045040 | 56.170369 | not applicable | NaN | NaN | False | False | approved | not applicable |
1 | 4275 | 2025-01-01 00:27:00 | 1735691220 | 1000.0 | purchase | POS | gas_transport | False | 727.0 | Тамбов | 52.715932 | 41.465163 | not applicable | NaN | NaN | False | False | approved | not applicable |
In [57]:
txns_dtypes = all_txns.dtypes
txns_dtypes
Out[57]:
client_id int64 txn_time datetime64[ns] unix_time int64 amount float64 type object channel object category object online bool merchant_id float64 trans_city object trans_lat float64 trans_lon float64 trans_ip object device_id float64 account float64 is_fraud bool is_suspicious bool status object rule object dtype: object
In [58]:
txns_dtypes["unix_time"] = "BigInt"
In [66]:
txns_table = add_table_to_metadata(table_name='txns', metadata=metadata, df_types=txns_dtypes)
In [62]:
txns_table.create(engine)
In [70]:
append_df_to_sql(df=all_txns, table_name="txns", engine=engine)
In [71]:
# сверяем размеры датафрейма и БД таблицы
count_rows(engine, 'txns', all_txns)
Dataframe and SQL table row counts are equal. 19982 rows
Создание и заполнение таблицы accounts
¶
In [72]:
acc_path = data_paths["generated"]["accounts"]
accounts = pd.read_csv(acc_path)
accounts.head(2)
Out[72]:
client_id | account_id | is_drop | |
---|---|---|---|
0 | 1 | 10000 | False |
1 | 2 | 10001 | False |
In [73]:
acc_dtypes = accounts.dtypes
acc_dtypes
Out[73]:
client_id int64 account_id int64 is_drop bool dtype: object
In [74]:
accounts_tab = add_table_to_metadata(table_name="accounts", metadata=metadata, df_types=acc_dtypes)
In [75]:
accounts_tab.create(engine)
In [76]:
# заполняем таблицу accounts
append_df_to_sql(df=accounts, table_name="accounts", engine=engine)
In [77]:
# сверяем количество строк
count_rows(engine, 'accounts', accounts)
Dataframe and SQL table row counts are equal. 5369 rows
Создание и заполнение таблицы cities
¶
In [ ]:
cities_path = data_paths["base"]["cities"]
cities = gpd.read_file(cities_path)
cities.drop(columns="geometry", inplace=True) # это геополигоны городов. Они не нужны в БД.
cities.head()
Out[ ]:
region | city | timezone | lat | lon | population | city_id | clients | |
---|---|---|---|---|---|---|---|---|
0 | Москва | Москва | UTC+3 | 55.753879 | 37.620373 | 11514330 | 1 | 663 |
1 | Санкт-Петербург | Санкт-Петербург | UTC+3 | 59.939125 | 30.315822 | 4848742 | 74 | 180 |
2 | Новосибирская | Новосибирск | UTC+7 | 55.028102 | 82.921058 | 1498921 | 70 | 169 |
3 | Свердловская | Екатеринбург | UTC+5 | 56.838633 | 60.605489 | 1377738 | 54 | 155 |
4 | Нижегородская | Нижний Новгород | UTC+3 | 56.324209 | 44.005395 | 1250615 | 64 | 109 |
In [81]:
cities_dtypes = cities.dtypes
cities_dtypes
Out[81]:
region object city object timezone object lat float64 lon float64 population int64 city_id int64 clients int64 dtype: object
In [82]:
# создаем таблицу в БД
cities_tab = add_table_to_metadata(table_name='cities', metadata=metadata, \
df_types=cities_dtypes)
In [83]:
cities_tab.create(engine)
In [84]:
# заполняем таблицу
append_df_to_sql(df=cities, table_name="cities", engine=engine)
In [85]:
# проверяем целостность
count_rows(engine, 'cities', cities)
Dataframe and SQL table row counts are equal. 77 rows
Создание и заполнение таблицы client_devices
¶
In [86]:
clnt_dev_path = data_paths["base"]["client_devices"]
client_devices = pd.read_csv(clnt_dev_path)
client_devices.head(2)
Out[86]:
client_id | platform | device_id | |
---|---|---|---|
0 | 1 | iOS | 1 |
1 | 2 | Android | 3 |
In [87]:
cl_dev_dtypes = client_devices.dtypes
cl_dev_dtypes
Out[87]:
client_id int64 platform object device_id int64 dtype: object
In [88]:
cl_dev_tab = add_table_to_metadata(table_name='client_devices', metadata=metadata, \
df_types=cl_dev_dtypes)
In [89]:
cl_dev_tab.create(engine)
In [90]:
append_df_to_sql(df=client_devices, table_name="client_devices", engine=engine)
In [91]:
count_rows(engine, 'client_devices', client_devices)
Dataframe and SQL table row counts are equal. 9718 rows
Создание и заполнение таблицы offline_merchants
¶
In [7]:
off_mer_path = data_paths["base"]["offline_merchants"]
offline_merchants = pd.read_parquet(off_mer_path)
offline_merchants.head()
Out[7]:
city | city_id | category | merchant_id | merchant_lat | merchant_lon | |
---|---|---|---|---|---|---|
0 | Москва | 1 | gas_transport | 1.0 | 55.711178 | 37.863932 |
1 | Москва | 1 | grocery_pos | 2.0 | 55.896746 | 37.370257 |
2 | Москва | 1 | home | 3.0 | 55.797594 | 37.382283 |
3 | Москва | 1 | shopping_pos | 4.0 | 55.723753 | 37.654267 |
4 | Москва | 1 | kids_pets | 5.0 | 55.907808 | 37.542508 |
In [8]:
off_mer_dtypes = offline_merchants.dtypes
off_mer_dtypes
Out[8]:
city object city_id int64 category object merchant_id float64 merchant_lat float64 merchant_lon float64 dtype: object
In [94]:
merchants_table = add_table_to_metadata(table_name='offline_merchants', metadata=metadata, \
df_types=off_mer_dtypes)
In [95]:
merchants_table.create(engine)
In [96]:
append_df_to_sql(df=offline_merchants, table_name="offline_merchants", engine=engine)
In [97]:
count_rows(engine, 'offline_merchants', offline_merchants)
Dataframe and SQL table row counts are equal. 6776 rows
Создание и заполнение таблицы fraud_devices
¶
In [98]:
fr_dev_path = data_paths["base_fraud"]["fraud_devices"]
fraud_devices = pd.read_csv(fr_dev_path)
fraud_devices.head()
Out[98]:
device_id | platform | |
---|---|---|
0 | 9719 | Android |
1 | 9720 | iOS |
2 | 9721 | Windows |
3 | 9722 | Windows |
4 | 9723 | Android |
In [99]:
fr_dev_dtypes = fraud_devices.dtypes
fr_dev_dtypes
Out[99]:
device_id int64 platform object dtype: object
In [100]:
fr_dev_table = add_table_to_metadata(table_name='fraud_devices', metadata=metadata, \
df_types=fr_dev_dtypes)
In [101]:
fr_dev_table.create(engine)
In [102]:
append_df_to_sql(df=fraud_devices, table_name="fraud_devices", engine=engine)
In [103]:
count_rows(engine, 'fraud_devices', fraud_devices)
Dataframe and SQL table row counts are equal. 5500 rows
Создание и наполнение таблицы fraud_ips
¶
In [105]:
fr_ip_path = data_paths["base_fraud"]["fraud_ips"]
fraud_ips = pd.read_parquet(fr_ip_path)
fraud_ips.head(2)
Out[105]:
city | lat | lon | fraud_ip | |
---|---|---|---|---|
0 | Москва | 55.753879 | 37.620373 | 5.3.252.223 |
1 | Санкт-Петербург | 59.939125 | 30.315822 | 5.3.252.224 |
In [106]:
fr_ip_dtypes = fraud_ips.dtypes
fr_ip_dtypes
Out[106]:
city object lat float64 lon float64 fraud_ip object dtype: object
In [110]:
fr_ip_table = add_table_to_metadata(table_name='fraud_ips', metadata=metadata, \
df_types=fr_ip_dtypes)
In [111]:
fr_ip_table.create(engine)
In [112]:
append_df_to_sql(df=fraud_ips, table_name="fraud_ips", engine=engine)
In [113]:
count_rows(engine, 'fraud_ips', fraud_ips)
Dataframe and SQL table row counts are equal. 7700 rows
Создание и наполнение таблицы rules
¶
In [115]:
rules_path = data_paths["base_fraud"]["rules"]
rules = pd.read_csv(rules_path)
rules
Out[115]:
rule | weight | online | |
---|---|---|---|
0 | fast_geo_change | 0.12500 | False |
1 | fast_geo_change_online | 0.21875 | True |
2 | new_ip_and_device_high_amount | 0.25000 | True |
3 | new_device_and_high_amount | 0.18750 | True |
4 | trans_freq_increase | 0.21875 | True |
In [116]:
rules_dtypes = rules.dtypes
rules_dtypes
Out[116]:
rule object weight float64 online bool dtype: object
In [117]:
rules_table = add_table_to_metadata(table_name='rules', metadata=metadata, \
df_types=rules_dtypes)
In [118]:
append_df_to_sql(df=rules, table_name="rules", engine=engine)
In [119]:
count_rows(engine, 'rules', rules)
Dataframe and SQL table row counts are equal. 5 rows
Демонстрационные запросы к наполненной БД¶
- проверка общей корректности работы
Посчитаем процент платформ устройств с которых был совершен фрод¶
In [ ]:
# Создадим единую devices таблицу т.к. во фроде участвуют как клиенты банка с устройствами из client_devices
# так и сторонние лица с устройствами из fraud_devices.
# Потом заджоиним devices ко всем транзакциям по device_id и c условием что транзакция - фрод
# Затем посчитаем кол-во девайсов во фроде по платформам. И дальше вычислим процент платфромы во фрод
# транзакциях
plat_query = """
WITH devices AS (SELECT platform, device_id
FROM
client_devices
UNION
SELECT platform, device_id
FROM
fraud_devices),
platforms AS (SELECT platform, COUNT(t.device_id) AS devices
FROM txns as t
JOIN devices as d
ON t.device_id = d.device_id AND t.is_fraud = true
GROUP BY platform)
SELECT platform,
ROUND(100 * devices / (SELECT SUM(devices) FROM platforms), 2) AS percentage
FROM platforms
"""
In [127]:
with engine.connect() as con:
rs = con.execute(text(plat_query))
plat_stats = pd.DataFrame(rs.fetchall())
In [128]:
plat_stats
Out[128]:
platform | percentage | |
---|---|---|
0 | macOS | 5.90 |
1 | Windows | 58.03 |
2 | Android | 30.82 |
3 | iOS | 3.28 |
4 | Linux | 1.97 |