Rename script

This commit is contained in:
Yorick Barbanneau 2022-04-25 23:04:21 +02:00
parent 542f67501e
commit 780663a3bc

309
csvprocess.py Executable file
View file

@ -0,0 +1,309 @@
#!/usr/bin/env python
import pandas as pd
import io
import numpy as np
import sys
import os
import re
import time
import logging
import argparse as arg
from timer.Timer import Timer
class CustomFormatter(logging.Formatter):
grey = "\x1b[0;35m"
blue = "\x1b[34;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format = "%(levelname)s: %(message)s (%(filename)s:%(lineno)d)"
FORMATS = {
logging.DEBUG: blue + format + reset,
logging.INFO: grey + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
def parse_args():
"""
Parse arguments
"""
parser = arg.ArgumentParser('Process csv files from INSEE')
parser.add_argument('--source', '-s',
help='csv source directory',
default='csv')
parser.add_argument('--export', '-e',
help='processeced csv directory',
default='exports')
parser.add_argument('--towns',
help='town raw csv file (inside source follder)',
default='commune2021.csv')
parser.add_argument('--departments',
help='departments raw csv file (inside source follder)',
default='departement2021.csv')
parser.add_argument('--states',
help='states raw csv file (inside source follder)',
default='region2021.csv')
parser.add_argument('--statistics',
help='statistics raw csv file to import',
default='statistiques.csv')
debug_group = parser.add_mutually_exclusive_group()
debug_group.add_argument('--verbose', '-V',
help='Verbose output',
action='store_true')
debug_group.add_argument('--debug', '-d',
help='Activate debug mode',
action='store_true')
return parser.parse_args()
def import_states_csv(raw_file):
"""
Process states raw file
"""
logger.info('import states from {}'.format(raw_file))
reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}'
states = pd.read_csv(raw_file,
usecols=["REG","NCC","LIBELLE","CHEFLIEU"],
converters={'REG': reg_convert})
return states
def import_department_csv(raw_file):
"""
Process department files
"""
logger.info('import departments from {}'.format(raw_file))
reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}'
dep = pd.read_csv(raw_file,
usecols=["DEP","NCC","LIBELLE","REG","CHEFLIEU"],
converters={'REG':reg_convert})
return dep
def import_towns_csv(raw_file):
"""
Process department files
"""
logger.info('import town from {}'.format(raw_file))
towns = pd.read_csv(raw_file,
usecols=["COM","TYPECOM","NCC","LIBELLE","DEP"])
return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']]
def import_statistics_csv(raw_file):
"""
Process stats files
"""
logger.info('import town from {}'.format(raw_file))
stats_convert= lambda x: x if len(str(x)) == 5 else f'0{x}'
stats = pd.read_csv(raw_file,
usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP",
"NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318",
"DECE0813","DECE9908","DECE9099","DECE8290","P18_LOG","P13_LOG",
"P08_LOG","D99_LOG","D90_LOG","D82_LOG", "P18_LOGVAC","P13_LOGVAC",
"P08_LOGVAC","D99_LOGVAC","D90_LOGVAC","D82_LOGVAC","P18_RP",
"P13_RP","P08_RP","D99_RP","D90_RP","D82_RP", "P18_RSECOCC",
"P13_RSECOCC","P08_RSECOCC","D99_RSECOCC","D90_RSECOCC",
"D82_RSECOCC"],
sep=';',
converters={'CODGEO':stats_convert}
)
return stats
def get_single_date(attr):
logger.debug('get a date from {}'.format(attr))
m = re.match('^[D,P]([0-9]{2}).*$', attr)
if m.group(1) is None:
logger.error('Cand determine single date from {}'.format(attr))
if m.group(1).startswith(tuple(['0','1','2'])):
return 2000 + int(m.group(1)), 'null'
else:
return 1900 + int(m.group(1)), 'null'
def get_range_date(attr):
logger.debug('get date range from {}'.format(attr))
m = re.match('^[A-Z]*([0-9]{2})([0-9]{2}).*$', attr)
date = {}
try:
for i in [1,2]:
logger.debug('Process two digits: {}'.format(m.group(i)))
if m.group(i).startswith(tuple(['0','1','2'])):
date[i] = 2000 + int(m.group(i))
else:
date[i] = 1900 + int(m.group(i))
except Exception as e:
logger.error(
'Error when trying to determine daterange from {} - {}'.format(
attr,
e
)
)
return None, None
return date[1], date[2]
if __name__ == '__main__':
args = parse_args()
#logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
tty_handler = logging.StreamHandler()
# create console handler with a higher log level
tty_handler.setFormatter(CustomFormatter())
logger.addHandler(tty_handler)
if args.verbose is True:
logger.setLevel(logging.INFO)
logger.info('VERBOSE mode activated')
if args.debug is True:
logger.setLevel(logging.DEBUG)
logger.debug('DEBUG mode activated')
t = Timer(logger=logger.info)
t.start('Import_CSV')
if not os.path.exists(args.source + '/' + args.states):
logger.critical('can\'t find source file for states')
sys.exit(1)
states = import_states_csv(args.source + '/' + args.states)
t.stop()
logger.debug(states)
t.start(name='Import_CSV')
if not os.path.exists(args.source + '/' + args.departments):
logger.critical('can\'t find source file for departments')
sys.exit(1)
departments = import_department_csv(args.source + '/' + args.departments)
t.stop()
logger.debug(departments)
t.start('Import_CSV')
if not os.path.exists(args.source + '/' + args.towns):
logger.critical('can\'t find source file for departments')
sys.exit(1)
towns = import_towns_csv(args.source + '/' + args.towns)
t.stop()
logger.debug(towns)
t.start('Import_CSV')
if not os.path.exists(args.source + '/' + args.statistics):
logger.critical('can\'t find source file for statistics')
sys.exit(1)
statistics = import_statistics_csv(args.source + '/' + args.statistics)
t.stop()
logger.debug(statistics)
t.get_time_by_tag('Import_CSV')
# Create missing table : indicators
indicators = pd.DataFrame({'indicateur': [
'population',
'naissances',
'deces',
'logements',
'logements vacants',
'residences principales',
'residences secondaires et logements occasionnels'
],
'code': ['_POP','NAIS', 'DECE','_LOG', '_LOGVAC', '_RP', '_RSECOCC']},
index=[1,2,3,4,5,6,7])
logger.debug(indicators)
## Create departments capitals
dep_capitals = departments[['CHEFLIEU','DEP']]
dep_capitals.columns = ["CHEFLIEUDEP","DEP"]
departments = departments[["DEP","NCC","LIBELLE","REG"]]
logger.debug(dep_capitals)
## Create states capitals
states_capitals = states[['CHEFLIEU','REG']]
states_capitals.columns = ["CHEFLIEUREG","REG"]
departments = departments[["REG","NCC","LIBELLE"]]
logger.debug(states_capitals)
## create statistics dataframes
#
# We need to first iterate on statistics
t.start('Process_Statistics')
c_stats = pd.DataFrame(columns = ['com','id_indicateur','date_debut',
'date_fin','valeur']
)
temp = {"com" : [], "id_indicateur" : [],
"date_debut" : [],
"date_fin" : [],
"valeur" : []
}
for s_index,srow in statistics.iterrows():
for index, irow in indicators.iterrows():
if irow['code'].startswith('_'):
regex = irow['code'] + '$'
else:
regex = '^' + irow['code']
logger.debug('Process indicator {}'.format(regex))
selection = srow.filter(regex=regex)
for attribute, value in selection.items():
logger.debug('check code: {}'.format(irow['code']))
if irow['code'].startswith('_'):
start,end = get_single_date(attribute)
else:
start,end = get_range_date(attribute)
if start is None or end is None:
logger.error('Can\'t process line, continue to next')
continue
logger.debug(
'town:{}, id_indic: {}, start: {}, end: {}, value:{}'
.format(
srow['CODGEO'],
index,
start,
end,
value
)
)
temp['com'].append(srow['CODGEO'])
temp['id_indicateur'].append(index)
temp['date_debut'].append(start)
temp['date_fin'].append(end)
temp['valeur'].append(value)
t.stop()
t.start('Process_town')
print(statistics[['SUPERF','CODGEO']])
towns = pd.merge(towns,
statistics[['CODGEO', 'SUPERF']],
left_on=['COM'],
right_on=['CODGEO'],
how = 'left'
)[['COM','NCC','LIBELLE', 'DEP', 'SUPERF']]
t.stop()
logger.debug(towns)
t.get_total_time()
sys.exit()