#!/usr/bin/env python import sys import io import re import psycopg2 as pg from psycopg2.extensions import AsIs import logging import argparse as arg from classes.Timer import Timer from classes.CustomFormater import CustomFormatter import locale locale.setlocale(locale.LC_ALL, 'fr_FR.UTF-8') # Schema name is NAMEDATALEN-1 (PGSQL source code) # -> src/include/pg_config_manual.h def check_schema_name(arg_value, pat=re.compile(r"^[a-z0-9A-Z]{1,63}$")): if not pat.match(arg_value): raise ValueError return arg_value def parse_args(): """ Parse arguments """ parser = arg.ArgumentParser('Process csv files from INSEE') parser.add_argument('--state', '-s', help='states raw csv file (inside source follder)', required=True ) parser.add_argument('--connection-file', '-f', help='Postgresql connexion file', default='.pgconn' ) parser.add_argument('--year', help='Specify year needed to display statistics', choices=['1982', '1990', '1999', '2008', '2013', '2018'], default=2018 ) parser.add_argument('--schema-name', help='Database schema name', type=check_schema_name, default='insee' ) debug_group = parser.add_mutually_exclusive_group() debug_group.add_argument('--verbose', '-V', help='Verbose output', action='store_true') debug_group.add_argument('--debug', '-d', help='Activate debug mode', action='store_true') return parser.parse_args() if __name__ == '__main__': args = parse_args() #logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger() tty_handler = logging.StreamHandler() # create console handler with a higher log level tty_handler.setFormatter(CustomFormatter()) logger.addHandler(tty_handler) if args.verbose is True: logger.setLevel(logging.INFO) logger.info('VERBOSE mode activated') if args.debug is True: logger.setLevel(logging.DEBUG) logger.debug('DEBUG mode activated') t = Timer(logger=logger.info) logging.debug('Import pgsql connection file {}'.format(args.connection_file)) with open(args.connection_file) as cf: pg_conn = cf.read() t.start('Get states') conn = pg.connect(pg_conn) with conn.cursor() as cur: cur.execute( """SELECT reg, libelle FROM %(schema)s.region WHERE libelle = %(state)s OR ncc = %(state)s;""", {'state': args.state, 'schema': AsIs(args.schema_name)} ) try: s_id, s_name = cur.fetchone() except Exception as e: logging.error('There is no state {}'.format(args.state)) sys.exit(1) t.stop() print('Get information about {} - id: {}'.format(s_name, s_id)) print('---') with conn.cursor() as cur: cur.execute(""" SELECT sum(c.superf) FROM %(schema)s.commune c INNER JOIN %(schema)s.departement d ON c.dep = d.dep WHERE d.reg = %(state)s """, {'state': s_id, 'schema': AsIs(args.schema_name), 'tear': args.year } ) try: surface = cur.fetchone() logger.debug(surface) except Exception as e: logging.error('There is no response for {} surface'.format(args.state)) sys.exit(1) print('surface: {:n}Km2'.format(surface[0])) with conn.cursor() as cur: cur.execute(""" SELECT sum(s.valeur)::numeric::integer FROM %(schema)s.commune c INNER JOIN %(schema)s.departement d ON c.dep = d.dep INNER JOIN %(schema)s.statistique s ON c.com = s.com WHERE s.id_indicateur = 1 AND s.date_debut = %(year)s AND d.reg = %(state)s; """, {'state': s_id, 'schema': AsIs(args.schema_name), 'year': args.year} ) try: inhabitants = cur.fetchone() logger.debug(inhabitants) except Exception as e: logging.error('There is no response for {} surface'.format(args.state)) sys.exit(1) print('Population: {:n} inhabitans'.format(inhabitants[0])) with conn.cursor() as cur: # get most populated city in state # need to cast float to int (valeur) cur.execute(""" SELECT c.libelle, s.valeur::numeric::integer FROM %(schema)s.commune c INNER JOIN %(schema)s.departement d ON c.dep = d.dep INNER JOIN %(schema)s.statistique s ON c.com = s.com WHERE s.id_indicateur = 1 AND s.date_debut = %(year)s AND d.reg = %(state)s ORDER BY s.valeur DESC LIMIT 10; """, {'state': s_id, 'schema': AsIs(args.schema_name), 'year': args.year} ) try: towns = cur.fetchall() logger.debug(towns) except Exception as e: logging.error('There is no state {}'.format(args.state)) sys.exit(1) print('Most populated cities:\n') for row in towns: print('\t{:.<40}{:.>10n}'.format(*row)) sys.exit(0)