Compare commits
No commits in common. "64b7d5283ffe74ff97c07cc1e36836286f730021" and "75554257aa8c433269ecb0b0ad00b13e7d752259" have entirely different histories.
64b7d5283f
...
75554257aa
3 changed files with 8 additions and 267 deletions
|
@ -1,247 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
# import time
|
|
||||||
import logging
|
|
||||||
import argparse as arg
|
|
||||||
import psycopg2 as pg
|
|
||||||
from psycopg2.extensions import AsIs
|
|
||||||
|
|
||||||
from classes.CustomFormater import CustomFormatter
|
|
||||||
from classes.Timer import Timer
|
|
||||||
|
|
||||||
# Schema name is NAMEDATALEN-1 (PGSQL source code)
|
|
||||||
# -> src/include/pg_config_manual.h
|
|
||||||
def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")):
|
|
||||||
if not pat.match(arg_value):
|
|
||||||
raise ValueError
|
|
||||||
return arg_value
|
|
||||||
|
|
||||||
|
|
||||||
def parse_args():
|
|
||||||
"""
|
|
||||||
Parse arguments
|
|
||||||
"""
|
|
||||||
parser = arg.ArgumentParser('Process csv files from INSEE')
|
|
||||||
|
|
||||||
parser.add_argument('--source', '-s',
|
|
||||||
help='csv source directory',
|
|
||||||
default='exports')
|
|
||||||
|
|
||||||
parser.add_argument('--connection-file', '-f',
|
|
||||||
help='Postgresql connexion file',
|
|
||||||
default='.pgconn'
|
|
||||||
)
|
|
||||||
|
|
||||||
# As whe use AsIs function to not include '' in our sql queries for schema
|
|
||||||
# name, me mus ensure that it is not contains SQL special characters
|
|
||||||
# like comments --, final coma etc and avoid SQL injection
|
|
||||||
parser.add_argument('--schema-name',
|
|
||||||
help='Database schema name',
|
|
||||||
type=check_schema_name,
|
|
||||||
default='insee'
|
|
||||||
)
|
|
||||||
|
|
||||||
debug_group = parser.add_mutually_exclusive_group()
|
|
||||||
debug_group.add_argument('--verbose', '-V',
|
|
||||||
help='Verbose output',
|
|
||||||
action='store_true')
|
|
||||||
debug_group.add_argument('--debug', '-d',
|
|
||||||
help='Activate debug mode',
|
|
||||||
action='store_true')
|
|
||||||
return parser.parse_args()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
args = parse_args()
|
|
||||||
t = Timer()
|
|
||||||
#logging.basicConfig(level=logging.DEBUG)
|
|
||||||
logger = logging.getLogger()
|
|
||||||
tty_handler = logging.StreamHandler()
|
|
||||||
|
|
||||||
# create console handler with a higher log level
|
|
||||||
tty_handler.setFormatter(CustomFormatter())
|
|
||||||
logger.addHandler(tty_handler)
|
|
||||||
|
|
||||||
if args.verbose is True:
|
|
||||||
logger.setLevel(logging.INFO)
|
|
||||||
logger.info('VERBOSE mode activated')
|
|
||||||
|
|
||||||
if args.debug is True:
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
logger.debug('DEBUG mode activated')
|
|
||||||
|
|
||||||
logging.debug('Import pgsql connection file {}'.format(args.connection_file))
|
|
||||||
with open(args.connection_file) as cf:
|
|
||||||
pg_conn = cf.read()
|
|
||||||
|
|
||||||
t.start('Create database schema')
|
|
||||||
conn = pg.connect(pg_conn)
|
|
||||||
with conn.cursor() as curs:
|
|
||||||
curs.execute('CREATE SCHEMA IF NOT EXISTS %s', (AsIs(args.schema_name),))
|
|
||||||
curs.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS %(schema)s.region (
|
|
||||||
id_region serial PRIMARY KEY,
|
|
||||||
reg VARCHAR(2) unique,
|
|
||||||
ncc VARCHAR(200),
|
|
||||||
libelle VARCHAR(200)
|
|
||||||
);""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS %(schema)s.departement (
|
|
||||||
id_departement serial PRIMARY KEY,
|
|
||||||
dep VARCHAR(3) unique,
|
|
||||||
ncc VARCHAR(200),
|
|
||||||
libelle VARCHAR(200),
|
|
||||||
reg VARCHAR(2) REFERENCES %(schema)s.region(reg)
|
|
||||||
);""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS %(schema)s.commune (
|
|
||||||
id_commune serial PRIMARY KEY,
|
|
||||||
com VARCHAR(5) unique,
|
|
||||||
ncc VARCHAR(200),
|
|
||||||
libelle VARCHAR(200),
|
|
||||||
dep VARCHAR(3) REFERENCES %(schema)s.departement(dep),
|
|
||||||
superf FLOAT CONSTRAINT SUPERFICIE_POSITIVE CHECK (superf > 0)
|
|
||||||
);""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS %(schema)s.cheflieudepartement(
|
|
||||||
dep VARCHAR(3) REFERENCES %(schema)s.departement(dep),
|
|
||||||
cheflieudep VARCHAR(5) REFERENCES %(schema)s.commune(com),
|
|
||||||
PRIMARY KEY (dep, cheflieudep)
|
|
||||||
);""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS %(schema)s.cheflieuregion(
|
|
||||||
reg VARCHAR(3) REFERENCES %(schema)s.region(reg),
|
|
||||||
cheflieureg VARCHAR(5) REFERENCES %(schema)s.commune(com),
|
|
||||||
PRIMARY KEY (reg, cheflieureg)
|
|
||||||
);""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS %(schema)s.indicateur(
|
|
||||||
id_indicateur INTEGER PRIMARY KEY,
|
|
||||||
libelle VARCHAR(200)
|
|
||||||
);""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS %(schema)s.statistique(
|
|
||||||
com VARCHAR(5) REFERENCES %(schema)s.commune(com),
|
|
||||||
id_indicateur INTEGER REFERENCES %(schema)s.indicateur(id_indicateur),
|
|
||||||
date_debut SMALLINT,
|
|
||||||
date_fin SMALLINT CONSTRAINT APRES_DATEDEBUT CHECK (date_fin > date_debut),
|
|
||||||
valeur float,
|
|
||||||
PRIMARY KEY (com,id_indicateur,date_debut)
|
|
||||||
);""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE OR REPLACE VIEW %(schema)s.view_indicateur_dep AS
|
|
||||||
SELECT id_departement, d.dep, d.ncc, s.id_indicateur, s.date_debut, SUM(s.VALEUR)
|
|
||||||
FROM %(schema)s.departement d
|
|
||||||
INNER JOIN %(schema)s.commune c ON c.dep = d.dep
|
|
||||||
INNER JOIN %(schema)s.statistique s ON s.com = c.com
|
|
||||||
GROUP BY id_departement,d.dep, s.date_debut, d.ncc, s.id_indicateur
|
|
||||||
ORDER BY id_indicateur;""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE OR REPLACE VIEW %(schema)s.view_indicateur_reg AS
|
|
||||||
SELECT id_region, r.reg, r.ncc, s.id_indicateur, s.date_debut, SUM(s.valeur)
|
|
||||||
FROM %(schema)s.region r
|
|
||||||
INNER JOIN %(schema)s.departement d ON d.reg = r.reg
|
|
||||||
INNER JOIN %(schema)s.commune c ON c.dep = d.dep
|
|
||||||
INNER JOIN %(schema)s.statistique s ON s.com = c.com
|
|
||||||
GROUP BY id_region, r.reg, s.date_debut, r.ncc, s.id_indicateur
|
|
||||||
ORDER BY id_indicateur;""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
ALTER TABLE %(schema)s.region
|
|
||||||
ADD COLUMN IF NOT EXISTS population INT;
|
|
||||||
""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
ALTER TABLE %(schema)s.departement
|
|
||||||
ADD COLUMN IF NOT EXISTS population INT;
|
|
||||||
""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
|
|
||||||
curs.execute("""
|
|
||||||
CREATE OR REPLACE PROCEDURE %(schema)s.PRC_POP_REG_DEP()
|
|
||||||
LANGUAGE plpgsql
|
|
||||||
AS $$
|
|
||||||
DECLARE
|
|
||||||
REC RECORD;
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
FOR REC IN (SELECT id_indicateur, ncc, SUM valeur
|
|
||||||
FROM %(schema)s.view_indicateur_dep v
|
|
||||||
WHERE id_indicateur = 1
|
|
||||||
AND date_debut = '2018') LOOP
|
|
||||||
|
|
||||||
UPDATE %(schema)s.departement
|
|
||||||
SET population = REC.valeur
|
|
||||||
WHERE id_departement = REC.id_departement;
|
|
||||||
|
|
||||||
END LOOP;
|
|
||||||
|
|
||||||
FOR REC IN (SELECT reg, SUM(population) valeur
|
|
||||||
FROM DEPARTEMENT d
|
|
||||||
GROUP BY REG) LOOP
|
|
||||||
|
|
||||||
UPDATE %(schema)s.region
|
|
||||||
SET population = REC.VALEUR
|
|
||||||
WHERE reg = REC.reg;
|
|
||||||
|
|
||||||
END LOOP;
|
|
||||||
END;
|
|
||||||
$$;""",
|
|
||||||
{'schema':AsIs(args.schema_name)})
|
|
||||||
conn.commit()
|
|
||||||
t.stop()
|
|
||||||
|
|
||||||
t.start('Import data from csv files')
|
|
||||||
with conn.cursor() as curs:
|
|
||||||
curs.execute('SET search_path TO {}'.format(args.schema_name))
|
|
||||||
csvs = {
|
|
||||||
'region': {'col': ('reg', 'ncc', 'libelle'), 'null':''},
|
|
||||||
'departement': {'col': ('dep', 'ncc', 'libelle', 'reg'), 'null': ''},
|
|
||||||
'commune': {'col': ('com', 'ncc', 'libelle', 'dep', 'superf'), 'null':''},
|
|
||||||
'cheflieudepartement': {'col': ('cheflieudep', 'dep'), 'null':''},
|
|
||||||
'cheflieuregion': {'col': ('cheflieureg', 'reg'), 'null':''},
|
|
||||||
'indicateur': {'col': ('id_indicateur', 'libelle'), 'null':''},
|
|
||||||
'statistique': {
|
|
||||||
'col': (
|
|
||||||
'com',
|
|
||||||
'id_indicateur',
|
|
||||||
'date_debut',
|
|
||||||
'date_fin',
|
|
||||||
'valeur'
|
|
||||||
),
|
|
||||||
'null':'null'
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for csv, params in csvs.items():
|
|
||||||
logging.debug('Process csv file : {} with {}'.format(csv, params))
|
|
||||||
with open( args.source + '/' + csv + '.csv') as f:
|
|
||||||
with conn.cursor() as curs:
|
|
||||||
curs.copy_from(
|
|
||||||
f,
|
|
||||||
csv,
|
|
||||||
sep=',',
|
|
||||||
columns=params['col'],
|
|
||||||
null=params['null']
|
|
||||||
)
|
|
||||||
conn.commit()
|
|
||||||
t.stop()
|
|
||||||
conn.close()
|
|
|
@ -82,12 +82,6 @@ def import_towns_csv(raw_file):
|
||||||
return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']]
|
return towns.loc[towns['TYPECOM'] == 'COM', ['COM','NCC', 'LIBELLE', 'DEP']]
|
||||||
|
|
||||||
|
|
||||||
def stats_convert(codegeo):
|
|
||||||
if len(codegeo) == 4:
|
|
||||||
return 0 + codegeo
|
|
||||||
if len(codegeo) == 6:
|
|
||||||
return codegeo[1:]
|
|
||||||
return codegeo
|
|
||||||
|
|
||||||
def import_statistics_csv(raw_file):
|
def import_statistics_csv(raw_file):
|
||||||
"""
|
"""
|
||||||
|
@ -95,6 +89,7 @@ def import_statistics_csv(raw_file):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
logger.info('import town from {}'.format(raw_file))
|
logger.info('import town from {}'.format(raw_file))
|
||||||
|
stats_convert= lambda x: x if len(str(x)) == 5 else f'0{x}'
|
||||||
stats = pd.read_csv(raw_file,
|
stats = pd.read_csv(raw_file,
|
||||||
usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP",
|
usecols=["CODGEO","SUPERF","P18_POP","P13_POP","P08_POP","D99_POP",
|
||||||
"NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318",
|
"NAIS1318","NAIS0813","NAIS9908","NAIS9099","NAIS8290","DECE1318",
|
||||||
|
@ -142,10 +137,10 @@ def get_range_date(attr):
|
||||||
|
|
||||||
return date[1], date[2]
|
return date[1], date[2]
|
||||||
|
|
||||||
def export_csv(dataframe, path, index = False):
|
def export_csv(dataframe, path):
|
||||||
logger.debug('export csv from panda dataframe')
|
logger.debug('export csv from panda dataframe')
|
||||||
try:
|
try:
|
||||||
dataframe.to_csv(path ,header = False, index = index)
|
dataframe.to_csv(path ,header = False, index= False)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
'Erro when exporting Dataframe to csvfile {}. \n{}'.format(
|
'Erro when exporting Dataframe to csvfile {}. \n{}'.format(
|
||||||
|
@ -206,7 +201,6 @@ if __name__ == '__main__':
|
||||||
logger.critical('can\'t find source file for statistics')
|
logger.critical('can\'t find source file for statistics')
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
statistics = import_statistics_csv(args.source + '/' + args.statistics)
|
statistics = import_statistics_csv(args.source + '/' + args.statistics)
|
||||||
statistics = statistics[statistics['CODGEO'].isin(towns['COM'])]
|
|
||||||
t.stop()
|
t.stop()
|
||||||
logger.debug(statistics)
|
logger.debug(statistics)
|
||||||
|
|
||||||
|
@ -234,7 +228,7 @@ if __name__ == '__main__':
|
||||||
## Create states capitals
|
## Create states capitals
|
||||||
states_capitals = states[['CHEFLIEU','REG']]
|
states_capitals = states[['CHEFLIEU','REG']]
|
||||||
states_capitals.columns = ["CHEFLIEUREG","REG"]
|
states_capitals.columns = ["CHEFLIEUREG","REG"]
|
||||||
states = states[[ "REG","NCC","LIBELLE"]]
|
departments = departments[["REG","NCC","LIBELLE"]]
|
||||||
logger.debug(states_capitals)
|
logger.debug(states_capitals)
|
||||||
|
|
||||||
## create statistics dataframes
|
## create statistics dataframes
|
||||||
|
@ -259,14 +253,8 @@ if __name__ == '__main__':
|
||||||
|
|
||||||
logger.debug('Process indicator {}'.format(regex))
|
logger.debug('Process indicator {}'.format(regex))
|
||||||
selection = srow.filter(regex=regex)
|
selection = srow.filter(regex=regex)
|
||||||
|
|
||||||
for attribute, value in selection.items():
|
for attribute, value in selection.items():
|
||||||
logger.debug('check code: {}'.format(irow['code']))
|
logger.debug('check code: {}'.format(irow['code']))
|
||||||
|
|
||||||
# If value is NaN, then do not add the line
|
|
||||||
if pd.isna(value):
|
|
||||||
continue
|
|
||||||
|
|
||||||
if irow['code'].startswith('_'):
|
if irow['code'].startswith('_'):
|
||||||
start,end = get_single_date(attribute)
|
start,end = get_single_date(attribute)
|
||||||
else:
|
else:
|
||||||
|
@ -295,7 +283,7 @@ if __name__ == '__main__':
|
||||||
|
|
||||||
consolidated_stats = pd.DataFrame.from_dict(temp)
|
consolidated_stats = pd.DataFrame.from_dict(temp)
|
||||||
t.stop()
|
t.stop()
|
||||||
indicators=indicators[['indicateur']]
|
|
||||||
t.start('Process_town')
|
t.start('Process_town')
|
||||||
towns = pd.merge(towns,
|
towns = pd.merge(towns,
|
||||||
statistics[['CODGEO', 'SUPERF']],
|
statistics[['CODGEO', 'SUPERF']],
|
||||||
|
@ -313,8 +301,8 @@ if __name__ == '__main__':
|
||||||
export_csv(states, args.export + '/region.csv')
|
export_csv(states, args.export + '/region.csv')
|
||||||
export_csv(dep_capitals, args.export + '/cheflieudepartement.csv')
|
export_csv(dep_capitals, args.export + '/cheflieudepartement.csv')
|
||||||
export_csv(states_capitals, args.export + '/cheflieuregion.csv')
|
export_csv(states_capitals, args.export + '/cheflieuregion.csv')
|
||||||
export_csv(indicators, args.export + '/indicateur.csv', True)
|
export_csv(indicators, args.export + '/indicateur.csv')
|
||||||
export_csv(consolidated_stats, args.export + '/statistique.csv')
|
export_csv(consolidated_stats, args.export + '/statistiques.csv')
|
||||||
t.stop()
|
t.stop()
|
||||||
|
|
||||||
t.get_times_by_tag()
|
t.get_times_by_tag()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue