Compare commits

..

6 commits

3 changed files with 267 additions and 8 deletions

247
createdatabase.py Executable file
View file

@ -0,0 +1,247 @@
#!/usr/bin/env python
import sys
import os
import re
# import time
import logging
import argparse as arg
import psycopg2 as pg
from psycopg2.extensions import AsIs
from classes.CustomFormater import CustomFormatter
from classes.Timer import Timer
# Schema name is NAMEDATALEN-1 (PGSQL source code)
# -> src/include/pg_config_manual.h
def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")):
if not pat.match(arg_value):
raise ValueError
return arg_value
def parse_args():
"""
Parse arguments
"""
parser = arg.ArgumentParser('Process csv files from INSEE')
parser.add_argument('--source', '-s',
help='csv source directory',
default='exports')
parser.add_argument('--connection-file', '-f',
help='Postgresql connexion file',
default='.pgconn'
)
# As whe use AsIs function to not include '' in our sql queries for schema
# name, me mus ensure that it is not contains SQL special characters
# like comments --, final coma etc and avoid SQL injection
parser.add_argument('--schema-name',
help='Database schema name',
type=check_schema_name,
default='insee'
)
debug_group = parser.add_mutually_exclusive_group()
debug_group.add_argument('--verbose', '-V',
help='Verbose output',
action='store_true')
debug_group.add_argument('--debug', '-d',
help='Activate debug mode',
action='store_true')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
t = Timer()
#logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
tty_handler = logging.StreamHandler()
# create console handler with a higher log level
tty_handler.setFormatter(CustomFormatter())
logger.addHandler(tty_handler)
if args.verbose is True:
logger.setLevel(logging.INFO)
logger.info('VERBOSE mode activated')
if args.debug is True:
logger.setLevel(logging.DEBUG)
logger.debug('DEBUG mode activated')
logging.debug('Import pgsql connection file {}'.format(args.connection_file))
with open(args.connection_file) as cf:
pg_conn = cf.read()
t.start('Create database schema')
conn = pg.connect(pg_conn)
with conn.cursor() as curs:
curs.execute('CREATE SCHEMA IF NOT EXISTS %s', (AsIs(args.schema_name),))
curs.execute("""
CREATE TABLE IF NOT EXISTS %(schema)s.region (
id_region serial PRIMARY KEY,
reg VARCHAR(2) unique,
ncc VARCHAR(200),
libelle VARCHAR(200)
);""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE TABLE IF NOT EXISTS %(schema)s.departement (
id_departement serial PRIMARY KEY,
dep VARCHAR(3) unique,
ncc VARCHAR(200),
libelle VARCHAR(200),
reg VARCHAR(2) REFERENCES %(schema)s.region(reg)
);""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE TABLE IF NOT EXISTS %(schema)s.commune (
id_commune serial PRIMARY KEY,
com VARCHAR(5) unique,
ncc VARCHAR(200),
libelle VARCHAR(200),
dep VARCHAR(3) REFERENCES %(schema)s.departement(dep),
superf FLOAT CONSTRAINT SUPERFICIE_POSITIVE CHECK (superf > 0)
);""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE TABLE IF NOT EXISTS %(schema)s.cheflieudepartement(
dep VARCHAR(3) REFERENCES %(schema)s.departement(dep),
cheflieudep VARCHAR(5) REFERENCES %(schema)s.commune(com),
PRIMARY KEY (dep, cheflieudep)
);""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE TABLE IF NOT EXISTS %(schema)s.cheflieuregion(
reg VARCHAR(3) REFERENCES %(schema)s.region(reg),
cheflieureg VARCHAR(5) REFERENCES %(schema)s.commune(com),
PRIMARY KEY (reg, cheflieureg)
);""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE TABLE IF NOT EXISTS %(schema)s.indicateur(
id_indicateur INTEGER PRIMARY KEY,
libelle VARCHAR(200)
);""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE TABLE IF NOT EXISTS %(schema)s.statistique(
com VARCHAR(5) REFERENCES %(schema)s.commune(com),
id_indicateur INTEGER REFERENCES %(schema)s.indicateur(id_indicateur),
date_debut SMALLINT,
date_fin SMALLINT CONSTRAINT APRES_DATEDEBUT CHECK (date_fin > date_debut),
valeur float,
PRIMARY KEY (com,id_indicateur,date_debut)
);""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE OR REPLACE VIEW %(schema)s.view_indicateur_dep AS
SELECT id_departement, d.dep, d.ncc, s.id_indicateur, s.date_debut, SUM(s.VALEUR)
FROM %(schema)s.departement d
INNER JOIN %(schema)s.commune c ON c.dep = d.dep
INNER JOIN %(schema)s.statistique s ON s.com = c.com
GROUP BY id_departement,d.dep, s.date_debut, d.ncc, s.id_indicateur
ORDER BY id_indicateur;""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE OR REPLACE VIEW %(schema)s.view_indicateur_reg AS
SELECT id_region, r.reg, r.ncc, s.id_indicateur, s.date_debut, SUM(s.valeur)
FROM %(schema)s.region r
INNER JOIN %(schema)s.departement d ON d.reg = r.reg
INNER JOIN %(schema)s.commune c ON c.dep = d.dep
INNER JOIN %(schema)s.statistique s ON s.com = c.com
GROUP BY id_region, r.reg, s.date_debut, r.ncc, s.id_indicateur
ORDER BY id_indicateur;""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
ALTER TABLE %(schema)s.region
ADD COLUMN IF NOT EXISTS population INT;
""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
ALTER TABLE %(schema)s.departement
ADD COLUMN IF NOT EXISTS population INT;
""",
{'schema':AsIs(args.schema_name)})
curs.execute("""
CREATE OR REPLACE PROCEDURE %(schema)s.PRC_POP_REG_DEP()
LANGUAGE plpgsql
AS $$
DECLARE
REC RECORD;
BEGIN
FOR REC IN (SELECT id_indicateur, ncc, SUM valeur
FROM %(schema)s.view_indicateur_dep v
WHERE id_indicateur = 1
AND date_debut = '2018') LOOP
UPDATE %(schema)s.departement
SET population = REC.valeur
WHERE id_departement = REC.id_departement;
END LOOP;
FOR REC IN (SELECT reg, SUM(population) valeur
FROM DEPARTEMENT d
GROUP BY REG) LOOP
UPDATE %(schema)s.region
SET population = REC.VALEUR
WHERE reg = REC.reg;
END LOOP;
END;
$$;""",
{'schema':AsIs(args.schema_name)})
conn.commit()
t.stop()
t.start('Import data from csv files')
with conn.cursor() as curs:
curs.execute('SET search_path TO {}'.format(args.schema_name))
csvs = {
'region': {'col': ('reg', 'ncc', 'libelle'), 'null':''},
'departement': {'col': ('dep', 'ncc', 'libelle', 'reg'), 'null': ''},
'commune': {'col': ('com', 'ncc', 'libelle', 'dep', 'superf'), 'null':''},
'cheflieudepartement': {'col': ('cheflieudep', 'dep'), 'null':''},
'cheflieuregion': {'col': ('cheflieureg', 'reg'), 'null':''},
'indicateur': {'col': ('id_indicateur', 'libelle'), 'null':''},
'statistique': {
'col': (
'com',
'id_indicateur',
'date_debut',
'date_fin',
'valeur'
),
'null':'null'
}
}
for csv, params in csvs.items():
logging.debug('Process csv file : {} with {}'.format(csv, params))
with open( args.source + '/' + csv + '.csv') as f:
with conn.cursor() as curs:
curs.copy_from(
f,
csv,
sep=',',
columns=params['col'],
null=params['null']
)
conn.commit()
t.stop()
conn.close()

