Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exports and imports #3692

Closed
wants to merge 3 commits into from
Closed

Conversation

ruslandoga
Copy link
Contributor

@ruslandoga ruslandoga commented Jan 16, 2024

Changes

This PR implements exports from events_v2 and sessions_v2 and imports into imported_* tables.

  • check export queries, are they correct, compare with CSV report queries (name=pageview filtering, etc.), ensure that after exporting website.com and then importing it as imported.website.com, the dashboards for website.com and imported.website.com show the same statistics
  • tests tests tests
  • adapt to Add multiple imports per site #3724
  • import_id and staging tables
  • zstd? Native?
  • flash message with success or error on import

Tests

  • Automated tests have been added

Changelog

  • Entry has been added to changelog

Documentation

  • Docs have been updated

Dark mode

  • The UI has been tested both in dark and light mode

@ruslandoga ruslandoga changed the title add export queries export+import Jan 17, 2024
@ruslandoga ruslandoga mentioned this pull request Jan 17, 2024
8 tasks
@ruslandoga ruslandoga force-pushed the export-import branch 4 times, most recently from 8469350 to 4a91d1d Compare January 17, 2024 06:21
end)

conn
|> put_flash(:info, "IMPORT SUCCESS")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • seems like there are no flashes on that page

@ruslandoga ruslandoga removed the request for review from zoldar January 17, 2024 08:38
@ruslandoga ruslandoga force-pushed the export-import branch 2 times, most recently from 19f847a to 889ac16 Compare January 18, 2024 12:22
@ruslandoga ruslandoga changed the title export+import Exports and imports Jan 18, 2024
xhr.open("PUT", url, true)
xhr.send(entry.file)
})
}
Copy link
Contributor Author

@ruslandoga ruslandoga Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config/dev.exs Outdated
url: "http://localhost:9000",
# minio includes the port in canonical request
host: "localhost:9000",
region: "us-east-1"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using minio for local/dev S3 testing

$ docker run --rm -d -p 9000:9000 -p 9001:9001 --name plausible_s3 minio/minio server /data --console-address ":9001"
1d0f20bbfa65bfd692078f5e84ebf8c53e70801f355be4557cd772ec01493d15

$ docker exec plausible_s3 mc alias set local http://localhost:9000 minioadmin minioadmin
Added `local` successfully.

$ docker exec plausible_s3 mc mb local/imports
Bucket created successfully `local/imports`.

$ docker exec plausible_s3 mc mb local/exports
Bucket created successfully `local/exports`.

select: %{
date: date(e.timestamp),
visitors: visitors(e),
pageviews: selected_as(fragment("countIf(?='pageview')", e.name), :pageviews)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about all these name=pageview filters, maybe they can be applied in the WHERE clause for most tables?

lib/plausible/exports/s3.ex Outdated Show resolved Hide resolved
:ok = ensure_supported_format(format)

statement =
"INSERT INTO {table:Identifier} SELECT {site_id:UInt64}, * FROM #{input(table)} FORMAT #{format}\n"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The staging tables approach is slightly more complicated

site_id = 1

# need session_id for temp tables to be visible
{:ok, pool} = Ch.start_link(pool_size: 1, settings: [session_id: "imports_site_#{site_id}"])

table = "imported_browsers"
format = "CSVWithNames"

# I'm using temp table to avoid cleanup, but automatic cleanup can be replaced with Oban
Ch.query!(pool, "CREATE TEMPORARY TABLE #{table}_tmp AS #{table}")

DBConnection.run(pool, fn conn ->
  statement = "INSERT INTO #{table_}_tmp SELECT {site_id:UInt64}, * FROM #{input(table)} FORMAT #{format}\n"
  
  File.stream!("imported_browsers.csv", _64kb = 64000)
  |> Stream.into(Ch.stream(conn, statement, %{"site_id" => site_id}, format: format))
  |> Stream.run()
end)

# this part can be made safer with Oban (otherwise we can lose connection before sending all of the ALTER statements)
pool
|> Ch.query!("SELECT partition FROM system.parts WHERE table = {table:String} GROUP BY partition ORDER BY partition", %{"table" => table <> "_tmp"})
# this is also where the data becomes visible to the "real" tables
|> Enum.each(fn [partition] -> Ch.query!(pool, "ALTER TABLE #{table}_tmp MOVE PARTITION #{partition} TO TABLE #{table}") end)

More info on this approach: https://clickhouse.com/blog/supercharge-your-clickhouse-data-loads-part3

lib/plausible/s3/cleaner.ex Outdated Show resolved Hide resolved
|> Stream.run()
end)

