311 lines
10 KiB
Python
Executable file
311 lines
10 KiB
Python
Executable file
#!/usr/bin/env python
|
|
import pandas as pd
|
|
import io
|
|
import numpy as np
|
|
import sys
|
|
import os
|
|
import re
|
|
import time
|
|
import logging
|
|
import argparse as arg
|
|
from classes.Timer import Timer
|
|
from classes.CustomFormater import CustomFormatter
|
|
|
|
|
|
def parse_args():
|
|
"""
|
|
Parse arguments
|
|
"""
|
|
parser = arg.ArgumentParser('Process csv files from INSEE')
|
|
parser.add_argument('--source', '-s',
|
|
help='csv source directory',
|
|
default='csv')
|
|
parser.add_argument('--export', '-e',
|
|
help='processeced csv directory',
|
|
default='exports')
|
|
parser.add_argument('--towns',
|
|
help='town raw csv file (inside source follder)',
|
|
default='commune2021.csv')
|
|
parser.add_argument('--departments',
|
|
help='departments raw csv file (inside source follder)',
|
|
default='departement2021.csv')
|
|
parser.add_argument('--states',
|
|
help='states raw csv file (inside source follder)',
|
|
default='region2021.csv')
|
|
parser.add_argument('--statistics',
|
|
help='statistics raw csv file to import',
|
|
default='statistiques.csv')
|
|
debug_group = parser.add_mutually_exclusive_group()
|
|
debug_group.add_argument('--verbose', '-V',
|
|
help='Verbose output',
|
|
action='store_true')
|
|
debug_group.add_argument('--debug', '-d',
|
|
help='Activate debug mode',
|
|
action='store_true')
|
|
return parser.parse_args()
|
|
|
|
|
|
def import_states_csv(raw_file):
|
|
"""
|
|
Process states raw file
|
|
"""
|
|
|
|
logger.info('import states from {}'.format(raw_file))
|
|
reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}'
|
|
states = pd.read_csv(raw_file,
|
|
usecols=["REG","NCC","LIBELLE","CHEFLIEU"],
|
|
converters={'REG': reg_convert})
|
|
return states
|
|
|
|
|
|
def import_department_csv(raw_file):
|
|
"""
|
|
Process department files
|
|
"""
|
|
|
|
logger.info('import departments from {}'.format(raw_file))
|
|
reg_convert= lambda x: x if len(str(x)) == 2 else f'0{x}'
|
|
dep = pd.read_csv(raw_file,
|
|
usecols=["DEP","NCC","LIBELLE","REG","CHEFLIEU"],
|
|
converters={'REG':reg_convert})
|
|
return dep
|
|
|
|
|
|
def import_towns_csv(raw_file):
|
|
"""
|
|
Process department files
|
|
"""
|
|
|
|
logger.info('import town from {}'.format(raw_file))
|
|
towns = pd.read_csv(raw_file,
|
|
usecols=["COM","TYPECOM","NCC","LIBELLE","DEP"])
|
|
return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']]
|
|
|
|
|
|
|
|
def import_statistics_csv(raw_file):
|
|
"""
|
|
Process stats files
|
|
"""
|
|
|
|
logger.info('import town from {}'.format(raw_file))
|
|
stats_convert= lambda x: x if len(str(x)) == 5 else f'0{x}'
|
|
stats = pd.read_csv(raw_file,
|
|
usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP",
|
|
"NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318",
|
|
"DECE0813","DECE9908","DECE9099","DECE8290","P18_LOG","P13_LOG",
|
|
"P08_LOG","D99_LOG","D90_LOG","D82_LOG", "P18_LOGVAC","P13_LOGVAC",
|
|
"P08_LOGVAC","D99_LOGVAC","D90_LOGVAC","D82_LOGVAC","P18_RP",
|
|
"P13_RP","P08_RP","D99_RP","D90_RP","D82_RP", "P18_RSECOCC",
|
|
"P13_RSECOCC","P08_RSECOCC","D99_RSECOCC","D90_RSECOCC",
|
|
"D82_RSECOCC"],
|
|
sep=';',
|
|
converters={'CODGEO':stats_convert}
|
|
)
|
|
return stats
|
|
|
|
def get_single_date(attr):
|
|
logger.debug('get a date from {}'.format(attr))
|
|
m = re.match('^[D,P]([0-9]{2}).*$', attr)
|
|
if m.group(1) is None:
|
|
logger.error('Cand determine single date from {}'.format(attr))
|
|
|
|
if m.group(1).startswith(tuple(['0','1','2'])):
|
|
return 2000 + int(m.group(1)), 'null'
|
|
else:
|
|
return 1900 + int(m.group(1)), 'null'
|
|
|
|
def get_range_date(attr):
|
|
logger.debug('get date range from {}'.format(attr))
|
|
m = re.match('^[A-Z]*([0-9]{2})([0-9]{2}).*$', attr)
|
|
date = {}
|
|
try:
|
|
for i in [1,2]:
|
|
logger.debug('Process two digits: {}'.format(m.group(i)))
|
|
if m.group(i).startswith(tuple(['0','1','2'])):
|
|
date[i] = 2000 + int(m.group(i))
|
|
else:
|
|
date[i] = 1900 + int(m.group(i))
|
|
except Exception as e:
|
|
logger.error(
|
|
'Error when trying to determine daterange from {} - {}'.format(
|
|
attr,
|
|
e
|
|
)
|
|
)
|
|
return None, None
|
|
|
|
return date[1], date[2]
|
|
|
|
def export_csv(dataframe, path, index = False):
|
|
logger.debug('export csv from panda dataframe')
|
|
try:
|
|
dataframe.to_csv(path ,header = False, index = index)
|
|
except Exception as e:
|
|
logger.error(
|
|
'Erro when exporting Dataframe to csvfile {}. \n{}'.format(
|
|
path,
|
|
e)
|
|
)
|
|
sys.exit(1)
|
|
|
|
if __name__ == '__main__':
|
|
args = parse_args()
|
|
|
|
#logging.basicConfig(level=logging.DEBUG)
|
|
logger = logging.getLogger()
|
|
tty_handler = logging.StreamHandler()
|
|
|
|
# create console handler with a higher log level
|
|
tty_handler.setFormatter(CustomFormatter())
|
|
logger.addHandler(tty_handler)
|
|
|
|
if args.verbose is True:
|
|
logger.setLevel(logging.INFO)
|
|
logger.info('VERBOSE mode activated')
|
|
|
|
if args.debug is True:
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.debug('DEBUG mode activated')
|
|
|
|
t = Timer(logger=logger.info)
|
|
|
|
t.start('Import_CSV')
|
|
if not os.path.exists(args.source + '/' + args.states):
|
|
logger.critical('can\'t find source file for states')
|
|
sys.exit(1)
|
|
|
|
states = import_states_csv(args.source + '/' + args.states)
|
|
t.stop()
|
|
logger.debug(states)
|
|
|
|
t.start(name='Import_CSV')
|
|
if not os.path.exists(args.source + '/' + args.departments):
|
|
logger.critical('can\'t find source file for departments')
|
|
sys.exit(1)
|
|
departments = import_department_csv(args.source + '/' + args.departments)
|
|
t.stop()
|
|
logger.debug(departments)
|
|
|
|
t.start('Import_CSV')
|
|
if not os.path.exists(args.source + '/' + args.towns):
|
|
logger.critical('can\'t find source file for departments')
|
|
sys.exit(1)
|
|
towns = import_towns_csv(args.source + '/' + args.towns)
|
|
t.stop()
|
|
logger.debug(towns)
|
|
|
|
|
|
t.start('Import_CSV')
|
|
if not os.path.exists(args.source + '/' + args.statistics):
|
|
logger.critical('can\'t find source file for statistics')
|
|
sys.exit(1)
|
|
statistics = import_statistics_csv(args.source + '/' + args.statistics)
|
|
t.stop()
|
|
logger.debug(statistics)
|
|
|
|
t.get_time_by_tag('Import_CSV')
|
|
# Create missing table : indicators
|
|
indicators = pd.DataFrame({'indicateur': [
|
|
'population',
|
|
'naissances',
|
|
'deces',
|
|
'logements',
|
|
'logements vacants',
|
|
'residences principales',
|
|
'residences secondaires et logements occasionnels'
|
|
],
|
|
'code': ['_POP','NAIS', 'DECE','_LOG', '_LOGVAC', '_RP', '_RSECOCC']},
|
|
index=[1,2,3,4,5,6,7])
|
|
logger.debug(indicators)
|
|
|
|
## Create departments capitals
|
|
dep_capitals = departments[['CHEFLIEU','DEP']]
|
|
dep_capitals.columns = ["CHEFLIEUDEP","DEP"]
|
|
departments = departments[["DEP","NCC","LIBELLE","REG"]]
|
|
logger.debug(dep_capitals)
|
|
|
|
## Create states capitals
|
|
states_capitals = states[['CHEFLIEU','REG']]
|
|
states_capitals.columns = ["CHEFLIEUREG","REG"]
|
|
states = states[[ "REG","NCC","LIBELLE"]]
|
|
logger.debug(states_capitals)
|
|
|
|
## create statistics dataframes
|
|
#
|
|
# We need to first iterate on statistics
|
|
t.start('Process_Statistics')
|
|
|
|
c_stats = pd.DataFrame(columns = ['com','id_indicateur','date_debut',
|
|
'date_fin','valeur']
|
|
)
|
|
temp = {"com" : [], "id_indicateur" : [],
|
|
"date_debut" : [],
|
|
"date_fin" : [],
|
|
"valeur" : []
|
|
}
|
|
for s_index,srow in statistics.iterrows():
|
|
for index, irow in indicators.iterrows():
|
|
if irow['code'].startswith('_'):
|
|
regex = irow['code'] + '$'
|
|
else:
|
|
regex = '^' + irow['code']
|
|
|
|
logger.debug('Process indicator {}'.format(regex))
|
|
selection = srow.filter(regex=regex)
|
|
for attribute, value in selection.items():
|
|
logger.debug('check code: {}'.format(irow['code']))
|
|
if irow['code'].startswith('_'):
|
|
start,end = get_single_date(attribute)
|
|
else:
|
|
start,end = get_range_date(attribute)
|
|
|
|
if start is None or end is None:
|
|
logger.error('Can\'t process line, continue to next')
|
|
continue
|
|
|
|
logger.debug(
|
|
'town:{}, id_indic: {}, start: {}, end: {}, value:{}'
|
|
.format(
|
|
srow['CODGEO'],
|
|
index,
|
|
start,
|
|
end,
|
|
value
|
|
)
|
|
)
|
|
|
|
temp['com'].append(srow['CODGEO'])
|
|
temp['id_indicateur'].append(index)
|
|
temp['date_debut'].append(start)
|
|
temp['date_fin'].append(end)
|
|
temp['valeur'].append(value)
|
|
|
|
consolidated_stats = pd.DataFrame.from_dict(temp)
|
|
t.stop()
|
|
indicators=indicators[['indicateur']]
|
|
t.start('Process_town')
|
|
towns = pd.merge(towns,
|
|
statistics[['CODGEO', 'SUPERF']],
|
|
left_on=['COM'],
|
|
right_on=['CODGEO'],
|
|
how = 'left'
|
|
)[['COM','NCC','LIBELLE', 'DEP', 'SUPERF']]
|
|
t.stop()
|
|
logger.debug(towns)
|
|
|
|
# Export all dataframes to csv
|
|
t.start('Export_CSV')
|
|
export_csv(towns, args.export + '/commune.csv')
|
|
export_csv(departments, args.export + '/departement.csv')
|
|
export_csv(states, args.export + '/region.csv')
|
|
export_csv(dep_capitals, args.export + '/cheflieudepartement.csv')
|
|
export_csv(states_capitals, args.export + '/cheflieuregion.csv')
|
|
export_csv(indicators, args.export + '/indicateur.csv', True)
|
|
export_csv(consolidated_stats, args.export + '/statistiques.csv')
|
|
t.stop()
|
|
|
|
t.get_times_by_tag()
|
|
t.get_total_time()
|
|
|
|
sys.exit()
|