View file

@ -82,6 +82,12 @@ def import_towns_csv(raw_file):
return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']] return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']]
def stats_convert(codegeo):
if len(codegeo) == 4:
return 0 + codegeo
if len(codegeo) == 6:
return codegeo[1:]
return codegeo
def import_statistics_csv(raw_file): def import_statistics_csv(raw_file):
""" """
@ -89,7 +95,6 @@ def import_statistics_csv(raw_file):
""" """
logger.info('import town from {}'.format(raw_file)) logger.info('import town from {}'.format(raw_file))
stats_convert= lambda x: x if len(str(x)) == 5 else f'0{x}'
stats = pd.read_csv(raw_file, stats = pd.read_csv(raw_file,
usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP", usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP",
"NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318", "NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318",
@ -137,10 +142,10 @@ def get_range_date(attr):
return date[1], date[2] return date[1], date[2]
def export_csv(dataframe, path): def export_csv(dataframe, path, index = False):
logger.debug('export csv from panda dataframe') logger.debug('export csv from panda dataframe')
try: try:
dataframe.to_csv(path ,header = False, index= False) dataframe.to_csv(path ,header = False, index = index)
except Exception as e: except Exception as e:
logger.error( logger.error(
'Erro when exporting Dataframe to csvfile {}. \n{}'.format( 'Erro when exporting Dataframe to csvfile {}. \n{}'.format(
@ -201,6 +206,7 @@ if __name__ == '__main__':
logger.critical('can\'t find source file for statistics') logger.critical('can\'t find source file for statistics')
sys.exit(1) sys.exit(1)
statistics = import_statistics_csv(args.source + '/' + args.statistics) statistics = import_statistics_csv(args.source + '/' + args.statistics)
statistics = statistics[statistics['CODGEO'].isin(towns['COM'])]
t.stop() t.stop()
logger.debug(statistics) logger.debug(statistics)
@ -228,7 +234,7 @@ if __name__ == '__main__':
## Create states capitals ## Create states capitals
states_capitals = states[['CHEFLIEU','REG']] states_capitals = states[['CHEFLIEU','REG']]
states_capitals.columns = ["CHEFLIEUREG","REG"] states_capitals.columns = ["CHEFLIEUREG","REG"]
departments = departments[["REG","NCC","LIBELLE"]] states = states[[ "REG","NCC","LIBELLE"]]
logger.debug(states_capitals) logger.debug(states_capitals)
## create statistics dataframes ## create statistics dataframes
@ -253,8 +259,14 @@ if __name__ == '__main__':
logger.debug('Process indicator {}'.format(regex)) logger.debug('Process indicator {}'.format(regex))
selection = srow.filter(regex=regex) selection = srow.filter(regex=regex)
for attribute, value in selection.items(): for attribute, value in selection.items():
logger.debug('check code: {}'.format(irow['code'])) logger.debug('check code: {}'.format(irow['code']))
# If value is NaN, then do not add the line
if pd.isna(value):
continue
if irow['code'].startswith('_'): if irow['code'].startswith('_'):
start,end = get_single_date(attribute) start,end = get_single_date(attribute)
else: else:
@ -263,7 +275,7 @@ if __name__ == '__main__':
if start is None or end is None: if start is None or end is None:
logger.error('Can\'t process line, continue to next') logger.error('Can\'t process line, continue to next')
continue continue
logger.debug( logger.debug(
'town:{}, id_indic: {}, start: {}, end: {}, value:{}' 'town:{}, id_indic: {}, start: {}, end: {}, value:{}'
.format( .format(
@ -283,7 +295,7 @@ if __name__ == '__main__':
consolidated_stats = pd.DataFrame.from_dict(temp) consolidated_stats = pd.DataFrame.from_dict(temp)
t.stop() t.stop()
indicators=indicators[['indicateur']]
t.start('Process_town') t.start('Process_town')
towns = pd.merge(towns, towns = pd.merge(towns,
statistics[['CODGEO', 'SUPERF']], statistics[['CODGEO', 'SUPERF']],
@ -301,8 +313,8 @@ if __name__ == '__main__':
export_csv(states, args.export + '/region.csv') export_csv(states, args.export + '/region.csv')
export_csv(dep_capitals, args.export + '/cheflieudepartement.csv') export_csv(dep_capitals, args.export + '/cheflieudepartement.csv')
export_csv(states_capitals, args.export + '/cheflieuregion.csv') export_csv(states_capitals, args.export + '/cheflieuregion.csv')
export_csv(indicators, args.export + '/indicateur.csv') export_csv(indicators, args.export + '/indicateur.csv', True)
export_csv(consolidated_stats, args.export + '/statistiques.csv') export_csv(consolidated_stats, args.export + '/statistique.csv')
t.stop() t.stop()
t.get_times_by_tag() t.get_times_by_tag()