conn
Copy link
Contributor Author

@ruslandoga ruslandoga Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically I need to return the modified conn, but it doesn't seem to matter in Cowboy. I'll update it to use Enum.reduce or similar.

{:ok, conn} =
repo.config()
|> Keyword.replace!(:pool_size, 1)
|> Ch.start_link()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this part. Maybe repos could do this job instead of one-off clients. I like that they are linked and exit when the request or Oban job finishes.

@@ -64,7 +64,8 @@ defmodule PlausibleWeb.Endpoint do
plug(Plug.Parsers,
parsers: [:urlencoded, :multipart, :json],
pass: ["*/*"],
json_decoder: Phoenix.json_library()
json_decoder: Phoenix.json_library(),
length: 100_000_000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an arbitrary large value to support direct uploads. This change would probably be present only in self-hosted.

lib/plausible_web/live/plugins/s3_import.ex Outdated Show resolved Hide resolved
@ruurtjan
Copy link

Thanks for your work on this!

I'd be happy to donate $100 when this lands in both the cloud version and the Docker image. DM me then :)

ch_conn
|> Plausible.Exports.stream_archive(
Plausible.Exports.export_queries(site.id, extname: ".csv"),
format: "CSVWithNames"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying out different combinations of compression and format on (2023-01-01 -- now) plausible.io data I got the following results

  # .<format>.<compression> <time took>, <zip size>
  # .csv 4145ms, 33.5MB
  # .csv.zst 6349ms, 7.1MB
  # .csv.gz 5805ms, 7.7MB
  # .csv.lz4, 5721ms, 8.5MB
  # .ch, 6076ms, 30.4MB
  # .ch.zst, 5434ms, 5.2MB
  # .ch.gz, 4415ms, 5.8MB
  # .ch.lz4, 5382ms, 7.3MB

  def data_export(conn, _params) do
    site = conn.assigns.site

    conn =
      conn
      |> put_resp_content_type("application/zip")
      |> put_resp_header("content-disposition", ~s|attachment; filename="Plausible.zip"|)
      |> send_chunked(200)

    started_at = System.monotonic_time(:millisecond)

    DBConnection.run(ch(Plausible.ClickhouseRepo), fn ch_conn ->
      ch_conn
      |> Plausible.Exports.stream_archive(
        Plausible.Exports.export_queries(site.id, extname: ".ch.lz4"),
        format: "Native",
        headers: [{"accept-encoding", "lz4"}],
        settings: [enable_http_compression: 1]
      )
      |> Stream.each(fn data -> chunk(conn, data) end)
      |> Stream.run()
    end)

    IO.inspect(System.monotonic_time(:millisecond) - started_at, label: "exported in ms")

    conn
  end

selected_as(
fragment(
"toUInt32(round(ifNotFinite(?,0)))",
sum(unquote(t).sign * unquote(t).duration) / sum(unquote(t).sign)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does sum(sign) become 0?

Copy link
Contributor Author

@ruslandoga ruslandoga Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT session_id, sum(sign) AS sign, count() AS count
FROM sessions_v2
WHERE site_id = 37
GROUP BY session_id
HAVING sign = 0

shows that all such sessions have two sessions rows, with one cancelling the other. Some of these sessions have only one event row, some have two.


defmacrop bounces(t) do
quote do
selected_as(sum(unquote(t).sign * unquote(t).is_bounce), :bounces)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sometimes evaluates to -1.

Copy link
Contributor Author

@ruslandoga ruslandoga Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT session_id, sum(sign * is_bounce) AS bounce
FROM sessions_v2
WHERE site_id = 37
GROUP BY session_id
HAVING bounce < 0

shows 52 sessions that can drag that aggregate below 0.

"""
INSERT INTO {table:Identifier} \
SELECT {site_id:UInt64} AS site_id, * \
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String})\
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also use url table function with a signed url from Plausible.S3.signed_url/1

@@ -525,8 +525,7 @@ base_queues = [
domain_change_transition: 1,
check_accept_traffic_until: 1,
s3_data_export: 1,
s3_data_import: 1,
s3_cleaner: 10
Copy link
Contributor Author

@ruslandoga ruslandoga Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • bucket lifecycle can be used instead, both for unfinished multipart uploads and object expiration

S3_ENDPOINT=http://localhost:9000
S3_EXPORTS_BUCKET=exports
S3_IMPORTS_BUCKET=imports
S3_HOST_FOR_CLICKHOUSE=172.19.0.4
Copy link
Contributor Author

@ruslandoga ruslandoga Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • make it plausible_s3 or something (right now ClickHouse complains about invalid DNS name even though it's resolvable in Docker, need to check how ClickHouse CI solves it)

def request(method, url, body, headers, opts) do
req = Finch.build(method, url, headers, body)

case Finch.request(req, Plausible.Finch, opts) do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be Plausible.HTTPClient.request? And if so, should Plausible.Sentry.Client be updated or are they different (Sentry is more meta, whereas S3 ops are part of the normal application flow).

|> select_merge([s, _i], %{
views_per_visit: s.views_per_visit
|> select_merge([s, i], %{
views_per_visit: s.views_per_visit + i.views_per_visit
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be weighted

@@ -83,8 +83,7 @@ export default class TopStats extends React.Component {
{this.topStatNumberLong(stat.name, stat.value)} {statName}
</div>}

{stat.name === 'Current visitors' && <p className="font-normal text-xs">Last updated <SecondsSinceLastLoad lastLoadTimestamp={lastLoadTimestamp}/>s ago</p>}
{stat.name === 'Views per visit' && showingImported && <p className="font-normal text-xs whitespace-nowrap">Based only on native data</p>}
{stat.name === 'Current visitors' && <p className="font-normal text-xs">Last updated <SecondsSinceLastLoad lastLoadTimestamp={lastLoadTimestamp} />s ago</p>}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move these changes to #3792

@ruslandoga ruslandoga force-pushed the export-import branch 4 times, most recently from 1dc14a0 to 46fa7eb Compare February 18, 2024 15:03
"""
@spec import_stream(DBConnection.t(), pos_integer, String.t(), String.t(), [Ch.query_option()]) ::
Ch.Stream.t()
def import_stream(conn, site_id, table, format, opts \\ []) do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is used in self-hosted only. TODO

@ruslandoga ruslandoga changed the title Exports and imports Add CSV Importer Feb 18, 2024
@ruslandoga
Copy link
Contributor Author

I'm going to split this PR, one for CSVImporter, one for S3 Exports, one for direct exports, one for direct imports.

@ruslandoga ruslandoga closed this Feb 18, 2024
@ruslandoga ruslandoga changed the title Add CSV Importer Exports and imports Feb 19, 2024
@johanrosenson
Copy link

@ruslandoga The exports you are working on here, would that allow for complete exports of data? For instance if we would want to import it into a completely different system, would it contain all the data?

@ruslandoga
Copy link
Contributor Author

ruslandoga commented Feb 24, 2024

👋 @johanrosenson

No, the data would be pre-aggregated by date and use the same tables that are used for the current Google Analytics imports.

@purcell
Copy link

purcell commented Feb 25, 2024

So they wouldn't have sufficient detail to switch seamlessly between the self-hosted and SaaS versions, or vice versa?

@ruslandoga
Copy link
Contributor Author

ruslandoga commented Feb 26, 2024

👋 @purcell

I think it would be similar to switching from Google Analytics to Plausible. Some details get lost, and hourly charts don't show much data. And due to how Plausible is calculating session data, visitor counts get over-estimated in exports. Here're some screenshots of my WIP for plausible.io (staging) data after export/import flow.

Views per visit is 0 in imported.com dashboards but won't be once #3792 is merged

Last year:

Screenshot 2024-02-26 at 19 12 04 Screenshot 2024-02-26 at 19 11 48

December 2023:

Screenshot 2024-02-26 at 19 19 25 Screenshot 2024-02-26 at 19 19 27

December 27, 2023

Screenshot 2024-02-26 at 19 19 39 Screenshot 2024-02-26 at 19 19 42

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants