projet_INSEE/get_states_statistics.py

167 lines
5.3 KiB
Python
Executable file

#!/usr/bin/env python
import sys
import io
import re
import psycopg2 as pg
from psycopg2.extensions import AsIs
import logging
import argparse as arg
from classes.Timer import Timer
from classes.CustomFormater import CustomFormatter
import locale
locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8')
# Schema name is NAMEDATALEN-1 (PGSQL source code)
# -> src/include/pg_config_manual.h
def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")):
if not pat.match(arg_value):
raise ValueError
return arg_value
def parse_args():
"""
Parse arguments
"""
parser = arg.ArgumentParser('Process csv files from INSEE')
parser.add_argument('--state', '-s',
help='states raw csv file (inside source follder)',
required=True
)
parser.add_argument('--connection-file', '-f',
help='Postgresql connexion file',
default='.pgconn'
)
parser.add_argument('--year',
help='Specify year needed to display statistics',
choices=['1982', '1990', '1999', '2008', '2013', '2018'],
default=2018
)
parser.add_argument('--schema-name',
help='Database schema name',
type=check_schema_name,
default='insee'
)
debug_group = parser.add_mutually_exclusive_group()
debug_group.add_argument('--verbose', '-V',
help='Verbose output',
action='store_true')
debug_group.add_argument('--debug', '-d',
help='Activate debug mode',
action='store_true')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
#logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
tty_handler = logging.StreamHandler()
# create console handler with a higher log level
tty_handler.setFormatter(CustomFormatter())
logger.addHandler(tty_handler)
if args.verbose is True:
logger.setLevel(logging.INFO)
logger.info('VERBOSE mode activated')
if args.debug is True:
logger.setLevel(logging.DEBUG)
logger.debug('DEBUG mode activated')
t = Timer(logger=logger.info)
logging.debug('Import pgsql connection file {}'.format(args.connection_file))
with open(args.connection_file) as cf:
pg_conn = cf.read()
t.start('Get states')
conn = pg.connect(pg_conn)
with conn.cursor() as cur:
cur.execute(
"""SELECT reg, libelle
FROM %(schema)s.region WHERE libelle = %(state)s
OR ncc = %(state)s;""",
{'state': args.state, 'schema': AsIs(args.schema_name)}
)
try:
s_id, s_name = cur.fetchone()
except Exception as e:
logging.error('There is no state {}'.format(args.state))
sys.exit(1)
t.stop()
print('Get information about {} - id: {}'.format(s_name, s_id))
print('---')
with conn.cursor() as cur:
cur.execute("""
SELECT sum(c.superf)
FROM %(schema)s.commune c
INNER JOIN %(schema)s.departement d ON c.dep = d.dep
WHERE d.reg = %(state)s
""",
{'state': s_id, 'schema': AsIs(args.schema_name), 'tear': args.year }
)
try:
surface = cur.fetchone()
logger.debug(surface)
except Exception as e:
logging.error('There is no response for {} surface'.format(args.state))
sys.exit(1)
print('surface: {:n}Km2'.format(surface[0]))
with conn.cursor() as cur:
cur.execute("""
SELECT sum(s.valeur)::numeric::integer
FROM %(schema)s.commune c
INNER JOIN %(schema)s.departement d ON c.dep = d.dep
INNER JOIN %(schema)s.statistique s ON c.com = s.com
WHERE s.id_indicateur = 1
AND s.date_debut = %(year)s
AND d.reg = %(state)s;
""",
{'state': s_id, 'schema': AsIs(args.schema_name), 'year': args.year}
)
try:
inhabitants = cur.fetchone()
logger.debug(inhabitants)
except Exception as e:
logging.error('There is no response for {} surface'.format(args.state))
sys.exit(1)
print('Population: {:n} inhabitans'.format(inhabitants[0]))
with conn.cursor() as cur:
# get most populated city in state
# need to cast float to int (valeur)
cur.execute("""
SELECT c.libelle, s.valeur::numeric::integer
FROM %(schema)s.commune c
INNER JOIN %(schema)s.departement d ON c.dep = d.dep
INNER JOIN %(schema)s.statistique s ON c.com = s.com
WHERE s.id_indicateur = 1
AND s.date_debut = %(year)s
AND d.reg = %(state)s
ORDER BY s.valeur DESC
LIMIT 10;
""",
{'state': s_id, 'schema': AsIs(args.schema_name), 'year': args.year}
)
try:
towns = cur.fetchall()
logger.debug(towns)
except Exception as e:
logging.error('There is no state {}'.format(args.state))
sys.exit(1)
print('Most populated cities:\n')
for row in towns:
print('\t{:.<40}{:.>10n}'.format(*row))
sys.exit(0)