-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathjob_scraper.py
executable file
·125 lines (102 loc) · 3.35 KB
/
job_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
import logging
import os
import argparse
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from linkedin_jobs_scraper import LinkedinScraper
from linkedin_jobs_scraper.events import Events, EventData, EventMetrics
from linkedin_jobs_scraper.query import Query, QueryOptions, QueryFilters
from linkedin_jobs_scraper.filters import (
RelevanceFilters,
TimeFilters,
TypeFilters,
ExperienceLevelFilters,
OnSiteOrRemoteFilters,
)
# Change root logger level (default is WARN)
logging.basicConfig(level=logging.INFO)
def sanitize_filename(s):
"""
Sanitize a string to be used as a filename.
"""
return "".join([c for c in s if c.isalpha() or c.isdigit()]).rstrip()
def on_data(data: EventData):
print(
'[ON_DATA]',
data.title,
data.location,
data.company,
data.company_link,
data.date,
data.link,
data.insights,
len(data.description),
)
# Create a directory for each company
company_dir = os.path.join('jobs', sanitize_filename(data.company))
os.makedirs(company_dir, exist_ok=True)
# Create a file for each job in the respective company directory
job_file = os.path.join(company_dir, sanitize_filename(data.title) + '.txt')
# Check if the file already exists
if not os.path.exists(job_file):
with open(job_file, 'w') as f:
# Write the link as the first line
f.write(f"Job Link: {data.link}\n")
f.write(f"title: {data.title}\n")
f.write(f"company: {data.company}\n")
f.write(f"location: {data.location}\n")
f.write("\n")
f.write("Job Description:\n")
f.write(data.description)
else:
print(f'Job already exists: {job_file}')
def on_metrics(metrics: EventMetrics):
print('[ON_METRICS]', str(metrics))
def on_error(error):
print('[ON_ERROR]', error)
def on_end():
print('[ON_END]')
def main(limit):
scraper = LinkedinScraper(
chrome_executable_path=None,
chrome_options=None,
headless=True,
max_workers=1,
slow_mo=5,
page_load_timeout=40,
)
# Add event listeners
scraper.on(Events.DATA, on_data)
scraper.on(Events.ERROR, on_error)
scraper.on(Events.END, on_end)
queries = [
Query(
query='devops',
options=QueryOptions(
locations=['Canada'],
apply_link=True,
skip_promoted_jobs=False,
page_offset=0,
limit=limit,
filters=QueryFilters(
relevance=RelevanceFilters.RECENT,
time=TimeFilters.MONTH, #DAY, #MONTH,
on_site_or_remote=[
OnSiteOrRemoteFilters.REMOTE,
OnSiteOrRemoteFilters.HYBRID,
],
experience=[
ExperienceLevelFilters.MID_SENIOR,
ExperienceLevelFilters.ASSOCIATE,
],
),
),
),
]
scraper.run(queries)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--limit", type=int, default=1, help="Set the limit for number of jobs")
args = parser.parse_args()
main(args.limit)