Python์ ๊ฐ๊ฒฐํจ๊ณผ ๊ฐ๋ ฅํ ์ํ๊ณ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ์ด๋ฉ์ผ ๊ฒ์ฆ์ ํตํฉํ๋ ๋ฐ ํ์ํ ์ ํ์ ๋๋ค. Django๋ Flask๋ก ์น ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ตฌ์ถํ๋ , pandas๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ , ์๋ํ๋ ์ํฌํ๋ก์ฐ๋ฅผ ๋ง๋ค๋ , ์ ๋ฌธ์ ์ธ ์ด๋ฉ์ผ ๊ฒ์ฆ์ ์ถ๊ฐํ๋ฉด ์ด๋ฉ์ผ ํต์ ์ด ์ค์ ์์ ์์๊ฒ ๋๋ฌํ๊ณ ๋ฐ์ ์ ํํ์ ๋ณดํธํ ์ ์์ต๋๋ค.
์ด ํฌ๊ด์ ์ธ ๊ฐ์ด๋๋ ๊ธฐ๋ณธ์ ์ธ ๋จ์ผ ์ด๋ฉ์ผ ๊ฒ์ฆ๋ถํฐ ๊ณ ๊ธ ๋ฐฐ์น ์ฒ๋ฆฌ ๋ฐ ํ๋ก๋์ ์ค๋น ๊ตฌํ๊น์ง BillionVerify ์ด๋ฉ์ผ ๊ฒ์ฆ API๋ฅผ Python๊ณผ ํตํฉํ๋ ๊ณผ์ ์ ์๋ดํฉ๋๋ค.
์ฌ์ ์๊ตฌ ์ฌํญ ๋ฐ ์ค์
์ฝ๋๋ฅผ ์์ํ๊ธฐ ์ ์ ์ ์ ํ ํ๊ฒฝ์ด ๊ตฌ์ฑ๋์ด ์๋์ง ํ์ธํ์ธ์. ์ด ํํ ๋ฆฌ์ผ์ ์์คํ ์ Python 3.8 ์ด์์ด ์ค์น๋์ด ์๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
ํ์ํ ํจํค์ง ์ค์น
ํ์ํ ํจํค์ง๋ฅผ ์ค์นํ๋ ๊ฒ๋ถํฐ ์์ํ์ธ์. HTTP ํต์ ์ ์ํด requests ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ง๋ง, ๋์ค์ ๋น๋๊ธฐ ์์
์ ์ํ aiohttp๋ ์ดํด๋ณผ ๊ฒ์
๋๋ค.
pip install requests python-dotenv
๋น๋๊ธฐ ์ง์ ๋ฐ ๊ณ ๊ธ ๊ธฐ๋ฅ์ ์ํด:
pip install aiohttp pandas
ํ๋ก์ ํธ ๊ตฌ์กฐ
๊ด์ฌ์ฌ๋ฅผ ๋ถ๋ฆฌํ๋ ๊น๋ํ ๊ตฌ์กฐ๋ก ํ๋ก์ ํธ๋ฅผ ๊ตฌ์ฑํ์ธ์:
email_verification/
โโโ __init__.py
โโโ client.py # Main verification client
โโโ models.py # Data models
โโโ exceptions.py # Custom exceptions
โโโ utils.py # Helper functions
โโโ batch_processor.py # Batch verification logic
โโโ examples/
โโโ basic_usage.py
โโโ flask_integration.py
โโโ django_integration.py
ํ๊ฒฝ ์ค์
API ์๊ฒฉ ์ฆ๋ช
์ ์์ ํ๊ฒ ์ ์ฅํ๊ธฐ ์ํ .env ํ์ผ์ ์์ฑํ์ธ์:
BILLIONVERIFY_API_KEY=your_api_key_here BILLIONVERIFY_API_URL=https://api.billionverify.com/v1
์ ํ๋ฆฌ์ผ์ด์ ์์ ์ด๋ฌํ ๋ณ์๋ฅผ ๋ก๋ํ์ธ์:
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv('BILLIONVERIFY_API_KEY')
API_URL = os.getenv('BILLIONVERIFY_API_URL', 'https://api.billionverify.com/v1')
๊ธฐ๋ณธ ์ด๋ฉ์ผ ๊ฒ์ฆ
๊ฐ์ฅ ๊ฐ๋จํ ๊ตฌํ๋ถํฐ ์์ํ๊ฒ ์ต๋๋ค: Python์ requests ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ฌ ๋จ์ผ ์ด๋ฉ์ผ ์ฃผ์๋ฅผ ๊ฒ์ฆํ๋ ๊ฒ์
๋๋ค.
๊ฐ๋จํ ๊ฒ์ฆ ํจ์
import requests
from typing import Dict, Any
def verify_email(email: str, api_key: str) -> Dict[str, Any]:
"""
Verify a single email address using the BillionVerify API.
Args:
email: The email address to verify
api_key: Your BillionVerify API key
Returns:
Dictionary containing verification results
Raises:
requests.RequestException: If the API request fails
"""
url = "https://api.billionverify.com/v1/verify"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {"email": email}
response = requests.post(url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
# Example usage
if __name__ == "__main__":
result = verify_email("test@example.com", API_KEY)
print(f"Email valid: {result.get('is_valid')}")
print(f"Deliverable: {result.get('is_deliverable')}")
์๋ต ์ดํดํ๊ธฐ
API๋ ์ฌ๋ฌ ๊ฒ์ฆ ์งํ๊ฐ ํฌํจ๋ ํฌ๊ด์ ์ธ ์๋ต์ ๋ฐํํฉ๋๋ค:
{
"email": "user@example.com",
"is_valid": True,
"is_deliverable": True,
"is_disposable": False,
"is_role_based": False,
"is_catch_all": False,
"is_free_provider": True,
"risk_score": 15,
"domain": "example.com",
"mx_records": ["mx1.example.com", "mx2.example.com"],
"smtp_check": True,
"verification_time_ms": 245
}
๊ฐ ํ๋๋ ์ ์ฉํ ์ ๋ณด๋ฅผ ์ ๊ณตํฉ๋๋ค:
- is_valid: ์ด๋ฉ์ผ ํ์์ด ๊ตฌ๋ฌธ์ ์ผ๋ก ์ฌ๋ฐ๋ฅธ์ง ์ฌ๋ถ
- is_deliverable: ๋ฉ์ผํจ์ด ์กด์ฌํ๊ณ ๋ฉ์ผ์ ๋ฐ์ ์ ์๋์ง ์ฌ๋ถ
- is_disposable: ์์ ๋๋ ์ผํ์ฉ ์ด๋ฉ์ผ ์ฃผ์์ธ์ง ์๋ณ
- is_role_based: info@ ๋๋ support@์ ๊ฐ์ ์ผ๋ฐ ์ฃผ์๋ฅผ ๊ฐ์ง
- is_catch_all: ๋ชจ๋ ์ฃผ์๋ฅผ ์๋ฝํ๋ ๋๋ฉ์ธ์ธ์ง ํ์
- risk_score: 0(์ต์ ์ํ)๋ถํฐ 100(์ต๊ณ ์ํ)๊น์ง์ ์์น์ ํ๊ฐ
ํ๋ก๋์ ์ค๋น ํด๋ผ์ด์ธํธ ๊ตฌ์ถ
ํ๋ก๋์ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ฒฝ์ฐ, ์ธ์ฆ, ์ฌ์๋, ์๋ ์ ํ ๋ฐ ์ค๋ฅ ์ฒ๋ฆฌ๋ฅผ ์ฐ์ํ๊ฒ ์ฒ๋ฆฌํ๋ ๊ฒฌ๊ณ ํ ํด๋ผ์ด์ธํธ ํด๋์ค๊ฐ ํ์ํฉ๋๋ค.
์ฌ์ฉ์ ์ ์ ์์ธ
๋จผ์ ๋ ๋์ ์ค๋ฅ ์ฒ๋ฆฌ๋ฅผ ์ํ ์ฌ์ฉ์ ์ ์ ์์ธ๋ฅผ ์ ์ํ์ธ์:
# exceptions.py
class EmailVerificationError(Exception):
"""Base exception for email verification errors."""
pass
class AuthenticationError(EmailVerificationError):
"""Raised when API authentication fails."""
pass
class RateLimitError(EmailVerificationError):
"""Raised when API rate limit is exceeded."""
def __init__(self, retry_after: int = 60):
self.retry_after = retry_after
super().__init__(f"Rate limit exceeded. Retry after {retry_after} seconds.")
class ValidationError(EmailVerificationError):
"""Raised when email validation fails."""
pass
class APIError(EmailVerificationError):
"""Raised for general API errors."""
def __init__(self, status_code: int, message: str):
self.status_code = status_code
super().__init__(f"API error {status_code}: {message}")
๋ฐ์ดํฐ ๋ชจ๋ธ
ํ์ ์์ ์๋ต ์ฒ๋ฆฌ๋ฅผ ์ํด dataclasses ๋๋ Pydantic์ ์ฌ์ฉํ์ธ์:
# models.py
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class VerificationResult:
"""Represents the result of an email verification."""
email: str
is_valid: bool
is_deliverable: bool
is_disposable: bool
is_role_based: bool
is_catch_all: bool
is_free_provider: bool
risk_score: int
domain: str
mx_records: List[str]
smtp_check: bool
verification_time_ms: int
@classmethod
def from_dict(cls, data: dict) -> 'VerificationResult':
"""Create a VerificationResult from API response dictionary."""
return cls(
email=data.get('email', ''),
is_valid=data.get('is_valid', False),
is_deliverable=data.get('is_deliverable', False),
is_disposable=data.get('is_disposable', False),
is_role_based=data.get('is_role_based', False),
is_catch_all=data.get('is_catch_all', False),
is_free_provider=data.get('is_free_provider', False),
risk_score=data.get('risk_score', 100),
domain=data.get('domain', ''),
mx_records=data.get('mx_records', []),
smtp_check=data.get('smtp_check', False),
verification_time_ms=data.get('verification_time_ms', 0)
)
def is_safe_to_send(self) -> bool:
"""Determine if it's safe to send emails to this address."""
return (
self.is_valid and
self.is_deliverable and
not self.is_disposable and
self.risk_score < 50
)
๋ฉ์ธ ํด๋ผ์ด์ธํธ ํด๋์ค
์ด์ ์์ ํ ๊ธฐ๋ฅ์ ๊ฐ์ถ ๊ฒ์ฆ ํด๋ผ์ด์ธํธ๋ฅผ ๊ตฌํํ์ธ์:
# client.py
import time
import logging
from typing import Optional, List
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .models import VerificationResult
from .exceptions import (
AuthenticationError,
RateLimitError,
ValidationError,
APIError
)
logger = logging.getLogger(__name__)
class EmailVerificationClient:
"""
Production-ready client for the BillionVerify email verification API.
Features:
- Automatic retry with exponential backoff
- Rate limit handling
- Connection pooling
- Comprehensive error handling
- Response caching (optional)
"""
DEFAULT_BASE_URL = "https://api.billionverify.com/v1"
DEFAULT_TIMEOUT = 30
MAX_RETRIES = 3
def __init__(
self,
api_key: str,
base_url: Optional[str] = None,
timeout: int = DEFAULT_TIMEOUT,
max_retries: int = MAX_RETRIES
):
"""
Initialize the email verification client.
Args:
api_key: Your BillionVerify API key
base_url: Optional custom API base URL
timeout: Request timeout in seconds
max_retries: Maximum number of retry attempts
"""
if not api_key:
raise ValueError("API key is required")
self.api_key = api_key
self.base_url = base_url or self.DEFAULT_BASE_URL
self.timeout = timeout
self.max_retries = max_retries
# Configure session with retry logic
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""Create a requests session with retry configuration."""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=self.max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=20
)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Set default headers
session.headers.update({
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"User-Agent": "BillionVerify-Python/1.0"
})
return session
def _handle_response(self, response: requests.Response) -> dict:
"""
Handle API response and raise appropriate exceptions.
Args:
response: The requests Response object
Returns:
Parsed JSON response
Raises:
AuthenticationError: For 401/403 responses
RateLimitError: For 429 responses
APIError: For other error responses
"""
if response.status_code == 401:
raise AuthenticationError("Invalid API key")
if response.status_code == 403:
raise AuthenticationError("Access forbidden. Check API key permissions.")
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
raise RateLimitError(retry_after)
if response.status_code == 400:
error_data = response.json()
raise ValidationError(error_data.get('message', 'Validation failed'))
if response.status_code >= 400:
raise APIError(response.status_code, response.text)
return response.json()
def verify(self, email: str) -> VerificationResult:
"""
Verify a single email address.
Args:
email: The email address to verify
Returns:
VerificationResult object with verification details
"""
url = f"{self.base_url}/verify"
logger.debug(f"Verifying email: {email}")
response = self.session.post(
url,
json={"email": email},
timeout=self.timeout
)
data = self._handle_response(response)
result = VerificationResult.from_dict(data)
logger.info(
f"Verified {email}: valid={result.is_valid}, "
f"deliverable={result.is_deliverable}, "
f"risk_score={result.risk_score}"
)
return result
def verify_batch(
self,
emails: List[str],
callback_url: Optional[str] = None
) -> str:
"""
Submit a batch of emails for verification.
Args:
emails: List of email addresses to verify
callback_url: Optional webhook URL for results notification
Returns:
Batch ID for tracking the verification job
"""
url = f"{self.base_url}/verify/batch"
payload = {"emails": emails}
if callback_url:
payload["callback_url"] = callback_url
response = self.session.post(
url,
json=payload,
timeout=self.timeout
)
data = self._handle_response(response)
batch_id = data.get('batch_id')
logger.info(f"Submitted batch verification: {batch_id} ({len(emails)} emails)")
return batch_id
def get_batch_status(self, batch_id: str) -> dict:
"""
Get the status of a batch verification job.
Args:
batch_id: The batch ID returned from verify_batch
Returns:
Dictionary with batch status and progress
"""
url = f"{self.base_url}/verify/batch/{batch_id}"
response = self.session.get(url, timeout=self.timeout)
return self._handle_response(response)
def get_batch_results(self, batch_id: str) -> List[VerificationResult]:
"""
Get the results of a completed batch verification.
Args:
batch_id: The batch ID returned from verify_batch
Returns:
List of VerificationResult objects
"""
url = f"{self.base_url}/verify/batch/{batch_id}/results"
response = self.session.get(url, timeout=self.timeout)
data = self._handle_response(response)
return [VerificationResult.from_dict(item) for item in data.get('results', [])]
def close(self):
"""Close the underlying session."""
self.session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
๋ฐฐ์น ์ด๋ฉ์ผ ๊ฒ์ฆ
๋๋ ์ด๋ฉ์ผ ๋ชฉ๋ก ์ฒ๋ฆฌ์๋ ํจ์จ์ ์ธ ๋ฐฐ์น ์ฒ๋ฆฌ๊ฐ ํ์ํฉ๋๋ค. ์งํ ์ํฉ ์ถ์ ๋ฐ ๊ฒฐ๊ณผ ๊ด๋ฆฌ๋ฅผ ํฌํจํ ๊ฐ๋ ฅํ ๋ฐฐ์น ๊ฒ์ฆ์ ๊ตฌํํ๋ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
๋ฐฐ์น ํ๋ก์ธ์ ๊ตฌํ
# batch_processor.py
import time
import logging
from typing import List, Callable, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from .client import EmailVerificationClient
from .models import VerificationResult
from .exceptions import RateLimitError
logger = logging.getLogger(__name__)
class BatchProcessor:
"""
Process large email lists with progress tracking and result handling.
"""
def __init__(
self,
client: EmailVerificationClient,
batch_size: int = 1000,
max_workers: int = 5,
progress_callback: Optional[Callable[[int, int], None]] = None
):
"""
Initialize the batch processor.
Args:
client: EmailVerificationClient instance
batch_size: Number of emails per batch submission
max_workers: Maximum concurrent verification threads
progress_callback: Optional callback for progress updates
"""
self.client = client
self.batch_size = batch_size
self.max_workers = max_workers
self.progress_callback = progress_callback
def process_list(
self,
emails: List[str],
use_async_batch: bool = True
) -> List[VerificationResult]:
"""
Process a list of emails with optimal strategy.
Args:
emails: List of email addresses
use_async_batch: Use async batch API for large lists
Returns:
List of verification results
"""
total = len(emails)
logger.info(f"Starting verification of {total} emails")
if total <= 10:
# Small list: verify individually
return self._process_sequential(emails)
elif use_async_batch and total > 100:
# Large list: use batch API
return self._process_batch_api(emails)
else:
# Medium list: use concurrent individual verification
return self._process_concurrent(emails)
def _process_sequential(
self,
emails: List[str]
) -> List[VerificationResult]:
"""Process emails sequentially."""
results = []
total = len(emails)
for i, email in enumerate(emails):
try:
result = self.client.verify(email)
results.append(result)
except RateLimitError as e:
logger.warning(f"Rate limit hit, waiting {e.retry_after}s")
time.sleep(e.retry_after)
result = self.client.verify(email)
results.append(result)
except Exception as e:
logger.error(f"Failed to verify {email}: {e}")
results.append(self._create_error_result(email, str(e)))
if self.progress_callback:
self.progress_callback(i + 1, total)
return results
def _process_concurrent(
self,
emails: List[str]
) -> List[VerificationResult]:
"""Process emails concurrently with thread pool."""
results = []
total = len(emails)
completed = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_email = {
executor.submit(self._verify_with_retry, email): email
for email in emails
}
for future in as_completed(future_to_email):
email = future_to_email[future]
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Failed to verify {email}: {e}")
results.append(self._create_error_result(email, str(e)))
completed += 1
if self.progress_callback:
self.progress_callback(completed, total)
return results
def _process_batch_api(
self,
emails: List[str]
) -> List[VerificationResult]:
"""Process emails using the async batch API."""
all_results = []
total = len(emails)
processed = 0
# Split into batches
batches = [
emails[i:i + self.batch_size]
for i in range(0, total, self.batch_size)
]
for batch_num, batch in enumerate(batches, 1):
logger.info(f"Submitting batch {batch_num}/{len(batches)}")
# Submit batch
batch_id = self.client.verify_batch(batch)
# Wait for completion with polling
results = self._wait_for_batch(batch_id)
all_results.extend(results)
processed += len(batch)
if self.progress_callback:
self.progress_callback(processed, total)
return all_results
def _wait_for_batch(
self,
batch_id: str,
poll_interval: int = 5,
max_wait: int = 3600
) -> List[VerificationResult]:
"""
Wait for batch verification to complete.
Args:
batch_id: The batch ID to wait for
poll_interval: Seconds between status checks
max_wait: Maximum seconds to wait
Returns:
List of verification results
"""
start_time = time.time()
while time.time() - start_time < max_wait:
status = self.client.get_batch_status(batch_id)
if status.get('status') == 'completed':
return self.client.get_batch_results(batch_id)
if status.get('status') == 'failed':
raise RuntimeError(f"Batch {batch_id} failed: {status.get('error')}")
progress = status.get('progress', 0)
logger.debug(f"Batch {batch_id} progress: {progress}%")
time.sleep(poll_interval)
raise TimeoutError(f"Batch {batch_id} did not complete within {max_wait}s")
def _verify_with_retry(
self,
email: str,
max_retries: int = 3
) -> VerificationResult:
"""Verify email with retry logic for rate limits."""
for attempt in range(max_retries):
try:
return self.client.verify(email)
except RateLimitError as e:
if attempt < max_retries - 1:
time.sleep(e.retry_after)
else:
raise
raise RuntimeError(f"Failed to verify {email} after {max_retries} attempts")
@staticmethod
def _create_error_result(email: str, error: str) -> VerificationResult:
"""Create a result object for failed verification."""
return VerificationResult(
email=email,
is_valid=False,
is_deliverable=False,
is_disposable=False,
is_role_based=False,
is_catch_all=False,
is_free_provider=False,
risk_score=100,
domain=email.split('@')[-1] if '@' in email else '',
mx_records=[],
smtp_check=False,
verification_time_ms=0
)
CSV ํ์ผ ๋ฐ Pandas ์์
๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ํฌํ๋ก์ฐ์ ๊ฒฝ์ฐ pandas์ ํตํฉํ์ธ์:
import pandas as pd
from typing import Optional
def verify_csv_file(
client: EmailVerificationClient,
input_file: str,
output_file: str,
email_column: str = 'email',
batch_size: int = 1000
) -> pd.DataFrame:
"""
Verify emails from a CSV file and save results.
Args:
client: EmailVerificationClient instance
input_file: Path to input CSV file
output_file: Path to output CSV file
email_column: Name of the column containing emails
batch_size: Processing batch size
Returns:
DataFrame with verification results
"""
# Read input file
df = pd.read_csv(input_file)
if email_column not in df.columns:
raise ValueError(f"Column '{email_column}' not found in CSV")
emails = df[email_column].dropna().tolist()
# Process with progress tracking
processor = BatchProcessor(
client,
batch_size=batch_size,
progress_callback=lambda done, total: print(f"Progress: {done}/{total}")
)
results = processor.process_list(emails)
# Create results DataFrame
results_df = pd.DataFrame([
{
'email': r.email,
'is_valid': r.is_valid,
'is_deliverable': r.is_deliverable,
'is_disposable': r.is_disposable,
'is_role_based': r.is_role_based,
'is_catch_all': r.is_catch_all,
'risk_score': r.risk_score,
'domain': r.domain
}
for r in results
])
# Merge with original data
merged = df.merge(results_df, left_on=email_column, right_on='email', how='left')
# Save results
merged.to_csv(output_file, index=False)
# Print summary
print(f"\nVerification Summary:")
print(f" Total emails: {len(emails)}")
print(f" Valid: {results_df['is_valid'].sum()}")
print(f" Deliverable: {results_df['is_deliverable'].sum()}")
print(f" Disposable: {results_df['is_disposable'].sum()}")
print(f" High risk (score >= 50): {(results_df['risk_score'] >= 50).sum()}")
return merged
asyncio๋ฅผ ์ฌ์ฉํ ๋น๋๊ธฐ ๊ฒ์ฆ
๊ณ ์ฑ๋ฅ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ฒฝ์ฐ aiohttp์ ํจ๊ป Python์ asyncio๋ฅผ ์ฌ์ฉํ์ธ์:
import asyncio
import aiohttp
from typing import List, Optional
from dataclasses import dataclass
class AsyncEmailVerificationClient:
"""
Asynchronous email verification client using aiohttp.
"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.billionverify.com/v1",
concurrency_limit: int = 10
):
self.api_key = api_key
self.base_url = base_url
self.semaphore = asyncio.Semaphore(concurrency_limit)
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
"""Get or create aiohttp session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
)
return self._session
async def verify(self, email: str) -> dict:
"""Verify a single email asynchronously."""
async with self.semaphore:
session = await self._get_session()
url = f"{self.base_url}/verify"
async with session.post(url, json={"email": email}) as response:
if response.status == 429:
retry_after = int(response.headers.get('Retry-After', 60))
await asyncio.sleep(retry_after)
return await self.verify(email)
response.raise_for_status()
return await response.json()
async def verify_many(self, emails: List[str]) -> List[dict]:
"""Verify multiple emails concurrently."""
tasks = [self.verify(email) for email in emails]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
r if not isinstance(r, Exception) else {"email": emails[i], "error": str(r)}
for i, r in enumerate(results)
]
async def close(self):
"""Close the aiohttp session."""
if self._session and not self._session.closed:
await self._session.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
# Usage example
async def main():
emails = [
"user1@example.com",
"user2@example.com",
"user3@example.com"
]
async with AsyncEmailVerificationClient(api_key="your_key") as client:
results = await client.verify_many(emails)
for result in results:
if "error" in result:
print(f"Error: {result['error']}")
else:
print(f"{result['email']}: valid={result['is_valid']}")
# Run the async function
asyncio.run(main())
Flask ํตํฉ
Flask ์น ์ ํ๋ฆฌ์ผ์ด์ ์ ์ด๋ฉ์ผ ๊ฒ์ฆ์ ํตํฉํ์ธ์:
from flask import Flask, request, jsonify
from functools import wraps
import os
from email_verification import EmailVerificationClient, ValidationError
app = Flask(__name__)
# Initialize client once
verification_client = EmailVerificationClient(
api_key=os.getenv('BILLIONVERIFY_API_KEY')
)
def verify_email_param(f):
"""Decorator to verify email parameter in requests."""
@wraps(f)
def decorated_function(*args, **kwargs):
email = request.json.get('email') if request.is_json else request.form.get('email')
if not email:
return jsonify({"error": "Email is required"}), 400
try:
result = verification_client.verify(email)
if not result.is_deliverable:
return jsonify({
"error": "Please provide a valid email address",
"details": {
"is_disposable": result.is_disposable,
"risk_score": result.risk_score
}
}), 400
# Attach result to request for use in route
request.email_verification = result
except ValidationError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
# Log error but don't block user
app.logger.error(f"Email verification failed: {e}")
return f(*args, **kwargs)
return decorated_function
@app.route('/api/register', methods=['POST'])
@verify_email_param
def register():
"""User registration endpoint with email verification."""
data = request.json
# Email has been verified by decorator
email = data.get('email')
verification = getattr(request, 'email_verification', None)
# Warn about disposable emails but allow
warning = None
if verification and verification.is_disposable:
warning = "You're using a disposable email. Some features may be limited."
# Create user (your implementation)
user = create_user(email=email, **data)
response = {"success": True, "user_id": user.id}
if warning:
response["warning"] = warning
return jsonify(response), 201
@app.route('/api/verify-email', methods=['POST'])
def verify_email_endpoint():
"""Standalone email verification endpoint."""
email = request.json.get('email')
if not email:
return jsonify({"error": "Email is required"}), 400
try:
result = verification_client.verify(email)
return jsonify({
"email": result.email,
"is_valid": result.is_valid,
"is_deliverable": result.is_deliverable,
"is_disposable": result.is_disposable,
"risk_score": result.risk_score,
"safe_to_send": result.is_safe_to_send()
})
except ValidationError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": "Verification service unavailable"}), 503
if __name__ == '__main__':
app.run(debug=True)
Django ํตํฉ
Django ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ฒฝ์ฐ ์ฌ์ฌ์ฉ ๊ฐ๋ฅํ ํผ ๊ฒ์ฆ๊ธฐ์ ๋ฏธ๋ค์จ์ด๋ฅผ ๋ง๋์ธ์:
# validators.py
from django.core.exceptions import ValidationError
from django.conf import settings
from email_verification import EmailVerificationClient
def get_verification_client():
"""Get or create verification client."""
if not hasattr(get_verification_client, '_client'):
get_verification_client._client = EmailVerificationClient(
api_key=settings.BILLIONVERIFY_API_KEY
)
return get_verification_client._client
def validate_email_deliverable(email: str) -> None:
"""
Django validator to check email deliverability.
Usage in forms:
email = forms.EmailField(validators=[validate_email_deliverable])
"""
client = get_verification_client()
try:
result = client.verify(email)
if not result.is_valid:
raise ValidationError("Please enter a valid email address.")
if not result.is_deliverable:
raise ValidationError(
"This email address doesn't appear to exist. "
"Please check for typos."
)
if result.is_disposable:
raise ValidationError(
"Please use a permanent email address, "
"not a disposable one."
)
except ValidationError:
raise
except Exception as e:
# Log but don't block on service errors
import logging
logging.error(f"Email verification failed: {e}")
# forms.py
from django import forms
from .validators import validate_email_deliverable
class RegistrationForm(forms.Form):
email = forms.EmailField(
validators=[validate_email_deliverable],
help_text="We'll send a confirmation email to this address."
)
password = forms.CharField(widget=forms.PasswordInput)
def clean_email(self):
email = self.cleaned_data['email']
# Additional cleaning if needed
return email.lower().strip()
# middleware.py
from django.http import JsonResponse
from django.conf import settings
class EmailVerificationMiddleware:
"""
Middleware to verify emails in API requests.
Add to MIDDLEWARE setting:
'myapp.middleware.EmailVerificationMiddleware',
"""
VERIFICATION_PATHS = ['/api/register/', '/api/contact/']
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Check if path needs verification
if request.path in self.VERIFICATION_PATHS and request.method == 'POST':
import json
try:
data = json.loads(request.body)
email = data.get('email')
if email:
from .validators import get_verification_client
client = get_verification_client()
result = client.verify(email)
if not result.is_safe_to_send():
return JsonResponse({
'error': 'Invalid email address',
'details': {
'is_valid': result.is_valid,
'is_deliverable': result.is_deliverable,
'is_disposable': result.is_disposable
}
}, status=400)
# Attach to request for views
request.email_verification = result
except (json.JSONDecodeError, Exception):
pass
return self.get_response(request)
์๋ต ์บ์ฑ
์ง๋ฅ์ ์ธ ์บ์ฑ์ผ๋ก API ํธ์ถ์ ์ค์ด๊ณ ์ฑ๋ฅ์ ๊ฐ์ ํ์ธ์:
import hashlib
import json
import time
from typing import Optional
from functools import lru_cache
class CachedEmailVerificationClient(EmailVerificationClient):
"""
Email verification client with response caching.
"""
def __init__(
self,
api_key: str,
cache_ttl: int = 86400, # 24 hours default
**kwargs
):
super().__init__(api_key, **kwargs)
self.cache_ttl = cache_ttl
self._cache = {}
def _cache_key(self, email: str) -> str:
"""Generate cache key from email."""
normalized = email.lower().strip()
return hashlib.md5(normalized.encode()).hexdigest()
def verify(self, email: str, skip_cache: bool = False) -> VerificationResult:
"""
Verify email with caching.
Args:
email: Email to verify
skip_cache: Force fresh verification
Returns:
VerificationResult from cache or API
"""
cache_key = self._cache_key(email)
# Check cache
if not skip_cache and cache_key in self._cache:
cached_data, cached_time = self._cache[cache_key]
if time.time() - cached_time < self.cache_ttl:
return cached_data
# Fetch from API
result = super().verify(email)
# Cache result
self._cache[cache_key] = (result, time.time())
return result
def clear_cache(self):
"""Clear all cached results."""
self._cache.clear()
def remove_from_cache(self, email: str):
"""Remove specific email from cache."""
cache_key = self._cache_key(email)
self._cache.pop(cache_key, None)
# Redis-based caching for distributed systems
import redis
class RedisCachedClient(EmailVerificationClient):
"""Email verification client with Redis caching."""
def __init__(
self,
api_key: str,
redis_url: str = "redis://localhost:6379",
cache_ttl: int = 86400,
**kwargs
):
super().__init__(api_key, **kwargs)
self.redis = redis.from_url(redis_url)
self.cache_ttl = cache_ttl
self.cache_prefix = "email_verify:"
def verify(self, email: str, skip_cache: bool = False) -> VerificationResult:
"""Verify with Redis caching."""
cache_key = f"{self.cache_prefix}{email.lower().strip()}"
# Check cache
if not skip_cache:
cached = self.redis.get(cache_key)
if cached:
data = json.loads(cached)
return VerificationResult.from_dict(data)
# Fetch from API
result = super().verify(email)
# Cache result
self.redis.setex(
cache_key,
self.cache_ttl,
json.dumps({
'email': result.email,
'is_valid': result.is_valid,
'is_deliverable': result.is_deliverable,
'is_disposable': result.is_disposable,
'is_role_based': result.is_role_based,
'is_catch_all': result.is_catch_all,
'is_free_provider': result.is_free_provider,
'risk_score': result.risk_score,
'domain': result.domain,
'mx_records': result.mx_records,
'smtp_check': result.smtp_check,
'verification_time_ms': result.verification_time_ms
})
)
return result
ํตํฉ ํ ์คํธ
ํตํฉ์ด ์ฌ๋ฐ๋ฅด๊ฒ ์๋ํ๋์ง ํ์ธํ๊ธฐ ์ํด ํฌ๊ด์ ์ธ ํ ์คํธ๋ฅผ ์์ฑํ์ธ์:
import pytest
from unittest.mock import Mock, patch
from email_verification import EmailVerificationClient
from email_verification.models import VerificationResult
from email_verification.exceptions import AuthenticationError, RateLimitError
@pytest.fixture
def client():
"""Create test client."""
return EmailVerificationClient(api_key="test_key")
@pytest.fixture
def mock_response():
"""Create mock API response."""
return {
"email": "test@example.com",
"is_valid": True,
"is_deliverable": True,
"is_disposable": False,
"is_role_based": False,
"is_catch_all": False,
"is_free_provider": False,
"risk_score": 10,
"domain": "example.com",
"mx_records": ["mx.example.com"],
"smtp_check": True,
"verification_time_ms": 150
}
class TestEmailVerificationClient:
"""Tests for EmailVerificationClient."""
def test_verify_valid_email(self, client, mock_response):
"""Test successful email verification."""
with patch.object(client.session, 'post') as mock_post:
mock_post.return_value.status_code = 200
mock_post.return_value.json.return_value = mock_response
result = client.verify("test@example.com")
assert result.is_valid is True
assert result.is_deliverable is True
assert result.risk_score == 10
def test_verify_disposable_email(self, client):
"""Test detection of disposable email."""
mock_data = {
"email": "temp@mailinator.com",
"is_valid": True,
"is_deliverable": True,
"is_disposable": True,
"is_role_based": False,
"is_catch_all": False,
"is_free_provider": False,
"risk_score": 80,
"domain": "mailinator.com",
"mx_records": [],
"smtp_check": True,
"verification_time_ms": 100
}
with patch.object(client.session, 'post') as mock_post:
mock_post.return_value.status_code = 200
mock_post.return_value.json.return_value = mock_data
result = client.verify("temp@mailinator.com")
assert result.is_disposable is True
assert result.risk_score == 80
assert result.is_safe_to_send() is False
def test_authentication_error(self, client):
"""Test handling of authentication errors."""
with patch.object(client.session, 'post') as mock_post:
mock_post.return_value.status_code = 401
with pytest.raises(AuthenticationError):
client.verify("test@example.com")
def test_rate_limit_handling(self, client, mock_response):
"""Test rate limit error handling."""
with patch.object(client.session, 'post') as mock_post:
mock_post.return_value.status_code = 429
mock_post.return_value.headers = {'Retry-After': '30'}
with pytest.raises(RateLimitError) as exc_info:
client.verify("test@example.com")
assert exc_info.value.retry_after == 30
class TestVerificationResult:
"""Tests for VerificationResult model."""
def test_from_dict(self, mock_response):
"""Test creating result from dictionary."""
result = VerificationResult.from_dict(mock_response)
assert result.email == "test@example.com"
assert result.is_valid is True
assert result.domain == "example.com"
def test_is_safe_to_send_valid(self, mock_response):
"""Test safe to send for valid email."""
result = VerificationResult.from_dict(mock_response)
assert result.is_safe_to_send() is True
def test_is_safe_to_send_disposable(self, mock_response):
"""Test safe to send blocks disposable."""
mock_response['is_disposable'] = True
mock_response['risk_score'] = 80
result = VerificationResult.from_dict(mock_response)
assert result.is_safe_to_send() is False
def test_is_safe_to_send_high_risk(self, mock_response):
"""Test safe to send blocks high risk."""
mock_response['risk_score'] = 75
result = VerificationResult.from_dict(mock_response)
assert result.is_safe_to_send() is False
์ค๋ฅ ์ฒ๋ฆฌ ๋ชจ๋ฒ ์ฌ๋ก
ํ๋ก๋์ ์์ ์ฑ์ ์ํ ํฌ๊ด์ ์ธ ์ค๋ฅ ์ฒ๋ฆฌ๋ฅผ ๊ตฌํํ์ธ์:
import logging
from typing import Optional, Callable
from functools import wraps
logger = logging.getLogger(__name__)
def with_verification_fallback(
fallback_value: bool = True,
log_errors: bool = True
):
"""
Decorator to handle verification errors gracefully.
Args:
fallback_value: Value to return on error
log_errors: Whether to log errors
"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except AuthenticationError:
logger.critical("Email verification API authentication failed")
raise # Re-raise auth errors
except RateLimitError as e:
if log_errors:
logger.warning(f"Rate limit exceeded, retry after {e.retry_after}s")
return fallback_value
except Exception as e:
if log_errors:
logger.error(f"Email verification failed: {e}")
return fallback_value
return wrapper
return decorator
class SafeEmailVerifier:
"""
Wrapper that provides safe verification with fallbacks.
"""
def __init__(
self,
client: EmailVerificationClient,
strict_mode: bool = False,
default_result: Optional[VerificationResult] = None
):
self.client = client
self.strict_mode = strict_mode
self.default_result = default_result or self._create_default_result()
def verify(self, email: str) -> VerificationResult:
"""
Verify email with graceful error handling.
In non-strict mode, returns a permissive default on errors.
In strict mode, propagates errors.
"""
try:
return self.client.verify(email)
except AuthenticationError:
# Always propagate auth errors
raise
except (RateLimitError, Exception) as e:
logger.error(f"Verification error for {email}: {e}")
if self.strict_mode:
raise
# Return permissive default
result = self._create_default_result()
result.email = email
return result
def _create_default_result(self) -> VerificationResult:
"""Create a permissive default result."""
return VerificationResult(
email="",
is_valid=True,
is_deliverable=True,
is_disposable=False,
is_role_based=False,
is_catch_all=False,
is_free_provider=False,
risk_score=0,
domain="",
mx_records=[],
smtp_check=True,
verification_time_ms=0
)
๋ชจ๋ํฐ๋ง ๋ฐ ๋ก๊น
ํ๋ก๋์ ๋ฐฐํฌ๋ฅผ ์ํ ์ ์ ํ ๋ชจ๋ํฐ๋ง์ ๊ตฌํํ์ธ์:
import time
import logging
from dataclasses import dataclass, field
from typing import Dict, List
from datetime import datetime, timedelta
from collections import defaultdict
@dataclass
class VerificationMetrics:
"""Track verification metrics for monitoring."""
total_verifications: int = 0
successful_verifications: int = 0
failed_verifications: int = 0
total_response_time_ms: int = 0
errors_by_type: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
results_by_status: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
hourly_counts: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
@property
def success_rate(self) -> float:
if self.total_verifications == 0:
return 0.0
return self.successful_verifications / self.total_verifications * 100
@property
def avg_response_time_ms(self) -> float:
if self.successful_verifications == 0:
return 0.0
return self.total_response_time_ms / self.successful_verifications
def record_success(self, result: VerificationResult):
"""Record a successful verification."""
self.total_verifications += 1
self.successful_verifications += 1
self.total_response_time_ms += result.verification_time_ms
# Track status distribution
if result.is_deliverable:
self.results_by_status['deliverable'] += 1
if result.is_disposable:
self.results_by_status['disposable'] += 1
if result.is_catch_all:
self.results_by_status['catch_all'] += 1
# Track hourly usage
hour_key = datetime.now().strftime('%Y-%m-%d-%H')
self.hourly_counts[hour_key] += 1
def record_error(self, error_type: str):
"""Record a verification error."""
self.total_verifications += 1
self.failed_verifications += 1
self.errors_by_type[error_type] += 1
def get_summary(self) -> dict:
"""Get metrics summary."""
return {
'total_verifications': self.total_verifications,
'success_rate': f"{self.success_rate:.1f}%",
'avg_response_time_ms': f"{self.avg_response_time_ms:.0f}",
'errors_by_type': dict(self.errors_by_type),
'results_distribution': dict(self.results_by_status)
}
class MonitoredEmailVerificationClient(EmailVerificationClient):
"""Client with built-in monitoring and metrics."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = VerificationMetrics()
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
def verify(self, email: str) -> VerificationResult:
"""Verify with metrics tracking."""
start_time = time.time()
try:
result = super().verify(email)
self.metrics.record_success(result)
# Log verification details
self.logger.info(
"email_verification",
extra={
'email_domain': result.domain,
'is_valid': result.is_valid,
'is_deliverable': result.is_deliverable,
'is_disposable': result.is_disposable,
'risk_score': result.risk_score,
'response_time_ms': result.verification_time_ms
}
)
return result
except Exception as e:
error_type = type(e).__name__
self.metrics.record_error(error_type)
self.logger.error(
"email_verification_error",
extra={
'email_domain': email.split('@')[-1] if '@' in email else 'unknown',
'error_type': error_type,
'error_message': str(e),
'duration_ms': int((time.time() - start_time) * 1000)
}
)
raise
๊ฒฐ๋ก
BillionVerify ์ด๋ฉ์ผ ๊ฒ์ฆ API๋ฅผ Python๊ณผ ํตํฉํ๋ฉด ๋ชจ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ฐ๋ ฅํ ์ด๋ฉ์ผ ์ ํจ์ฑ ๊ฒ์ฌ๋ฅผ ๊ตฌ์ถํ ์ ์์ต๋๋ค. ์ด ๊ฐ์ด๋์ ํจํด๊ณผ ์์ ๋ ํ๋ก๋์ ์ฌ์ฉ์ ์ํ ๊ฒฌ๊ณ ํ ๊ธฐ๋ฐ์ ์ ๊ณตํฉ๋๋ค.
์ฑ๊ณต์ ์ธ Python ํตํฉ์ ์ํ ํต์ฌ ์์ฝ:
๊ตฌ์กฐํ๋ ํด๋ผ์ด์ธํธ ํด๋์ค ์ฌ์ฉ - ์์ ์ฑ์ ์ํ ์ ์ ํ ์ค๋ฅ ์ฒ๋ฆฌ, ์ฌ์๋ ๋ก์ง ๋ฐ ์ฐ๊ฒฐ ํ๋ง ํฌํจ
์บ์ฑ ๊ตฌํ - API ํธ์ถ์ ์ค์ด๊ณ ์์ฃผ ๊ฒ์ฆ๋๋ ์ฃผ์์ ์๋ต ์๊ฐ ๊ฐ์
์ฌ๋ฐ๋ฅธ ์ฒ๋ฆฌ ์ ๋ต ์ ํ - ๋ณผ๋ฅจ์ ๋ฐ๋ผ: ์๊ท๋ชจ ๋ชฉ๋ก์ ์์ฐจ ์ฒ๋ฆฌ, ์ค๊ฐ ๋ณผ๋ฅจ์ ๋์ ์ค๋ ๋, ๋๊ท๋ชจ ๋ชฉ๋ก์ ๋ฐฐ์น API ์ฌ์ฉ
async/await ํ์ฉ - ๋ง์ ์ด๋ฉ์ผ์ ๋น ๋ฅด๊ฒ ๊ฒ์ฆํด์ผ ํ๋ ๊ณ ์ฒ๋ฆฌ๋ ์ ํ๋ฆฌ์ผ์ด์ ์๋ aiohttp ์ฌ์ฉ
ํ๋ ์์ํฌ์ ํตํฉ - Django, Flask ๋๋ ๊ธฐํ Python ํ๋ ์์ํฌ์ ์์ฐ์ค๋ฝ๊ฒ ์ด์ธ๋ฆฌ๋ ๋ฐ์ฝ๋ ์ดํฐ, ๊ฒ์ฆ๊ธฐ ๋๋ ๋ฏธ๋ค์จ์ด ํจํด ์ฌ์ฉ
๋ชจ๋ํฐ๋ง ๋ฐ ์ธก์ - ๋ฉํธ๋ฆญ ์ถ์ ์ผ๋ก ๊ฒ์ฆ ์ฌ์ฉ๋์ ๋ชจ๋ํฐ๋งํ์ฌ ํจํด์ ์ดํดํ๊ณ ๋น์ฉ ์ต์ ํ
์ค๋ฅ๋ฅผ ์ฐ์ํ๊ฒ ์ฒ๋ฆฌ - ๊ฒ์ฆ ์๋น์ค๊ฐ ์ผ์์ ์ผ๋ก ์ฌ์ฉํ ์ ์์ ๋ ์ฌ์ฉ์๋ฅผ ์ฐจ๋จํ์ง ์๋ ํด๋ฐฑ ์ ๊ณต
๊ธฐ๋ณธ ํด๋ผ์ด์ธํธ ๊ตฌํ์ผ๋ก ์์ํ์ฌ ํ์์ ๋ฐ๋ผ ์บ์ฑ, ๋ฐฐ์น ์ฒ๋ฆฌ ๋ฐ ๋ชจ๋ํฐ๋ง๊ณผ ๊ฐ์ ๊ธฐ๋ฅ์ ์ ์ง์ ์ผ๋ก ์ถ๊ฐํ์ธ์. ๋ชจ๋์ ์ค๊ณ๋ก ํน์ ์๊ตฌ ์ฌํญ์ ๋ง๊ฒ ํตํฉ์ ์ฝ๊ฒ ์ฌ์ฉ์ ์ ์ํ ์ ์์ต๋๋ค.
ํ๋ก๋์ ๋ฐฐํฌ์ ๊ฒฝ์ฐ ํญ์ ์์๋๋ ์ด๋ฉ์ผ ํจํด ๋ฐ ๋ณผ๋ฅจ์ผ๋ก ์ฒ ์ ํ ํ ์คํธํ๊ณ , ๋๋ฒ๊น ์ ์ํ ์ ์ ํ ๋ก๊น ์ ๊ตฌํํ๋ฉฐ, ์ธ์ฆ ์คํจ ๋๋ ๋น์ ์์ ์ธ ์ค๋ฅ์จ์ ๋ํ ์๋ฆผ์ ์ค์ ํ์ธ์.